更新時(shí)間:2021-10-19 來(lái)源:黑馬程序員 瀏覽量:
我們可以讓Kafka根據(jù)消費(fèi)組中的消費(fèi)者動(dòng)態(tài)地為topic分配要消費(fèi)的分區(qū)。但在某些時(shí)候,我們需要指定要消費(fèi)的分區(qū),例如:
如果某個(gè)程序?qū)⒛硞€(gè)指定分區(qū)的數(shù)據(jù)保存到外部存儲(chǔ)中,例如:Redis、MySQL,那么保存數(shù)據(jù)的時(shí)候,只需要消費(fèi)該指定的分區(qū)數(shù)據(jù)即可
如果某個(gè)程序是高可用的,在程序出現(xiàn)故障時(shí)將自動(dòng)重啟(例如:后面我們將學(xué)習(xí)的Flink、Spark程序)。這種情況下,程序?qū)闹付ǖ姆謪^(qū)重新開(kāi)始消費(fèi)數(shù)據(jù)。
如何進(jìn)行手動(dòng)消費(fèi)分區(qū)中的數(shù)據(jù)呢?
1. 不再使用之前的 subscribe 方法訂閱主題,而使用 「assign」方法指定想要消費(fèi)的消息
String topic = "test"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));
2. 一旦指定了分區(qū),就可以就像前面的示例一樣,在循環(huán)中調(diào)用「poll」方法消費(fèi)消息
注意
1. 當(dāng)手動(dòng)管理消費(fèi)分區(qū)時(shí),即使GroupID是一樣的,Kafka的組協(xié)調(diào)器都將不再起作用
2. 如果消費(fèi)者失敗,也將不再自動(dòng)進(jìn)行分區(qū)重新分配
IOC底層實(shí)現(xiàn)原理介紹,手動(dòng)實(shí)現(xiàn)IOC容器
怎樣能確保Kafka儲(chǔ)存的數(shù)據(jù)不丟失?
Kafka是什么?kafka有什么優(yōu)點(diǎn)?