更新時間:2022年01月12日15時35分 來源:傳智教育 瀏覽次數(shù):
在Kafka的topic 「ods_user」中有一些用戶數(shù)據(jù),數(shù)據(jù)格式如下:
| 姓名,性別,出生日期
| 張三,1,1980-10-09
| 李四,0,1985-11-01
我們需要編寫程序,將用戶的性別轉換為男、女(1-男,0-女),轉換后將數(shù)據(jù)寫入到topic 「dwd_user」中。要求使用事務保障,要么消費了數(shù)據(jù)同時寫入數(shù)據(jù)到 topic,提交offset。要么全部失敗。
# 創(chuàng)建名為ods_user和dwd_user的主題 bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic ods_user bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic dwd_user # 生產(chǎn)數(shù)據(jù)到 ods_user bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic ods_user # 從dwd_user消費數(shù)據(jù) bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic dwd_user --from-beginning --isolation-level read_committed
編寫一個方法 createConsumer,該方法中返回一個消費者,訂閱「ods_user」主題。注意:需要配置事務隔離級別、關閉自動提交。
實現(xiàn)步驟:
1. 創(chuàng)建Kafka消費者配置
Properties props = new Properties(); props.setProperty("bootstrap.servers", "node1.itcast.cn:9092"); props.setProperty("group.id", "ods_user"); props.put("isolation.level","read_committed"); props.setProperty("enable.auto.commit", "false"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
2. 創(chuàng)建消費者,并訂閱 ods_user 主題
//1.創(chuàng)建消費者 publicstaticConsumer<String,String>createConsumer(){ //1.創(chuàng)建Kafka消費者配置 Propertiesprops=newProperties(); props.setProperty("bootstrap.servers","node1.itcast.cn:9092"); props.setProperty("group.id","ods_user"); props.put("isolation.level","read_committed"); props.setProperty("enable.auto.commit","false"); props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); //2.創(chuàng)建Kafka消費者 KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props); //3.訂閱要消費的主題 consumer.subscribe(Arrays.asList("ods_user")); returnconsumer; }
編寫一個方法 createProducer,返回一個生產(chǎn)者對象。注意:需要配置事務的id,開啟了事務會默認開啟冪等性。
1. 創(chuàng)建生產(chǎn)者配置
Propertiesprops=newProperties(); props.put("bootstrap.servers","node1.itcast.cn:9092"); props.put("transactional.id","dwd_user"); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
2.創(chuàng)建生產(chǎn)者對象
publicstaticProducer < String, String > createProduceer() { //1.創(chuàng)建生產(chǎn)者配置 Propertiesprops = newProperties(); props.put("bootstrap.servers", "node1.itcast.cn:9092"); props.put("transactional.id", "dwd_user"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //2.創(chuàng)建生產(chǎn)者 Producer < String, String > producer = newKafkaProducer < > (props); returnproducer; }
實現(xiàn)步驟:
1. 調用之前實現(xiàn)的方法,創(chuàng)建消費者、生產(chǎn)者對象
2. 生產(chǎn)者調用initTransactions初始化事務
3. 編寫一個while死循環(huán),在while循環(huán)中不斷拉取數(shù)據(jù),進行處理后,再寫入到指定的topic
(1) 生產(chǎn)者開啟事務
(2) 消費者拉取消息
(3) 遍歷拉取到的消息,并進行預處理(將1轉換為男,0轉換為女)
(4) 生產(chǎn)消息到dwd_user topic中
(5) 提交偏移量到事務中
(6) 提交事務
(7) 捕獲異常,如果出現(xiàn)異常,則取消事務
publicstaticvoidmain(String[] args) { Consumer < String, String > consumer = createConsumer(); Producer < String, String > producer = createProducer(); //初始化事務 producer.initTransactions(); while (true) { try { //1.開啟事務 producer.beginTransaction(); //2.定義Map結構,用于保存分區(qū)對應的offset Map < TopicPartition, OffsetAndMetadata > offsetCommits = newHashMap < > (); //2.拉取消息 ConsumerRecords < String, String > records = consumer.poll(Duration.ofSeconds(2)); for (ConsumerRecord < String, String > record: records) { //3.保存偏移量 offsetCommits.put(newTopicPartition(record.topic(), record.partition()), newOffsetAndMetadata(record.offset() + 1)); //4.進行轉換處理 String[] fields = record.value().split(","); fields[1] = fields[1].equalsIgnoreCase("1") ? "男" : "女"; Stringmessage = fields[0] + "," + fields[1] + "," + fields[2]; //5.生產(chǎn)消息到dwd_user producer.send(newProducerRecord < > ("dwd_user", message)); } //6.提交偏移量到事務 producer.sendOffsetsToTransaction(offsetCommits, "ods_user"); //7.提交事務 producer.commitTransaction(); } catch (Exceptione) { //8.放棄事務 producer.abortTransaction(); } } }
往之前啟動的console-producer中寫入消息進行測試,同時檢查console-consumer是否能夠接收到消息:
逐個測試一下消息:
//3.保存偏移量 offsetCommits.put(newTopicPartition(record.topic(),record.partition()), newOffsetAndMetadata(record.offset()+1)); //4.進行轉換處理 String[]fields=record.value().split(","); fields[1]=fields[1].equalsIgnoreCase("1")?"男":"女"; Stringmessage=fields[0]+","+fields[1]+","+fields[2]; //模擬異常 inti=1/0; //5.生產(chǎn)消息到dwd_user producer.send(newProducerRecord<>("dwd_user",message));
啟動程序一次,拋出異常。
再啟動程序一次,還是拋出異常。
直到我們處理該異常為止。
我們發(fā)現(xiàn),可以消費到消息,但如果中間出現(xiàn)異常的話,offset是不會被提交的,除非消費、生產(chǎn)消息都成功,才會提交事務。