• Post Reply Bookmark Topic Watch Topic
  • New Topic
programming forums Java Mobile Certification Databases Caching Books Engineering Micro Controllers OS Languages Paradigms IDEs Build Tools Frameworks Application Servers Open Source This Site Careers Other Pie Elite all forums
this forum made possible by our volunteer staff, including ...
Marshals:
  • Campbell Ritchie
  • Jeanne Boyarsky
  • Ron McLeod
  • Paul Clapham
  • Liutauras Vilda
Sheriffs:
  • paul wheaton
  • Rob Spoor
  • Devaka Cooray
Saloon Keepers:
  • Stephan van Hulst
  • Tim Holloway
  • Carey Brown
  • Frits Walraven
  • Tim Moores
Bartenders:
  • Mikalai Zaikin

Can console consumer be used to clear bad message in Kafka which consuming application gave error fo

 
Ranch Hand
Posts: 2934
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
I was consuming kaka messages from Kafka topic using a Streaming application. If I send messages from kafka producer, it gets consumed by the application. When I placed a bad message, the streaming application gave exception for it which is the expected behaviour. Now even when I sent good message, it still kept giving exception as it was trying to read the old bad message which could not be consumed properly. I tried to run Kafka consumer and thought it will clear this bad message from Kafka but it didn't work and I had to delete the Kafka topic and recreate it. Why can't this be done using Kafka consumer (to clear bad message stuck in Kafka topic. )
Thanks
 
Bartender
Posts: 2437
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
You can use @RetryableTopic for the Kafka listener to retry for a number of attempts. Once the max attempts are made, the message will be moved to another dead end topic.
Here is an example:
https://www.baeldung.com/spring-retry-kafka-consumer
 
Monica Shiralkar
Ranch Hand
Posts: 2934
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator

Himai Minh wrote:You can use @RetryableTopic for the Kafka listener to retry for a number of attempts. Once the max attempts are made, the message will be moved to another dead end topic.
Here is an example:
https://www.baeldung.com/spring-retry-kafka-consumer



Thanks. Understood that after max those many attempts it will be moved to Dead end topic and removed from this topic.

But, trying to understand that suppose one had not configured retry for those many attempts and message is stuck in the topic, why cannot it simply be removed by creating a console consumer ?
 
Himai Minh
Bartender
Posts: 2437
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
If I understand your question correctly, the consumer cannot continue to read after it encounters a bad message without the @RetryabeTopic
 
Monica Shiralkar
Ranch Hand
Posts: 2934
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator

Himai Minh wrote:If I understand your question correctly, the consumer cannot continue to read after it encounters a bad message without the @RetryabeTopic



The consumer application cannot continue to read after it encounters a bad message. I have a way for solving it. Delete the topic and recreate it. But why cant it be cleared using a console consumer? ( not talking about consumer application) ?
 
Himai Minh
Bartender
Posts: 2437
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
In Kafka, any message that has been published in a topic won't be deleted.
Reference:
https://stackoverflow.com/questions/65902121/removing-one-message-from-a-topic-in-kafka
 
Monica Shiralkar
Ranch Hand
Posts: 2934
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator

Himai Minh wrote:In Kafka, any message that has been published in a topic won't be deleted.
Reference:
https://stackoverflow.com/questions/65902121/removing-one-message-from-a-topic-in-kafka



Ok. Thanks. Understood that it is append only log.  My question is not to delete, but was thinking that like Consumer application cannot consume because of exception, but why no Console consumer, consume it and remove from the queue?
 
Himai Minh
Bartender
Posts: 2437
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
When a message cannot be consumed, the consumer cannot consume it and cannot continue. That may be a design of Kafka.
If the message has already published, then it cannot be removed. That is a design of Kafka as well.
 
Himai Minh
Bartender
Posts: 2437
13
  • Likes 1
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
One more note. Kafka is intended to publish real time data, such as stock prices, heartbeat rate, blood pressure... at every moment.
So, if some invalid data has been published , Kafka is not supposed to delete it.
 
Monica Shiralkar
Ranch Hand
Posts: 2934
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator

