系列文章目录

一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel



前言

DataX的Reader组件负责从数据源中读取数据,并将这些数据转换成DataX框架可以处理的数据格式。DataX的Reader组件采用了插件化的设计,使得添加新的数据源类型变得相对容易。只需要实现相应的Reader接口或抽象类,并提供必要的配置参数,就可以将新的数据源集成到DataX框架中。这种可扩展性使得DataX能够适应不断变化的数据环境。Reader通常与特定的数据源绑定,每种数据源类型可能都需要一个独立的Reader实现。

以下是一个简化的源码分析步骤,以DataX的MySQLReader为例:

初始化:
在DataX的任务配置文件中,会指定使用哪种Reader,并配置相应的参数,如MySQL的连接信息、查询SQL等。这些信息会被解析并传递给Reader。

构建Reader:
根据配置文件中指定的Reader类型,DataX会动态地创建相应的Reader实例。对于MySQLReader,它会调用MysqlReader.Builder来构建Reader对象。

任务准备:
Reader会执行一些准备工作,如建立与数据源的连接、准备查询语句等。对于MySQLReader,这通常包括调用openConnection方法建立数据库连接,以及调用prepare方法准备SQL查询。

读取数据:
Reader的核心功能是从数据源中读取数据。对于MySQLReader,这通常涉及到执行SQL查询,并遍历查询结果集。Reader可能会使用多线程或分批处理的方式来提高读取效率。

数据转换:
读取到的原始数据可能需要进行一些转换,以满足DataX框架或目标Writer的要求。这可能包括数据类型转换、数据清洗等。

发送数据:
读取并转换后的数据会发送给DataX的Framework,由Framework负责将数据写入目标Writer。

关闭资源:
在读取任务完成后,Reader会负责关闭与数据源相关的资源,如数据库连接等。


Reader组件如何处理各类数据源

DataX的Reader组件处理不同的数据源类型主要是通过抽象和扩展的机制来实现的。具体来说,DataX框架为每种数据源类型定义了一个Reader接口或抽象类,并为每种具体的数据源实现了相应的Reader类。

以下是DataX的Reader组件如何处理不同数据源类型的基本步骤:

抽象定义:
DataX首先定义了一个抽象的Reader接口或抽象类,该接口或抽象类定义了一组通用的方法,如init(初始化)、prepare(准备)、post(读取数据)和close(关闭资源)等。这些方法为Reader提供了统一的生命周期和数据处理流程。

具体实现:
对于每种数据源类型,DataX会创建一个具体的Reader类来实现上述接口或抽象类。例如,对于MySQL数据源,会有一个MysqlReader类;对于Oracle数据源,会有一个OracleReader类。这些具体的Reader类会根据数据源的特性来实现接口中定义的方法。

配置文件解析:
当DataX启动一个数据同步任务时,它会首先解析任务配置文件(通常是JSON格式)。配置文件中包含了任务的各种参数,包括数据源类型、Reader类型、Writer类型以及各自的配置参数。

动态加载:
DataX框架会根据配置文件中的Reader类型动态加载相应的Reader实现类。这通常是通过反射机制实现的,即根据Reader类型的字符串名称,在运行时动态加载并实例化对应的Reader类。

调用Reader方法:
一旦Reader类被加载并实例化,DataX框架会按照定义的生命周期方法调用Reader的相应方法。例如,首先调用init方法进行初始化,然后调用prepare方法准备数据源连接和查询,接着调用post方法读取数据,并在任务完成后调用close方法关闭资源。

数据转换:
在读取数据的过程中,Reader可能需要对数据进行一些转换或适配,以便与DataX框架的数据处理流程兼容。这可能包括数据类型转换、字段重命名、数据清洗等。

错误处理与日志记录:
Reader实现类还需要处理可能出现的错误和异常,并记录必要的日志信息。这有助于在数据同步过程中出现问题时进行故障排查和问题定位。

通过以上步骤,DataX的Reader组件能够灵活处理不同类型的数据源,并实现了数据从数据源到DataX框架的顺畅传输。同时,这种抽象和扩展的机制也使得DataX框架易于扩展,可以方便地添加对新数据源类型的支持。

源码


/**
 * 每个Reader插件在其内部内部实现Job、Task两个内部类。
 * 
 * 
 * */
public abstract class Reader extends BaseObject {

	/**
	 * 每个Reader插件必须实现Job内部类。
	 * 
	 * */
	public static abstract class Job extends AbstractJobPlugin {

		/**
		 * 切分任务
		 * 
		 * @param adviceNumber
		 * 
		 *            着重说明下,adviceNumber是框架建议插件切分的任务数,插件开发人员最好切分出来的任务数>=
		 *            adviceNumber。<br>
		 * <br>
		 *            之所以采取这个建议是为了给用户最好的实现,例如框架根据计算认为用户数据存储可以支持100个并发连接,
		 *            并且用户认为需要100个并发。 此时,插件开发人员如果能够根据上述切分规则进行切分并做到>=100连接信息,
		 *            DataX就可以同时启动100个Channel,这样给用户最好的吞吐量 <br>
		 *            例如用户同步一张Mysql单表,但是认为可以到10并发吞吐量,插件开发人员最好对该表进行切分,比如使用主键范围切分,
		 *            并且如果最终切分任务数到>=10,我们就可以提供给用户最大的吞吐量。 <br>
		 * <br>
		 *            当然,我们这里只是提供一个建议值,Reader插件可以按照自己规则切分。但是我们更建议按照框架提供的建议值来切分。 <br>
		 * <br>
		 *            对于ODPS写入OTS而言,如果存在预排序预切分问题,这样就可能只能按照分区信息切分,无法更细粒度切分,
		 *            这类情况只能按照源头物理信息切分规则切分。 <br>
		 * <br>
		 * 
		 * 
		 * */
		public abstract List<Configuration> split(int adviceNumber);
	}

