Testing Micronaut Kafka Listener using Testcontainers
Learn how to create a Micronaut application with a Kafka listener that persists data in MySQL, then test it using Testcontainers Kafka and MySQL modules with Awaitility.
In this guide, you'll learn how to:
- Create a Micronaut application with Kafka integration
- Implement a Kafka listener and persist data in a MySQL database
- Test the Kafka listener using Testcontainers and Awaitility
Prerequisites
- Java 17+
- Maven or Gradle
- A Docker environment supported by Testcontainers
NoteIf you're new to Testcontainers, visit the Testcontainers overview to learn more about Testcontainers and the benefits of using it.
Create the Micronaut project
Set up the project
Create a Micronaut project from Micronaut Launch by selecting the kafka, data-jpa, mysql, awaitility, assertj, and testcontainers features.
Alternatively, clone the guide repository.
You'll use the Awaitility library to assert the expectations of an asynchronous process flow.
The key dependencies in pom.xml are:
<parent>
<groupId>io.micronaut.platform</groupId>
<artifactId>micronaut-parent</artifactId>
<version>4.1.4</version>
</parent>
<dependencies>
<dependency>
<groupId>io.micronaut.data</groupId>
<artifactId>micronaut-data-hibernate-jpa</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.micronaut.kafka</groupId>
<artifactId>micronaut-kafka</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.micronaut.serde</groupId>
<artifactId>micronaut-serde-jackson</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.micronaut.sql</groupId>
<artifactId>micronaut-jdbc-hikari</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-mysql</artifactId>
<scope>test</scope>
</dependency>
</dependencies>The Micronaut parent POM manages the Testcontainers BOM, so you don't need to specify versions for Testcontainers modules individually.
Create the JPA entity
The application listens to a topic called product-price-changes. When a
message arrives, it extracts the product code and price from the event payload
and updates the price for that product in the MySQL database.
Create Product.java:
package com.testcontainers.demo;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import java.math.BigDecimal;
@Entity
@Table(name = "products")
public class Product {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String code;
@Column(nullable = false)
private String name;
@Column(nullable = false)
private BigDecimal price;
public Product() {}
public Product(Long id, String code, String name, BigDecimal price) {
this.id = id;
this.code = code;
this.name = name;
this.price = price;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public BigDecimal getPrice() {
return price;
}
public void setPrice(BigDecimal price) {
this.price = price;
}
}Create the Micronaut Data JPA repository
Create a repository interface for the Product entity with a method to find a
product by code and a method to update the price for a given product code:
package com.testcontainers.demo;
import io.micronaut.data.annotation.Query;
import io.micronaut.data.annotation.Repository;
import io.micronaut.data.jpa.repository.JpaRepository;
import java.math.BigDecimal;
import java.util.Optional;
@Repository
public interface ProductRepository extends JpaRepository<Product, Long> {
Optional<Product> findByCode(String code);
@Query("update Product p set p.price = :price where p.code = :productCode")
void updateProductPrice(String productCode, BigDecimal price);
}Unlike Spring Data JPA, Micronaut Data uses compile-time annotation processing to implement repository methods, avoiding runtime reflection.
Create the event payload
Create a record named ProductPriceChangedEvent that represents the structure
of the event payload received from the Kafka topic:
package com.testcontainers.demo;
import io.micronaut.serde.annotation.Serdeable;
import java.math.BigDecimal;
@Serdeable
public record ProductPriceChangedEvent(String productCode, BigDecimal price) {}The @Serdeable annotation tells Micronaut Serialization that this type can be
serialized and deserialized.
The sender and receiver agree on the following JSON format:
{
"productCode": "P100",
"price": 25.0
}Implement the Kafka listener
Create ProductPriceChangedEventHandler.java, which handles messages from the
product-price-changes topic and updates the product price in the database:
package com.testcontainers.demo;
import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import jakarta.inject.Singleton;
import jakarta.transaction.Transactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Singleton
@Transactional
class ProductPriceChangedEventHandler {
private static final Logger LOG = LoggerFactory.getLogger(ProductPriceChangedEventHandler.class);
private final ProductRepository productRepository;
ProductPriceChangedEventHandler(ProductRepository productRepository) {
this.productRepository = productRepository;
}
@Topic("product-price-changes")
@KafkaListener(offsetReset = EARLIEST, groupId = "demo")
public void handle(ProductPriceChangedEvent event) {
LOG.info("Received a ProductPriceChangedEvent with productCode:{}: ", event.productCode());
productRepository.updateProductPrice(event.productCode(), event.price());
}
}Key details:
- The
@KafkaListenerannotation marks this class as a Kafka message listener. SettingoffsetResettoEARLIESTmakes the listener start consuming messages from the beginning of the partition, which is useful during testing. - The
@Topicannotation specifies which topic to subscribe to. - Micronaut handles JSON deserialization of the
ProductPriceChangedEventautomatically using Micronaut Serialization.
Configure the datasource
Add the following properties to src/main/resources/application.properties:
micronaut.application.name=tc-guide-testing-micronaut-kafka-listener
datasources.default.db-type=mysql
datasources.default.dialect=MYSQL
jpa.default.properties.hibernate.hbm2ddl.auto=update
jpa.default.entity-scan.packages=com.testcontainers.demo
datasources.default.driver-class-name=com.mysql.cj.jdbc.DriverHibernate's hbm2ddl.auto=update creates and updates the database schema
automatically. For testing, you'll override this to create-drop in the test
properties file.
Create src/test/resources/application-test.properties:
jpa.default.properties.hibernate.hbm2ddl.auto=create-dropWrite tests with Testcontainers
To test the Kafka listener, you need a running Kafka broker and a MySQL
database, plus a started Micronaut application context. Testcontainers spins up
both services in Docker containers and the TestPropertyProvider interface
connects them to Micronaut.
Create a Kafka client for testing
First, create a @KafkaClient interface to publish events in the test:
package com.testcontainers.demo;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;
@KafkaClient
public interface ProductPriceChangesClient {
@Topic("product-price-changes")
void send(@KafkaKey String productCode, ProductPriceChangedEvent event);
}Key details:
- The
@KafkaClientannotation designates this interface as a Kafka producer. - The
@Topicannotation specifies the target topic. - The
@KafkaKeyannotation marks the parameter used as the Kafka message key. If no such parameter exists, Micronaut sends the record with a null key.
Write the test
Create ProductPriceChangedEventHandlerTest.java:
package com.testcontainers.demo;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import io.micronaut.context.annotation.Property;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.micronaut.test.support.TestPropertyProvider;
import java.math.BigDecimal;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
@MicronautTest(transactional = false)
@Property(name = "datasources.default.driver-class-name", value = "org.testcontainers.jdbc.ContainerDatabaseDriver")
@Property(name = "datasources.default.url", value = "jdbc:tc:mysql:8.0.32:///db")
@Testcontainers(disabledWithoutDocker = true)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ProductPriceChangedEventHandlerTest implements TestPropertyProvider {
@Container
static final ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.8.0");
@Override
public @NonNull Map<String, String> getProperties() {
if (!kafka.isRunning()) {
kafka.start();
}
return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());
}
@Test
void shouldHandleProductPriceChangedEvent(
ProductPriceChangesClient productPriceChangesClient, ProductRepository productRepository) {
Product product = new Product(null, "P100", "Product One", BigDecimal.TEN);
Long id = productRepository.save(product).getId();
ProductPriceChangedEvent event = new ProductPriceChangedEvent("P100", new BigDecimal("14.50"));
productPriceChangesClient.send(event.productCode(), event);
await().pollInterval(Duration.ofSeconds(3)).atMost(10, SECONDS).untilAsserted(() -> {
Optional<Product> optionalProduct = productRepository.findByCode("P100");
assertThat(optionalProduct).isPresent();
assertThat(optionalProduct.get().getCode()).isEqualTo("P100");
assertThat(optionalProduct.get().getPrice()).isEqualTo(new BigDecimal("14.50"));
});
productRepository.deleteById(id);
}
}Here's what the test does:
@MicronautTestinitializes the Micronaut application context and the embedded server. Settingtransactionaltofalseprevents each test method from running inside a rolled-back transaction, which is necessary because the Kafka listener processes messages in a separate thread.- The
@Propertyannotations override the datasource driver and URL to use the Testcontainers special JDBC URL (jdbc:tc:mysql:8.0.32:///db). This spins up a MySQL container and configures it as the datasource automatically. @Testcontainersand@Containermanage the Kafka container lifecycle. TheTestPropertyProviderinterface registers the Kafka bootstrap servers with Micronaut so that the producer and consumer connect to the test container.@TestInstance(TestInstance.Lifecycle.PER_CLASS)creates a single test instance for all test methods, which is required when implementingTestPropertyProvider.- The test creates a
Productrecord in the database, then sends aProductPriceChangedEventto theproduct-price-changestopic using theProductPriceChangesClient. - Because Kafka message processing is asynchronous, the test uses Awaitility to poll every 3 seconds (up to a maximum of 10 seconds) until the product price in the database matches the expected value.
Run tests and next steps
Run the tests
$ ./mvnw test
Or with Gradle:
$ ./gradlew test
You should see the Kafka and MySQL Docker containers start and all tests pass. After the tests finish, the containers stop and are removed automatically.
Summary
Testing with real Kafka and MySQL instances gives you more confidence in the correctness of your code than mocks or embedded alternatives. The Testcontainers library manages the container lifecycle so that your integration tests run against the same services you use in production.
To learn more about Testcontainers, visit the Testcontainers overview.