flinkcdc同步PostgreSQL到SQLServer

By qq84628151 没有评论

1.必须用Linux系统,Windows下低版本Flink有Bug,高版本的API改动太大,资料太少难开发。

2.我是使用Docker用Centos7镜像,然后Flink版本为1.16.0。https://archive.apache.org/dist/flink/flink-1.16.1/

3.安装Java,参考https://www.saoniuhuo.com/article/detail-92.html,这个教程唯一不好的就是重启后环境变量失效。

4.SQLServer开启允许远程连接,配置好记得重启电脑,重启服务不行。

5.PostgreSQL我从官网下载的最新版,也需要开启允许远程连接。

6.开启远程连接是为了让Docker容器的Centos可以访问宿主的数据库。

7.分别给SQLServer和PostgreSQL创建表,SQLServer的id不需要设置主键

SQLServerbbibi’abiao表
PostgreSQL表设计,和SQLServer是一样的类型,但id有主键和自增

7.修改PostgreSQL的pg_hba.conf配置文件,多加一条信息设置允许远程访问

8.修改PostgreSQL的postgresql.conf配置文件,FlinkCDC才可以监听到

9.IDEA创建Maven项目

10.把用到的jar放到Flink的lib目录下,然后Java项目添加Jar的引用。

11.必须说明一点,Flink高和低版本API变化很多,建议Jar版本和Flink版本保持一致!!!

12.创建FlinkCdcDataStream.java,编写Java代码监听PostgreSQL表的变化

SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
                    .hostname("192.168.0.148").port(5432)
                    .database("test_db_11") // monitor postgres database
                    .schemaList("public")  // monitor inventory schema
                    .tableList("public.test_table_11") // monitor products table
                    .decodingPluginName("pgoutput")
                    .slotName("t_table_slot")
                    .username("postgres")
                    .password("123")
                    .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                    .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);

env.addSource(sourceFunction)
   .addSink(new ClickHouseSink("test_table_mssql1", "id"));

env.execute();

13.创建ClickHouseSink.java,编写监听到数据变化后同步到SQLServer,代码太多就不截图,代码放到下面吧

package org.example;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class ClickHouseSink extends RichSinkFunction<String> {
    private Connection connection;
    private String targetTableName;
    private String idName;

    public ClickHouseSink(String targetTableName, String idName) {
        this.targetTableName = targetTableName;
        this.idName = idName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        try {
            if (this.connection == null) {
                connection = getConnection();
            }
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    @Override
    public void close() {
        try {
            //关闭连接和释放资源
            if (connection != null) {
                connection.close();
            }
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    @Override
    public void invoke(String sql, Context context) {
        JSONObject res = JSON.parseObject(sql);
        System.out.println(res);
        try {
            JSONObject after = res.getJSONObject("after");
            JSONObject before = res.getJSONObject("before");

            switch (res.getString("op")) {
                case "r":
                    System.out.println("读取数据");
                    break;
                case "d":
                    System.out.println("删除数据");
                    if (before != null) {
                        PreparedStatement ps = this.connection.prepareStatement("delete from " + targetTableName + " where " + idName + "=?");
                        ps.setInt(1, before.getInteger(idName));
                        ps.execute();
                        ps.close();
                    }
                    break;
                case "u":
                    System.out.println("更新数据");
                    PreparedStatement ps = this.connection.prepareStatement("update " + targetTableName + " set name=?, age=? where id = ?");
                    ps.setString(1, after.getString("name"));
                    ps.setInt(2, after.getInteger("age"));
                    ps.setInt(3, after.getInteger("id"));
                    ps.execute();
                    ps.close();
                    break;
            }

            if (res.getString("op").equals("c")) {
                System.out.println("插入数据");
                if (after != null) {
                    PreparedStatement ps = this.connection.prepareStatement("insert into " + targetTableName + "(id, name, age) values(?,?,?)");
                    ps.setInt(1, after.getInteger("id"));
                    ps.setString(2, after.getString("name"));
                    ps.setInt(3, after.getInteger("age"));
                    ps.execute();
                    ps.close();
                }
            }
        } catch (Exception ex) {
            System.out.println("异常" + ex.getMessage());
        }
    }

    private static Connection getConnection() {
        Connection con = null;
        try {
            Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver").newInstance();
            con = DriverManager.getConnection("jdbc:sqlserver://192.168.0.148:1433;database=test_db_mssql;user=test;password=123;encrypt=true;trustServerCertificate=true;loginTimeout=30;");
        } catch (Exception e) {
            System.out.println("-----------sqlserver get connection has exception , msg = " + e.getMessage());
        }
        return con;
    }
}

15.IDEA添加工件,并且选择入口类

16.构建工件得到一个Jar包

17.启动Docker端口映射8081然后启动Flink1.16.0,记得需要提前把Jar放进Flink的lib下面,否则Flink无法加载Jar会上传失败

18.提交刚刚生成的Jar包,并且运行

运行成功

至此已经完成同步功能。

接下来看下效果:

1.PostgreSQL插入数据,SQLServer实时同步

PostgreSQL默认情况无法修改表,我就不重新建一个表了。

MySQL测试过增删改都是实时同步,代码是同一份。