admin 管理员组文章数量: 1087649
Apache Flink 培训系列
无状态的 Transformation
官网没有给出实战的代码,笔者为了大家可以练习,将会提供完整代码。
map()
在上面的实验 1 中,我们通过 GeoUtils.isInNYC 过滤出位于纽约市的出租车事件流。同样在 GeoUtils 类,还提供了另外一个静态方法 GeoUtils.mapToGridCell(float lon, float lat),该方法将位置(经度、纬度)映射到一个网格单元,该网格单元指的是大约 100x100 米的区域。
现在,通过向每个事件添加 startCell 和 endCell 字段来丰富我们的出租车乘车对象流。我们可以创建扩展了 TaxiRide 的 EnrichedRide 类,并添加以下字段:
public static class EnrichedRide extends TaxiRide {public int startCell;public int endCell;public EnrichedRide() {}public EnrichedRide(TaxiRide ride) {this.rideId = ride.rideId;this.isStart = ride.isStart;this.startTime = ride.startTime;this.endTime = ride.endTime;this.startLon = ride.startLon;this.startLat = ride.startLat;this.endLon = ride.endLon;this.endLat = ride.endLat;this.passengerCnt = ride.passengerCnt;this.taxiId = ride.taxiId;this.driverId = ride.driverId;// 添加 startCell 和 endCellthis.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);}public String toString() {return super.toString() + "," +Integer.toString(this.startCell) + "," +Integer.toString(this.endCell);}
}
然后我们创建 Enrichment 类,使用 map() 函数,将 TaxiRide 对象转换为 EnrichedRide,代码如下:
public class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {@Overridepublic EnrichedRide map(TaxiRide taxiRide) throws Exception {return new EnrichedRide(taxiRide);}
}
最后我们可以创建 EnrichedRideCleansingSolution 类,包含 main 方法,用来过滤事件流并转换为指定格式的事件流,完整代码如下:
public class EnrichedRideCleansingSolution extends ExerciseBase {public static void main(String[] args) throws Exception {ParameterTool params = ParameterTool.fromArgs(args);final String input = params.get("input", pathToRideData);final int maxEventDelay = 60; // events are out of order by max 60 secondsfinal int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second// set up streaming execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(ExerciseBase.parallelism);// start the data generatorDataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));DataStream<EnrichedRide> enrichedNYCRides = rides.filter(new RideCleansingSolution.NYCFilter()).map(new Enrichment());// print the filtered streamprintOrTest(enrichedNYCRides);// run the cleansing pipelineenv.execute("Taxi Ride Cleansing");}
}
运行代码,结果类似为:
...
2> 103,START,2013-01-0100:00:50,1970-01-0100:00:00,-73.98857,40.764046,-73.99101,40.760773,2,2013000103,2013000103,47043,47792
4> 48,START,2013-01-0100:00:00,1970-01-0100:00:00,-73.95538,40.779728,-73.96776,40.760326,1,2013000048,2013000048,44067,47808
4> 5,START,2013-01-0100:00:00,1970-01-0100:00:00,-74.00053,40.737343,-73.97723,40.783607,4,2013000005,2013000005,52535,43301
1> 24,START,2013-01-0100:00:00,1970-01-0100:00:00,-73.99858,40.755333,-73.994576,40.760796,1,2013000024,2013000024,48786,47789
4> 40,START,2013-01-0100:00:00,1970-01-0100:00:00,-73.98613,40.722607,-73.95306,40.771873,3,2013000040,2013000040,55295,45569
1> 39,START,2013-01-0100:00:00,1970-01-0100:00:00,-74.00465,40.721268,-73.95696,40.766376,5,2013000039,2013000039,55532,46566
3> 38,START,2013-01-0100:00:00,1970-01-0100:00:00,-74.00822,40.71672,-73.97234,40.753498,1,2013000038,2013000038,56529,49305
...
flatmap()
MapFunction 仅在执行一对一转换时才适用,对于进入流中的每个元素,map() 将发出一个转换后的元素。否则,我们要使用 flatmap()。
创建类 EnrichedRideFlatMapSolution ,使用 flatmap 操作:
public class EnrichedRideFlatMapSolution extends ExerciseBase {public static void main(String[] args) throws Exception {ParameterTool params = ParameterTool.fromArgs(args);final String input = params.get("input", pathToRideData);final int maxEventDelay = 60; // events are out of order by max 60 secondsfinal int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second// set up streaming execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(ExerciseBase.parallelism);// start the data generatorDataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));DataStream<EnrichedRide> enrichedNYCRides = rides.flatMap(new NYCEnrichment());// print the filtered streamenrichedNYCRides.print();// run the cleansing pipelineenv.execute("Taxi Ride flatmap operation");}
}
其中 NYCEnrichment 类如下:
public class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {@Overridepublic void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {FilterFunction<TaxiRide> valid = new RideCleansingSolution.NYCFilter();if (valid.filter(taxiRide)) {out.collect(new EnrichedRide(taxiRide));}}
}
运行 main 方法时,使用 Collector 时,flatmap() 方法可以发射任意数量的流元素,包括不包含任何元素。
Keyed Streams
keyBy()
能够根据流中的某个属性对流进行分区通常非常有用,以便将具有该属性的相同值的所有事件组合在一起进行计算。例如,我们想找到从每个网格单元开始的行驶时间最长的出租车。如果我们使用 SQL 查询,则会使用 startCell 执行某种 GROUP BY,而在 Flink 中,使用 keyBy(KeySelector) 完成。
rides.flatMap(newNYCEnrichment()).keyBy("startCell")
每个 keyBy 都会导致网络 shuffle,从而对流进行重分区 repartition。通常,这个操作非常消耗资源,因为它涉及网络通信以及序列化和反序列化。
在上面的示例中,key 为 startCell。这种类型的 key 具有一个缺点,即编译器无法推断用于 key 的字段的类型,因此 Flink 会将 key 的值作为 Tuple 传递。建议最好使用类型匹配的 KeySelector,例如:
rides.flatMap(new NYCEnrichment()).keyBy(new KeySelector<EnrichedRide, int>() {@Overridepublic int getKey(EnrichedRide ride) throws Exception {return ride.startCell;}})
可以使用 lambda 更简洁地表示:
rides.flatMap(new NYCEnrichment()).keyBy(ride -> ride.startCell)
对 Keyed Stream 进行聚合
这段代码创建了一个新的元组流,其中包含 startCell 和 duration(以分钟为单位):
DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides.flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {@Overridepublic void flatMap(EnrichedRide ride,Collector<Tuple2<Integer, Minutes>> out) throws Exception {// 出租车出行的结束事件if (!ride.isStart) {// 计算出租车出行的时间间隔Interval rideInterval = new Interval(ride.startTime, ride.endTime);Minutes duration = rideInterval.toDuration().toStandardMinutes();out.collect(new Tuple2<>(ride.startCell, duration));}}});
现在,我们可以生成一个流,其中仅包含每个 startCell 最长的乘车时间。
前面我们看到了一个 EnrichedRide POJO 的示例,我们通过使用其名称指定了要使用的字段。在下面介绍的场景下,我们使用 Tuple2 对象,并在元组中提供从0开始的索引。
minutesByStartCell.keyBy(0) // startCell.maxBy(1) // duration.print();
为了方便读者的理解,这里笔者写一个完整的示例,然后运行并查看结果。
public class EnrichedRideKeyedStreamsSolution extends ExerciseBase {public static void main(String[] args) throws Exception {ParameterTool params = ParameterTool.fromArgs(args);final String input = params.get("input", pathToRideData);final int maxEventDelay = 60; // events are out of order by max 60 secondsfinal int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second// set up streaming execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(ExerciseBase.parallelism);// start the data generatorDataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));DataStream<EnrichedRide> enrichedNYCRides = rides.filter(new RideCleansingSolution.NYCFilter()).map(new Enrichment());DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides.flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {@Overridepublic void flatMap(EnrichedRide ride, Collector<Tuple2<Integer, Minutes>> out) throws Exception {if (!ride.isStart) {Interval rideInterval = new Interval(ride.startTime, ride.endTime);Minutes duration = rideInterval.toDuration().toStandardMinutes();out.collect(new Tuple2<>(ride.startCell, duration));}}});minutesByStartCell.keyBy(0) // startCell.maxBy(1) // duration.print();// run the keyed streams pipelineenv.execute("Taxi Ride Keyed Streams");}
}
运行输出结果:
3> (50053,PT1M)
1> (54791,PT0M)
1> (46307,PT1M)
1> (55782,PT1M)
3> (48301,PT0M)
3> (43325,PT1M)
3> (54541,PT0M)
3> (71191,PT0M)
1> (55044,PT2M)
3> (51551,PT3M)
...
状态
上面的 keyBy 操作是我们看到的第一个有状态流的示例。虽然状态处理是透明的,但是 Flink 必须跟踪每个不同 key 的最大持续时间。
在 Flink 程序中涉及到状态时,需要考虑到状态所占用空间到大小。如果 key 的空间是无界的,那么 Flink 状态的空间也应是无界的。
在实际场景中,在处理流数据时,通常更有意义的做法是考虑有界窗口上的聚合,而不是整个流。
reduce() 和其他聚合算子
上面使用的 maxBy() 只是 Flink 的 KeyedStream 上可用的众多聚合操作中的一个算子,还有一个更通用的 reduce() 函数,可用于实现用户自定义聚合。
有状态 Transformation
为什么 Flink 参与状态管理
大家开发的应用程序当然可以自己使用和管理状态,而不需要让 Flink 参与。但是 Flink 为它管理的状态提供了一些出色的特性,解决用户的痛点:
本地化 Flink
状态保存在处理该状态的计算机本地,并且可以以内存速度访问。
持久化
Flink 状态是自动设置检查点并恢复 Flink 状态。
垂直可扩展
Flink 状态可以保留在嵌入式 RocksDB 实例中,该实例可以通过添加更多本地磁盘来扩展。
水平可伸缩
随着群集的增长和收缩,Flink 状态将重新分配。
可查询
可以通过 REST API 查询 Flink 状态。
在本课程中,读者将学习如何使用 Flink API 来管理 Keyed 状态。
Rich Functions
前面我们已经看到了 Flink 的几个功能接口,包括 FilterFunction、MapFunction 和 FlatMapFunction。
对于每个接口,Flink 还提供了一个 Rich 变体,例如 RichFilterFunction、RichMapFunction 和 RichFlatMapFunction等。笔者这里解释一下,有的读者称之为富函数,以 Rich 开头。
它们具有一些其他方法,包括:
open(Configuration c) 在初始化时,open() 被调用一次,用于加载一些静态数据或打开与外部服务的连接。
close() 通常用于清理和释放资源。
getRuntimeContext() getRuntimeContext() 方法提供了对函数 RuntimeContext 的访问。RuntimeContext 可用于查询函数的并行度、子任务索引和执行函数的任务的名称等信息。此外,它还包括访问分区状态的方法。
Keyed State 示例
在这个示例中,可以获取由 Tuple2 事件组成的传感器读数流,Tuple2 中的 String 类型是每个传感器 ID,Double 类型是传感器读到的数据,实验中使用 Socket 接收用户传输的数据。
public class SensorStreamKeyedStateSolution {public static void main(String[] args) throws Exception {// set up streaming execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(ExerciseBase.parallelism);// receive each sensor from a socket// nc -l 1000DataStream<String> text = env.socketTextStream("localhost", 8000, "\n", 10);// processing each sensor dataDataStream<Tuple2<String, Double>> input = text.map(new MapFunction<String, Tuple2<String, Double>>() {@Overridepublic Tuple2<String, Double> map(String sensordata) throws Exception {// each sensor format:// S000001,8.0// S000002,3.0// S000001,6.0// S000001,3.0// S000002,5.0// S000002,7.0String[] record = sensordata.split(",");if (record.length == 2) {return new Tuple2<>(record[0], Double.valueOf(record[1]));} else {System.err.println("异常数据: " + sensordata);// 返回异常数据return new Tuple2<>("E000000", 0.0);}}});DataStream<Tuple2<String, Double>> smoothed = input.keyBy(0).map(new Smoother());// print result// 4> (S000001,0.0)// 4> (S000002,0.0)// 4> (S000001,0.0)// 4> (S000001,4.5)// 4> (S000002,0.0)// 4> (S000002,6.0)smoothed.print();// submit the jobenv.execute("SensorStreamKeyedStateSolution Job");}
}
我们的目标是对来自每个传感器的数据进行平滑处理,我们将使用上面代码中称为 Smoother 的 RichMapFunction 进行处理,代码实现如下:
public class Smoother extends RichMapFunction<Tuple2<String, Double>, Tuple2<String, Double>> {private ValueState<MovingAverage> averageState;@Overridepublic void open (Configuration conf) {ValueStateDescriptor<MovingAverage> descriptor =new ValueStateDescriptor<>("moving average", MovingAverage.class);averageState = getRuntimeContext().getState(descriptor);}@Overridepublic Tuple2<String, Double> map (Tuple2<String, Double> item) throws Exception {// access the state for this keyMovingAverage average = averageState.value();// create a new MovingAverage (with window size 2) if none exists for this key// 窗口大小为 2if (average == null) average = new MovingAverage(2);// add this event to the moving averageaverage.add(item.f1);averageState.update(average);// return the smoothed resultreturn new Tuple2(item.f0, average.getAverage());}
}
Flink 支持几种不同类型的 keyed 状态。在此示例中,笔者使用最简单的一种,即ValueState,这意味着 Flink 将为每个 key 存储一个对象(在本例中,使用的是一个类型为 MovingAverage 的对象)。出于性能的考虑,Flink 还支持 ListState 和 MapState 。
为了完成本次实验,Smoother 记录每个传感器的最新传感器读数,使用 Flink 的 keyed state 接口来完成,Flink 将为所管理的每个状态项维护一个 key/value 存储。
MovingAverage 代码如下,实现的功能其实是滑动平均数:
class MovingAverage {int size;double sum;double average;LinkedList<Double> list;public MovingAverage(int size) {this.list = new LinkedList<>();this.size = size;}public double add(double value) {sum += value;// Adds the specified element as the tail (last element) of this list.list.offer(value);if (list.size() <= size) {return sum / list.size();}sum -= list.poll();return average = sum / size;}public double getAverage() {return average;}
}
其实上面的实验运行后,查看结果还是容易理解的,笔者这里再补充说明一下。
Smoother 中的 map 方法负责使用 MovingAverage 对象平滑处理每个事件。每个事件都会调用一次 map,该事件与一个特定的 key(即传感器 ID)相关联,并且在 ValueState 对象上的 averageState 方法对该传感器的 key 进行操作。换句话说,调用 averageState.value() 返回相应传感器当前 MovingAverage 对象,因此,当调用 average.add(item.f1) 时,我们将此事件添加到相同 key 的先前事件中(即同一传感器),因为之前的状态已经使用 ValueState 保存。
清除 State
其实上面的示例存在一个潜在的问题:如果 key 空间是无界的,将会发生什么呢?
Flink 为每个使用的不同 key 在某处存储一个 MovingAverage 实例。如果传感器事件数据有限,自然问题不大,但是如果传感器 key 集合无限制增加的情况下,存储空间毕竟也是有限的,所以需要清除不再需要的状态。做法比较简单,就是通过在状态对象上调用 clear() 来完成,如下所示:
averageState.clear()
另外,笔者将在事件驱动的应用程序课程中,掌握在 ProcessFunction 中使用 Timer 定时器来清除状态的操作。
Flink 1.6 版本之后还增加了 TTL 功能。不过这种方法的适用性有限,但是在某些情况下可以依靠它来清除不需要的状态。
相互连接的流
在线上的生产环境中,有时并不是简单地对单一的输入流做预处理转换操作,而是使用动态的数据流通过阈值、规则和其他参数对常规的数据流进行转换,在 Flink 中支持连接流,比如单个运算符具有两个输入流,如下所示:
连接流还可以用于实现流 join,后面有对应的实验课程。
示例
在此示例中,控制流用于从 streamOfWords 中过滤掉指定的单词。被 ControlFunction 调用的 RichCoFlatMapFunction 将应用于连接流以完成此操作。
先看一下 RichCoFlatMapFunction 抽象类:
package org.apache.flink.streaming.api.functions.co;
import org.apache.flink.annotation.Public;
import org.apache.flink.apimon.functions.AbstractRichFunction;
import org.apache.flink.apimon.functions.RichFunction;
/*** A RichCoFlatMapFunction represents a FlatMap transformation with two different input* types. In addition to that the user can use the features provided by the* {@link RichFunction} interface.** @param <IN1>* Type of the first input.* @param <IN2>* Type of the second input.* @param <OUT>* Output type.*/
@Public
public abstract class RichCoFlatMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implementsCoFlatMapFunction<IN1, IN2, OUT> {private static final long serialVersionUID = 1L;
}
RichCoFlatMapFunction 是 FlatMapFunction 的一种,可以应用于一对连接的流,并且可以访问 Rich 函数接口。
RichCoFlatMapFunction 实现了 CoFlatMapFunction 接口,实现两个连接流上的 FlatMap 转换操作。
/*** @param <IN1> Type of the first input.* @param <IN2> Type of the second input.* @param <OUT> Output type.*/
@Public
public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {/*** This method is called for each element in the first of the connected streams.*/void flatMap1(IN1 value, Collector<OUT> out) throws Exception;/*** This method is called for each element in the second of the connected streams.*/void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
}
接下来,我们根据示例来演示 Flink 连接流操作,为了便于读者查看,笔者直接在代码上添加注释。
import org.apache.flink.apimon.state.ValueState;
import org.apache.flink.apimon.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;
public class ConnectedStreamFlatMapSolution {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// control 会被 flatMap1 处理// 从 streamOfWords 流中过滤出不在 control 流中的单词DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);// streamOfWords 会被 flatMap2 处理// 因为 data 和 artisans 单词不在 control 流中,所以其状态在 flatMap1 中为 null,不为 TRUE,因此 streamOfWords 在调用 flatMap2 时满足 blocked.value() == null, 则会被输出DataStream<String> streamOfWords = env.fromElements("data", "DROP", "artisans", "IGNORE").keyBy(x -> x);// control 流连接 streamOfWords 流,两个流都是以单词做 keyBy,即 key 值为单词control.connect(streamOfWords).flatMap(new ControlFunction()).print();env.execute();}public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {// key 状态使用 Boolean 值保存,blocked 用于判断每个单词是否在 control 流中private ValueState<Boolean> blocked;@Overridepublic void open(Configuration config) {blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));}// control.connect(streamOfWords) 表明 control 流中的元素会被 flatMap1 处理,streamOfWords 流中的元素会被 flatMap2 处理@Overridepublic void flatMap1(String control_value, Collector<String> out) throws Exception {blocked.update(Boolean.TRUE);}// 对于不在 control 流中的元素,其状态不为 TRUE,即 blocked.value() == null,从而被 flatMap2 处理时,会被 out 输出@Overridepublic void flatMap2(String data_value, Collector<String> out) throws Exception {if (blocked.value() == null) {out.collect(data_value);}}}
}
准确输出输出结果为:
4> artisans
1> data
笔者这里为什么会说“准确”二字呢?输出结果果真会一直是这个吗?
读者可以多运行几遍代码,是不是会发现输出结果并非如此,还有其他输出:
1> data
4> artisans
1> DROP
4> IGNORE
或
1> data
1> DROP
4> artisans
或
4> artisans
4> IGNORE
1> data
Why? What happened?
可能细心的读者已经发现,Flink 运行时是无法控制 flatMap1 和 flatMap2 回调的调用顺序的,这两个输入流相互竞争,也就是说 control 和 streamOfWords 两个流的每个元素分别调用 flatMap1 和 flatMap2 的先后顺序不是确定的,就会产生上面的不同输出结果。
笔者拿其中的两个输出结果进行说明:
如果 streamOfWords 流的元素全部都优先调用 flatMap2 其实 flatMap1 根本就没有执行,ValueState 状态为 null,没有任何单词的状态。streamOfWords 流中的每个单词在调用 flatMap2 时,所有单词的 blocked.value() == null,因此输出 streamOfWords 流中的所有单词,即:
1> data
4> artisans
1> DROP
4> IGNORE
如果 flatMap1 只优先处理过 control 流中的 IGNORE 单词 其实有了上面的基础,这里的情况就很容易了解了。既然 control 流中的 IGNORE 单词优先处理,则其状态为 True,那么 streamOfWords 流中的其他单词调用 flatMap2 时状态值为 null,直接输出,即:
1> data
1> DROP
4> artisans
最后大家需要注意,Flink 运行时所连接的两个流必须以兼容的方式进行 keyed,即要么两个流都未被 keyed,要么两个流都被 keyed,并且如果两个都被 keyed,则 key 值必须相同。在本示例中,两个流都属于 DataStream 类型。另外,示例中的 RichCoFlatmap 在 keyed state 中存储一个 Boolean 值,并且该 Boolean 值由两个流共享。
提示:在时序或排序很重要的情况下,就非常有必要在管理的 Flink 状态中缓冲事件,直到应用程序准备好处理它们为止,这样可以避免出现不可预料的未知结果。
总结
笔者在本篇文章中,讲解了无状态和有状态数据流转换,包括 map、flatmap 和 RichMapFunction 等实战操作,以及使用 Keyed Streams 对传感器进行 keyBy 操作,最后还使用 connect 对两个流进行连接操作。
为了增加实战效果,笔者将传感器的案例丰富化:要求对于传感器的流式数据,每隔5秒钟计算平均温度,实验中考虑到使用 EventTime、Watermar、keyBy 以及 timeWindow。
全部实战代码见 Github 项目,运行的 main 方法位于 AverageSensorReadings 对象中。
本文标签: Apache Flink 培训系列
版权声明:本文标题:Apache Flink 培训系列 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.roclinux.cn/p/1700323512a396800.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论