Flink application example

The following is an example of a Flink application about a streaming window word count that counts worfs from a web socket in five second windows.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
            .socketTextStream("localhost", 9999)
            .flatMap(new Splitter())
            .keyBy(value -> value.f0)
            .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
            .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter
        implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String sentence,
                            Collector<Tuple2<String, Integer>> out)
            throws Exception {
            
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}
import logging
import sys

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode


word_count_data = [
    "Lorem ipsum dolor sit amet, consectetur adipiscing elit,",
    " sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.",
    " Eget est lorem ipsum dolor sit amet.",
    " Venenatis lectus magna fringilla urna porttitor rhoncus dolor purus non.",
    " Scelerisque felis imperdiet proin fermentum leo vel orci porta non.",
    " A iaculis at erat pellentesque adipiscing commodo elit at.",
    " Bibendum neque egestas congue quisque egestas.",
    " Massa enim nec dui nunc. Tellus mauris a diam maecenas sed enim ut sem.",
    " Mauris in aliquam sem fringilla ut morbi tincidunt augue interdum.",
    " Eget sit amet tellus cras adipiscing enim. Amet porttitor eget dolor morbi non.",
    " Lacus suspendisse faucibus interdum posuere lorem ipsum.",
    " Diam phasellus vestibulum lorem sed risus ultricies.",
    " Nulla facilisi nullam vehicula ipsum a arcu.",
    " Diam in arcu cursus euismod quis. Tempor commodo ullamcorper a lacus vestibulum."]


def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)


    ds = env.from_collection(word_count_data)

    def split(line):
        yield from line.split()

    ds = ds.flat_map(split) \
        .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
        .key_by(lambda i: i[0]) \
        .reduce(lambda i, j: (i[0], i[1] + j[1]))

    ds.print()

    env.execute()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    word_count()