Apache Flink is a powerful distributed stream processing engine used to build real-time, stateful applications in Java and Scala. But for developers building production-grade Flink applications, one recurring challenge stands out: testing.
Between time-based event processing, complex operator chains, and the distributed nature of Flink jobs, creating robust and maintainable tests can quickly become a bottleneck.
In this article, we walk through how to effectively test Apache Flink applications—unit and integration, and where tools like Diffblue Cover can help automate and accelerate your testing pipeline.
Apache Flink’s Testing Framework
Apache Flink offers a robust testing framework specifically designed for unit testing Flink applications. This framework allows developers to:
- Test individual operators and user-defined functions in isolation
- Simulate streaming data inputs and verify the outputs
- Validate state management and event time processing
The testing utilities are available through Maven dependencies like flink-test-utils
and flink-runtime
with the “tests
” classifier
.
Test Harnesses
Flink provides specialized test harnesses for different types of operators and functions:
OneInputStreamOperatorTestHarness
for testing operators onDataStream
sKeyedOneInputStreamOperatorTestHarness
for operators onKeyedStream
sTwoInputStreamOperatorTestHarness
for operators on connected streamsProcessFunctionTestHarnesses
for easier testing ofProcessFunction
s
These tools are indispensable—but they require manual effort and don’t help with testing your pure Java logic (e.g., MapFunctions or helper classes). That’s where Diffblue Cover can complement the workflow.
MiniClusterWithClientResource
The testing framework provides the MiniClusterWithClientResource
class for integration testing of complete Flink jobs. This class allows testing against a local, embedded Flink mini cluster, simulating a more realistic execution environment.
The lifecycle of this cluster can be managed manually, with a JUnit 4 rule or a JUnit Jupiter extension.
Benefits for Java Developers
- Isolation: Developers can test individual components of their Flink applications without dependencies on the full runtime environment.
- Comprehensive Testing: The framework enables testing of stateful operations, event time processing, and watermark handling, which are crucial aspects of stream processing applications.
- Ease of Use: Flink’s testing utilities integrate well with popular Java testing frameworks like JUnit.
- Realistic Simulations: Test harnesses allow developers to simulate complex stream processing scenarios, including out-of-order events and state management.
- Performance Optimization: By enabling thorough testing of individual components, developers can identify and resolve performance issues early in the development cycle.
- Confidence in Production: Comprehensive unit testing helps ensure that Flink applications behave as expected when deployed to production environments.
Apache Flink is widely known for its powerful real-time stream processing capabilities, making it ideal for large-scale data processing applications. Its event-driven processing model, fault tolerance, and ability to maintain state across distributed environments enable developers to handle complex data pipelines effectively.
Its architecture is based on a controller-worker model with a central JobManager that coordinates distributed TaskManagers for parallel execution. Flink’s programming model supports both streaming and batch data through the DataStream and DataSet APIs, enabling Java, Scala, and Python developers to apply transformations and manage state effectively.
For deployment, Flink can run in standalone mode or on resource managers like Kubernetes or YARN, and it integrates with major cloud platforms and the big data ecosystem. With rich connectors for systems like Kafka, Cassandra, and Elasticsearch, Flink is versatile for diverse data pipelines and real-time processing needs.
However, as Flink applications grow in complexity, testing becomes crucial to ensure data accuracy, fault tolerance, and overall system reliability. Whether working with stateless or stateful operators, or testing end-to-end pipelines, a solid testing strategy can save us from an outage production.
This article covers both unit testing and integration testing in Flink, offering simple yet practical examples to illustrate how you can test various components in your Flink applications.
Challenges of Testing Flink Applications
Testing Apache Flink applications is different from testing a typical microservice application. Focussing on a single microservice, testing primarily involves request-response verification, service logic validation, and database interactions. These tests are usually deterministic, ensuring the service handles API requests correctly, processes business logic, and interacts with databases as expected.
Since microservices are often stateless and synchronous, unit tests and integration tests are relatively straightforward, isolating components like controllers, services, or repositories.
In contrast, Flink applications are designed for continuous, real-time data stream processing, often involving stateful operations that track data across events.
The distributed, parallelized nature of Flink means that tests must account for distributed state management, parallel task execution, and fault tolerance through checkpointing.
Additionally, Flink’s time-dependent operators (e.g., event-time or processing-time triggers) introduce asynchronous behavior, making it harder to write deterministic tests. These complexities make testing Flink applications more challenging, requiring a specialized test framework to simulate real-world environments and ensure correctness across various failure scenarios.
Unit Testing Flink Applications
Unit Testing Stateless UDFs
In data processing, it’s common to encounter raw text embedded with HTML or XML tags, especially when extracting content from web pages, emails, or other markup-heavy sources.
In many cases, we don’t need the HTML tags themselves; rather, we’re interested in the plain text that lies within. A great example of this is cleaning up text for further analysis in a data pipeline.
Let’s take a look at a simple stateless User-Defined Function (UDF) in Apache Flink that removes HTML and XML tags. This UDF, RemoveHtmlTagsFunction
, uses a regular expression to strip all the tags, leaving behind the clean text.
Here’s how the UDF looks:
import org.apache.flink.api.common.functions.MapFunction;
public class RemoveHtmlTagsFunction implements MapFunction<String, String> {
@Override
public String map(String value) {
// Use regex to remove HTML or XML tags
return value.replaceAll("<[^>]*>", "").trim();
}
}
In this function, we use a regular expression to match anything between < and > and remove it from the string. After removing the tags, we also call trim()
to clean up any extra spaces that might be left behind.
As a next step, we now want to test and verify our implementation.
The beauty of stateless functions like this one is that they are extremely simple to test. Since the function doesn’t depend on any external state, databases, or network calls, we can test it by feeding in different inputs and checking the output.
All we need is our standard testing framework — in this case, JUnit 5 — to write assertions and validate the behavior of our function.
Here’s how a basic test class might look for RemoveHtmlTagsFunction
:
class RemoveHtmlTagsFunctionTest {
private final RemoveHtmlTagsFunction udf =
new RemoveHtmlTagsFunction();
@Test
void shouldRemoveTagsFromHtml() {
String input = "<p>Hello World</p>";
assertEquals("Hello World", udf.map(input));
}
@Test
void shouldRemoveNestedTagsFromHtml() {
String input = "<div><p>Flink is <strong>great</strong>!</p></div>";
String expectedOutput2 = "Flink is great!";
assertEquals(expectedOutput2, udf.map(input));
}
@Test
void shouldTrimCleanedHtml() {
String input = "<h1> Heading </h1>";
assertEquals("Heading", udf.map(input));
}
}
Testing a stateless function like this is straightforward. We provide an input, call the UDF, and verify the output using assertions. There is no need for complex test setups. If our function depends on another class, we can use a mocking framework like Mockito to mock the behavior of any dependent class of our function.
The function behaves deterministically, meaning it will always produce the same output given the same input. This makes it easy to write comprehensive tests that cover different scenarios, ensuring that the UDF works as expected in various cases.
Unit Testing Stateful UDFs
For our next unit test example, we move on to stateful UDFs and stay in the realm of data scraping and web analysis.
Consider a scenario where we need to analyze logs of HTML content to ensure that potentially harmful tags, such as <script>
tags, do not exceed a certain threshold. This is especially important for applications that scrape web pages or aggregate user-generated content. For this use case, we implement a stateful function using Apache Flink to detect surges in the frequency of specific HTML tags.
The following Java class demonstrates the implementation of a stateful function that tracks the frequency of <script>
tags in incoming HTML content. The function is designed to collect and monitor occurrences of the specified HTML tag and emit a surge detection message when the count exceeds a predefined threshold:
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class HtmlTagFrequencySurgeFunction
extends KeyedProcessFunction<String, String, String> {
private final int threshold;
private ValueState<Long> tagCountState;
public HtmlTagFrequencySurgeFunction(int threshold) {
this.threshold = threshold;
}
@Override
public void open(Configuration parameters) throws Exception {
tagCountState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("tagCount", Long.class, 0L));
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
long tagCount = value.split("<script>").length - 1;
System.out.println(tagCount +" "+ tagCountState.value());
Long currentCount = tagCountState.value();
currentCount += tagCount;
tagCountState.update(currentCount);
if (currentCount >= threshold) {
out.collect("Surge detected: " + currentCount + " occurrences of <script>");
}
out.collect(value);
}
}
The HtmlTagFrequencySurgeFunction
extends the KeyedProcessFunction
class, allowing it to maintain state across incoming records. The open method initializes a state variable tagCountState
, which keeps track of the count of <script>
tags processed.
The processElement
method updates this count with each incoming HTML string and emits a surge detection message if the count surpasses the specified threshold.
To ensure the reliability and correctness of our function, we need to test it. The testing process requires using a test harness that simulates the stream processing environment.
Below is an example of how to implement a unit test for the HtmlTagFrequencySurgeFunction
using Apache Flink’s testing utilities:
class HtmlTagFrequencySurgeFunctionTest {
@Test
void testSurgeDetection() throws Exception {
HtmlTagFrequencySurgeFunction surgeFunction = new HtmlTagFrequencySurgeFunction(3);
KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>
(new KeyedProcessOperator<>(surgeFunction), string -> string, TypeInformation.of(String.class));
testHarness.open();
String[] inputs = new String[]{
"<p>Hello World</p>",
"<script>console.log('test');</script>",
"<div><script>alert('Surge!');</script></div>",
"<script>Another script tag</script>",
"<script>Final script tag</script>"
};
for (String input : inputs) {
testHarness.processElement(new StreamRecord<>(input, 100));
}
Collection<StreamRecord<String>> output = testHarness.getRecordOutput();
assertTrue(output.stream().map(StreamRecord::getValue).anyMatch(
message -> message.contains("Surge detected: 4 occurrences of <script>")
));
testHarness.close();
}
}
In this test, we create an instance of the HtmlTagFrequencySurgeFunction
with a threshold of 3 for detecting <script>
tag occurrences. The test harness, KeyedOneInputStreamOperatorTestHarness
, is then set up to facilitate the testing of a keyed stream operator.
We simulate the processing of a series of HTML strings by calling processElement
on the test harness. After processing, we retrieve the output and verify that a surge detection message has been emitted when the number of <script>
tags exceeds the threshold.
To reuse the testing harness setup and keep our code maintainable, we can outsource the setup part to a JUnit 5 @BeforeEach
lifecycle method and the closing to @AfterEach
if our test class contains more than one test.
Unit Testing Time-Dependent UDFs
Staying in the context of web analysis, this time, we want to test an Apache Flink function that tracks and regularly emits the occurrence of HTML tags to identify the adoption of newer HTML standards and elements.
The following code implements the AverageTagOccurrenceRateFunction
, which captures and processes incoming HTML strings to calculate the average occurrence of <img>
tags over a specified time window.
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class AverageTagOccurrenceRateFunction extends KeyedProcessFunction<String, String, String> {
private final long windowSize;
private ValueState<Long> totalTagCountState;
private ValueState<Long> totalEventsState;
public AverageTagOccurrenceRateFunction(long windowSize) {
this.windowSize = windowSize;
}
@Override
public void open(Configuration parameters) throws Exception {
totalTagCountState = getRuntimeContext().getState(new ValueStateDescriptor<>("totalTagCount", Long.class, 0L));
totalEventsState = getRuntimeContext().getState(new ValueStateDescriptor<>("totalEvents", Long.class, 0L));
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
long tagCount = value.split("<img>").length - 1;
Long currentTagCount = totalTagCountState.value();
Long currentEvents = totalEventsState.value();
totalTagCountState.update(currentTagCount + tagCount);
totalEventsState.update(currentEvents + 1);
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + windowSize);
out.collect(value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
Long totalTagCount = totalTagCountState.value();
Long totalEvents = totalEventsState.value();
double averageRate = (totalEvents > 0) ? (double) totalTagCount / totalEvents : 0.0;
out.collect("Average occurrence rate of <img> tags in the last window: " + averageRate);
totalTagCountState.clear();
totalEventsState.clear();
}
}
In this implementation, the AverageTagOccurrenceRateFunction
tracks two states: the total count of <img>
tags encountered and the total number of events processed. Each time an HTML string is processed, the tag count is updated, and an event time timer is set to trigger after the specified window size.
When the timer fires, the function calculates and emits the average occurrence rate of <img>
tags during that period.
To ensure our function behaves correctly, we need to conduct thorough testing. Apart from the stateful nature of our function, we also depend on time to trigger code this time.
Similar to the previous stateful UDF, we utilize the KeyedOneInputStreamOperatorTestHarness
, which allows us to simulate event processing and validate the output of our function in a controlled environment:
class AverageTagOccurrenceRateFunctionTest {
@Test
void test() throws Exception {
long oneMinute = 60_000;
AverageTagOccurrenceRateFunction function = new AverageTagOccurrenceRateFunction(oneMinute);
KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(
new KeyedProcessOperator<>(function),
key -> key,
TypeInformation.of(String.class)
);
testHarness.open();
String[] inputs = new String[]{
"<p>Hello World</p>",
"<img src='image1.png' alt='Image 1'>",
"<div><img src='image2.jpg' alt='Image 2'></div>",
"<img src='image3.gif' alt='Image 3'><img src='image4.bmp' alt='Image 4'>",
"<img src='image5.png' alt='Image 5'>"
};
for (String input : inputs) {
testHarness.processElement(new StreamRecord<>(input, System.currentTimeMillis()));
}
testHarness.processWatermark(new Watermark(System.currentTimeMillis() + oneMinute));
Collection<StreamRecord<String>> output = testHarness.getRecordOutput();
assertTrue(output.stream().map(StreamRecord::getValue).anyMatch(
message -> message.contains("Average occurrence rate of <img> tags in the last window")
));
testHarness.close();
}
}
In this test, we create an instance of AverageTagOccurrenceRateFunction
with a one-minute window size. The test harness is set up to process a series of input strings containing varying occurrences of <img>
tags. After processing the inputs, we simulate the emission of a watermark, which triggers our function’s timer functionality.
The test then verifies that the expected output message about the average occurrence rate of <img>
tags is produced. By using the watermark, we effectively simulate the event time mechanics of our function without having to wait and slow down our entire test suite.
Integration Testing for Flink Applications
When working with Apache Flink, it’s essential to ensure that our data processing jobs behave as expected in real-world environments. This is where integration testing comes in handy.
In this section, we’ll explore how to write integration tests for Flink jobs using a MiniCluster to simulate Flink’s distributed behavior. This approach allows us to validate the entire job’s functionality in an environment similar to production without having to run a full Flink cluster.
Setting up the MiniCluster for Integration Testing
To begin, we include the necessary test dependency for using the MiniCluster
, which is part of Flink’s flink-test-utils
module:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>1.19.0</version>
<scope>test</scope>
</dependency>
This dependency provides the utilities needed to spin up a Flink MiniCluster, allowing us to run our integration tests against a small local cluster.
Example Streaming Job
In our test case, we’re working with an example Flink streaming job that processes HTML documents in the context of our web scraping analysis domain. The job consists of chained UDFs (user-defined functions) that we previously showcased.
We pass a collection of HTML strings as input, representing the documents our Flink job will process. These inputs could come from external sources like Kafka topics in a production setup, but for testing, we’ll use a predefined set of strings.
Here is the job setup:
public class StreamingJob {
private final Collection<String> dataSource;
private final Sink<String> sink;
public StreamingJob(Collection<String> dataSource, Sink<String> sink) {
this.dataSource = dataSource;
this.sink = sink;
}
public void execute() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stringListStream =
env.fromData(dataSource);
stringListStream
.keyBy(string -> string)
.process(new HtmlTagFrequencySurgeFunction(3))
.keyBy(string -> string)
.process(new AverageTagOccurrenceRateFunction(60_000))
.map(new RemoveHtmlTagsFunction())
.sinkTo(sink);
env.execute();
}
}
The processed data is then sent to the defined Sink<String>
(a customizable sink that could be anything from a file sink to a custom test sink for verification purposes).
Writing the Integration Test
For our integration test, we use JUnit 5 and the MiniClusterExtension
, which helps manage the lifecycle of the MiniCluster
. The test spawns a local Flink cluster to simulate the behavior of a production environment.
Here’s how the integration test is structured:
class StreamingJobIT {
@RegisterExtension
public static MiniClusterExtension flinkCluster =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder().build());
@Test
public void shouldProcessHtmlDocuments() throws Exception {
Collection<String> input = Arrays.asList("<p>Hello World</p>",
"<script>console.log('test');</script>",
"<div><script>alert('Surge!');</script></div>",
"<script>Another script tag</script>",
"<script>Final script tag</script>");
TestSink testSink = new TestSink();
StreamingJob job = new StreamingJob(input, testSink);
job.execute();
assertTrue(
testSink.getValues().stream().noneMatch(tag -> tag.contains("<"))
);
}
}
Key Components of the Integration Test
- MiniCluster Extension: The test uses
MiniClusterExtension
, provided by Flink’s test utilities, to create a small Flink cluster. The cluster lifecycle is managed by JUnit’s@RegisterExtension
annotation, ensuring that the cluster is started before the test and properly shut down afterward.
@RegisterExtension
public static MiniClusterExtension flinkCluster =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder().build());
- Test Input Data: We pass a collection of hardcoded HTML strings as the input to simulate real-world data ingestion. The
StreamingJob
constructor takes this data, and during the test, it acts as the source for the Flink pipeline.
Collection<String> input = Arrays.asList("<p>Hello World</p>",
"<script>console.log('test');</script>",
"<div><script>alert('Surge!');</script></div>",
"<script>Another script tag</script>",
"<script>Final script tag</script>");
- Test Sink: For testing purposes, we use a custom
TestSink
to collect the output of the Flink job. This allows us to inspect the results after the job has been executed. The test verifies that the final output contains no remaining HTML tags.
public class TestSink implements Sink<String> {
private static final List<String> values = Collections.synchronizedList(new ArrayList<>());
@Override
public SinkWriter<String> createWriter(Sink.InitContext context) {
return new CollectingSinkWriter(values);
}
public List<String> getValues() {
return values;
}
public void clear() {
values.clear();
}
private record CollectingSinkWriter(List<String> values) implements SinkWriter<String> {
@Override
public void write(String element, Context context) {
values.add(element);
}
@Override
public void flush(boolean b) {
}
@Override
public void close() {
}
}
}
- Assertions: After executing the job, we assert that none of the processed strings contain HTML tags by checking the output collected in the TestSink.
By running this test, we are simulating how Flink processes data in a distributed environment. The MiniCluster allows us to test the full data pipeline, including the UDFs we implemented earlier. This approach provides a higher confidence level that the job will behave correctly in production.
Bottom line
Apache Flink’s combination of stream and batch processing capabilities, in addition to its robust features for state management, fault tolerance, and scalability, make it a powerful tool for building real-time data processing applications across various industries.
Testing Apache Flink applications involves addressing challenges like stateful processing, distributed environments, and parallelism. With Flink’s testing harness and frameworks, we can effectively write unit tests for stateless, stateful, and time-dependent operators.
Integration tests using Flink’s MiniCluster
are essential to validate the full pipeline behavior, ensuring that your application behaves correctly in real-world scenarios.
By taking advantage of Flink’s testing capabilities, Java developers working with Apache Flink can create more reliable, maintainable, and performant stream processing applications while adhering to best practices in software testing. Naturally, Diffblue Cover supports Apache Flink projects.
Try Diffblue Cover Free
Want to see how many Flink UDFs Diffblue can test for you?
- Download Diffblue Cover
- Run
dcover create
on your Java classes - Review auto-generated, human-readable JUnit tests
No LLMs. No prompting. Just tests that work.