Micronaut, Kafka and Testcontainers - test all the things with an example

Share on:

I have been using Micronaut for the past year. It is always refreshing to learn something new, deep-diving into Micronaut was a nice change. it's a solid framework, I think it has a future and will gain momentum, my guess is that through time will attract a lot of Spring oriented developers. Being an early adopter though always has challenges, and the pain of having to deal with bugs or missing documentation cases. There were moments in these past months, where I kind of missed the fact that I could just Google or StackOverflow my way out of a configuration issue, or a problem, relying on the vast resources and cases documented from people using Spring for example.

This was not the case with Micronaut where some times I end up reading the source code trying to figure if I missed a config here and there or trying to understand things don't work the way I expect them to work. After all these months especially working with functionality that requires Apache Kafka, I felt that it would be nice to share some tips and tricks to people that possibly now start to use this particular module in Micronaut. The more we write about our experiences the more we help newcomers use the technology.

So I hope this post will help you get away with some small config traps! If you find it helpful you can always buy me a coffee! Make note that my post, and the repo provided is there mostly to help wire things together and should be used as a template for writing tests. My test code is fairly simple the same applies for the code under test.

If you can't be bothered reading the post you can jump directly to the repo. I promise I will try over time to update it with the latest dependencies. At the moment the latest Micronaut Version is 2.1.0 and TestContainers is on version 1.15.rc2. Dependencies and their version is as I found a pain point, so sometimes you need to be very careful (tip for the future). At the end of the page you will find a section with useful references and links that you could have a look, I highly recommend you check them out.

Objectives

Provide a simple skeleton for a Micronaut Microservice that features :

  • Config and code for Kafka Producers/Consumers.
  • Generating and using Avro Messages and the relevant Kafka Producers/Consumers.
  • Simple Kafka Streams implementation and relevant configs.
  • Last but not least, provide the config and code for create a simple. Integration (like) test that is using the TestContainers framework. The test will make use of docker and spin a combo of Kafka and SchemaRegistry while it will test the above code.
  • This post does not involve any code around using Embedded Kafka.

Create a new project

I started by simply using the Launch Micronaut wizard, (Similar to start.spring.io). I selected version 2.1.0, and I added the following modules:

  • kafka
  • kafka-streams
  • test containers

_Along with the basic config that you will get with the generated project, you will need to add the following dependencies. I am providing a snippet below, but it's better you check the complete build.gradle

 1annotationProcessor "org.projectlombok:lombok:$lombokVersion" 
 2testAnnotationProcessor "org.projectlombok:lombok:$lombokVersion" 
 3compileOnly "org.projectlombok:lombok:$lombokVersion" 
 4testCompileOnly "org.projectlombok:lombok:$lombokVersion"
 5
 6//micronaut inject 
 7implementation("io.micronaut:micronaut-inject-java")
 8
 9//avro serdes 
10implementation
11"io.confluent:kafka-streams-avro-serde:${confluentVersion}"
12
13//avro 
14implementation "org.apache.avro:avro:$avroVersion"
15
16//test containers 
17testImplementation(platform("org.testcontainers:testcontainers-bom:$testContainersVersion")) 
18testImplementation "org.testcontainers:kafka" 
19testImplementation "org.testcontainers:junit-jupiter" 
20testImplementation "org.awaitility:awaitility:$awaitilityVersion"

How to compile Avro schemas and add them to your class-path.

Kind of a separate issue that applies to all gradle based java projects that want to add Avro schemas and messages into their classpath and use them is the following configuration.

Add your Avro schema definition, into to a relevant package inside your src folder.

In your _gradle.build add an entry for this plugin :

1id "com.commercehub.gradle.plugin.avro" version "0.21.0"

And make sure you add the relevant entry on the settings.gradle to pick the plugin.

 1pluginManagement { 
 2 repositories { 
 3 gradlePluginPortal() 
 4 jcenter() 
 5 maven { 
 6 name "JCenter Gradle Plugins" 
 7 url "https://dl.bintray.com/gradle/gradle-plugins" 
 8 } 
 9 mavenLocal() 
10 } 
11}

Finally, add this on your build.gradle to tune the _avro plugin. There are several other options that you can use, see the documentation here.

avro { fieldVisibility = "PRIVATE" }

You are done, next time you build a new class called MyEvent will be added to your path and can be used on your Kafka Consumers/Producers.

**Defining Kafka Consumers, Producers and Kafka Streams using Micronaut **

What you will find in the code is the following cases:

  • A Kafka Consumer & Producer that use the topic test-topic - the serialization is simple String for key and value of the messages in the topic.
  • A Kafka Consumer & Producer that use the topic test-avro-topic where it contains messages that use String as they key and our custom Avro based message (see above MyEvent) as the value.
  • A Kafka Stream definition, that reads (streams) messages from the test-topic, performs a small transformation + logging and emits new messages to an output topic called test-output-topic_ . The output topic does not contain avro based messages (similar to test-topic).

