• DataX更新null值到ElasticSearch不生效的问题


    一、问题现象

    我们用的 DataX 版本比较老,在推送数据到 Elasticsearch ,根据主键更新数据时,发现有 null 不能更新到 Elasticsearch 中的问题,Elasticsearch 中还保持原来的值。

    具体情况如下:
    1、Elasticsearch 索引中有个 double 类型的字段,比如字段名叫 guar_fee_rate (担保费率),原来是有值的,比如值为1。

    ## 查询索引结构
    GET my_test_indice/_mapping
    
    {
      "my_test_indice" : {
        "mappings" : {
          "properties" : {
            "guar_fee_rate" : {
              "type" : "long"
            },
            "guar_fee_rate " : {
              "type" : "double"  ## double类型的字段
            }
          }
        }
      }
    }
    
    ## 查询数据
    GET my_test_indice/_search
    
    {
         ...... 
        "hits" : [
          {
            "_index" : "my_test_indice",
            "_type" : "_doc",
            "_id" : "1",
            "_score" : 1.0,
            "_source" : {
              "guar_fee_rate" : 1  ## 当前有值,为1
            }
          }
        ]
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    2、现在这个字段值变为 null,使用 DataX 推送更新到 Elasticsearch 中,预期是将ES中的 guar_fee_rate 值改为 null。

    ## 预期的值
    "hits" : [
          {
            "_index" : "my_test_indice",
            "_type" : "_doc",
            "_id" : "1",
            "_score" : 1.0,
            "_source" : {
              "guar_fee_rate" : null  ## 想把字段值改为 null
            }
          }
        ]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3、但是发现, Elasticsearch 中的值没有变,还是保持原来的 1。
    4、如果在 source 端将 null 值转换为空字符串后,则 DataX 运行时候会报错,报脏数据。

    二、先说解决办法

    经过一番排查问题,最终的解决方法是:

    1、如果只是想解决 double 类型字段的问题,可以 在 DataX source 端将为 null 的字段值转换为字符串 “NaN”,DataX 才会将对应 ES 字段值更新为 null
    2、如果要解决 integer、keyword 等其他类型字段问题,升级 DataX 版本,或者二次开发。

    三、排查过程 & 原因分析

    针对于这个 null 值不能更到到 Elasticsearch 中的问题,排查分析过程如下:

    1、先排查是 ES 的问题还是 DataX 的问题

    测试一下给 ES double 类型字段赋 null,能更新成功吗?

    ## 创建索引
    PUT my_test_indice
    {
      "mappings": {
        "properties": {
          "guar_fee_rate ": {
            "type":       "double"
          }
        }
      }
    }
    ## 插入值1
    PUT my_test_indice/_doc/1
    {
      "guar_fee_rate":1
    }
    ## 查询目前值为1
    GET my_test_indice/_search
    {
    	"hits" : [
    	 {
    	   "_index" : "my_test_indice",
    	   "_type" : "_doc",
    	   "_id" : "1",
    	   "_score" : 1.0,
    	   "_source" : {
    	     "guar_fee_rate" : 1
    	   }
    	 }
    	]
    }
    
    ## 更新为 null
    POST my_test_indice/_update_by_query
    {
      "script": {
        "source": "ctx._source['guar_fee_rate'] = null"
      }
    }
    
    ## 查询目前值为null
    GET my_test_indice/_search
    {
    	"hits" : [
    	 {
    	   "_index" : "my_test_indice",
    	   "_type" : "_doc",
    	   "_id" : "1",
    	   "_score" : 1.0,
    	   "_source" : {
    	     "guar_fee_rate" : null
    	   }
    	 }
    	]
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    至此,说明 ES 的 double 类型字段是可以赋值为 null 的,那肯定是 DataX 的问题导致不能更新。

    2、查找资料

    先后翻看了 DataX 官方文档、百度、谷歌,都没有找到解决方案,甚至连问这个问题的都没有。

    3、查看 DataX ElasticsearchWriter 源码 double 字段类型更新部分

    查看写入 Elasticsearch 数据的类:
    最新版本的链接如下,(我生产环境用的不是最新版本,但是可以参考): ElasticSearchWriter.java

    查看这个方法,

    void doBatchInsert(final List<Record> writerBuffer)
    
    • 1

    找到字段类型为 double 时的代码,column调用 asDouble() 方法获取值。

    case DOUBLE:
         data.put(columnName, column.asDouble());
         break;
    
    • 1
    • 2
    • 3

    找到对应重载的实现方法,在 com.alibaba.datax.common.element.DoubleColumn 类中,

    @Override
    public Double asDouble() {
    	if (null == this.getRawData()) {
    		return null;
    	}
    
    	String string = (String) this.getRawData();
    
    	boolean isDoubleSpecific = string.equals("NaN")
    			|| string.equals("-Infinity") || string.equals("+Infinity");
    	if (isDoubleSpecific) {
    		return Double.valueOf(string);
    	}
    
    	BigDecimal result = this.asBigDecimal();
    	OverFlowUtil.validateDoubleNotOverFlow(result);
    
    	return result.doubleValue();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    这里当值为 “NaN”、“-Infinity”、“+Infinity”,分别表示 不是数字、负无穷、正无穷,如果是这三个字符串,不会返回null,是可以返回值的,这时候我将DataX 源端值为 null 的情况下转换为字符串 “NaN”,发现可以 将 Elasticsearch 里的字段值更新为 null 了,实现了目标。

    4、查看 DataX ElasticsearchWriter 源码 序列化 JSON 串部分

    double 类型字段值的问题解决了,Integer和其他类型 null 值肯定也存在不能更新的问题,怎么解决呢,继续看代码。

    void doBatchInsert(final List<Record> writerBuffer)
    
    • 1

    在这个方法下面,当 DataX 配置为 Updata 模式时,序列化的地方。

    case UPDATE:
        Map<String, Object> updateDoc = new HashMap<String, Object>();
        updateDoc.put("doc", data);
        updateDoc.put("doc_as_upsert", true);
        Update.Builder update = null;
        if (this.enableWriteNull)  //这个 if 是最新版本DataX的功能,我用的版本没有
        {
                // write: {a:"1",b:null}
            update = new Update.Builder(
                    JSONObject.toJSONString(updateDoc, SerializerFeature.WriteMapNullValue,
                            SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
                            SerializerFeature.WriteEnumUsingToString, SerializerFeature.SortField));
            // 在DEFAULT_GENERATE_FEATURE基础上,只增加了SerializerFeature.WRITE_MAP_NULL_FEATURES
        } else {
            // write: {"a":"1"}
            update = new Update.Builder(JSONObject.toJSONString(updateDoc));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    这里的将对象序列化为JSON串用的 fastjson 工具,而 fastjson 对于值为 null 的节点,默认是丢弃掉了。比如:

        @Test
        public void test22() {
            Map<String, Object> objectMap = new HashMap<>();
            objectMap.put("a", 2);
            objectMap.put("b", null);
            objectMap.put("c", "abc");
    
            String jsonStr = JSON.toJSONString(objectMap);
            System.out.println(jsonStr);
            // 输出内容为:{"a":2,"c":"abc"},b节点会丢弃掉。
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    我用的 DataX 版本是直接用 JSONObject.toJSONString(updateDoc) 序列化的,下一步用这个 json 串去更新 Elasticsearch 时,没有的字段,就会造成不更新,根本问题原因就在这里。

    四、如何解决

    彻底解决问题的办法就是修改对象序列化逻辑,让为 null 的节点也能序列化出来。我想到的解决方法有两个:

    1、升级 DataX 版本

    最新的 DataX 版本已经修复了这个 bug,就像前面代码里一样,增加了一个参数 enableWriteNull,默认是true,用这个参数控制是否将 null 值更新到 Elasticsearch 中,具体代码是这一段:

    if (this.enableWriteNull)
    {
            // write: {a:"1",b:null}
        update = new Update.Builder(
                JSONObject.toJSONString(updateDoc, SerializerFeature.WriteMapNullValue,
                        SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
                        SerializerFeature.WriteEnumUsingToString, SerializerFeature.SortField));
        // 在DEFAULT_GENERATE_FEATURE基础上,只增加了SerializerFeature.WRITE_MAP_NULL_FEATURES
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在 toJSONString() 时指定 SerializerFeature.WriteMapNullValue 参数,这个是 fastjson 参数,指定是否将 null,也序列化出来。

    2、修改源码

    因为我们生产环境的 DataX 本身就是二次开发的,所以我们采用自己修改源码的方式,仿照最新版本的处理方式,解决这个问题。

    至此,这个 DataX 更新 null 值到 Elasticsearch 不生效的问题算是完美解决了。

  • 相关阅读:
    互联网快讯:腾讯会议应用市场正式上线;Soul赴港递交上市申请书
    monkeyrunner环境搭建和初步用法
    读明朝那些事儿有感:书生的骨
    【期权系列】常见期权定价模型与策略概览
    使用舵机和超声波模块实现小车自动避障
    软磁材料种类、特点和应用范围
    android--屏幕适配
    何为博弈心理学?
    使用Pyhton执行JavaScript-pyexecjs
    支持JDK19虚拟线程的web框架,之一:体验
  • 原文地址:https://blog.csdn.net/chybin500/article/details/127782216