Json Diff&Merge

Json 的 diff 和 merge 功能实现

需求背景

scaleph 为了减少用户配置 flink 任务的工作,将一些常用、重复性的配置做成 template,自动应用到新建任务中。当 template 与新建任务进行 merge 时,需要比较 template 和新建任务配置项,新建任务会应用、新增或覆盖 template 配置项。

scaleph 的 flink 任务管理基于 flink kubernetes operator 实现,需要提供符合 flink kubernetes operator CRD 格式的对象,格式为 json 或 yaml。

scaleph 实现 template 和新建任务配置 merge 时,也同样采用了 json 生态相关库。

功能调研

在很多场景都需要用到数据的 diff 和 merge 功能:

  • 优化网络传输。网络传输大对象的时候会消耗大量网络流量,此时可以只传输变动的数据,减少对象体积。如 Elasticsearch 优化集群信息同步。
  • 配置变更。当推送新配置时,对比当前生效中的配置和新配置区别,应用发生变更的配置。如 kubernetes 的声明式配置特性。

收集到的开源项目实现如下:

实现一

基于 jackson 实现

package cn.sliew.scaleph.engine.flink.kubernetes.operator.util;

import cn.sliew.milky.common.util.JacksonUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.flipkart.zjsonpatch.DiffFlags;
import com.flipkart.zjsonpatch.JsonDiff;
import com.flipkart.zjsonpatch.JsonPatch;

import java.util.EnumSet;

public enum TemplateMerger {
    ;

    public static <T> T merge(T template, T target, Class<T> clazz) {
        JsonNode templateNode = JacksonUtil.toJsonNode(template);
        JsonNode targetNode = JacksonUtil.toJsonNode(target);
        JsonNode merged = doMerge(templateNode, targetNode);
        return JacksonUtil.toObject(merged, clazz);
    }

    public static JsonNode doMerge(final JsonNode template, final JsonNode target) {
        ObjectNode targetObject = (ObjectNode) target;
        ObjectNode templateObject;
        if (template instanceof ObjectNode) {
            templateObject = (ObjectNode) template;
        } else {
            templateObject = targetObject.objectNode();
        }

        target.fields().forEachRemaining(field -> {
            String key = field.getKey();
            JsonNode value = field.getValue();
            if (value.isNull()) {
                templateObject.remove(key);
            } else {
                JsonNode existingValue = templateObject.get(key);
                JsonNode mergeResult = doMerge(existingValue, value);
                templateObject.replace(key, mergeResult);
            }
        });
        return templateObject;
    }

}

实现二

基于 zjsonpatch 实现,取消 remove 类型。

package cn.sliew.scaleph.engine.flink.kubernetes.operator.util;

import cn.sliew.milky.common.util.JacksonUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.flipkart.zjsonpatch.DiffFlags;
import com.flipkart.zjsonpatch.JsonDiff;
import com.flipkart.zjsonpatch.JsonPatch;

import java.util.EnumSet;

public enum TemplateMerger {
    ;

    public static <T> T merge(T template, T target, Class<T> clazz) {
        JsonNode templateNode = JacksonUtil.toJsonNode(template);
        JsonNode targetNode = JacksonUtil.toJsonNode(target);
        JsonNode merged = doMerge(templateNode, targetNode);
        return JacksonUtil.toObject(merged, clazz);
    }

    private static JsonNode doMerge(JsonNode source, JsonNode target) {
        EnumSet<DiffFlags> flags = DiffFlags.dontNormalizeOpIntoMoveAndCopy().clone();
        JsonNode patch = disableRemove((ArrayNode) JsonDiff.asJson(source, target, flags));
        return JsonPatch.apply(patch, source);
    }

    private static JsonNode disableRemove(ArrayNode patch) {
        if (patch.isEmpty()) {
            return patch;
        }

        ArrayNode arrayNode = JacksonUtil.createArrayNode();
        for (JsonNode op : patch) {
            ObjectNode objectNode = (ObjectNode) op;
            if (objectNode.path("op").asText().equals("remove") == false) {
                arrayNode.add(objectNode);
            }
        }

        return arrayNode;
    }
}