一、前言
SETL是otter的核心组件,我们将逐个模块分析和了解otter的实现机制。我们先对E(extract)的组件做分析。
二、核心组件
ExtractTask
是extract的工作线程。
GlobalTask
是SETL的父线程。封装了工作线程需要的一些公共属性。
public abstract class GlobalTask extends Thread {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
//控制线程运行状态
protected volatile boolean running = true;
//从源端到目标端的整个过程描述,主要由一些同步映射过程组成。简单来说,一个同步规则就是一个pipeline
protected Pipeline pipeline;
//pipeline对应的唯一ID
protected Long pipelineId;
//仲裁事件相关控制(HA)
protected ArbitrateEventService arbitrateEventService;
//行数据(binlog)?
protected RowDataPipeDelegate rowDataPipeDelegate;
//线程池
protected ExecutorService executorService;
//客户端配置?
protected ConfigClientService configClientService;
protected StageAggregationCollector stageAggregationCollector;
protected Map<Long, Future> pendingFuture;
三、源码分析
ExtractTask
。extrack任务的核心流程,我这里把非核心的代码逻辑已经去掉,只保留最核心的业务逻辑。
public void run() {
...
while (running) {
try {
final EtlEventData etlEventData = arbitrateEventService.extractEvent().await(pipelineId);
Runnable task = new Runnable() {
public void run() {
...
try {
pipeline = configClientService.findPipeline(pipelineId);
List<PipeKey> keys = (List<PipeKey>) etlEventData.getDesc();
long nextNodeId = etlEventData.getNextNid();
//select过程中的数据会丢到rowDataPipeDelegate,这里我们通过keys拿到selectTask中得到的数据
DbBatch dbBatch = rowDataPipeDelegate.get(keys);
...
otterExtractorFactory.extract(dbBatch);// 重新装配一下数据
...
List<PipeKey> pipeKeys = rowDataPipeDelegate.put(dbBatch, nextNodeId);
etlEventData.setDesc(pipeKeys);
...
}
}
};
// 构造pending任务,可在关闭线程时退出任务
SetlFuture extractFuture = new SetlFuture(StageType.EXTRACT, etlEventData.getProcessId(),
pendingFuture, task);
executorService.execute(extractFuture);
} ...
}
}
3.1 otterExtractorFactory.extract(dbBatch)
从otterExtractorFactory这个工厂类选择extractor进行处理。这里我们重点关注DatabaseExtractor。
DatabaseExtractor
@Override
public void extract(DbBatch dbBatch) throws ExtractException {
...
// 读取配置
Pipeline pipeline = getPipeline(dbBatch.getRowBatch().getIdentity().getPipelineId());
boolean mustDb = pipeline.getParameters().getSyncConsistency().isMedia();
boolean isRow = pipeline.getParameters().getSyncMode().isRow();// 如果是行记录是必须进行数据库反查
// 读取一次配置
adjustPoolSize(pipeline.getParameters().getExtractPoolSize()); // 调整下线程池,Extractor会被池化处理
ExecutorCompletionService completionService = new ExecutorCompletionService(executor);
// 进行并发提交
ExtractException exception = null;
// 每个表进行处理
List<DataItem> items = new ArrayList<DataItem>();
List<Future> futures = new ArrayList<Future>();
List<EventData> eventDatas = dbBatch.getRowBatch().getDatas();
for (EventData eventData : eventDatas) {
//DDL语句忽略
if (eventData.getEventType().isDdl()) {
continue;
}
DataItem item = new DataItem(eventData);
// 针对row模式,需要去检查一下当前是否已经包含row记录的所有字段,如果发现字段不足,则执行一次数据库查询
boolean flag = mustDb
|| (eventData.getSyncConsistency() != null && eventData.getSyncConsistency().isMedia());
...
if (flag && (eventData.getEventType().isInsert() || eventData.getEventType().isUpdate())) {// 判断是否需要反查
Future future = completionService.submit(new DatabaseExtractWorker(pipeline, item), null); // 提交进行并行查询
...
futures.add(future);// 记录一下添加的任务
}
items.add(item);// 按顺序添加
}
// 开始处理结果
int index = 0;
while (index < futures.size()) { // 循环处理发出去的所有任务
try {
Future future = completionService.take();// 它也可能被打断
future.get();
} ...
index++;
}
if (index < futures.size()) {
// 小于代表有错误,需要对未完成的记录进行cancel操作,对已完成的结果进行收集,做重复录入过滤记录
cancel(futures);
throw exception;
} else {
// 全部成功分支, 构造返回结果也要保证原始的顺序
for (int i = 0; i < items.size(); i++) {
DataItem item = items.get(i);
if (item.filter) { // 忽略需要被过滤的数据,比如数据库反查时记录已经不存在
eventDatas.remove(item.getEventData());
}
}
}
}
3.2 数据并行查询
Future future = completionService.submit(new DatabaseExtractWorker(pipeline, item), null); // 提交进行并行查询
DatabaseExtractor.rum()
public void run() {
try {
...
// 获取数据表信息
DataMedia dataMedia = ConfigHelper.findDataMedia(pipeline, eventData.getTableId());
DbDialect dbDialect = dbDialectFactory.getDbDialect(pipeline.getId(),
(DbMediaSource) dataMedia.getSource());
Table table = dbDialect.findTable(eventData.getSchemaName(), eventData.getTableName());
TableData keyTableData = buildTableData(table, eventData.getKeys());
...
boolean needAll = pipeline.getParameters().getSyncMode().isRow()
|| (eventData.getSyncMode() != null && eventData.getSyncMode().isRow());
...
List<DataMediaPair> mediaParis = ConfigHelper.findDataMediaPairByMediaId(pipeline, dataMedia.getId());
List<String> viewColumnNames = buildMaxColumnsFromColumnPairs(mediaParis, eventData.getKeys());
// modified by ljh at 2012-11-04
// 反查数据时只反查带update=true标识的数据,因为update=false的记录可能只是进行filter需要用到的数据,不需要反查
TableData columnTableData = buildTableData(table,
eventData.getUpdatedColumns(),
needAll,
viewColumnNames);
if (columnTableData.columnNames.length == 0) {
// 全主键,不需要进行反查
} else {
//这里会根据主键到数据库查询数据
List<String> newColumnValues = select(dbDialect,
eventData.getSchemaName(),
eventData.getTableName(),
keyTableData,
columnTableData);
if (newColumnValues == null) {
...
} else {
// 构造反查的返回结果
List<EventColumn> newEventColumns = new ArrayList<EventColumn>();
for (int i = 0; i < newColumnValues.size(); i++) {
EventColumn column = new EventColumn();
column.setIndex(columnTableData.indexs[i]);
column.setColumnName(columnTableData.columnNames[i]);
column.setColumnType(columnTableData.columnTypes[i]);
column.setNull(newColumnValues.get(i) == null);
column.setColumnValue(newColumnValues.get(i));
column.setUpdate(true);
newEventColumns.add(column);
}
// 处理下columns中不在反查字段内的字段列表
for (EventColumn column : eventData.getColumns()) {
boolean override = false;
for (EventColumn newEventColumn : newEventColumns) {
if (StringUtils.equalsIgnoreCase(newEventColumn.getColumnName(), column.getColumnName())) {
override = true;
break;
}
}
if (!override) {// 针对newcolumns不存在的记录进行添加
newEventColumns.add(column);
}
}
Collections.sort(newEventColumns, new EventColumnIndexComparable()); // 重新排个序
eventData.setColumns(newEventColumns);
}
}
} ...
}
问题
- extract的核心功能?
- 反查数据库的作用在哪里?为什么要这么做?