风哥提示:
# cdc-task.yaml
name: "tidb-to-kafka"
mode: "full"
source:
host: "192.168.1.20"
port: 4000
user: "root"
password: "password"
target:
type: "kafka"
kafka-config:
broker-addrs: ["192.168.1.11:9092", "192.168.1.12:9092", "192.168.1.13:9092"]
topic: "tidb-data"
partition-num: 3
replication-factor: 2
message-format: "canal-json"
tiup cdc cli changefeed create –server=http://192.168.1.10:8300 –sink-uri=”kafka://192.168.1.11:9092/tidb-data?partition-num=3&replication-factor=2&message-format=canal-json” –config=cdc-task.yaml
3.1.2 使用TiDB Binlog
- 安装TiDB Binlog:
tiup cluster add-component fgedudb pump drainer
- 配置Drainer:
# drainer.toml [server] addr = "0.0.0.0:8249" [storage] storage-type = "local" path = "/tidb/app/drainer/data" [syncer] target-type = "kafka" [syncer.to] broker-addrs = ["192.168.1.11:9092", "192.168.1.12:9092", "192.168.1.13:9092"] topic = "tidb-binlog" partition-num = 3 replication-factor = 2
- 启动服务:
tiup cluster start fgedudb -R pump drainer
3.2 Kafka → TiDB集成
3.2.1 使用Kafka Connect
- 安装Kafka Connect:
# 下载Kafka Connect
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar -xzf kafka_2.13-3.4.0.tgz - 配置Kafka Connect:
# connect-standalone.properties bootstrap.servers=192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000
- 配置TiDB连接器:
# tidb-sink.properties name=tidb-sink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=10 connection.url=jdbc:mysql://192.168.1.20:4000/test?useSSL=false connection.user=root connection.password=password topics=kafka-data auto.create=true auto.evolve=true insert.mode=upsert pk.fields=id pk.mode=record_key
学习交流加群风哥QQ113257174
- 启动Kafka Connect:
bin/connect-standalone.sh config/connect-standalone.properties config/tidb-sink.properties
3.2.2 使用自定义消费者
// Java Kafka消费者示例
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Collections;
import java.util.Properties;
public class KafkaToTiDBConsumer {
public static void main(String[] args) {
// Kafka配置
Properties kafkaProps = new Properties();
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092");
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-to-tidb-consumer");
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer consumer = new KafkaConsumer<>(kafkaProps);
consumer.subscribe(Collections.singletonList("kafka-data"));
// TiDB连接
String jdbcUrl = "jdbc:mysql://192.168.1.20:4000/test?useSSL=false";
String username = "root";
String password = "password";
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
while (true) {
ConsumerRecords records = consumer.poll(100);
records.forEach(record -> {
try {
// 解析消息
String value = record.value();
// 提取数据
// ...
// 插入TiDB
String sql = "INSERT INTO test.table (id, name, value) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE name=?, value=?";
PreparedStatement pstmt = conn.prepareStatement(sql);
// 设置参数
// ...
pstmt.executeUpdate();
} catch (Exception e) {
e.printStackTrace();
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
更多视频教程www.fgedu.net.cn
本文由风哥教程整理发布,仅用于学习测试使用,转载注明出处:http://www.fgedu.net.cn/10327.html
