How to choose the No. of Partitions for a Kafka Topic? Horizontal Scaling for Kafka Consumer

Ғылым және технология

This video explains , how to choose the no. of partitions for a kafka topic?
Also the video explains , how to scale up Kafka Consumer horizontally by adding extra partitions & new consumer in Consumer Group.
Prerequisite:
-------------------
Consumer & Consumer Group in Kafka
kzread.info/dash/bejne/g4idwdGKmNungtY.html
Kafka Consumer Groups CLI Demo | Kafka-Python
kzread.info/dash/bejne/q3-uxtemj9m-Z84.html
Manual Offset Commits & Exactly-Once Once Processing in Kafka Consumer using Python
kzread.info/dash/bejne/ioaG27Fvkpy9oco.html
Start Kafka:
-------------------
F:/kafka_2.12-3.3.1/bin/windows/zookeeper-server-start.bat F:/kafka_2.12-3.3.1/config/zookeeper.properties
F:/kafka_2.12-3.3.1/bin/windows/kafka-server-start.bat F:/kafka_2.12-3.3.1/config/server.properties
F:/kafka_2.12-3.3.1/bin/windows/kafka-topics.bat --create --topic sensor_data_consumer --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
Producer Code:
------------------------
from time import sleep
from json import dumps
from kafka import KafkaProducer
def custom_partitioner(key, all_partitions, available):
"""
Customer Kafka partitioner to get the partition corresponding to key
:param key: partitioning key
:param all_partitions: list of all partitions sorted by partition ID
:param available: list of available partitions in no particular order
:return: one of the values from all_partitions or available
"""
print("The key is : {}".format(key))
print("All partitions : {}".format(all_partitions))
print("After decoding of the key : {}".format(key.decode('UTF-8')))
return int(key.decode('UTF-8'))%len(all_partitions)
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'),partitioner=custom_partitioner)
topic_name='sensor_data_consumer'
for e in range(0,100):
data={"number":e}
producer.send(topic_name, key=str(e).encode(), value=data)
sleep(10)
Consumer Code:
------------------
from kafka import KafkaConsumer
from kafka import TopicPartition , OffsetAndMetadata
import kafka
import json
class MyConsumerRebalanceListener(kafka.ConsumerRebalanceListener):
def on_partitions_revoked(self, revoked):
print("Partitions %s revoked" % revoked)
def on_partitions_assigned(self, assigned):
print("Partitions %s assigned" % assigned)
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='demo112215sgtrjwrykvjh', auto_offset_reset='earliest',
enable_auto_commit=False)
listener = MyConsumerRebalanceListener()
consumer.subscribe('sensor_data_consumer',listener=listener)
for message in consumer:
print(message)
print("The value is : {}".format(message.value))
tp=TopicPartition(message.topic,message.partition)
om = OffsetAndMetadata(message.offset+1, message.timestamp)
consumer.commit({tp:om})
print('*' * 100)
Code to change no. of Partitions in a Topic:
----------------------------------------------------------------
F:/kafka_2.12-3.3.1/bin/windows/kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic sensor_data_consumer --partitions 2
Check this playlist for more Data Engineering related videos:
kzread.info/head/PLjfRmoYoxpNopPjdACgS5XTfdjyBcuGku
Apache Kafka form scratch
kzread.info/head/PLjfRmoYoxpNrs0VmIq6mOTqXP52RfZdRf
Snowflake Complete Course from scratch with End-to-End Project with in-depth explanation--
doc.clickup.com/37466271/d/h/13qc4z-104/d4346819bd8d510
🙏🙏🙏🙏🙏🙏🙏🙏
YOU JUST NEED TO DO
3 THINGS to support my channel
LIKE
SHARE
&
SUBSCRIBE
TO MY KZread CHANNEL

Пікірлер: 9

  • @randomstuffsofdayandnight
    @randomstuffsofdayandnightАй бұрын

    Is there any default size(in kb/mb/gb) that a consumer can process per second in Kafka ?

  • @navaneethbusha5134
    @navaneethbusha5134 Жыл бұрын

    great knowledge sharing. If possible, please share your about, how to work over one producer to multiple consumer.

  • @KnowledgeAmplifier1

    @KnowledgeAmplifier1

    Жыл бұрын

    Hello Navaneeth Busha, Thank you for your kind words .. Regarding your request , you can refer below videos -- Multiple Producer & Multiple Consumer in a Kafka Topic --kzread.info/dash/bejne/Y3yimZatlJTcpZs.html Kafka Consumer Groups CLI Demo | Kafka-Python-- kzread.info/dash/bejne/q3-uxtemj9m-Z84.html Manual Offset Commits & Exactly-Once Once Processing in Kafka Consumer using Python--kzread.info/dash/bejne/ioaG27Fvkpy9oco.html These videos have some prerequisites which you can find in this playlist where all Kafka Concepts are explained in systematic order -- kzread.info/head/PLjfRmoYoxpNrs0VmIq6mOTqXP52RfZdRf Hope this will be helpful! Happy Learning

  • @akbarmunwar5435
    @akbarmunwar5435 Жыл бұрын

    Great,. Thanks

  • @KnowledgeAmplifier1

    @KnowledgeAmplifier1

    Жыл бұрын

    You are welcome Akbar Munwar! Happy Learning

  • @anthonya880
    @anthonya880 Жыл бұрын

    If possible plz make Kafka videos for Java devs also. We could also benefit from ur knowledge.

  • @KnowledgeAmplifier1

    @KnowledgeAmplifier1

    Жыл бұрын

    Hello Anthony A, actually, I am from Python background , you can get the theoretical understanding from my videos , and for practical implementation using Java, you can refer Stephane Maarek's lectures or Prashant Kumar Pandey's lectures ....

  • @ramagaur8413
    @ramagaur8413 Жыл бұрын

    Nice explanation, I have one question let's suppose we have 200 partition where producer is producing records but at consumer side we can't scale 200 consumers in a consumer group will in that case order of message be maintained?

  • @MrMadmaggot
    @MrMadmaggot4 ай бұрын

    One thing is important hre, shouldnt it be automatic?