Source, operator and sink in DataStream API
A DataStream represents the data records and the operators. There are pre-implemented sources and sinks for Flink, and you can also use custom defined connectors to maintain the dataflow with other functions.
DataStream<String> source = env
.fromSource(
kafkaSource,
WatermarkStrategy.noWatermarks(),
"Kafka Source")
.uid("kafka-source")
.map(record -> record.getId()
+ "," + record.getName()
+ "," + record.getDescription())
.name("To Output String")
.uid("to-output-string");
FileSink<String> sink = FileSink
.forRowFormat(
new Path(params.getRequired(K_HDFS_OUTPUT)),
new SimpleStringEncoder<String>("UTF-8"))
.build();
source.sinkTo(sink)
.name("FS Sink")
.uid("fs-sink");
source.print();
source = env.from_source(source=kafka_source,
watermark_strategy=WatermarkStrategy.no_watermarks(),
source_name='Kafka Source')
source = source.map(lambda record:
record.get_id() + ',' +
record.get_name() + ',' +
record.get_description(),
output_type=Types.STRING())
source = source.name('To Output String').uid('to-output-string')
sink = FileSink.for_row_format(
base_path=output_path,
encoder=Encoder.simple_string_encoder())
source.sink_to(sink).name('FS Sink').uid('fs-sink')
source.print()
Choosing the sources and sinks depends on the purpose of the application. As Flink can be
implemented in any kind of an environment, various connectors are available. In most cases,
Kafka is used as a connector as it has streaming capabilities and can be easily integrated
with other services.
- Sources
- Sources are where your program reads its input from. You can attach a source to your
program by using
StreamExecutionEnvironment.addSource(sourceFunction)
. Flink comes with a number of pre-implemented source functions. For the list of sources, see the Apache Flink documentation. - Operators
- Operators transform one or more DataStreams into a new DataStream. When choosing the
operator, you need to decide what type of transformation you need on your data. The
following are some basic transformation:
- Map
- FlatMapTakes one element and produces zero, one, or more elements.
dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for (String word: value.split(" ")) { out.collect(word); } } });
data_stream.flat_map(lambda value: value.split(' '), output_type=Types.STRING())
- Filter
- KeyByLogically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. This transformation returns a
KeyedStream
- WindowWindows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (for example, the data that arrived within the last 5 seconds).
For the full list of operators, see the Apache Flink documentation.
- Sinks
- Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams. For the list of sources, see the Apache Flink documentation.