Application structure

To build and develop a Flink streaming application, the structure of the application needs to be designed, then implemented as a core logic with the execution environment, source, and sink.

A Flink application consists of the following structural parts:
  • Application main class
  • Data sources
  • Processing operators
  • Data sinks

The application main class defines the execution environment and creates the data pipeline. The data pipeline is the business logic of a Flink application where one or more operators are chained together. These processing operators apply transformations on the input data that comes from the data sources. After the transformation, the application forwards the transformed data to the data sinks.

StreamExecutionEnvironment class is needed to create DataStream and to configure important job parameters for maintaining the behavior of the application. The rest of the main class defines the application sources, processing flow and the sinks followed by the execute() call. The execute call triggers the actual execution of the pipeline either locally or on the cluster. The getExecutionEnvironment() static call guarantees that the pipeline always uses the correct environment based on the location it is executed on. When running from the IDE, a local execution environment, and when running from the client for cluster submission, it returns the YARN execution environment. The rest of the main class defines the application sources, processing flow and the sinks followed by the execute() call. The execute call triggers the actual execution of the pipeline either locally or on the cluster.

A DataStream represents the data records, and operators can be used to apply transformations that create new DataStreams. There are pre-implemented sources and sinks for Flink, and you can also use custom defined connectors to maintain the dataflow with other functions. Choosing the sources and sinks depends on the purpose of the application. As Flink can be implemented in any kind of an environment, various connectors are available. In most cases, Kafka is used as a connector as it has streaming capabilities and can be easily integrated with other services.

The following is an example of a Flink application logic:
public class KafkaToHDFSAvroJob {
	private static Logger LOG = LoggerFactory.getLogger(KafkaToHDFSAvroJob.class);
	public static void main(String[] args) throws Exception {
		ParameterTool params = Utils.parseArgs(args);

		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		KafkaDeserializationSchema<Message> schema = ClouderaRegistryKafkaDeserializationSchema
				.builder(Message.class)
				.setConfig(Utils.readSchemaRegistryProperties(params))
				.build();
		FlinkKafkaConsumer<Message> consumer = new FlinkKafkaConsumer<Message>(params.getRequired(K_KAFKA_TOPIC), schema, Utils.readKafkaProperties(params));

		DataStream<String> source = env.addSource(consumer)
				.name("Kafka Source")
				.uid("Kafka Source")
				.map(record -> record.getId() + "," + record.getName() + "," + record.getDescription())
				.name("ToOutputString");
		StreamingFileSink<String> sink = StreamingFileSink
				.forRowFormat(new Path(params.getRequired(K_HDFS_OUTPUT)), new SimpleStringEncoder<String>("UTF-8"))
				.build();
		source.addSink(sink)
				.name("FS Sink")
				.uid("FS Sink");
		source.print();

		env.execute("Flink Streaming Secured Job Sample");
	}
}