大家好,欢迎来到IT知识分享网。
Window
KeyedStream → WindowedStream
可以在已分区的 KeyedStreams 上定义 Windows。Windows 根据某些特征(例如,最近 5 秒内到达的数据)对每个键中的数据进行分组。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1);
dataStream.print();
env.execute("WindowExample");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
IT知识分享网
要运行示例程序,首先从终端使用 netcat 启动输入流:
IT知识分享网nc -lk 9999
然后输入word回车
WindowAll
DataStream → AllWindowedStream
可以在常规 DataStreams 上定义 Windows。Windows 根据某些特征(例如,最近 5 秒内到达的数据)对所有流事件进行分组。
package quick;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import quick.source.DataSource;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.List;
public class WindowAllExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple3<String, String, Integer>> inStream = env.addSource(new MyRichSourceFunction());
DataStream<Tuple3<String, String, Integer>> dataStream = inStream
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
//注意:计算变量为f2
.maxBy(2);
dataStream.print();
env.execute("WindowAllExample job");
}
/**
* 模拟数据持续输出
*/
public static class MyRichSourceFunction extends RichSourceFunction<Tuple3<String, String, Integer>> {
@Override
public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
for (Tuple3 tuple3 : tuple3List){
ctx.collect(tuple3);
System.out.println("----"+tuple3);
//1秒钟输出一个
Thread.sleep(1 * 1000);
}
}
@Override
public void cancel() {
try{
super.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
}
定义数据
IT知识分享网package quick.source;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.Arrays;
import java.util.List;
/**
* @Description 公共示例数据
*/
public class DataSource {
/**
* 示例数据集合
* Tuple3 是一个固定3个属性变量的实体类,分别用f0,f1,f2表示三个构造传参与变量
* @return
*/
public static List<Tuple3<String,String,Integer>> getTuple3ToList(){
//Tuple3<f0,f1,f2> = Tuple3<姓名,性别(man男,girl女),年龄>
return Arrays.asList(
new Tuple3<>("张三", "man", 20),
new Tuple3<>("李四", "girl", 24),
new Tuple3<>("王五", "man", 29),
new Tuple3<>("刘六", "girl", 32),
new Tuple3<>("伍七", "girl", 18),
new Tuple3<>("吴八", "man", 30)
);
}
}
输出:
----(张三,man,20)
----(李四,girl,24)
----(王五,man,29)
----(刘六,girl,32)
----(伍七,girl,18)
(刘六,girl,32)
----(吴八,man,30)
说明:
max/min 操作 会根据用户指定的字段取最小值(而字段外的其他值 并不能保证正确) 而maxBy/minBy 指的是自己本身的这条数据。
源码地址:
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/6274.html