2023年8月1日发(作者:)

Flink使⽤之OracleCDCFlink 使⽤介绍相关⽂档⽬录Flink 使⽤介绍相关⽂档⽬录准备⼯作在这⼀步需要配置Oracle。主要包含。1. 开启Archive log2. 开启数据库和数据表的supplemental log3. 创建CDC⽤户并赋予权限注意:不要使⽤Oracle的SYS和SYSTEM⽤户做为CDC⽤户。因为这两个⽤户能够捕获到⼤量Oracle数据库内部的变更信息,对于业务数据来说是不必要的。Debezium会过滤掉这两个⽤户捕获到的变更内容。下⾯开始配置步骤。在安装Oracle的机器上执⾏:su - oraclesqlplus / as sysdba进⼊Sqlplus。然后开启Archive log。alter system set db_recovery_file_dest_size = 10G;alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;shutdown immediate;startup mount;alter database archivelog;alter database open;# 检查Archive log是否成功开启archive log list;注意:1. 本步骤需要重启数据库,请选择在合适的时间操作。2. 例⼦中的/opt/oracle/oradata/recovery_area⽬录oracle⽤户需要有读写权限。3. 如果执⾏alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;的时候报ORA-32001: write to SPFILE requestedbut no SPFILE is in use。需要检查spfile⽂件。show parameter spfile;# 如果输出value为空,说明没有创建spfile,执⾏下⾯SQL创建create spfile from pfile;# 关闭并重启shutdown immediate;startup;# 检查spfile是否成功创建show parameter spfile;开启数据库和需要CDC的表的supplemental log:ALTER TABLE ers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;其中ers需要CDC的⽬标表,格式为_name。最后,我们需要创建CDC专⽤⽤户,以及为它赋予权限。# ⽰例路径/opt/oracle/oradata/SID/,需要提前创建好并赋予权限CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;GRANT CREATE SESSION TO flinkuser;GRANT SET CONTAINER TO flinkuser;GRANT SELECT ON V_$DATABASE to flinkuser;GRANT FLASHBACK ANY TABLE TO flinkuser;GRANT SELECT ANY TABLE TO flinkuser;GRANT SELECT_CATALOG_ROLE TO flinkuser;GRANT EXECUTE_CATALOG_ROLE TO flinkuser;GRANT SELECT ANY TRANSACTION TO flinkuser;GRANT LOGMINING TO flinkuser;GRANT CREATE TABLE TO flinkuser;GRANT LOCK ANY TABLE TO flinkuser;GRANT ALTER ANY TABLE TO flinkuser;GRANT CREATE SEQUENCE TO flinkuser;GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;GRANT SELECT ON V_$LOG TO flinkuser;GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;GRANT SELECT ON V_$LOGFILE TO flinkuser;GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;注意:如果使⽤的是Oracle 11g,执⾏GRANT LOGMINING TO flinkuser;会报没有LOGMINING这个role,可忽略这个错误,不影响使⽤。如果使⽤12c版本赋权语句有所不同,可参考Debezium Connector for Oracle :: DebeziumDocumentation。最后需要强调下,我们的Oracle CDC程序运⾏的时候可能会报出如下错误。ORA-12514, TNS:listener does not currently know of service requested in connect descriptor编辑⽂件(不知道路径的可以find⼀下),添加:SID_LIST_LISTENER = (SID_LIST = (SID_DESC = (SID_NAME = ora11g) (ORACLE_HOME = /data/oracle/product/11.2.0/dbhome_1) ) )SID_NAME和ORACLE_HOME改为真实的值,ORACLE_HOME可通过环境变量查看。修改后别忘了执⾏:lsnrctl reload重启监听器。到此为⽌,Oracle数据库环境配置完毕。项⽬依赖在中添加如下依赖: ica flink-connector-oracle-cdc 2.1.0 flink-table-api-scala-bridge_${n} ${n} flink-table-planner-blink_${n} ${n}Oracle CDC SQL⽅式直接上⽰例程序:val bsSettings = tance().useBlinkPlanner().inStreamingMode().build()val tableEnvironment = (bsSettings)val sql = """ |CREATE TABLE test ( | ID INT, | NAME STRING, | AGE INT | ) WITH ( | 'connector' = 'oracle-cdc', | 'hostname' = '', | 'port' = '1521', | 'username' = 'flinkuser', | 'password' = 'flinkpw', | 'database-name' = 'ora11g', | 'schema-name' = 'INVENTORY', | 'table-name' = 'CUSTOMERS' | ) |""".eSql(sql)// 如下两种print数据⽅式都可以使⽤// ⽅法 1// val result = eSql("select * from test")// ()// ⽅法 eSql("CREATE TABLE sink_table (ID INT, NAME STRING, AGE INT) WITH ( 'connector' = 'print')")eSql("INSERT INTO sink_table SELECT ID, NAME, AGE FROM test")注意:Oracle字段默认会转化为⼤写。如果create table的时候没有使⽤引号引住字段名,则字段名会被转换为⼤写。那么在Flink create table的时候字段也必须使⽤⼤写。否则对应字段的内容会变成null,⽆法正常获取到数据!Oracle中查看建表语句的⽅法为SELECT DBMS__DDL('TABLE','表名称') FROM DUAL;Oracle CDC API⽅式除了使⽤SQL⽅式外,我们还可以使⽤DataStream API⽅式。val sourceFunction: SourceFunction[String] = OracleSource .builder[String] .hostname("") .port(1521) .database("ora11g") .schemaList("INVENTORY") .tableList("ERS") .username("flinkuser") .password("flinkpw") .deserializer(new JsonDebeziumDeserializationSchema) .build val env = cutionEnvironment rce(sourceFunction).allelism(1) // use parallelism 1 for sink to keep message ordering e()注意:tableList参数有⼀个坑,必须配置为-name格式,否则会找不到数据表。和SQL中的table-name配置⽅式不同!参考⽂献Oracle CDC Connector — Flink CDC 2.0.0 documentation ()Debezium Connector for Oracle :: Debezium Documentation