	public static abstract class Task extends AbstractTaskPlugin {
		public abstract void startRead(RecordSender recordSender);
	}
}

public class MysqlReader extends Reader {

    private static final DataBaseType DATABASE_TYPE = DataBaseType.MySql;

    public static class Job extends Reader.Job {
        private static final Logger LOG = LoggerFactory
                .getLogger(Job.class);

        private Configuration originalConfig = null;
        private CommonRdbmsReader.Job commonRdbmsReaderJob;

        @Override
        public void init() {
            this.originalConfig = super.getPluginJobConf();

            Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);
            if (userConfigedFetchSize != null) {
                LOG.warn("对 mysqlreader 不需要配置 fetchSize, mysqlreader 将会忽略这项配置. 如果您不想再看到此警告,请去除fetchSize 配置.");
            }

            this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);

            this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);
            this.commonRdbmsReaderJob.init(this.originalConfig);
        }

        @Override
        public void preCheck(){
            init();
            this.commonRdbmsReaderJob.preCheck(this.originalConfig,DATABASE_TYPE);

        }

        @Override
        public List<Configuration> split(int adviceNumber) {
            return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber);
        }

        @Override
        public void post() {
            this.commonRdbmsReaderJob.post(this.originalConfig);
        }

        @Override
        public void destroy() {
            this.commonRdbmsReaderJob.destroy(this.originalConfig);
        }

    }

    public static class Task extends Reader.Task {

        private Configuration readerSliceConfig;
        private CommonRdbmsReader.Task commonRdbmsReaderTask;

        @Override
        public void init() {
            this.readerSliceConfig = super.getPluginJobConf();
            this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE,super.getTaskGroupId(), super.getTaskId());
            this.commonRdbmsReaderTask.init(this.readerSliceConfig);

        }

        @Override
        public void startRead(RecordSender recordSender) {
            int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);

            this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
                    super.getTaskPluginCollector(), fetchSize);
        }

        @Override
        public void post() {
            this.commonRdbmsReaderTask.post(this.readerSliceConfig);
        }

        @Override
        public void destroy() {
            this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);
        }

    }

}

public class RdbmsReader extends Reader {
    private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS;
    static {
    	//加载插件下面配置的驱动类
        DBUtil.loadDriverClass("reader", "rdbms");
    }
    public static class Job extends Reader.Job {

        private Configuration originalConfig;
        private CommonRdbmsReader.Job commonRdbmsReaderMaster;

        @Override
        public void init() {
            this.originalConfig = super.getPluginJobConf();
            int fetchSize = this.originalConfig.getInt(
                    com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE,
                    Constant.DEFAULT_FETCH_SIZE);
            if (fetchSize < 1) {
                throw DataXException
                        .asDataXException(
                                DBUtilErrorCode.REQUIRED_VALUE,
                                String.format(
                                        "您配置的fetchSize有误,根据DataX的设计,fetchSize : [%d] 设置值不能小于 1.",
                                        fetchSize));
            }
            this.originalConfig.set(
                    com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE,
                    fetchSize);

            this.commonRdbmsReaderMaster = new SubCommonRdbmsReader.Job(
                    DATABASE_TYPE);
            this.commonRdbmsReaderMaster.init(this.originalConfig);
        }

        @Override
        public List<Configuration> split(int adviceNumber) {
            return this.commonRdbmsReaderMaster.split(this.originalConfig,
                    adviceNumber);
        }

        @Override
        public void post() {
            this.commonRdbmsReaderMaster.post(this.originalConfig);
        }

        @Override
        public void destroy() {
            this.commonRdbmsReaderMaster.destroy(this.originalConfig);
        }

    }

    public static class Task extends Reader.Task {

        private Configuration readerSliceConfig;
        private CommonRdbmsReader.Task commonRdbmsReaderSlave;

        @Override
        public void init() {
            this.readerSliceConfig = super.getPluginJobConf();
            this.commonRdbmsReaderSlave = new SubCommonRdbmsReader.Task(
                    DATABASE_TYPE);
            this.commonRdbmsReaderSlave.init(this.readerSliceConfig);
        }

        @Override
        public void startRead(RecordSender recordSender) {
            int fetchSize = this.readerSliceConfig
                    .getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE);

            this.commonRdbmsReaderSlave.startRead(this.readerSliceConfig,
                    recordSender, super.getTaskPluginCollector(), fetchSize);
        }

        @Override
        public void post() {
            this.commonRdbmsReaderSlave.post(this.readerSliceConfig);
        }

        @Override
        public void destroy() {
            this.commonRdbmsReaderSlave.destroy(this.readerSliceConfig);
        }
    }
}
02-12 07:43