cp/src/main/data-mask/maskingJobs/MaskingJob.java

337 lines
13 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package maskingJobs;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.record.DefaultRecord;
import com.alibaba.datax.core.transport.transformer.TransformerErrorCode;
import com.alibaba.datax.core.transport.transformer.TransformerExecution;
import com.alibaba.datax.core.util.TransformerUtil;
import com.alibaba.datax.core.util.container.ClassLoaderSwapper;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.reader.util.SingleTableSplitUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
import com.google.gson.JsonPrimitive;
import org.apache.commons.lang3.Validate;
import org.bdware.sc.util.JsonUtil;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MaskingJob {
DataBaseType dataBaseType = DataBaseType.MySql;
private String username;
private String password;
private String jdbcUrl;
private String table;
private Configuration maskConf;
private Configuration readerPara;
private List<Record> buffer;
private List<TransformerExecution> transformerExecs;
public void init(String confContent) {
maskConf = Configuration.from(confContent);
System.out.println("maskConf" + maskConf.toString());
System.out.println(("maskCOnfjob" + maskConf.get("job").toString()));
readerPara = maskConf.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER);
System.out.println(readerPara);
username = readerPara.getString(Key.USERNAME);
password = readerPara.getString(Key.PASSWORD);
jdbcUrl = readerPara.getString(Key.JDBC_URL);
table = readerPara.getString(Key.TABLE);
buffer = new ArrayList<>();
System.out.println(
"maskConf11" + maskConf.getConfiguration(CoreConstant.DATAX_JOB_CONTENT + "[0]"));
transformerExecs = TransformerUtil.buildTransformerInfo(
maskConf.getConfiguration(CoreConstant.DATAX_JOB_CONTENT + "[0]"));
}
public String buildQuerySql() {
String column = "*";
// String column = readerPara.getString(Key.COLUMN);
String table = readerPara.getString(Key.TABLE);
String where = readerPara.getString(Key.WHERE, null);
// String querySql = SingleTableSplitUtil.buildQuerySql(column, table, where) + " limit
// 100";
String querySql = SingleTableSplitUtil.buildQuerySql(column, table, where) + " limit 100";
return querySql;
}
public JsonPrimitive getMaskedData(String confContent) {
init(confContent);
return startRead();
// return new JsonPrimitive(getResult());
}
public JsonPrimitive startRead() {
String querySql = buildQuerySql();
System.out.println("startRead" + dataBaseType + jdbcUrl + username + password);
Connection conn = DBUtil.getConnection(dataBaseType, jdbcUrl, username, password);
System.out.println(dataBaseType + jdbcUrl + username + password);
int columnNumber = 0;
String res = "";
ArrayList<String> columnName = new ArrayList<>();
try {
ResultSet rs = DBUtil.query(conn, querySql);
ResultSetMetaData metaData = rs.getMetaData();
columnNumber = metaData.getColumnCount();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
// 获取列表 index 从1开始、列名、列类型、列的数据长度
// System.out.println("aaa"+metaData.getColumnName(i)+"\t"+metaData.getColumnTypeName(i)+"\t"+metaData.getColumnDisplaySize(i));
columnName.add(metaData.getColumnName(i));
}
while (rs.next()) {
transportOneRecord(rs, metaData, columnNumber);
}
} catch (Exception e) {
throw RdbmsException.asQueryException(dataBaseType, e, querySql, table, username);
} finally {
DBUtil.closeDBResources(null, conn);
}
//// for(int i=0;i<columnNumber;i++){
// columnName.add(metaData.getColumnName(i));
// }//
res = getResult(columnName);
return new JsonPrimitive(res);
}
private Record transportOneRecord(ResultSet rs, ResultSetMetaData metaData, int columnNumber) {
Record record = buildRecord(rs, metaData, columnNumber);
sendToWriter(record);
return record;
}
private void sendToWriter(Record record) {
Validate.notNull(record, "record不能为空.");
record = doTransformer(record);
if (record == null) {
return;
}
this.buffer.add(record);
}
private Record doTransformer(Record record) {
System.out.println("Record" + record);
if (transformerExecs == null || transformerExecs.size() == 0) {
return record;
}
ClassLoaderSwapper classLoaderSwapper =
ClassLoaderSwapper.newCurrentThreadClassLoaderSwapper();
Record result = record;
String errorMsg = null;
boolean failed = false;
for (TransformerExecution transformerInfoExec : transformerExecs) {
System.out.println("transformerExecs" + transformerInfoExec.getTransformerName());
if (transformerInfoExec.getClassLoader() != null) {
classLoaderSwapper
.setCurrentThreadClassLoader(transformerInfoExec.getClassLoader());
}
/**
* 延迟检查transformer参数的有效性直接抛出异常不作为脏数据 不需要在插件中检查参数的有效性。但参数的个数等和插件相关的参数,在插件内部检查
*/
if (!transformerInfoExec.isChecked()) {
if (transformerInfoExec.getColumnIndex() != null
&& transformerInfoExec.getColumnIndex() >= record.getColumnNumber()) {
throw DataXException.asDataXException(
TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER,
String.format("columnIndex[%s] out of bound[%s]. name=%s",
transformerInfoExec.getColumnIndex(), record.getColumnNumber(),
transformerInfoExec.getTransformerName()));
}
transformerInfoExec.setIsChecked(true);
}
try {
result = transformerInfoExec.getTransformer().evaluate(result,
transformerInfoExec.gettContext(), transformerInfoExec.getFinalParas());
} catch (Exception e) {
errorMsg = String.format("transformer(%s) has Exception(%s)",
transformerInfoExec.getTransformerName(), e.getMessage());
failed = true;
// LOG.error(errorMsg, e);
// transformerInfoExec.addFailedRecords(1);
// 脏数据不再进行后续transformer处理按脏数据处理并过滤该record。
break;
} finally {
if (transformerInfoExec.getClassLoader() != null) {
classLoaderSwapper.restoreCurrentThreadClassLoader();
}
}
if (result == null) {
break;
}
}
if (failed) {
return null;
} else {
System.out.println("result" + result);
return result;
}
}
protected Record buildRecord(ResultSet rs, ResultSetMetaData metaData, int columnNumber) {
final byte[] EMPTY_CHAR_ARRAY = new byte[0];
Record record = new DefaultRecord();
try {
for (int i = 1; i <= columnNumber; i++) {
switch (metaData.getColumnType(i)) {
case Types.CHAR:
case Types.NCHAR:
case Types.VARCHAR:
case Types.LONGVARCHAR:
case Types.NVARCHAR:
case Types.LONGNVARCHAR:
case Types.CLOB:
case Types.NCLOB:
record.addColumn(new StringColumn(rs.getString(i)));
break;
case Types.SMALLINT:
case Types.TINYINT:
case Types.INTEGER:
case Types.BIGINT:
record.addColumn(new LongColumn(rs.getString(i)));
break;
case Types.NUMERIC:
case Types.DECIMAL:
record.addColumn(new DoubleColumn(rs.getString(i)));
break;
case Types.FLOAT:
case Types.REAL:
case Types.DOUBLE:
record.addColumn(new DoubleColumn(rs.getString(i)));
break;
case Types.TIME:
record.addColumn(new DateColumn(rs.getTime(i)));
break;
// for mysql bug, see http://bugs.mysql.com/bug.php?id=35115
case Types.DATE:
if (metaData.getColumnTypeName(i).equalsIgnoreCase("year")) {
record.addColumn(new LongColumn(rs.getInt(i)));
} else {
record.addColumn(new DateColumn(rs.getDate(i)));
}
break;
case Types.TIMESTAMP:
record.addColumn(new DateColumn(rs.getTimestamp(i)));
break;
case Types.BINARY:
case Types.VARBINARY:
case Types.BLOB:
case Types.LONGVARBINARY:
record.addColumn(new BytesColumn(rs.getBytes(i)));
break;
// warn: bit(1) -> Types.BIT 可使用BoolColumn
// warn: bit(>1) -> Types.VARBINARY 可使用BytesColumn
case Types.BOOLEAN:
case Types.BIT:
record.addColumn(new BoolColumn(rs.getBoolean(i)));
break;
case Types.NULL:
String stringData = null;
if (rs.getObject(i) != null) {
stringData = rs.getObject(i).toString();
}
record.addColumn(new StringColumn(stringData));
break;
default:
throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], 字段名称:[%s], 字段Java类型:[%s]. 请尝试使用数据库函数将其转换datax支持的类型 或者不同步该字段 .",
metaData.getColumnName(i), metaData.getColumnType(i),
metaData.getColumnClassName(i)));
}
}
} catch (Exception e) {
if (e instanceof DataXException) {
throw (DataXException) e;
}
}
return record;
}
// private String recordToString(Record record) {
// final String NEWLINE_FLAG = "\n";
// String fieldDelimiter = "\t";
//
// int recordLength = record.getColumnNumber();
// if (0 == recordLength) {
// return NEWLINE_FLAG;
// }
//
// Column column;
// StringBuilder sb = new StringBuilder();
// for (int i = 0; i < recordLength; i++) {
// column = record.getColumn(i);
// sb.append(column.asString()).append(fieldDelimiter);
// }
// sb.setLength(sb.length() - 1);
// sb.append(NEWLINE_FLAG);
//
// return sb.toString();
// }
public String getResult(ArrayList<String> columnName) {
List<Object> dataList = new ArrayList<>();
int size = buffer.size();
// System.out.println("CCULUMN"+readerPara.getString(Key.COLUMN).toString());
// String[] colmnNames = readerPara.getString(Key.COLUMN).replace(" ", "").split(",");
int colmnSize = columnName.size();
for (int i = 0; i < colmnSize; ++i) {
Map<Object, Object> rowData = new HashMap<>();
for (int j = 0; j < size; ++j) {
rowData.put(columnName.get(i), buffer.get(j).getColumn(i).asString());
}
dataList.add(rowData);
}
return JsonUtil.toJson(dataList);
}
}