Kafka Integration Testing: Partition Assignment

Author:  Ken Stevens

Integration Testing with Kafka can be really challenging.  There are so many moving parts and so much concurrency that it’s easy to inadvertently introduce intermittent test failures into your test bed.

If…

  • you run Kafka Integration Tests in Continuous Integration,
  • your system creates Kafka consumers using a Kafka Listener Container, and
  • your tests use @EmbeddedKafka or EmbeddedKafkaRule

then this post is for you!

The Problem

We added Kafka Integration tests about a year ago and have been living with intermittent failures in these tests until a couple of weeks ago when we finally figured out the solution.

The intermittent failure scenarios look like this

Scenario I

  1. Thread A starts listening to a Kafka topic
  2. Thread B sends a message to the topic
  3. Thread A waits for the message to arrive
  4. Ugh: sometimes the test times out waiting for the message

Scenario II

  1. Thread B sends a message to a Kafka topic
  2. Thread A starts listening go the topic
  3. Thread A waits for the message to arrive
  4. Ugh: sometimes the test times out waiting for the message

What Doesn’t Work

The first thing I tried was eliminating the possibility that one test was interfering with another, so I “dirtied” the Application Context, shut down all listeners and destroyed all topics between tests.  Slow and cumbersome.  And it didn’t fix the problem.

The next thing I tried was Googling some specific warnings I saw in the logs concerning partition assignment.  I found a number of posts suggesting that this is a source of intermittent test failures:  When you start up your Kafka Consumer, there can be a time lag until the broker assigns a partition to your consumer.  If your consumer starts listening before a partition is assigned to it, the consumer will often not get the message, especially in Scenario II.

Some posts suggest resolving this partition assignment issue by adding a wait in your test until your consumer has a partition assigned.  So Scenario I now looks like the following.

Scenario Ia

  1. Thread A starts listening to a Kafka topic
  2. Thread A waits until its consumer has a partition assigned
  3. Thread B sends a message to the topic
  4. Thread A waits for the message to arrive

Our system dynamically creates Kafka consumers in response to asynchronous events, so setting up the infrastructure to do this properly was time-consuming and complicated.  The core of this code looked something like this:

 // Wait until a partition has been assigned to our Kafka consumer

if (myListenerContainer.getAssignedPartitions().size() > 0) {
  return;
{

CountDownLatch latch = new CountDownLatch(1);
ConsumerRebalanceListener latchListener = new LatchConsumerRebalanceListener(latch);
// LatchConsumerRebalanceListener calls latchListener.countdown() when onPartitionsAssigned() is called

myListenerContainer.setConsumerRebalanceListener(latchListener);

// waitTime is 2 * value of session.timeout.ms
return latch.await(waitTime, TimeUnit.MILLISECONDS) || myListenerContainer.getAssignedPartitions().size() > 0;

In order to get Kafka to reassign partitions quickly, you need to reduce your consumer config values to something like this:

session.timeout.ms: 600
heartbeat.interval.ms: 200
fetch.max.wait.ms: 600

And you need to reduce

group.min.session.timeout.ms 

and

 group.max.session.timeout.ms 

in your EmbeddedKafkaRule to align with these config values.

This helped a lot.  It reduced our intermittents from 50% to maybe 20%.  But 20% is still super annoying, especially when the Kafka integration tests aren’t run until 45 min into your build…  And when you’re having an unlucky day and it happens three times in a row…

What Does Work

The solution to the problem is actually quite simple.  You don’t need to wait for Kafka to assign a partition to your consumer.  You just assign the partition directly yourself!

In your production code, you will create your listener container like this:

ConcurrentMessageListenerContainer container = new
ContainerProperties(theTopicName); 

However, in your tests, you should configure all your topics to have only 1 partition and create your listener container like this:

ConcurrentMessageListenerContainer container =  new
ContainerProperties(new TopicPartitionOffset(theTopicName, 0));

You will also likely want to set the following to ensure your consumer gets the message in both Scenario I and Scenario II.

 auto.offset.reset: earliest 

Since we made this change, we have not had a single Kafka intermittent test failure!