Write tests with Testcontainers
Table of contents
To test the Kafka listener, you need a running Kafka broker and a MySQL
database, plus a started Spring context. Testcontainers spins up both services
in Docker containers and @DynamicPropertySource connects them to Spring.
Write the test
Create ProductPriceChangedEventHandlerTest.java:
package com.testcontainers.demo;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import java.math.BigDecimal;
import java.time.Duration;
import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.springframework.test.context.TestPropertySource;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
@SpringBootTest
@TestPropertySource(
properties = {
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.datasource.url=jdbc:tc:mysql:8.0.32:///db",
}
)
@Testcontainers
class ProductPriceChangedEventHandlerTest {
@Container
static final ConfluentKafkaContainer kafka =
new ConfluentKafkaContainer("confluentinc/cp-kafka:7.8.0");
@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
private ProductRepository productRepository;
@BeforeEach
void setUp() {
Product product = new Product(null, "P100", "Product One", BigDecimal.TEN);
productRepository.save(product);
}
@Test
void shouldHandleProductPriceChangedEvent() {
ProductPriceChangedEvent event = new ProductPriceChangedEvent(
"P100",
new BigDecimal("14.50")
);
kafkaTemplate.send("product-price-changes", event.productCode(), event);
await()
.pollInterval(Duration.ofSeconds(3))
.atMost(10, SECONDS)
.untilAsserted(() -> {
Optional<Product> optionalProduct = productRepository.findByCode(
"P100"
);
assertThat(optionalProduct).isPresent();
assertThat(optionalProduct.get().getCode()).isEqualTo("P100");
assertThat(optionalProduct.get().getPrice())
.isEqualTo(new BigDecimal("14.50"));
});
}
}Here's what the test does:
@SpringBootTeststarts the full Spring application context.- The Testcontainers special JDBC URL (
jdbc:tc:mysql:8.0.32:///db) in@TestPropertySourcespins up a MySQL container and configures it as the datasource automatically. @Testcontainersand@Containermanage the lifecycle of the Kafka container.@DynamicPropertySourceregisters the Kafka bootstrap servers with Spring so that the producer and consumer connect to the test container.@BeforeEachcreates aProductrecord in the database before each test.- The test sends a
ProductPriceChangedEventto theproduct-price-changestopic usingKafkaTemplate. Spring Boot converts the object to JSON usingJsonSerializer. - Because Kafka message processing is asynchronous, the test uses Awaitility to poll every 3 seconds (up to a maximum of 10 seconds) until the product price in the database matches the expected value.
- The property
spring.kafka.consumer.auto-offset-resetis set toearliestso that the listener consumes messages even if they're sent to the topic before the listener is ready. This setting is helpful when running tests.