# 安装Kafka
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar -xzf kafka_2.13-3.4.0.tgz
cd kafka_2.13-3.4.0风哥提示:
# 启动Kafka
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar -xzf kafka_2.13-3.4.0.tgz
cd kafka_2.13-3.4.0风哥提示:
# 启动Kafka
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
# 创建主题
bin/kafka-topics.sh –create –topic tidb-cdc –bootstrap-server localhost:9092 –partitions 3 –replication-factor 2
bin/kafka-topics.sh –create –topic tidb-cdc –bootstrap-server localhost:9092 –partitions 3 –replication-factor 2
3.2 TiCDC配置
3.2.1 配置文件
# ticdc.toml # TiCDC服务器配置 [server] addr = "0.0.0.0:8300" advertise-addr = "192.168.1.10:8300" log-level = "info" log-file = "/tidb/app/ticdc/logs/ticdc.log" # 安全配置 [security] ca-path = "/tidb/app/cert/ca.pem" cert-path = "/tidb/app/cert/ticdc.pem" key-path = "/tidb/app/cert/ticdc-key.pem" # Kafka配置 [kafka] broker-addrs = ["192.168.1.11:9092", "192.168.1.12:9092", "192.168.1.13:9092"] topic = "tidb-cdc" partition-num = 3 replication-factor = 2 max-message-bytes = 1048576
3.2.2 启动TiCDC
# 启动TiCDC服务
tiup cluster start fgedudb -R ticdc
tiup cluster start fgedudb -R ticdc
3.3 创建同步任务
3.3.1 配置同步任务
# cdc-task.yaml
# 任务名称
name: "tidb-to-kafka"
# 同步模式:full(全量+增量)、incremental(仅增量)
mode: "full"
# 源TiDB配置
source:
host: "192.168.1.20"
port: 4000
user: "root"
password: "password"
# 过滤规则
filter:
rules:
- database: "test"
table: "%"
# 目标Kafka配置
target:
type: "kafka"
kafka-config:
broker-addrs: ["192.168.1.11:9092", "192.168.1.12:9092", "192.168.1.13:9092"]
topic: "tidb-cdc"
partition-num: 3
replication-factor: 2
# 消息格式:canal-json、avro、maxwell
message-format: "canal-json"
# 同步配置
sync:
worker-count: 4
batch-size: 1000
max-message-size: 1048576
3.3.2 启动同步任务
# 启动同步任务
tiup cdc cli changefeed create –server=http://192.168.1.10:8300 –sink-uri=”kafka://192.168.1.11:9092/tidb-cdc?partition-num=3&replication-factor=2&max-message-bytes=1048576&message-format=canal-json” –config=cdc-task.yaml
tiup cdc cli changefeed create –server=http://192.168.1.10:8300 –sink-uri=”kafka://192.168.1.11:9092/tidb-cdc?partition-num=3&replication-factor=2&max-message-bytes=1048576&message-format=canal-json” –config=cdc-task.yaml
3.4 监控同步状态
# 查看同步任务状态
tiup cdc cli changefeed list –server=http://192.168.1.10:8300
学习交流加群风哥QQ113257174
# 查看同步任务详情
tiup cdc cli changefeed query –server=http://192.168.1.10:8300 –changefeed-id=tidb-to-kafka
tiup cdc cli changefeed list –server=http://192.168.1.10:8300
学习交流加群风哥QQ113257174
# 查看同步任务详情
tiup cdc cli changefeed query –server=http://192.168.1.10:8300 –changefeed-id=tidb-to-kafka
# 查看TiCDC日志
tail -f /tidb/app/ticdc/logs/ticdc.log
3.5 消费变更数据
3.5.1 使用Kafka消费者
// Java Kafka消费者示例
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class CdcConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "tidb-cdc-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("tidb-cdc"));
try {
while (true) {
ConsumerRecords records = consumer.poll(100);
records.forEach(record -> {
System.out.println("Topic: " + record.topic() + ", Partition: " + record.partition() + ", Offset: " + record.offset() + ", Key: " + record.key() + ", Value: " + record.value());
// 处理变更数据
});
}
} finally {
consumer.close();
}
}
}
更多视频教程www.fgedu.net.cn
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
