Flink application example

The following is an example of a Flink application logic from the Secure Tutorial. The application is using Kafka as a source and writing the outputs to an HDFS sink.

public class KafkaToHDFSAvroJob {
	private static Logger LOG = LoggerFactory.getLogger(KafkaToHDFSAvroJob.class);
	public static void main(String[] args) throws Exception {
		ParameterTool params = Utils.parseArgs(args);

		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		KafkaDeserializationSchema<Message> schema = ClouderaRegistryKafkaDeserializationSchema
				.builder(Message.class)
				.setConfig(Utils.readSchemaRegistryProperties(params))
				.build();
		FlinkKafkaConsumer<Message> consumer = new FlinkKafkaConsumer<Message>(params.getRequired(K_KAFKA_TOPIC), schema, Utils.readKafkaProperties(params));

		DataStream<String> source = env.addSource(consumer)
				.name("Kafka Source")
				.uid("Kafka Source")
				.map(record -> record.getId() + "," + record.getName() + "," + record.getDescription())
				.name("ToOutputString");
		StreamingFileSink<String> sink = StreamingFileSink
				.forRowFormat(new Path(params.getRequired(K_HDFS_OUTPUT)), new SimpleStringEncoder<String>("UTF-8"))
				.build();
		source.addSink(sink)
				.name("FS Sink")
				.uid("FS Sink");
		source.print();

		env.execute("Flink Streaming Secured Job Sample");
	}
}