前言
近期给客户实施大数据,本文详细阐述了将Oracle全量同步到kafka的过程
📣 1.部署环境
✨ 1.1 架构规划
![在这里插入图片描述](https://img-blog.csdnimg.cn/37dd7893fde44a73a8753f432363c8e5.png)
✨ 1.2 服务规划
📣 2.Oracle相关配置
✨ 2.1 参数调整
-- oracle数据库配置
1.开启数据库归档--如果没有开启
2.开启数据库级别附加日志--如果没有开始最小附加日志
3.开启强制日志--如果没有开启强制日志
4.设置ENABLE_GOLDENGATE_REPLICAT参数为TRUE
5.创建OGG用户包括包括源端用户、目标端用户以及OGG抽取用户
alter database add supplemental log data;
alter database add supplemental log data (all) columns;
alter database force logging;
alter system set enable_goldengate_replication=TRUE;
##开归档
mkdir -p /u01/app/oracle/arch
SYS@JEM11GR2> shutdown immediate;
Database closed.
Database dismounted.
ORACLE instance shut down.
SYS@JEM11GR2> startup mount;
ORACLE instance started.
Total System Global Area 409194496 bytes
Fixed Size 2253744 bytes
Variable Size 310381648 bytes
Database Buffers 92274688 bytes
Redo Buffers 4284416 bytes
Database mounted.
SYS@JEM11GR2> alter database archivelog;
Database altered.
SYS@JEM11GR2> alter system set log_archive_dest_1='location=/u01/app/oracle/arch';
System altered.
SYS@JEM11GR2> archive log list
Database log mode Archive Mode
Automatic archival Enabled
Archive destination /u01/app/oracle/arch
Oldest online log sequence 3
Next log sequence to archive 5
Current log sequence 5
SYS@JEM11GR2> alter database open;
Database altered.
SYS@JEM11GR2> select name,supplemental_log_data_min , force_logging, log_mode from v$database;
NAME SUPPLEMENTAL_LOG FORCE_ LOG_MODE
------------------ ---------------- ------ ------------------------
JEM11GR2 YES YES ARCHIVELOG
##关闭回收站
SQL> SHOW PARAMETER recyclebin;
SQL> ALTER SYSTEM SET recyclebin = OFF SCOPE = SPFILE;
SQL> show recyclebin;
ORIGINAL NAME RECYCLEBIN NAME OBJECT TYPE DROP TIME
---------------- ------------------------------ ------------ -------------------
IMAGE_LOB BIN$CWQtAJlQCH3gYx4MEqylGQ==$0 TABLE 2023-11-05:08:03:41
SQL> PURGE recyclebin;
✨ 2.2 新增用户
-- OGG管理用户
CREATE USER ogg identified by oracle;
GRANT DBA to ogg;
grant SELECT ANY DICTIONARY to ogg;
GRANT EXECUTE ON SYS.DBMS_LOCK TO ogg;
grant select any transaction to ogg;
grant select any table to ogg;
grant flashback any table to ogg;
grant alter any table to ogg;
-- 业务用户
CREATE USER rptuser identified by oracle;
alter user sys identified by oracle;
GRANT DBA to rptuser ;
grant SELECT ANY DICTIONARY to rptuser;
GRANT EXECUTE ON SYS.DBMS_LOCK TO rptuser;
-- 启动监听
[oracle@jemora11204 ~]$ lsnrctl start
[oracle@jemora11204 ~]$ lsnrctl status
📣 3.kafka环境
1.启动进程
/usr/local/kafka/kafka_2.13-3.5.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/kafka_2.13-3.5.1/config/zookeeper.properties
/usr/local/kafka/kafka_2.13-3.5.1/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka_2.13-3.5.1/config/server.properties
[root@kafka /]# netstat -nta|grep 2181
tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN
tcp 0 0 127.0.0.1:2181 127.0.0.1:57458 TIME_WAIT
[root@kafka /]# netstat -tna|grep -e 9092 -e 2181
tcp 0 0 0.0.0.0:9092 0.0.0.0:* LISTEN
tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN
tcp 0 0 172.18.12.31:9092 172.18.12.31:56690 ESTABLISHED
tcp 0 0 127.0.0.1:37252 127.0.0.1:2181 ESTABLISHED
tcp 0 0 172.18.12.31:56690 172.18.12.31:9092 ESTABLISHED
tcp 0 0 127.0.0.1:2181 127.0.0.1:37252 ESTABLISHED
[root@kafka /]# jps
1179 Jps
443 QuorumPeerMain
[root@kafka /]# netstat -tulnp | grep java
tcp 0 0 0.0.0.0:9092 0.0.0.0:* LISTEN 2231/java
tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 443/java
tcp 0 0 0.0.0.0:42854 0.0.0.0:* LISTEN 2231/java
tcp 0 0 0.0.0.0:43337 0.0.0.0:* LISTEN 443/java
kafka默认占用9092端口,ZK默认占用2181端口。
2.kafka日志
/usr/local/kafka/kafka_2.13-3.5.1/kafka_log
3.测试kafka
#创建主题
cd /usr/local/kafka/kafka_2.13-3.5.1
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic --partitions 1
其中--partitions 参数指定分区数。在实际生产环境中,建议将副本数设置为 2 或 3,以提高可用性。
#查看主题
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
##发送消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic
>hello
>world
##接收消息
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic --from-beginning
hello
world
📣 4.OGG for Oracle
✨ 4.1 创建身份证明
✨ 4.2 添加表附加日志
✨ 4.3 进程配置
📣 5.OGG for Bigdata
✨ 5.1 配置主进程
✨5.2 kafka端配置
📣 6.全量同步
📣 7.Kafka确认