import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter
implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence,
Collector<Tuple2<String, Integer>> out)
throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
import logging
import sys
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
word_count_data = [
"Lorem ipsum dolor sit amet, consectetur adipiscing elit,",
" sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.",
" Eget est lorem ipsum dolor sit amet.",
" Venenatis lectus magna fringilla urna porttitor rhoncus dolor purus non.",
" Scelerisque felis imperdiet proin fermentum leo vel orci porta non.",
" A iaculis at erat pellentesque adipiscing commodo elit at.",
" Bibendum neque egestas congue quisque egestas.",
" Massa enim nec dui nunc. Tellus mauris a diam maecenas sed enim ut sem.",
" Mauris in aliquam sem fringilla ut morbi tincidunt augue interdum.",
" Eget sit amet tellus cras adipiscing enim. Amet porttitor eget dolor morbi non.",
" Lacus suspendisse faucibus interdum posuere lorem ipsum.",
" Diam phasellus vestibulum lorem sed risus ultricies.",
" Nulla facilisi nullam vehicula ipsum a arcu.",
" Diam in arcu cursus euismod quis. Tempor commodo ullamcorper a lacus vestibulum."]
def word_count():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
ds = env.from_collection(word_count_data)
def split(line):
yield from line.split()
ds = ds.flat_map(split) \
.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
.key_by(lambda i: i[0]) \
.reduce(lambda i, j: (i[0], i[1] + j[1]))
ds.print()
env.execute()
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
word_count()