Kafka Producer failure handling at multiple levels of Spring abstractions

This post is co-authored by swathi kurella.

Excessive abstractions by frameworks can sometimes be painful for developers to utilize the underlying functionalities of core libraries.


In this write out, you are going to take away how to elegantly handle publisher failures while publishing to Kafka using Spring abstractions.

At ThoughtWorks, we have built a data ingestion platform as a part of a project. We chose Kafka as a streaming platform, Spring messaging framework to ingest payloads using spring-cloud-stream’s Kafka binder.

The reader is assumed to have some basic knowledge of Spring framework, Kafka streaming platform, callbacks mechanism, asynchronous communication.

While publishing the payloads to Kafka, we figured out that there were few false positives related to acknowledgement status on which we relied to assure the payload is published successfully. The application logs were showing that pay loads had been published successfully in spite of few failures and we never had access to failed payloads.

To have some exception handling in place, we solely depended on the flag returned by send(…) method of Spring messaging to validate the status of payloads published to Kafka. Click here to view the source code of “send()”.

<script src=”https://gist.github.com/akhil-ghatiki/ed4d11cde874cef878e3a139a6e423f4.js"></script>

Everything was smooth until we noticed a mismatch in published and consumed payloads count in Splunk logs. On digging deeper, we realized that few payloads failed to get published in spite of the success return from above send method.

This method delegates the payload to the subscribed message handler which internally adds the message to a group. If this process succeeds, it returns success else an exception which means a failure. Learn more.

Can we now rely just on this status to check if the message got published to Kafka ?

The answer is NO. This method returns the success just on successful delegation of payload to handler but it helped us with a run time exception for non-recoverable errors.

The send method has abstracted the core implementation of the send method of KafkaTemplate in Spring Kafka which intern abstracted the send method of Producer in Apache Kafka. It was quite clear that there were three possibilities of using the send method. We observed that there were callbacks available at each inner layers i.,e in Spring Kafka and Apache Kafka and by now, we were sure that callbacks are going to be our rescuers.

Implementing Apache Kafka callback:

<script src=”https://gist.github.com/akhil-ghatiki/ad009f6b77b9003eb29023ade18a7c6e.js"></script>

This is the simplest implementation and its good enough to know that there is failure while producing the payload. It will not give the failed payload and impossible to investigate why it has failed. It just logs when ever there is a failure and one can get alerts from these logs using any logs analyzer tools.

This is best suited if the system is streaming behavioral data like clicks, logs, in-game player activity etc. from upstream systems as it is not a great loss in missing them for a moment. If it deals with data like inventory picture, air traffic data etc. which can potentially effect the near real time business, its big NO for this implementation as one cannot afford to loose investigating that data.

Implementing Spring Kafka Producer Listener:

<script src=”https://gist.github.com/akhil-ghatiki/9ca4d4aafaf7fd0455077041e8441c29.js"></script>
<script src=”https://gist.github.com/akhil-ghatiki/039a470fb8bd9a035a58b6ab5a15c51f.js"></script>

For the above reasons, we started looking more for another solution and we started looking into a higher level of abstraction. This implementation was decent enough to get the failure logs to get alerts along with the access to the failed payloads.

And we had one issue with this implementation. This demanded a lot of configuration in the existing code base as we have come a long way using the spring-messaging and spring cloud data stream frameworks. We cannot afford that now.

Implementing in Spring cloud data stream:

<script src=”https://gist.github.com/akhil-ghatiki/9577fed392d9aa18db7584d06fc35418.js"></script>
<script src=”https://gist.github.com/akhil-ghatiki/a8373ee67fc7c0c39a301168ed0ed1a4.js"></script>

We started looking into one more higher abstraction. We figured out that spring was using a producer callback. Thanks to Spring’s Auto-wiring by name. Creating the producer listener with the same name and overriding the implementation gave us what we wanted with minimal code changes.

Now the above created bean is injected directly to Kafka template as shown in below classes by the spring-cloud-stream-binder-kafka library

<script src=”https://gist.github.com/akhil-ghatiki/f25d7cb8a0e7b1b12b5fe196c864e4ac.js"></script>

See more for complete library source code

<script src=”https://gist.github.com/akhil-ghatiki/16701746970e83c90aa5b071f11eceb3.js"></script>

See more for complete library source code

It’s always helpful to be able to read framework code and be able to think like a framework developer.

Thanks to Unmesh Joshi for the above line and his inspiration to dive into framework source codes.

Developer at ThoughtWorks. Sometimes ENTP-T and sometimes ESTP-A not sure which one.Loves to talk about tech, code, data privacy, environment.