The most important thing when you do all the above on every Micronaut Kafka Project, is to look out for the properties. I think they could be documented a bit better or more examples (like this) should be available. I had a hard time in the first months, especially when dealing with the default behavior or having beans initialised for me automatically by the framework.

You will notice that when configuring the different consumers and producers (or streams) in the code, I group them by name. This name is actually the _bean name from Micronaut which automatically binds it to the related config section. For each bean I need to configure the ser/des. Since I have different beans with different options, I can not just use the defaults.

 1
 2kafka:
 3  bootstrap:
 4  servers: localhost:9092
 5  streams:
 6  default:
 7  client.id: ${random.uuid}
 8  application.id: "${micronaut.application.name}-stream"
 9  auto.offset.reset: "earliest"
10  default:
11    key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
12    value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
13    deserialization:
14      exception:
15      handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
16  producers:
17  simpleproducer:
18  key.serializer: org.apache.kafka.common.serialization.StringSerializer
19  value.serializer: org.apache.kafka.common.serialization.StringSerializer
20  avroproducer:
21  key.serializer: org.apache.kafka.common.serialization.StringSerializer
22  value.serializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer
23  consumers:
24  simpleconsumer:
25  key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
26  value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
27  streamconsumer:
28  key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
29  value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
30  avroconsumer:
31  key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
32  value.deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer

The above code makes sense if you look one of the Bean definitions in the code. For example the simple Producer.

1
2@KafkaClient("simpleproducer")
3public interface SimpleProducer {
4
5    @Topic(SIMPLE_TOPIC)
6    void sendMsg(@KafkaKey String key, String name);
7
8}

Notice, how the bean name "simpleproducer" is referenced above on the config section. The same applies for all the rest of the beans in the example. Also, the classes referenced as serializers and deserializers can be found on one of the dependencies above.

1implementation "io.confluent:kafka-streams-avro-serde:${confluentVersion}"

Integration Testing using test containers.

When it comes to testing a microservice that features mostly code that interacts with Kafka there are several options.

  • Attempt to mock the Kafka infrastructure (very difficult especially if your kafka code is wrapped with a framework like Spring or Micronaut, where there layers above the core Kafka Java libraries and your actual implementation).
  • Use the Embedded Kafka project, and spin kafka, and the related dependencies in your test (this is still very common in many projects, and it works ok to be honest).
  • Use TestContainers, which means you will need to have Docker running on your machine or build server, and your test now will rely on the fact that specific docker containers will be started prior to your actual test, and your code (service) will connect to a docker stack.

From the 3 above options in this post I will focus on last one, because I feel that it separates a bit the complexity (that you anyway have to deal with) of separating the infrastructure and your code. Also, you get to test your code, with something that is very close to the real thing!

What you will need in order to add the test containers capabilities and write this sort of Integration tests, is the following:

  • docker installed and running on your machine
  • The appropriate gradle dependencies
  • Some tips and init code - that will make Testcontainers / JUnit5, and a fully fledged @MicronautTest work all together

Dependencies

1testImplementation(platform("org.testcontainers:testcontainers-bom:$testContainersVersion"))
2testImplementation "org.testcontainers:kafka" testImplementation "org.testcontainers:junit-jupiter"

Glue code

Currently, there are 2 main challenges in order to get the integration test to work.

  • Create the appropriate TestContainers glue code that, will spin the required containers and have them running prior to your Micronaut service is ready to run. Remember that when you use @MicronautTest, the framework brings up your application context - is like spinning the real thing.
  • Make your @MicronautTest aware of the containers being available and make your code to dynamically read ports and paths of the containers, before starting! The lifecycle of a @MicronautTest and @TestContainers test + JUnit5 are not synced (at least for now) that may lead to some frustrating attempts.

After some research and tips, I concluded that it is simpler to have a BaseTest class with some static init code, that will bring up all the required containers. This BaseTest will be inherited from the main IT test (the actual @MicronautTest). The only thing you need to remember is that you need to make your @MicronautTest, aware of the containers, luckily there is a nice interface that you can implement - TestPropertyProvider . This is the place where you bridge the TestContainers world with your Micronaut Test and you bring over the ports and urls of the containers you are about to use.

Here is how the TestProperyProvider on TestContainersKafkaIT test looks like

1@NotNull
2@Override
3public Map<String, String\> getProperties() {
4 return Map.of("kafka.bootstrap.servers", "localhost:" \+
5 kafka.getFirstMappedPort(),
6 "kafka.schema.registry.url", getSchemaRegistryUrl()
7 );
8}

As you can see I am statically referencing the _containers which are already initialized on the BaseTestContainer class, and I get ports and urls. These are then used for me to populate my _application-test property context!

Hope that helps! I am happy to receive any improvements or fixes on the repo!

References & links: