Flink streaming application structure
You must understand the parts of application structure to build and develop a Flink streaming application. This design is implemented as the core logic with the execution environment, source, and sink.
- Application main class
- Data sources
- Processing operators
- Data sinks
The application main class defines the execution environment and creates the data pipeline. The data pipeline is the business logic of a Flink application where one or more operators are chained together. These processing operators apply transformations on the input data that comes from the data sources. After the transformation, the application forwards the transformed data to the data sinks.
StreamExecutionEnvironment
class is needed to create
DataStream
and to configure important job parameters for maintaining the
behavior of the application. The rest of the main class defines the application sources,
processing flow and the sinks followed by the execute()
call. The execute call
triggers the actual execution of the pipeline either locally or on the cluster. The
getExecutionEnvironment()
static call guarantees that the pipeline always uses
the correct environment based on the location it is executed on. When running from the IDE, a
local execution environment, and when running from the client for cluster submission, it returns
the YARN execution environment. The rest of the main class defines the application sources,
processing flow and the sinks followed by the execute()
call. The execute call
triggers the actual execution of the pipeline either locally or on the cluster.
A DataStream
represents the data records, and operators can be used to apply
transformations that create new DataStreams
. There are pre-implemented sources
and sinks for Flink, and you can also use custom defined connectors to maintain the dataflow
with other functions. Choosing the sources and sinks depends on the purpose of the application.
As Flink can be implemented in any kind of an environment, various connectors are available. In
most cases, Kafka is used as a connector as it has streaming capabilities and can be easily
integrated with other services.
public class KafkaToHDFSAvroJob {
private static Logger LOG = LoggerFactory.getLogger(KafkaToHDFSAvroJob.class);
public static void main(String[] args) throws Exception {
ParameterTool params = Utils.parseArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaDeserializationSchema<Message> schema = ClouderaRegistryKafkaDeserializationSchema
.builder(Message.class)
.setConfig(Utils.readSchemaRegistryProperties(params))
.build();
FlinkKafkaConsumer<Message> consumer = new FlinkKafkaConsumer<Message>(params.getRequired(K_KAFKA_TOPIC), schema, Utils.readKafkaProperties(params));
DataStream<String> source = env.addSource(consumer)
.name("Kafka Source")
.uid("Kafka Source")
.map(record -> record.getId() + "," + record.getName() + "," + record.getDescription())
.name("ToOutputString");
StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(params.getRequired(K_HDFS_OUTPUT)), new SimpleStringEncoder<String>("UTF-8"))
.build();
source.addSink(sink)
.name("FS Sink")
.uid("FS Sink");
source.print();
env.execute("Flink Streaming Secured Job Sample");
}
}