Apache Kafka Consumer Lag Analysis in-depth intuition

Ғылым және технология

Kafka Consumer Lag indicates how much lag there is between Kafka producers and consumers. This is one the basic monitoring matrix for your Kafka Application.
Let's explore this topic with in-depth intuition.
Prerequisite:
--------------------------
Manual Offset Commits & Exactly-Once Once Processing in Kafka Consumer using Python
• Manual Offset Commits ...
Launch Kafka:
-------------------
F:/kafka_2.12-3.3.1/bin/windows/zookeeper-server-start.bat F:/kafka_2.12-3.3.1/config/zookeeper.properties
F:/kafka_2.12-3.3.1/bin/windows/kafka-server-start.bat F:/kafka_2.12-3.3.1/config/server.properties
F:/kafka_2.12-3.3.1/bin/windows/kafka-topics.bat --create --topic hello_world1 --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2
Producer Code:
-------------------------
from time import sleep
from json import dumps
from kafka import KafkaProducer
topic_name='hello_world1'
def custom_partitioner(key, all_partitions, available):
"""
Customer Kafka partitioner to get the partition corresponding to key
:param key: partitioning key
:param all_partitions: list of all partitions sorted by partition ID
:param available: list of available partitions in no particular order
:return: one of the values from all_partitions or available
"""
print("The key is : {}".format(key))
print("All partitions : {}".format(all_partitions))
print("After decoding of the key : {}".format(key.decode('UTF-8')))
return int(key.decode('UTF-8'))%len(all_partitions)
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'),
partitioner=custom_partitioner)
for e in range(1000):
data = {'number' : e}
print(data)
producer.send(topic_name, key=str(e).encode(),value=data)
sleep(0.4)
Consumer Code(Same code can be used to launch another consumer under the same consumer group):
-----------------------------------------------------------------------------------------------
from kafka import KafkaConsumer
from kafka import TopicPartition , OffsetAndMetadata
from time import sleep
import json
consumer = KafkaConsumer ('hello_world1',bootstrap_servers = ['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),group_id='demo112215sgtrjwrykvjh',auto_offset_reset='earliest',
enable_auto_commit =False)
for message in consumer:
print(message)
tp = TopicPartition(message.topic, message.partition)
om = OffsetAndMetadata(message.offset + 1, message.timestamp)
consumer.commit({tp: om})
sleep(0.8)
To get the information about Consumer Lag:
------------------------------------------------------------------------------
F:/kafka_2.12-3.3.1/bin/windows/kafka-consumer-groups.bat --bootstrap-server localhost:9092 --group demo112215sgtrjwrykvjh --describe
Check this playlist for more Data Engineering related videos:
• Demystifying Data Engi...
Apache Kafka form scratch
• Apache Kafka for Pytho...
Snowflake Complete Course from scratch with End-to-End Project with in-depth explanation--
doc.clickup.com/37466271/d/h/...
🙏🙏🙏🙏🙏🙏🙏🙏
YOU JUST NEED TO DO
3 THINGS to support my channel
LIKE
SHARE
&
SUBSCRIBE
TO MY KZread CHANNEL

Пікірлер: 16

  • @akashshelke5330
    @akashshelke533010 ай бұрын

    You are nailing it buddy. kafka expert.

  • @KnowledgeAmplifier1

    @KnowledgeAmplifier1

    9 ай бұрын

    Glad to hear that!

  • @desiCineMan
    @desiCineMan Жыл бұрын

    You're awsome brother. Salute.

  • @KnowledgeAmplifier1

    @KnowledgeAmplifier1

    Жыл бұрын

    I appreciate that 😊🙏Happy Learning

  • @arnabganguly4962
    @arnabganguly4962 Жыл бұрын

    Khub sundor explaination.

  • @KnowledgeAmplifier1

    @KnowledgeAmplifier1

    Жыл бұрын

    ধন্যবাদ 😄

  • @ssidhpura05
    @ssidhpura058 ай бұрын

    Superb explanation along with demo . 👏

  • @KnowledgeAmplifier1

    @KnowledgeAmplifier1

    8 ай бұрын

    Glad to hear this Sagar! Happy Learning

  • @chamathjayasekara5057
    @chamathjayasekara5057Ай бұрын

    How do I reduce the consumer lag?

  • @MrMadmaggot
    @MrMadmaggot Жыл бұрын

    I ran into an Issue, it says that the consumer group doesnt exist

  • @KnowledgeAmplifier1

    @KnowledgeAmplifier1

    Жыл бұрын

    Hello MAD MAGGOT, If you are facing an issue where the consumer group does not exist, it is possible that the consumer group name you specified is incorrect or does not exist. You can check the spelling and case sensitivity of the consumer group name and make sure it matches with the one used while creating the Kafka consumer group. If you are still facing the issue, it could be because the consumer group has been deleted or has expired due to inactivity. In that case, you may need to recreate the consumer group and start consuming messages again. It's also possible that the Kafka topic being consumed by the consumer group has been deleted or does not exist. You can check the existence of the topic and its spelling. If the topic has been deleted, you may need to recreate it or choose a different topic to consume.

  • @nikhilpachkawade7564
    @nikhilpachkawade7564 Жыл бұрын

    Awesome work thank u so much 😊

  • @KnowledgeAmplifier1

    @KnowledgeAmplifier1

    Жыл бұрын

    You're welcome! I'm glad I could help Nikhil Pachkawade! Happy Learning

  • @lisarascarpa3047
    @lisarascarpa3047 Жыл бұрын

    Thank you for your amazing work. Can I ask you some questions? Do you have an email?

  • @KnowledgeAmplifier1

    @KnowledgeAmplifier1

    Жыл бұрын

    Hello li sara scarpa, please post your doubt here , I will try to reply back asap if I have idea on that topic ..

  • @lisarascarpa3047

    @lisarascarpa3047

    Жыл бұрын

    m.kzread.info/dash/bejne/q5WmyappfJaoZps.html&pp=ygUXSG93IHRvIGZpbHRlciBvbiBtYXRsYWI%3D I was talking about this video. Is filtering an ode related to filter function on matlab?

Келесі