Himai Minh wrote:One more note. Kafka is intended to publish real time data, such as stock prices, heartbeat rate, blood pressure... at every moment.
So, if some invalid data has been published , Kafka is not supposed to delete it.



Thanks
 
Monica Shiralkar
Ranch Hand
Posts: 2934
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator

Himai Minh wrote:When a message cannot be consumed, the consumer cannot consume it and cannot continue. That may be a design of Kafka.
If the message has already published, then it cannot be removed. That is a design of Kafka as well.



Yes. I want to know for the below case:

Suppose a streaming application consumes from Kafka topic and parses the JSON. If instead of JSON, a bad JSON comes, the application will give an exception while parsing.

The consumer in this streaming application, cannot consume it and cannot continue (due to the exception while parsing).
But why cant Kafka console consumer, consume (and in this way remove) this bad JSON?
 
Himai Minh
Bartender
Posts: 2437
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Hi, Monica,
If the JSON is in a bad format, the consumer cannot deserialize it properly.
 
Monica Shiralkar
Ranch Hand
Posts: 2934
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
But deserialization will be by the Consumer application. I am talking about console consumer, which we start by using


Like from Kafka console we can start a consumer and anything published to this topic from anywhere will be seen on this consumer. I was thinking this does not mean it will also be deserializing the message. (Like how application code will do). Does even the console consumer (not the consumer application) also do deserialization?
 
Himai Minh
Bartender
Posts: 2437
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Yes. The console consumer deserializes your message in the topic. If the console producer produces a JSON with wrong format, the error message will show up from your application.
The console consumer cannot read the message.
 
Monica Shiralkar
Ranch Hand
Posts: 2934
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Thanks. Trying to understand. The work of console consumer is to simply consume. So, why does it deserialize?

The work of consumer in application can be to deserialize as required by the business logic, by creating pojo and say deserialize JSON to pojo object. But why does console consumer deserialize, thats what I am trying to understand.
 
Himai Minh
Bartender
Posts: 2437
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
You can see the input and output topics defined a customized serde (serialization/ deserialization) :
https://medium.com/@agvillamizar/implementing-custom-serdes-for-java-objects-using-json-serializer-and-deserializer-in-kafka-streams-d794b66e7c03

The consumer needs to use the deserialization mechanism to parse the JSON.
 
Monica Shiralkar
Ranch Hand
Posts: 2934
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Thanks. So does it mean that Kafka Console consumer first dynamically checks that what kind of message it is consuming, and if it is a string like "hello world", then don't do anything just consume as it is, and if a JSON, then parse it and throw error if it is not a proper JSON ?
 
Himai Minh
Bartender
Posts: 2437
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
The console consumer does the deserialization. If the message in the topic is not in a correct JSON format, it will not consume it. The app will throw an exception.
 
Monica Shiralkar
Ranch Hand
Posts: 2934
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
And it can be string also where no deserialization is required.
 
Himai Minh
Bartender
Posts: 2437
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
If the message is a string and the console consumer can usually deserialize it without issue.
If the message is in a wrong JSON format, the console consumer cannot deserialize it. The error is usually thrown in our app that sends the messages to the producer's topic.
 
Monica Shiralkar
Ranch Hand
Posts: 2934
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Thanks. I was thinking that deserialization is required only for JSON, XML. By deserialization what I understand is JSON to POJO object.
 
Himai Minh
Bartender
Posts: 2437
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
JSON and XML can be deserialized into POJO.
 
Monica Shiralkar
Ranch Hand
Posts: 2934
13
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator

Himai Minh wrote:JSON and XML can be deserialized into POJO.



Yes but that's in custom application code that we write. In console consumer where is the application to deserialize.
 
Himai Minh
Bartender
Posts: 2437
13
  • Likes 1
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Some developers write a customized consumer app to listen to the Kafka topic and deserialize the message.
Some developers just simply use a console consumer from Kafka to read and deserialize the message.
The serialization/deserialization code is defined in your application that acts "in between" the producer and consumer.
 
reply
    Bookmark Topic Watch Topic
  • New Topic