背景

由于公司系统重构,涉及到历史数据初始化,各种字段补充,新的里面增加一个账单编号字段,10位随机数字,总数据量大约4500w,通过编写java代码实现此功能也是比较简单,但是写代码还要各种准备,还要整合多数据源处理,整体跑起来性能方面也不会很好,麻烦!!!,那怎么办呢,我想到了DataX这个数据同步的利器,高性能,但使用DataX进行数据处理时怎么把这个账单编号补充上呢?让我陷入了沉思,通过查看官方文档,知道DataX有Transformer功能,可以进行替换、写groovy脚本等,但是没我可以用的,于是乎,自己动写插件,自己动手丰衣足食。

一、Transformer

1、Transformer定义

在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。DataX包含了完整的E(Extract)、T(Transformer)、L(Load)支持。
运行模型
在这里插入图片描述
想要了解更多,查看官方手册,传送门---->官方文档

2、Transformer插件开发

  1. 如何开发插件,拿到源码后在哪里写代码,在哪里开发,这里怎么获取源码我这里就不细说了,请看我上一篇文章,有详细介绍。拿到源码后,找到DataX的core模块,找到transformer包在此下面创建相关类
    在这里插入图片描述
  2. 创建FiledFillTransformer类,此类就是用来进行字段转换的,我这里的需求比较简单,根据不同的类型创建不同前缀的编码,本来是想用redis的increment()方法,但是还需要整合redis,一是麻烦,二是有网络开销,影响性能,故采用AtomicLong 原子类进行替代
package com.alibaba.datax.core.transport.transformer;

import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.core.util.RedisUitl;
import com.alibaba.datax.transformer.Transformer;
import com.alibaba.fastjson2.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.YearMonth;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

/**
 * @Description:
 * @Date: 2024/7/29
 **/
public class FiledFillTransformer extends Transformer {

    private static final Logger LOG = LoggerFactory.getLogger(RedisUitl.class);
    private static final String  BILL_NO_PREFIX = "AR";
    private static final String  NO_BILL_NO_PREFIX = "BR";
    private static final String NATURE = "0";

    static AtomicLong arAtomicLong = new AtomicLong(0);
    static AtomicLong brAtomicLong = new AtomicLong(0);

    private static final String  BILL_NO = "finance:dts:bill_no";

    public FiledFillTransformer() {
        setTransformerName("dx_filed");
    }
    @Override
    public Record evaluate(Record record, Object... paras) {
        int columnIndex;
//        long start = System.currentTimeMillis();
        try {
            if (paras.length != 1) {
                throw new RuntimeException("dx_filed paras must be 1");
            }

            columnIndex = (Integer) paras[0];
        } catch (Exception e) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER, "paras:" + Arrays.asList(paras).toString() + " => " + e.getMessage());
        }

        Column column = record.getColumn(columnIndex);

        try {
            String oriValue = column.asString();

            //如果字段为空,跳过replace处理
            if(oriValue == null){
                return  record;
            }
            String newValue = NATURE.equals(oriValue) ? BILL_NO_PREFIX.concat(String.format("%010d", arAtomicLong.incrementAndGet()))
                                                                : NO_BILL_NO_PREFIX.concat(String.format("%010d", brAtomicLong.incrementAndGet()));
            record.setColumn(columnIndex, new StringColumn(newValue));
//            LOG.info("filedTransformer 耗时:" + (System.currentTimeMillis() - start));
        } catch (Exception e) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_RUN_EXCEPTION, e.getMessage(),e);
        }
        return record;
    }
}

3、插件注册

开发完成后,不能直接使用,需要注册到程序中,在core模块中,找到TransformerRegistry 注册类,将FiledFillTransformer进行注册。这样就可以编译打包使用了。
在这里插入图片描述

4、Job定义

模板具体怎么使用,这里不再赘述,请看我上一篇文章《Datax插件开发之整合easy Excel进行数据导出》
这里是官方给出的案例,自己可以修改下

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            },
            "errorLimit": {
                "record": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column": [
                            {
                                "value": "DataX",
                                "type": "string"
                            },
                            {
                                "value": 1724154616370,
                                "type": "long"
                            },
                            {
                                "value": "2024-01-01 00:00:00",
                                "type": "date"
                            },
                            {
                                "value": true,
                                "type": "bool"
                            },
                            {
                                "value": "TestRawData",
                                "type": "bytes"
                            }
                        ],
                        "sliceRecordCount": 100
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false,
                        "encoding": "UTF-8"
                    }
                },
                "transformer": [
                    {
                        "name": "dx_substr",
                        "parameter": {
                            "columnIndex": 5,
                            "paras": [
                                "1",
                                "3"
                            ]
                        }
                    },
                    {
                        "name": "dx_replace",
                        "parameter": {
                            "columnIndex": 4,
                            "paras": [
                                "3",
                                "4",
                                "****"
                            ]
                        }
                    },
                    {
                        "name": "dx_digest",
                        "parameter": {
                            "columnIndex": 3,
                            "paras": [
                                "md5",
                                "toLowerCase"
                            ]
                        }
                    },
                    {
                        "name": "dx_groovy",
                        "parameter": {
                            "code": "//groovy code//",
                            "extraPackage": [
                                "import somePackage1;",
                                "import somePackage2;"
                            ]
                        }
                    }
                ]
            }
        ]
    }
}

这里是我的Demo案例

{
	"job": {
		"setting": {
			"speed": {
				"channel": 1
			},
			"errorLimit": {
				"record": 0,
				"percentage": 0.02
			}
		},
		"content": [{
			"reader": {
				"name": "mysqlreader",
				"parameter": {
					"username": "username",
					"password": "password",
					"isTableMode": true,
					"connection": [{
						"querySql": [
							"
							select
								nature,
								city_code,
								city,
								area_code,
								area,
								town_code,
								town
							from table;
							"
						],
						"jdbcUrl": [
							"jdbc:mysql://{数据库连接}?useUnicode=true&characterEncoding=UTF8&net_write_timeout=720000"
						]
					}]
				}
			},
			"transformer": [  
                    {
						"name": "dx_filed", #插件名称
						"parameter": {
							"columnIndex":0, #要处理的字段下标
						}
					}
                ],  
			"writer": {
				"name": "mysqlwriter",
				"parameter": {
					"username": "username",
					"password": "password",
					"column": [
						"bill_no"
						"city_code",
						"city",
						"area_code",
						"area",
						"town_code",
						"town"
					],
					"connection": [
					  {
						"table": [
						  "table" #表名
						],
						"jdbcUrl": "jdbc:mysql:{数据库连接}//" 
					  }
					]
				}
			}
		}]
	}
}

到这里就结束了,有什么问题可以交流探讨。

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