更新時(shí)間:2023年10月20日10時(shí)32分 來源:傳智教育 瀏覽次數(shù):
在Apache Kafka中,消費(fèi)者(Consumers)和消費(fèi)者組(Consumer Groups)是核心概念,用于處理消息的訂閱和處理。接下來筆者將詳細(xì)解釋它們之間的關(guān)系,并提供一個(gè)簡(jiǎn)單的代碼示例來演示它們的用法。
消費(fèi)者是Kafka中的客戶端應(yīng)用程序,它負(fù)責(zé)訂閱主題并處理從主題中生產(chǎn)的消息。消費(fèi)者可以獨(dú)立訂閱一個(gè)或多個(gè)主題,并且可以以不同的速度處理消息。它們可以在不同的分區(qū)中并行地處理消息。
消費(fèi)者組是消費(fèi)者的邏輯集合,它們一起協(xié)作處理主題中的消息。每個(gè)消費(fèi)者組可以包含一個(gè)或多個(gè)消費(fèi)者。消費(fèi)者組的關(guān)鍵特性是它可以協(xié)調(diào)多個(gè)消費(fèi)者來消費(fèi)主題中的消息,確保每個(gè)分區(qū)的消息只被組內(nèi)的一個(gè)消費(fèi)者處理。這有助于實(shí)現(xiàn)負(fù)載均衡和提高容錯(cuò)性。
接下來我們看一個(gè)使用Java語(yǔ)言的Kafka消費(fèi)者和消費(fèi)者組示例:
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Properties; public class MyKafkaConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-broker-list"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName); Consumer<String, String> consumer = new KafkaConsumer<>(props); // 訂閱一個(gè)主題 consumer.subscribe(Collections.singletonList("my-topic")); while (true) { // 拉取消息 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { // 處理消息 System.out.printf("消費(fèi)者: key=%s, value=%s%n", record.key(), record.value()); } } } }
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; public class MyKafkaConsumerGroup { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-broker-list"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // 消費(fèi)者組名稱 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName); Consumer<String, String> consumer = new KafkaConsumer<>(props); // 訂閱一個(gè)主題 consumer.subscribe(Collections.singletonList("my-topic")); while (true) { // 拉取消息 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { // 處理消息 System.out.printf("消費(fèi)者組成員: key=%s, value=%s%n", record.key(), record.value()); } } } }
在上述示例中,兩個(gè)消費(fèi)者(可以是同一消費(fèi)者組的成員)訂閱了同一個(gè)主題,但消費(fèi)者組確保每個(gè)分區(qū)的消息只被一個(gè)消費(fèi)者處理,實(shí)現(xiàn)了負(fù)載均衡和高可用性。這是Kafka中消費(fèi)者和消費(fèi)者組的基本關(guān)系和用法。
北京校區(qū)