Test and validation

Flink applications can be tested to validate the correctness of the application.

Pipelines can be extracted to static methods and can be easily tested with the JUnit framework.

A simple JUnit test can be written to verify the core application logic. The test is implemented in the test class and should be regarded as an integration test of the application flow.

The test mimics the application main class with only minor differences:
  1. Create the StreamExecutionEnvironment the same way.
  2. Use the env.fromElements(..) method to pre-populate a DataStream with some testing data.
  3. Feed the testing data to the static data processing logic as before.
  4. Verify the correctness once the test is finished.
    @Test
    public void testPipeline() throws Exception {

        final String alertMask = "42";

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        HeapMetrics alert1 = testStats(0.42);
        HeapMetrics regular1 = testStats(0.452);
        HeapMetrics regular2 = testStats(0.245);
        HeapMetrics alert2 = testStats(0.9423);

        DataStreamSource<HeapMetrics> testInput = env.fromElements(alert1, alert2, regular1, regular2);
        HeapMonitorPipeline.computeHeapAlerts(testInput, ParameterTool.fromArgs(new String[]{"--alertMask", alertMask}))
                .addSink(new SinkFunction<HeapAlert>() {
                    @Override
                    public void invoke(HeapAlert value) {
                        testOutput.add(value);
                    }
                })
                .setParallelism(1);

        env.execute();

        assertEquals(Sets.newHashSet(HeapAlert.maskRatioMatch(alertMask, alert1),
                HeapAlert.maskRatioMatch(alertMask, alert2)), testOutput);
    }

    private HeapMetrics testStats(double ratio) {
        return new HeapMetrics(HeapMetrics.OLD_GEN, 0, 0, ratio, 0, "testhost");
    }
}