tidb-in-action

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,支持包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。DataX 采用了框架 + 插件 的模式。目前已开源,代码托管在 GitHub。

DataX 数据同步效率较高,满足大多数场景下的异构数据库间的数据同步需求。同时其配置灵活,支持字段级别的配置,可以轻松应对因异构数据库迁移到 TiDB 而产生的一些改动。

方案设计如图所示

第一阶段:切换支持 TiDB 的应用上线之前,把 SQL Server 数据库中的全量数据用 DataX 同步到 TiDB 库中。为避免对线上业务产生影响,可以选择备份库,或者在业务低峰期操作。

第二阶段:把全量同步改为增量同步,利用 UpdateTime 字段(或其它条件,根据实际业务灵活调整)作为同步 Where 条件,进行增量覆盖式同步。 t_sync_record 表中记录每张表上次增量任务的执行时间。

第三阶段:支持 TiDB 的应用上线以后,增量同步切换读写数据源改为逆向增量同步,将新数据近实时地写回 SQL Server 数据库。一旦上线之后出现需要回退的情况,可随时切回 SQL Server,待修复之后再次上线。

具体操作步骤:

第一步:部署 DataX

下载

wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

解压

tar -zxvf datax.tar.gz

自检

python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json

第二步:编写 DataX 数据同步 Job(Json格式)

(1)全量同步Job

vi full.json

{

    "job": {

        "setting": {

            "speed": {

                #数据分片,分片数据可同时进行同步

                "channel": 128 

            }

        },

        "content": [{

             #SQL Server配置

            "reader": {

                "name": "sqlserverreader",

                "parameter": {

                    #数据库用户名

                    "username": "${srcUserName}",

                    #数据库密码

                    "password": "${srcPassword}",

                     #需要迁移的列名,* 表示全部列

                    "column": ["*"]

                    "connection": [{

                        #需要迁移的表名

                        "table": ["${tableName}"],

                         数据库 jdbc 连接

                        "jdbcUrl": ["${srcUrl}"]

                    }]

                }

        },

        #TiDB配置

        "writer": {

                "name": "tidbwriter",

                "parameter": {

                    #数据库用户名

                    "username": "${desUserName}",

                     #数据库密码

                    "password": "${desPassword}",

                     #使用 Replace 语法

                    "writeMode": "replace",

                    #目标表列名,* 表示全部列

                    "column": ["*"],

                    "connection": [{

                        #数据库 jdbc 连接

                        "jdbcUrl": "${desUrl}",

                        #目标表名

                        "table": ["${tableName}"]

                    }],

                     #本次迁移开始前执行的sql-作数据迁移日志使用

                    "preSql": [

                       "replace into t_sync_record(table_name,start_date,end_date) values('@table',now(),null)"],

                    #本次迁移完成后执行的 sql- 作数据迁移日志使用

                    "postSql": [

                       "update t_sync_record set end_date=now() where table_name='@table' " ]

                }

            }

        }]

    }

}

(2)增量同步 Job

vi increase.json

{

    "job": {

        "setting": {

            "speed": {

                #数据分片,分片数据可同时进行同步

                "channel": 128 

            }

        },

        "content": [{

             #SQL Server配置

            "reader": {

                "name": "sqlserverreader",

                "parameter": {

                    #数据库用户名

                    "username": "${srcUserName}",

                    #数据库密码

                    "password": "${srcPassword}",

                     #需要迁移的列名,* 表示全部列

                    "column": ["*"],

                    "connection": [{

                        #需要迁移的表名

                        "table": ["${tableName}"],

                         数据库 jdbc 连接

                        "jdbcUrl": ["${srcUrl}"]

                    }],

                    #抓取一个时间窗口的增量数据

                    "where": "updateTime >= '${syncTime}'"

                }

        },

        #TiDB配置

        "writer": {

                "name": "tidbwriter",

                "parameter": {

                    #数据库用户名

                    "username": "${desUserName}",

                     #数据库密码

                    "password": "${desPassword}",

                     #使用 Replace 语法

                    "writeMode": "replace",

                    #目标表列名,* 表示全部列

                    "column": ["*"],

                    "connection": [{

                        #数据库 jdbc 连接

                        "jdbcUrl": "${desUrl}",

                        #目标表名

                        "table": ["${tableName}"]

                    }],

                     #本次迁移开始前执行的sql-作数据迁移日志使用

                    "preSql": [

                       "replace into t_sync_record(table_name,start_date,end_date) values('@table',now(),null)"],

                    #本次迁移完成后执行的 sql- 作数据迁移日志使用

                    "postSql": [

                       "update t_sync_record set end_date=now() where table_name='@table' " ]

                }

            }

        }]

    }

}

(3)编写运行DataX Job的Shell执行脚本

vi datax_excute_job.sh

#!/bin/bash

source /etc/profile

srcUrl="Reader Sql Server 地址"

srcUserName="Sql Server 账号"

srcPassword="Sql Server 密码"

desUrl="Writer TiDB 地址"

desUserName="TiDB 账号"

desPassword="TiDB 密码"

# 同步开始时间

defaultsyncUpdateTime="2020-03-03 18:00:00.000"

# 同步周期(秒)

sleepTime="N"

tableName="Table1,Table2,..."

# 循环次数标识,-1为一直循环,其他按输入次数循环

flg=-1

while [ "$flg" -gt 0 -o "$flg" -eq -1 ]

do

        #更新时间设置为上次循序执行的时间

        if [ "" = "$preRunTime" ]; then

                syncTime=$defaultsyncUpdateTime

        else

                syncTime=$preRunTime

        fi

        #记录下本次循环执行时间,供下次循环使用

        preRunTime=$(date -d +"%Y-%m-%d %T")".000";

        echo $syncTime

        echo $preRunTime

        echo $flg

        python {YOUR_DATAX_HOME}/bin/datax.py -p "-DsyncTime='${syncTime}' -DtableName='${tableName}' -DsrcUrl='${srcUrl}' -DsrcUserName='${srcUserName}' -DsrcPassword='${srcPassword}' -DdesUrl='${desUrl}' -DdesUserName='${desUserName}' -DdesPassword='${desPassword}'" {YOUR_DATAX_HOME}/job/increase.json

        if [ -1  -lt  "$flg" ]; then

               let "flg-=1"

        fi

        sleep $sleepTime

done

(4)执行 Shell 脚本

chmod +x datax_excute_job.sh

nohup ./datax_excute_job.sh > info.file 2>&1 &

至此,核心脚本和操作都已完成,可通过修改参数配置,达到自己不同的需求,还可以配合数据对比服务,以期达到将应用程序从 SQL Server 顺利迁移到 TiDB 的目的。