Source, operator and sink in DataStream API

A DataStream represents the data records and the operators. There are pre-implemented sources and sinks for Flink, and you can also use custom defined connectors to maintain the dataflow with other functions.

DataStream<String> source = env
        .fromSource(
                kafkaSource,
                WatermarkStrategy.noWatermarks(),
                "Kafka Source")
        .uid("kafka-source")
        .map(record -> record.getId()
                + "," + record.getName()
                + "," + record.getDescription())
        .name("To Output String")
        .uid("to-output-string");

FileSink<String> sink = FileSink
        .forRowFormat(
                new Path(params.getRequired(K_HDFS_OUTPUT)),
                new SimpleStringEncoder<String>("UTF-8"))
        .build();

source.sinkTo(sink)
		.name("FS Sink")
		.uid("fs-sink");
source.print();
source = env.from_source(source=kafka_source,
                         watermark_strategy=WatermarkStrategy.no_watermarks(),
                         source_name='Kafka Source')
source = source.map(lambda record:
                    record.get_id() + ',' +
                    record.get_name() + ',' +
                    record.get_description(),
                    output_type=Types.STRING())
source = source.name('To Output String').uid('to-output-string')

sink = FileSink.for_row_format(
    base_path=output_path,
    encoder=Encoder.simple_string_encoder())

source.sink_to(sink).name('FS Sink').uid('fs-sink')
source.print()
Choosing the sources and sinks depends on the purpose of the application. As Flink can be implemented in any kind of an environment, various connectors are available. In most cases, Kafka is used as a connector as it has streaming capabilities and can be easily integrated with other services.
Sources
Sources are where your program reads its input from. You can attach a source to your program by using StreamExecutionEnvironment.addSource(sourceFunction). Flink comes with a number of pre-implemented source functions. For the list of sources, see the Apache Flink documentation.
Streaming Analytics in Cloudera supports the following sources:
  • HDFS
  • Kafka
Operators
Operators transform one or more DataStreams into a new DataStream. When choosing the operator, you need to decide what type of transformation you need on your data. The following are some basic transformation:
  • Map
    Takes one element and produces one element.
    dataStream.map(new MapFunction<Integer, Integer>() {
        @Override
        public Integer map(Integer value) throws Exception {
            return 2 * value;
        }
    });
    
    data_stream.map(lambda value: 2 * value, output_type=Types.INT())
  • FlatMap
    Takes one element and produces zero, one, or more elements.
    dataStream.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String value, Collector<String> out)
            throws Exception {
            for (String word: value.split(" ")) {
                out.collect(word);
            }
       }
    });
    
    data_stream.flat_map(lambda value: value.split(' '), output_type=Types.STRING())
  • Filter
    Evaluates a boolean function for each element and retains those for which the function returns true.
    dataStream.filter(new FilterFunction<Integer>() {
        @Override
        public boolean filter(Integer value) throws Exception {
            return value != 0;
        }
    });
    data_stream.filter(lambda value: value != 0)
  • KeyBy
    Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. This transformation returns a KeyedStream
    dataStream.keyBy(value -> value.getSomeKey());
    dataStream.keyBy(value -> value.f0);
    data_stream.key_by(lambda value: value.get_some_key())
    data_stream.key_by(lambda value: value[0], key_type=Types.STRING())
  • Window
    Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (for example, the data that arrived within the last 5 seconds).
    dataStream
        .keyBy(value -> value.f0)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)));
    data_stream.key_by(lambda value: value[0], key_type=Types.STRING()) \
               .window(TumblingEventTimeWindows.of(Time.minutes(1)))

For the full list of operators, see the Apache Flink documentation.

Sinks
Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams. For the list of sources, see the Apache Flink documentation.
Streaming Analytics in Cloudera supports the following sinks:
  • Kafka
  • HBase
  • Kudu
  • HDFS