示例
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
User user = new User("assssss", "b", 2);
User user1 = new User("assssss", "b", 1);
User user2 = new User("assssss", "b", 3);
DataStreamSource<User> objectDataStreamSource = executionEnvironment.fromElements(user, user2, user1);
objectDataStreamSource.keyBy(new KeySelector<User, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(User value) throws Exception {
return new Tuple2<>(value.getUsername(), value.getCity());
}
}).countWindow(3).process(new ProcessWindowFunction<User, Tuple3<String, Long, Long>,
Tuple2<String, String>,
GlobalWindow>() {
@Override
public void process(Tuple2<String, String> stringStringTuple2, Context context,
Iterable<User> elements, Collector<Tuple3<String, Long, Long>> out) throws Exception {
Iterator<User> iterator = elements.iterator();
Long min = Long.MAX_VALUE;
Long max = Long.MIN_VALUE;
String username = "";
String city;
while (iterator.hasNext()) {
User user = iterator.next();
username = user.username;
Long a = Long.valueOf(user.age);
if (a < min) {
min = a;
}
if ((a > max)) {
max = a;
}
}
out.collect(new Tuple3<String, Long, Long>(username, min, max));
}
}).print();
executionEnvironment.execute("a");
|