flink 教程 Window「建议收藏」

flink 教程 Window「建议收藏」WindowKeyedStream→WindowedStream可以在已分区的KeyedStreams上定义Windows。Windows根据某些特征(例如,最近5秒内到达的数据)对每个键中的数据进行分组。importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming

大家好,欢迎来到IT知识分享网。

在这里插入图片描述

Window

KeyedStream → WindowedStream

可以在已分区的 KeyedStreams 上定义 Windows。Windows 根据某些特征(例如,最近 5 秒内到达的数据)对每个键中的数据进行分组。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowExample {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum(1);

        dataStream.print();

        env.execute("WindowExample");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

IT知识分享网

要运行示例程序,首先从终端使用 netcat 启动输入流:

IT知识分享网nc -lk 9999

然后输入word回车

WindowAll

DataStream → AllWindowedStream

可以在常规 DataStreams 上定义 Windows。Windows 根据某些特征(例如,最近 5 秒内到达的数据)对所有流事件进行分组。

package quick;

import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import quick.source.DataSource;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.List;

public class WindowAllExample {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       
        DataStream<Tuple3<String, String, Integer>> inStream = env.addSource(new MyRichSourceFunction());
        DataStream<Tuple3<String, String, Integer>> dataStream = inStream
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                //注意:计算变量为f2
                .maxBy(2);
        dataStream.print();
        env.execute("WindowAllExample job");
    }

    /**
     * 模拟数据持续输出
     */
    public static class MyRichSourceFunction extends RichSourceFunction<Tuple3<String, String, Integer>> {
        @Override
        public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
            List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
            for (Tuple3 tuple3 : tuple3List){
                ctx.collect(tuple3);
                System.out.println("----"+tuple3);
                //1秒钟输出一个
                Thread.sleep(1 * 1000);
            }
        }
        @Override
        public void cancel() {
            try{
                super.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

定义数据

IT知识分享网package quick.source;

import org.apache.flink.api.java.tuple.Tuple3;
import java.util.Arrays;
import java.util.List;

/**
 * @Description 公共示例数据
 */
public class DataSource {

    /**
     * 示例数据集合
     * Tuple3 是一个固定3个属性变量的实体类,分别用f0,f1,f2表示三个构造传参与变量
     * @return
     */
    public static List<Tuple3<String,String,Integer>> getTuple3ToList(){
        //Tuple3<f0,f1,f2> = Tuple3<姓名,性别(man男,girl女),年龄>
        return Arrays.asList(
                new Tuple3<>("张三", "man", 20),
                new Tuple3<>("李四", "girl", 24),
                new Tuple3<>("王五", "man", 29),
                new Tuple3<>("刘六", "girl", 32),
                new Tuple3<>("伍七", "girl", 18),
                new Tuple3<>("吴八", "man", 30)
        );
    }

}

输出:

----(张三,man,20)
----(李四,girl,24)
----(王五,man,29)
----(刘六,girl,32)
----(伍七,girl,18)
(刘六,girl,32)
----(吴八,man,30)

说明:

max/min 操作 会根据用户指定的字段取最小值(而字段外的其他值 并不能保证正确) 而maxBy/minBy 指的是自己本身的这条数据。

源码地址:
在这里插入图片描述
在这里插入图片描述

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/6274.html

(0)
上一篇 2022-12-16 21:10
下一篇 2022-12-16 21:30

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信