Write tests with Testcontainers
Table of contents
To test the Kafka listener, you need a running Kafka broker and a MySQL
database, plus a started Micronaut application context. Testcontainers spins up
both services in Docker containers and the TestPropertyProvider interface
connects them to Micronaut.
Create a Kafka client for testing
First, create a @KafkaClient interface to publish events in the test:
package com.testcontainers.demo;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;
@KafkaClient
public interface ProductPriceChangesClient {
@Topic("product-price-changes")
void send(@KafkaKey String productCode, ProductPriceChangedEvent event);
}Key details:
- The
@KafkaClientannotation designates this interface as a Kafka producer. - The
@Topicannotation specifies the target topic. - The
@KafkaKeyannotation marks the parameter used as the Kafka message key. If no such parameter exists, Micronaut sends the record with a null key.
Write the test
Create ProductPriceChangedEventHandlerTest.java:
package com.testcontainers.demo;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import io.micronaut.context.annotation.Property;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.micronaut.test.support.TestPropertyProvider;
import java.math.BigDecimal;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
@MicronautTest(transactional = false)
@Property(name = "datasources.default.driver-class-name", value = "org.testcontainers.jdbc.ContainerDatabaseDriver")
@Property(name = "datasources.default.url", value = "jdbc:tc:mysql:8.0.32:///db")
@Testcontainers(disabledWithoutDocker = true)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ProductPriceChangedEventHandlerTest implements TestPropertyProvider {
@Container
static final ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.8.0");
@Override
public @NonNull Map<String, String> getProperties() {
if (!kafka.isRunning()) {
kafka.start();
}
return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());
}
@Test
void shouldHandleProductPriceChangedEvent(
ProductPriceChangesClient productPriceChangesClient, ProductRepository productRepository) {
Product product = new Product(null, "P100", "Product One", BigDecimal.TEN);
Long id = productRepository.save(product).getId();
ProductPriceChangedEvent event = new ProductPriceChangedEvent("P100", new BigDecimal("14.50"));
productPriceChangesClient.send(event.productCode(), event);
await().pollInterval(Duration.ofSeconds(3)).atMost(10, SECONDS).untilAsserted(() -> {
Optional<Product> optionalProduct = productRepository.findByCode("P100");
assertThat(optionalProduct).isPresent();
assertThat(optionalProduct.get().getCode()).isEqualTo("P100");
assertThat(optionalProduct.get().getPrice()).isEqualTo(new BigDecimal("14.50"));
});
productRepository.deleteById(id);
}
}Here's what the test does:
@MicronautTestinitializes the Micronaut application context and the embedded server. Settingtransactionaltofalseprevents each test method from running inside a rolled-back transaction, which is necessary because the Kafka listener processes messages in a separate thread.- The
@Propertyannotations override the datasource driver and URL to use the Testcontainers special JDBC URL (jdbc:tc:mysql:8.0.32:///db). This spins up a MySQL container and configures it as the datasource automatically. @Testcontainersand@Containermanage the Kafka container lifecycle. TheTestPropertyProviderinterface registers the Kafka bootstrap servers with Micronaut so that the producer and consumer connect to the test container.@TestInstance(TestInstance.Lifecycle.PER_CLASS)creates a single test instance for all test methods, which is required when implementingTestPropertyProvider.- The test creates a
Productrecord in the database, then sends aProductPriceChangedEventto theproduct-price-changestopic using theProductPriceChangesClient. - Because Kafka message processing is asynchronous, the test uses Awaitility to poll every 3 seconds (up to a maximum of 10 seconds) until the product price in the database matches the expected value.