更新时间:2021-10-19 来源:黑马程序员 浏览量:
我们可以让Kafka根据消费组中的消费者动态地为topic分配要消费的分区。但在某些时候,我们需要指定要消费的分区,例如:
如果某个程序将某个指定分区的数据保存到外部存储中,例如:Redis、MySQL,那么保存数据的时候,只需要消费该指定的分区数据即可
如果某个程序是高可用的,在程序出现故障时将自动重启(例如:后面我们将学习的Flink、Spark程序)。这种情况下,程序将从指定的分区重新开始消费数据。
如何进行手动消费分区中的数据呢?
1. 不再使用之前的 subscribe 方法订阅主题,而使用 「assign」方法指定想要消费的消息
String topic = "test"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));
2. 一旦指定了分区,就可以就像前面的示例一样,在循环中调用「poll」方法消费消息
注意
1. 当手动管理消费分区时,即使GroupID是一样的,Kafka的组协调器都将不再起作用
2. 如果消费者失败,也将不再自动进行分区重新分配