Flink streaming application structure
You must understand the parts of application structure to build and develop a Flink streaming application. This design is implemented as the core logic with the execution environment, source, and sink.
- Application main class
- Data sources
- Processing operators
- Data sinks
The application main class defines the execution environment and creates the data pipeline. The data pipeline is the business logic of a Flink application where one or more operators are chained together. These processing operators apply transformations on the input data that comes from the data sources. After the transformation, the application forwards the transformed data to the data sinks.
StreamExecutionEnvironment
class is needed to create
DataStream
and to configure important job parameters for maintaining the
behavior of the application. The rest of the main class defines the application sources,
processing flow and the sinks followed by the execute()
call. The execute call
triggers the actual execution of the pipeline either locally or on the cluster. The
getExecutionEnvironment()
static call guarantees that the pipeline always uses
the correct environment based on the location it is executed on. When running from the IDE, a
local execution environment, and when running from the client for cluster submission, it returns
the YARN execution environment. The rest of the main class defines the application sources,
processing flow and the sinks followed by the execute()
call. The execute call
triggers the actual execution of the pipeline either locally or on the cluster.
public class KafkaToHDFSAvroJob { private static Logger LOG = LoggerFactory.getLogger(KafkaToHDFSAvroJob.class); public static void main(String[] args) throws Exception { ParameterTool params = Utils.parseArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); KafkaDeserializationSchema<Message> schema = ClouderaRegistryKafkaDeserializationSchema .builder(Message.class) .setConfig(Utils.readSchemaRegistryProperties(params)) .build(); FlinkKafkaConsumer<Message> consumer = new FlinkKafkaConsumer<Message>(params.getRequired(K_KAFKA_TOPIC), schema, Utils.readKafkaProperties(params)); DataStream<String> source = env.addSource(consumer) .name("Kafka Source") .uid("Kafka Source") .map(record -> record.getId() + "," + record.getName() + "," + record.getDescription()) .name("ToOutputString"); StreamingFileSink<String> sink = StreamingFileSink .forRowFormat(new Path(params.getRequired(K_HDFS_OUTPUT)), new SimpleStringEncoder<String>("UTF-8")) .build(); source.addSink(sink) .name("FS Sink") .uid("FS Sink"); source.print(); env.execute("Flink Streaming Secured Job Sample"); } }