1. 首页 > 国产数据库教程 > TiDB教程 > 正文

tidb-093-TiDB CDC数据同步配置

  • 安装Kafka
    # 安装Kafka
    wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
    tar -xzf kafka_2.13-3.4.0.tgz
    cd kafka_2.13-3.4.0风哥提示:
    # 启动Kafka
    bin/zookeeper-server-start.sh config/zookeeper.properties &
    bin/kafka-server-start.sh config/server.properties &
  • 创建Kafka主题
    # 创建主题
    bin/kafka-topics.sh –create –topic tidb-cdc –bootstrap-server localhost:9092 –partitions 3 –replication-factor 2
  • 3.2 TiCDC配置

    3.2.1 配置文件

    # ticdc.toml
    
    # TiCDC服务器配置
    [server]
    addr = "0.0.0.0:8300"
    advertise-addr = "192.168.1.10:8300"
    log-level = "info"
    log-file = "/tidb/app/ticdc/logs/ticdc.log"
    
    # 安全配置
    [security]
    ca-path = "/tidb/app/cert/ca.pem"
    cert-path = "/tidb/app/cert/ticdc.pem"
    key-path = "/tidb/app/cert/ticdc-key.pem"
    
    # Kafka配置
    [kafka]
    broker-addrs = ["192.168.1.11:9092", "192.168.1.12:9092", "192.168.1.13:9092"]
    topic = "tidb-cdc"
    partition-num = 3
    replication-factor = 2
    max-message-bytes = 1048576
    

    3.2.2 启动TiCDC

    # 启动TiCDC服务
    tiup cluster start fgedudb -R ticdc

    3.3 创建同步任务

    3.3.1 配置同步任务

    # cdc-task.yaml
    
    # 任务名称
    name: "tidb-to-kafka"
    
    # 同步模式:full(全量+增量)、incremental(仅增量)
    mode: "full"
    
    # 源TiDB配置
    source:
      host: "192.168.1.20"
      port: 4000
      user: "root"
      password: "password"
      # 过滤规则
      filter:
        rules:
          - database: "test"
            table: "%"
    
    # 目标Kafka配置
    target:
      type: "kafka"
      kafka-config:
        broker-addrs: ["192.168.1.11:9092", "192.168.1.12:9092", "192.168.1.13:9092"]
        topic: "tidb-cdc"
        partition-num: 3
        replication-factor: 2
        # 消息格式:canal-json、avro、maxwell
        message-format: "canal-json"
    
    # 同步配置
    sync:
      worker-count: 4
      batch-size: 1000
      max-message-size: 1048576
    

    3.3.2 启动同步任务

    # 启动同步任务
    tiup cdc cli changefeed create –server=http://192.168.1.10:8300 –sink-uri=”kafka://192.168.1.11:9092/tidb-cdc?partition-num=3&replication-factor=2&max-message-bytes=1048576&message-format=canal-json” –config=cdc-task.yaml

    3.4 监控同步状态

    # 查看同步任务状态
    tiup cdc cli changefeed list –server=http://192.168.1.10:8300
    学习交流加群风哥QQ113257174
    # 查看同步任务详情
    tiup cdc cli changefeed query –server=http://192.168.1.10:8300 –changefeed-id=tidb-to-kafka

    # 查看TiCDC日志
    tail -f /tidb/app/ticdc/logs/ticdc.log

    3.5 消费变更数据

    3.5.1 使用Kafka消费者

    // Java Kafka消费者示例
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Collections;
    import java.util.Properties;
    
    public class CdcConsumer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "tidb-cdc-consumer");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
            KafkaConsumer consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList("tidb-cdc"));
    
            try {
                while (true) {
                    ConsumerRecords records = consumer.poll(100);
                    records.forEach(record -> {
                        System.out.println("Topic: " + record.topic() + ", Partition: " + record.partition() + ", Offset: " + record.offset() + ", Key: " + record.key() + ", Value: " + record.value());
                        // 处理变更数据
                    });
                }
            } finally {
                consumer.close();
            }
        }
    }
    

    更多视频教程www.fgedu.net.cn

    本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

    联系我们

    在线咨询:点击这里给我发消息

    微信号:itpux-com

    工作日:9:30-18:30,节假日休息