DataX插件开发之自定义Transformer
·
背景
由于公司系统重构,涉及到历史数据初始化,各种字段补充,新的里面增加一个账单编号字段,10位随机数字,总数据量大约4500w,通过编写java代码实现此功能也是比较简单,但是写代码还要各种准备,还要整合多数据源处理,整体跑起来性能方面也不会很好,麻烦!!!,那怎么办呢,我想到了DataX这个数据同步的利器,高性能,但使用DataX进行数据处理时怎么把这个账单编号补充上呢?让我陷入了沉思,通过查看官方文档,知道DataX有Transformer功能,可以进行替换、写groovy脚本等,但是没我可以用的,于是乎,自己动写插件,自己动手丰衣足食。
一、Transformer
1、Transformer定义
在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。DataX包含了完整的E(Extract)、T(Transformer)、L(Load)支持。
运行模型
想要了解更多,查看官方手册,传送门---->官方文档
2、Transformer插件开发
- 如何开发插件,拿到源码后在哪里写代码,在哪里开发,这里怎么获取源码我这里就不细说了,请看我上一篇文章,有详细介绍。拿到源码后,找到DataX的core模块,找到transformer包在此下面创建相关类

- 创建FiledFillTransformer类,此类就是用来进行字段转换的,我这里的需求比较简单,根据不同的类型创建不同前缀的编码,本来是想用redis的increment()方法,但是还需要整合redis,一是麻烦,二是有网络开销,影响性能,故采用AtomicLong 原子类进行替代
package com.alibaba.datax.core.transport.transformer;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.core.util.RedisUitl;
import com.alibaba.datax.transformer.Transformer;
import com.alibaba.fastjson2.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.YearMonth;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
* @Description:
* @Date: 2024/7/29
**/
public class FiledFillTransformer extends Transformer {
private static final Logger LOG = LoggerFactory.getLogger(RedisUitl.class);
private static final String BILL_NO_PREFIX = "AR";
private static final String NO_BILL_NO_PREFIX = "BR";
private static final String NATURE = "0";
static AtomicLong arAtomicLong = new AtomicLong(0);
static AtomicLong brAtomicLong = new AtomicLong(0);
private static final String BILL_NO = "finance:dts:bill_no";
public FiledFillTransformer() {
setTransformerName("dx_filed");
}
@Override
public Record evaluate(Record record, Object... paras) {
int columnIndex;
// long start = System.currentTimeMillis();
try {
if (paras.length != 1) {
throw new RuntimeException("dx_filed paras must be 1");
}
columnIndex = (Integer) paras[0];
} catch (Exception e) {
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER, "paras:" + Arrays.asList(paras).toString() + " => " + e.getMessage());
}
Column column = record.getColumn(columnIndex);
try {
String oriValue = column.asString();
//如果字段为空,跳过replace处理
if(oriValue == null){
return record;
}
String newValue = NATURE.equals(oriValue) ? BILL_NO_PREFIX.concat(String.format("%010d", arAtomicLong.incrementAndGet()))
: NO_BILL_NO_PREFIX.concat(String.format("%010d", brAtomicLong.incrementAndGet()));
record.setColumn(columnIndex, new StringColumn(newValue));
// LOG.info("filedTransformer 耗时:" + (System.currentTimeMillis() - start));
} catch (Exception e) {
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_RUN_EXCEPTION, e.getMessage(),e);
}
return record;
}
}
3、插件注册
开发完成后,不能直接使用,需要注册到程序中,在core模块中,找到TransformerRegistry 注册类,将FiledFillTransformer进行注册。这样就可以编译打包使用了。
4、Job定义
模板具体怎么使用,这里不再赘述,请看我上一篇文章《Datax插件开发之整合easy Excel进行数据导出》
这里是官方给出的案例,自己可以修改下
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "DataX",
"type": "string"
},
{
"value": 1724154616370,
"type": "long"
},
{
"value": "2024-01-01 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "TestRawData",
"type": "bytes"
}
],
"sliceRecordCount": 100
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false,
"encoding": "UTF-8"
}
},
"transformer": [
{
"name": "dx_substr",
"parameter": {
"columnIndex": 5,
"paras": [
"1",
"3"
]
}
},
{
"name": "dx_replace",
"parameter": {
"columnIndex": 4,
"paras": [
"3",
"4",
"****"
]
}
},
{
"name": "dx_digest",
"parameter": {
"columnIndex": 3,
"paras": [
"md5",
"toLowerCase"
]
}
},
{
"name": "dx_groovy",
"parameter": {
"code": "//groovy code//",
"extraPackage": [
"import somePackage1;",
"import somePackage2;"
]
}
}
]
}
]
}
}
这里是我的Demo案例
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "username",
"password": "password",
"isTableMode": true,
"connection": [{
"querySql": [
"
select
nature,
city_code,
city,
area_code,
area,
town_code,
town
from table;
"
],
"jdbcUrl": [
"jdbc:mysql://{数据库连接}?useUnicode=true&characterEncoding=UTF8&net_write_timeout=720000"
]
}]
}
},
"transformer": [
{
"name": "dx_filed", #插件名称
"parameter": {
"columnIndex":0, #要处理的字段下标
}
}
],
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "username",
"password": "password",
"column": [
"bill_no"
"city_code",
"city",
"area_code",
"area",
"town_code",
"town"
],
"connection": [
{
"table": [
"table" #表名
],
"jdbcUrl": "jdbc:mysql:{数据库连接}//"
}
]
}
}
}]
}
}
到这里就结束了,有什么问题可以交流探讨。
更多推荐

所有评论(0)