1. 首页 > 国产数据库教程 > TiDB教程 > 正文

tidb-095-TiDB与Kafka数据集成

  • 配置TiCDC
    风哥提示:

    # cdc-task.yaml
    name: "tidb-to-kafka"
    mode: "full"
    source:
      host: "192.168.1.20"
      port: 4000
      user: "root"
      password: "password"
    target:
      type: "kafka"
      kafka-config:
        broker-addrs: ["192.168.1.11:9092", "192.168.1.12:9092", "192.168.1.13:9092"]
        topic: "tidb-data"
        partition-num: 3
        replication-factor: 2
        message-format: "canal-json"
    

  • 启动同步任务
    tiup cdc cli changefeed create –server=http://192.168.1.10:8300 –sink-uri=”kafka://192.168.1.11:9092/tidb-data?partition-num=3&replication-factor=2&message-format=canal-json” –config=cdc-task.yaml
  • 3.1.2 使用TiDB Binlog

    1. 安装TiDB Binlog
      tiup cluster add-component fgedudb pump drainer
    2. 配置Drainer
      # drainer.toml
      [server]
      addr = "0.0.0.0:8249"
      
      [storage]
      storage-type = "local"
      path = "/tidb/app/drainer/data"
      
      [syncer]
      target-type = "kafka"
      
      [syncer.to]
      broker-addrs = ["192.168.1.11:9092", "192.168.1.12:9092", "192.168.1.13:9092"]
      topic = "tidb-binlog"
      partition-num = 3
      replication-factor = 2
      

    3. 启动服务
      tiup cluster start fgedudb -R pump drainer

    3.2 Kafka → TiDB集成

    3.2.1 使用Kafka Connect

    1. 安装Kafka Connect
      # 下载Kafka Connect
      wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
      tar -xzf kafka_2.13-3.4.0.tgz
    2. 配置Kafka Connect
      # connect-standalone.properties
      bootstrap.servers=192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      key.converter.schemas.enable=true
      value.converter.schemas.enable=true
      offset.storage.file.filename=/tmp/connect.offsets
      offset.flush.interval.ms=10000
      

    3. 配置TiDB连接器
      # tidb-sink.properties
      name=tidb-sink
      connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
      tasks.max=10
      connection.url=jdbc:mysql://192.168.1.20:4000/test?useSSL=false
      connection.user=root
      connection.password=password
      topics=kafka-data
      auto.create=true
      auto.evolve=true
      insert.mode=upsert
      pk.fields=id
      pk.mode=record_key
      

      学习交流加群风哥QQ113257174

    4. 启动Kafka Connect
      bin/connect-standalone.sh config/connect-standalone.properties config/tidb-sink.properties

    3.2.2 使用自定义消费者

    // Java Kafka消费者示例
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaToTiDBConsumer {
        public static void main(String[] args) {
            // Kafka配置
            Properties kafkaProps = new Properties();
            kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092");
            kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-to-tidb-consumer");
            kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
            KafkaConsumer consumer = new KafkaConsumer<>(kafkaProps);
            consumer.subscribe(Collections.singletonList("kafka-data"));
    
            // TiDB连接
            String jdbcUrl = "jdbc:mysql://192.168.1.20:4000/test?useSSL=false";
            String username = "root";
            String password = "password";
    
            try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
                while (true) {
                    ConsumerRecords records = consumer.poll(100);
                    records.forEach(record -> {
                        try {
                            // 解析消息
                            String value = record.value();
                            // 提取数据
                            // ...
                            
                            // 插入TiDB
                            String sql = "INSERT INTO test.table (id, name, value) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE name=?, value=?";
                            PreparedStatement pstmt = conn.prepareStatement(sql);
                            // 设置参数
                            // ...
                            pstmt.executeUpdate();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    });
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }
    }
    

    更多视频教程www.fgedu.net.cn

    本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html

    联系我们

    在线咨询:点击这里给我发消息

    微信号:itpux-com

    工作日:9:30-18:30,节假日休息