flinkcdc同步PostgreSQL到SQLServer
1.必须用Linux系统,Windows下低版本Flink有Bug,高版本的API改动太大,资料太少难开发。
2.我是使用Docker用Centos7镜像,然后Flink版本为1.16.0。https://archive.apache.org/dist/flink/flink-1.16.1/
3.安装Java,参考https://www.saoniuhuo.com/article/detail-92.html,这个教程唯一不好的就是重启后环境变量失效。
4.SQLServer开启允许远程连接,配置好记得重启电脑,重启服务不行。

5.PostgreSQL我从官网下载的最新版,也需要开启允许远程连接。
6.开启远程连接是为了让Docker容器的Centos可以访问宿主的数据库。
7.分别给SQLServer和PostgreSQL创建表,SQLServer的id不需要设置主键。


7.修改PostgreSQL的pg_hba.conf配置文件,多加一条信息设置允许远程访问

8.修改PostgreSQL的postgresql.conf配置文件,FlinkCDC才可以监听到


9.IDEA创建Maven项目
10.把用到的jar放到Flink的lib目录下,然后Java项目添加Jar的引用。

11.必须说明一点,Flink高和低版本API变化很多,建议Jar版本和Flink版本保持一致!!!
12.创建FlinkCdcDataStream.java,编写Java代码监听PostgreSQL表的变化
SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
.hostname("192.168.0.148").port(5432)
.database("test_db_11") // monitor postgres database
.schemaList("public") // monitor inventory schema
.tableList("public.test_table_11") // monitor products table
.decodingPluginName("pgoutput")
.slotName("t_table_slot")
.username("postgres")
.password("123")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.addSource(sourceFunction)
.addSink(new ClickHouseSink("test_table_mssql1", "id"));
env.execute();

13.创建ClickHouseSink.java,编写监听到数据变化后同步到SQLServer,代码太多就不截图,代码放到下面吧
package org.example;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class ClickHouseSink extends RichSinkFunction<String> {
private Connection connection;
private String targetTableName;
private String idName;
public ClickHouseSink(String targetTableName, String idName) {
this.targetTableName = targetTableName;
this.idName = idName;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
try {
if (this.connection == null) {
connection = getConnection();
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
@Override
public void close() {
try {
//关闭连接和释放资源
if (connection != null) {
connection.close();
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
@Override
public void invoke(String sql, Context context) {
JSONObject res = JSON.parseObject(sql);
System.out.println(res);
try {
JSONObject after = res.getJSONObject("after");
JSONObject before = res.getJSONObject("before");
switch (res.getString("op")) {
case "r":
System.out.println("读取数据");
break;
case "d":
System.out.println("删除数据");
if (before != null) {
PreparedStatement ps = this.connection.prepareStatement("delete from " + targetTableName + " where " + idName + "=?");
ps.setInt(1, before.getInteger(idName));
ps.execute();
ps.close();
}
break;
case "u":
System.out.println("更新数据");
PreparedStatement ps = this.connection.prepareStatement("update " + targetTableName + " set name=?, age=? where id = ?");
ps.setString(1, after.getString("name"));
ps.setInt(2, after.getInteger("age"));
ps.setInt(3, after.getInteger("id"));
ps.execute();
ps.close();
break;
}
if (res.getString("op").equals("c")) {
System.out.println("插入数据");
if (after != null) {
PreparedStatement ps = this.connection.prepareStatement("insert into " + targetTableName + "(id, name, age) values(?,?,?)");
ps.setInt(1, after.getInteger("id"));
ps.setString(2, after.getString("name"));
ps.setInt(3, after.getInteger("age"));
ps.execute();
ps.close();
}
}
} catch (Exception ex) {
System.out.println("异常" + ex.getMessage());
}
}
private static Connection getConnection() {
Connection con = null;
try {
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver").newInstance();
con = DriverManager.getConnection("jdbc:sqlserver://192.168.0.148:1433;database=test_db_mssql;user=test;password=123;encrypt=true;trustServerCertificate=true;loginTimeout=30;");
} catch (Exception e) {
System.out.println("-----------sqlserver get connection has exception , msg = " + e.getMessage());
}
return con;
}
}
15.IDEA添加工件,并且选择入口类

16.构建工件得到一个Jar包


17.启动Docker端口映射8081然后启动Flink1.16.0,记得需要提前把Jar放进Flink的lib下面,否则Flink无法加载Jar会上传失败
18.提交刚刚生成的Jar包,并且运行



至此已经完成同步功能。
接下来看下效果:
1.PostgreSQL插入数据,SQLServer实时同步


PostgreSQL默认情况无法修改表,我就不重新建一个表了。
MySQL测试过增删改都是实时同步,代码是同一份。