Testing and validating Flink applications

After you have built your Flink streaming application, you can create a simple testing method to validate the correct behaviour of your application.

Pipelines can be extracted to static methods and can be easily tested with the JUnit framework.

A simple JUnit test can be written to verify the core application logic. The test is implemented in the test class and should be regarded as an integration test of the application flow.

The test mimics the application main class with only minor differences:
  1. Create the StreamExecutionEnvironment the same way.
  2. Use the env.fromElements(..) method to pre-populate a DataStream with some testing data.
  3. Feed the testing data to the static data processing logic as before.
  4. Verify the correctness once the test is finished.
    @Test
    public void testPipeline() throws Exception {

        final String alertMask = "42";

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        HeapMetrics alert1 = testStats(0.42);
        HeapMetrics regular1 = testStats(0.452);
        HeapMetrics regular2 = testStats(0.245);
        HeapMetrics alert2 = testStats(0.9423);

        DataStreamSource<HeapMetrics> testInput = env.fromElements(alert1, alert2, regular1, regular2);
        HeapMonitorPipeline.computeHeapAlerts(testInput, ParameterTool.fromArgs(new String[]{"--alertMask", alertMask}))
                .addSink(new SinkFunction<HeapAlert>() {
                    @Override
                    public void invoke(HeapAlert value) {
                        testOutput.add(value);
                    }
                })
                .setParallelism(1);

        env.execute();

        assertEquals(Sets.newHashSet(HeapAlert.maskRatioMatch(alertMask, alert1),
                HeapAlert.maskRatioMatch(alertMask, alert2)), testOutput);
    }

    private HeapMetrics testStats(double ratio) {
        return new HeapMetrics(HeapMetrics.OLD_GEN, 0, 0, ratio, 0, "testhost");
    }
}