2023年8月1日发(作者:)

Flink使⽤之OracleCDCFlink 使⽤介绍相关⽂档⽬录Flink 使⽤介绍相关⽂档⽬录准备⼯作在这⼀步需要配置Oracle。主要包含。1. 开启Archive log2. 开启数据库和数据表的supplemental log3. 创建CDC⽤户并赋予权限注意:不要使⽤Oracle的SYS和SYSTEM⽤户做为CDC⽤户。因为这两个⽤户能够捕获到⼤量Oracle数据库内部的变更信息,对于业务数据来说是不必要的。Debezium会过滤掉这两个⽤户捕获到的变更内容。下⾯开始配置步骤。在安装Oracle的机器上执⾏:su - oraclesqlplus / as sysdba进⼊Sqlplus。然后开启Archive log。alter system set db_recovery_file_dest_size = 10G;alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;shutdown immediate;startup mount;alter database archivelog;alter database open;# 检查Archive log是否成功开启archive log list;注意:1. 本步骤需要重启数据库,请选择在合适的时间操作。2. 例⼦中的/opt/oracle/oradata/recovery_area⽬录oracle⽤户需要有读写权限。3. 如果执⾏alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;的时候报ORA-32001: write to SPFILE requestedbut no SPFILE is in use。需要检查spfile⽂件。show parameter spfile;# 如果输出value为空,说明没有创建spfile,执⾏下⾯SQL创建create spfile from pfile;# 关闭并重启shutdown immediate;startup;# 检查spfile是否成功创建show parameter spfile;开启数据库和需要CDC的表的supplemental log:ALTER TABLE ers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;其中ers需要CDC的⽬标表,格式为_name。最后,我们需要创建CDC专⽤⽤户,以及为它赋予权限。# ⽰例路径/opt/oracle/oradata/SID/,需要提前创建好并赋予权限CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;GRANT CREATE SESSION TO flinkuser;GRANT SET CONTAINER TO flinkuser;GRANT SELECT ON V_$DATABASE to flinkuser;GRANT FLASHBACK ANY TABLE TO flinkuser;GRANT SELECT ANY TABLE TO flinkuser;GRANT SELECT_CATALOG_ROLE TO flinkuser;GRANT EXECUTE_CATALOG_ROLE TO flinkuser;GRANT SELECT ANY TRANSACTION TO flinkuser;GRANT LOGMINING TO flinkuser;GRANT CREATE TABLE TO flinkuser;GRANT LOCK ANY TABLE TO flinkuser;GRANT ALTER ANY TABLE TO flinkuser;GRANT CREATE SEQUENCE TO flinkuser;GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;GRANT SELECT ON V_$LOG TO flinkuser;GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;GRANT SELECT ON V_$LOGFILE TO flinkuser;GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;注意:如果使⽤的是Oracle 11g,执⾏GRANT LOGMINING TO flinkuser;会报没有LOGMINING这个role,可忽略这个错误,不影响使⽤。如果使⽤12c版本赋权语句有所不同,可参考Debezium Connector for Oracle :: DebeziumDocumentation。最后需要强调下,我们的Oracle CDC程序运⾏的时候可能会报出如下错误。ORA-12514, TNS:listener does not currently know of service requested in connect descriptor编辑⽂件(不知道路径的可以find⼀下),添加:SID_LIST_LISTENER = (SID_LIST = (SID_DESC = (SID_NAME = ora11g) (ORACLE_HOME = /data/oracle/product/11.2.0/dbhome_1) ) )SID_NAME和ORACLE_HOME改为真实的值,ORACLE_HOME可通过环境变量查看。修改后别忘了执⾏:lsnrctl reload重启监听器。到此为⽌,Oracle数据库环境配置完毕。项⽬依赖在中添加如下依赖: ica flink-connector-oracle-cdc 2.1.0 flink-table-api-scala-bridge_${n} ${n} flink-table-planner-blink_${n} ${n}Oracle CDC SQL⽅式直接上⽰例程序:val bsSettings = tance().useBlinkPlanner().inStreamingMode().build()val tableEnvironment = (bsSettings)val sql = """ |CREATE TABLE test ( | ID INT, | NAME STRING, | AGE INT | ) WITH ( | 'connector' = 'oracle-cdc', | 'hostname' = '', | 'port' = '1521', | 'username' = 'flinkuser', | 'password' = 'flinkpw', | 'database-name' = 'ora11g', | 'schema-name' = 'INVENTORY', | 'table-name' = 'CUSTOMERS' | ) |""".eSql(sql)// 如下两种print数据⽅式都可以使⽤// ⽅法 1// val result = eSql("select * from test")// ()// ⽅法 eSql("CREATE TABLE sink_table (ID INT, NAME STRING, AGE INT) WITH ( 'connector' = 'print')")eSql("INSERT INTO sink_table SELECT ID, NAME, AGE FROM test")注意:Oracle字段默认会转化为⼤写。如果create table的时候没有使⽤引号引住字段名,则字段名会被转换为⼤写。那么在Flink create table的时候字段也必须使⽤⼤写。否则对应字段的内容会变成null,⽆法正常获取到数据!Oracle中查看建表语句的⽅法为SELECT DBMS__DDL('TABLE','表名称') FROM DUAL;Oracle CDC API⽅式除了使⽤SQL⽅式外,我们还可以使⽤DataStream API⽅式。val sourceFunction: SourceFunction[String] = OracleSource .builder[String] .hostname("") .port(1521) .database("ora11g") .schemaList("INVENTORY") .tableList("ERS") .username("flinkuser") .password("flinkpw") .deserializer(new JsonDebeziumDeserializationSchema) .build val env = cutionEnvironment rce(sourceFunction).allelism(1) // use parallelism 1 for sink to keep message ordering e()注意:tableList参数有⼀个坑,必须配置为-name格式,否则会找不到数据表。和SQL中的table-name配置⽅式不同!参考⽂献Oracle CDC Connector — Flink CDC 2.0.0 documentation ()Debezium Connector for Oracle :: Debezium Documentation