Testing and validating Flink applications
After you have built your Flink streaming application, you can create a simple testing method to validate the correct behaviour of your application.
Pipelines can be extracted to static methods and can be easily tested with the JUnit framework.
A simple JUnit test can be written to verify the core application logic. The test is implemented in the test class and should be regarded as an integration test of the application flow.
The test mimics the application main class with only minor differences:
- Create the
StreamExecutionEnvironment
the same way. - Use the
env.fromElements(..)
method to pre-populate aDataStream
with some testing data. - Feed the testing data to the static data processing logic as before.
- Verify the correctness once the test is finished.
@Test public void testPipeline() throws Exception { final String alertMask = "42"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); HeapMetrics alert1 = testStats(0.42); HeapMetrics regular1 = testStats(0.452); HeapMetrics regular2 = testStats(0.245); HeapMetrics alert2 = testStats(0.9423); DataStreamSource<HeapMetrics> testInput = env.fromElements(alert1, alert2, regular1, regular2); HeapMonitorPipeline.computeHeapAlerts(testInput, ParameterTool.fromArgs(new String[]{"--alertMask", alertMask})) .addSink(new SinkFunction<HeapAlert>() { @Override public void invoke(HeapAlert value) { testOutput.add(value); } }) .setParallelism(1); env.execute(); assertEquals(Sets.newHashSet(HeapAlert.maskRatioMatch(alertMask, alert1), HeapAlert.maskRatioMatch(alertMask, alert2)), testOutput); } private HeapMetrics testStats(double ratio) { return new HeapMetrics(HeapMetrics.OLD_GEN, 0, 0, ratio, 0, "testhost"); } }