admin 管理员组

文章数量: 1087649

Apache Flink 培训系列

无状态的 Transformation

官网没有给出实战的代码,笔者为了大家可以练习,将会提供完整代码。

map()

在上面的实验 1 中,我们通过 GeoUtils.isInNYC 过滤出位于纽约市的出租车事件流。同样在 GeoUtils 类,还提供了另外一个静态方法 GeoUtils.mapToGridCell(float lon, float lat),该方法将位置(经度、纬度)映射到一个网格单元,该网格单元指的是大约 100x100 米的区域。

现在,通过向每个事件添加 startCell 和 endCell 字段来丰富我们的出租车乘车对象流。我们可以创建扩展了 TaxiRide 的 EnrichedRide 类,并添加以下字段:

public static class EnrichedRide extends TaxiRide {public int startCell;public int endCell;public EnrichedRide() {}public EnrichedRide(TaxiRide ride) {this.rideId = ride.rideId;this.isStart = ride.isStart;this.startTime = ride.startTime;this.endTime = ride.endTime;this.startLon = ride.startLon;this.startLat = ride.startLat;this.endLon = ride.endLon;this.endLat = ride.endLat;this.passengerCnt = ride.passengerCnt;this.taxiId = ride.taxiId;this.driverId = ride.driverId;// 添加 startCell 和 endCellthis.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);}public String toString() {return super.toString() + "," +Integer.toString(this.startCell) + "," +Integer.toString(this.endCell);}
}

然后我们创建 Enrichment 类,使用 map() 函数,将 TaxiRide 对象转换为 EnrichedRide,代码如下:

public class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {@Overridepublic EnrichedRide map(TaxiRide taxiRide) throws Exception {return new EnrichedRide(taxiRide);}
}

最后我们可以创建 EnrichedRideCleansingSolution 类,包含 main 方法,用来过滤事件流并转换为指定格式的事件流,完整代码如下:

public class EnrichedRideCleansingSolution extends ExerciseBase {public static void main(String[] args) throws Exception {ParameterTool params = ParameterTool.fromArgs(args);final String input = params.get("input", pathToRideData);final int maxEventDelay = 60;       // events are out of order by max 60 secondsfinal int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second// set up streaming execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(ExerciseBase.parallelism);// start the data generatorDataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));DataStream<EnrichedRide> enrichedNYCRides = rides.filter(new RideCleansingSolution.NYCFilter()).map(new Enrichment());// print the filtered streamprintOrTest(enrichedNYCRides);// run the cleansing pipelineenv.execute("Taxi Ride Cleansing");}
}

运行代码,结果类似为: 

...
2> 103,START,2013-01-0100:00:50,1970-01-0100:00:00,-73.98857,40.764046,-73.99101,40.760773,2,2013000103,2013000103,47043,47792
4> 48,START,2013-01-0100:00:00,1970-01-0100:00:00,-73.95538,40.779728,-73.96776,40.760326,1,2013000048,2013000048,44067,47808
4> 5,START,2013-01-0100:00:00,1970-01-0100:00:00,-74.00053,40.737343,-73.97723,40.783607,4,2013000005,2013000005,52535,43301
1> 24,START,2013-01-0100:00:00,1970-01-0100:00:00,-73.99858,40.755333,-73.994576,40.760796,1,2013000024,2013000024,48786,47789
4> 40,START,2013-01-0100:00:00,1970-01-0100:00:00,-73.98613,40.722607,-73.95306,40.771873,3,2013000040,2013000040,55295,45569
1> 39,START,2013-01-0100:00:00,1970-01-0100:00:00,-74.00465,40.721268,-73.95696,40.766376,5,2013000039,2013000039,55532,46566
3> 38,START,2013-01-0100:00:00,1970-01-0100:00:00,-74.00822,40.71672,-73.97234,40.753498,1,2013000038,2013000038,56529,49305
...

flatmap()

MapFunction 仅在执行一对一转换时才适用,对于进入流中的每个元素,map() 将发出一个转换后的元素。否则,我们要使用 flatmap()。

创建类 EnrichedRideFlatMapSolution ,使用 flatmap 操作:

public class EnrichedRideFlatMapSolution extends ExerciseBase {public static void main(String[] args) throws Exception {ParameterTool params = ParameterTool.fromArgs(args);final String input = params.get("input", pathToRideData);final int maxEventDelay = 60;       // events are out of order by max 60 secondsfinal int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second// set up streaming execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(ExerciseBase.parallelism);// start the data generatorDataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));DataStream<EnrichedRide> enrichedNYCRides = rides.flatMap(new NYCEnrichment());// print the filtered streamenrichedNYCRides.print();// run the cleansing pipelineenv.execute("Taxi Ride flatmap operation");}
}

其中 NYCEnrichment 类如下:

public class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {@Overridepublic void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {FilterFunction<TaxiRide> valid = new RideCleansingSolution.NYCFilter();if (valid.filter(taxiRide)) {out.collect(new EnrichedRide(taxiRide));}}
}

运行 main 方法时,使用 Collector 时,flatmap() 方法可以发射任意数量的流元素,包括不包含任何元素。

Keyed Streams

keyBy()

能够根据流中的某个属性对流进行分区通常非常有用,以便将具有该属性的相同值的所有事件组合在一起进行计算。例如,我们想找到从每个网格单元开始的行驶时间最长的出租车。如果我们使用 SQL 查询,则会使用 startCell 执行某种 GROUP BY,而在 Flink 中,使用 keyBy(KeySelector) 完成。

rides.flatMap(newNYCEnrichment()).keyBy("startCell")

每个 keyBy 都会导致网络 shuffle,从而对流进行重分区 repartition。通常,这个操作非常消耗资源,因为它涉及网络通信以及序列化和反序列化。 

在上面的示例中,key 为 startCell。这种类型的 key 具有一个缺点,即编译器无法推断用于 key 的字段的类型,因此 Flink 会将 key 的值作为 Tuple 传递。建议最好使用类型匹配的 KeySelector,例如:

rides.flatMap(new NYCEnrichment()).keyBy(new KeySelector<EnrichedRide, int>() {@Overridepublic int getKey(EnrichedRide ride) throws Exception {return ride.startCell;}})

可以使用 lambda 更简洁地表示:

rides.flatMap(new NYCEnrichment()).keyBy(ride -> ride.startCell)

对 Keyed Stream 进行聚合

这段代码创建了一个新的元组流,其中包含 startCell 和 duration(以分钟为单位):

DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides.flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {@Overridepublic void flatMap(EnrichedRide ride,Collector<Tuple2<Integer, Minutes>> out) throws Exception {// 出租车出行的结束事件if (!ride.isStart) {// 计算出租车出行的时间间隔Interval rideInterval = new Interval(ride.startTime, ride.endTime);Minutes duration = rideInterval.toDuration().toStandardMinutes();out.collect(new Tuple2<>(ride.startCell, duration));}}});

现在,我们可以生成一个流,其中仅包含每个 startCell 最长的乘车时间。

前面我们看到了一个 EnrichedRide POJO 的示例,我们通过使用其名称指定了要使用的字段。在下面介绍的场景下,我们使用 Tuple2 对象,并在元组中提供从0开始的索引。

minutesByStartCell.keyBy(0) // startCell.maxBy(1) // duration.print();

为了方便读者的理解,这里笔者写一个完整的示例,然后运行并查看结果。

public class EnrichedRideKeyedStreamsSolution extends ExerciseBase {public static void main(String[] args) throws Exception {ParameterTool params = ParameterTool.fromArgs(args);final String input = params.get("input", pathToRideData);final int maxEventDelay = 60;       // events are out of order by max 60 secondsfinal int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second// set up streaming execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(ExerciseBase.parallelism);// start the data generatorDataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));DataStream<EnrichedRide> enrichedNYCRides = rides.filter(new RideCleansingSolution.NYCFilter()).map(new Enrichment());DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides.flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {@Overridepublic void flatMap(EnrichedRide ride, Collector<Tuple2<Integer, Minutes>> out) throws Exception {if (!ride.isStart) {Interval rideInterval = new Interval(ride.startTime, ride.endTime);Minutes duration = rideInterval.toDuration().toStandardMinutes();out.collect(new Tuple2<>(ride.startCell, duration));}}});minutesByStartCell.keyBy(0) // startCell.maxBy(1) // duration.print();// run the keyed streams pipelineenv.execute("Taxi Ride Keyed Streams");}
}

运行输出结果:

3> (50053,PT1M)
1> (54791,PT0M)
1> (46307,PT1M)
1> (55782,PT1M)
3> (48301,PT0M)
3> (43325,PT1M)
3> (54541,PT0M)
3> (71191,PT0M)
1> (55044,PT2M)
3> (51551,PT3M)
...

状态

上面的 keyBy 操作是我们看到的第一个有状态流的示例。虽然状态处理是透明的,但是 Flink 必须跟踪每个不同 key 的最大持续时间。

在 Flink 程序中涉及到状态时,需要考虑到状态所占用空间到大小。如果 key 的空间是无界的,那么 Flink 状态的空间也应是无界的。

在实际场景中,在处理流数据时,通常更有意义的做法是考虑有界窗口上的聚合,而不是整个流。

reduce() 和其他聚合算子

上面使用的 maxBy() 只是 Flink 的 KeyedStream 上可用的众多聚合操作中的一个算子,还有一个更通用的 reduce() 函数,可用于实现用户自定义聚合。

有状态 Transformation

为什么 Flink 参与状态管理

大家开发的应用程序当然可以自己使用和管理状态,而不需要让 Flink 参与。但是 Flink 为它管理的状态提供了一些出色的特性,解决用户的痛点:

  • 本地化 Flink

    状态保存在处理该状态的计算机本地,并且可以以内存速度访问。

  • 持久化

    Flink 状态是自动设置检查点并恢复 Flink 状态。

  • 垂直可扩展

    Flink 状态可以保留在嵌入式 RocksDB 实例中,该实例可以通过添加更多本地磁盘来扩展。

  • 水平可伸缩

    随着群集的增长和收缩,Flink 状态将重新分配。

  • 可查询

    可以通过 REST API 查询 Flink 状态。

在本课程中,读者将学习如何使用 Flink API 来管理 Keyed 状态。

Rich Functions

前面我们已经看到了 Flink 的几个功能接口,包括 FilterFunction、MapFunction 和 FlatMapFunction。

对于每个接口,Flink 还提供了一个 Rich 变体,例如 RichFilterFunction、RichMapFunction 和 RichFlatMapFunction等。笔者这里解释一下,有的读者称之为富函数,以 Rich 开头。

它们具有一些其他方法,包括:

  • open(Configuration c) 在初始化时,open() 被调用一次,用于加载一些静态数据或打开与外部服务的连接。

  • close() 通常用于清理和释放资源。

  • getRuntimeContext() getRuntimeContext() 方法提供了对函数 RuntimeContext 的访问。RuntimeContext 可用于查询函数的并行度、子任务索引和执行函数的任务的名称等信息。此外,它还包括访问分区状态的方法。

Keyed State 示例

在这个示例中,可以获取由 Tuple2 事件组成的传感器读数流,Tuple2 中的 String 类型是每个传感器 ID,Double 类型是传感器读到的数据,实验中使用 Socket 接收用户传输的数据。

public class SensorStreamKeyedStateSolution {public static void main(String[] args) throws Exception {// set up streaming execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(ExerciseBase.parallelism);// receive each sensor from a socket// nc -l 1000DataStream<String> text = env.socketTextStream("localhost", 8000, "\n", 10);// processing each sensor dataDataStream<Tuple2<String, Double>> input = text.map(new MapFunction<String, Tuple2<String, Double>>() {@Overridepublic Tuple2<String, Double> map(String sensordata) throws Exception {// each sensor format:// S000001,8.0// S000002,3.0// S000001,6.0// S000001,3.0// S000002,5.0// S000002,7.0String[] record = sensordata.split(",");if (record.length == 2) {return new Tuple2<>(record[0], Double.valueOf(record[1]));} else {System.err.println("异常数据: " + sensordata);// 返回异常数据return new Tuple2<>("E000000", 0.0);}}});DataStream<Tuple2<String, Double>> smoothed = input.keyBy(0).map(new Smoother());// print result// 4> (S000001,0.0)// 4> (S000002,0.0)// 4> (S000001,0.0)// 4> (S000001,4.5)// 4> (S000002,0.0)// 4> (S000002,6.0)smoothed.print();// submit the jobenv.execute("SensorStreamKeyedStateSolution Job");}
}

我们的目标是对来自每个传感器的数据进行平滑处理,我们将使用上面代码中称为 Smoother 的 RichMapFunction 进行处理,代码实现如下:

public class Smoother extends RichMapFunction<Tuple2<String, Double>, Tuple2<String, Double>> {private ValueState<MovingAverage> averageState;@Overridepublic void open (Configuration conf) {ValueStateDescriptor<MovingAverage> descriptor =new ValueStateDescriptor<>("moving average", MovingAverage.class);averageState = getRuntimeContext().getState(descriptor);}@Overridepublic Tuple2<String, Double> map (Tuple2<String, Double> item) throws Exception {// access the state for this keyMovingAverage average = averageState.value();// create a new MovingAverage (with window size 2) if none exists for this key// 窗口大小为 2if (average == null) average = new MovingAverage(2);// add this event to the moving averageaverage.add(item.f1);averageState.update(average);// return the smoothed resultreturn new Tuple2(item.f0, average.getAverage());}
}

Flink 支持几种不同类型的 keyed 状态。在此示例中,笔者使用最简单的一种,即ValueState,这意味着 Flink 将为每个 key 存储一个对象(在本例中,使用的是一个类型为 MovingAverage 的对象)。出于性能的考虑,Flink 还支持 ListState 和 MapState 。

为了完成本次实验,Smoother 记录每个传感器的最新传感器读数,使用 Flink 的 keyed state 接口来完成,Flink 将为所管理的每个状态项维护一个 key/value 存储。

MovingAverage 代码如下,实现的功能其实是滑动平均数:

class MovingAverage {int size;double sum;double average;LinkedList<Double> list;public MovingAverage(int size) {this.list = new LinkedList<>();this.size = size;}public double add(double value) {sum += value;// Adds the specified element as the tail (last element) of this list.list.offer(value);if (list.size() <= size) {return sum / list.size();}sum -= list.poll();return average = sum / size;}public double getAverage() {return average;}
}

其实上面的实验运行后,查看结果还是容易理解的,笔者这里再补充说明一下。

Smoother 中的 map 方法负责使用 MovingAverage 对象平滑处理每个事件。每个事件都会调用一次 map,该事件与一个特定的 key(即传感器 ID)相关联,并且在 ValueState 对象上的 averageState 方法对该传感器的 key 进行操作。换句话说,调用 averageState.value() 返回相应传感器当前 MovingAverage 对象,因此,当调用 average.add(item.f1) 时,我们将此事件添加到相同 key 的先前事件中(即同一传感器),因为之前的状态已经使用 ValueState 保存。

清除 State

其实上面的示例存在一个潜在的问题:如果 key 空间是无界的,将会发生什么呢?

Flink 为每个使用的不同 key 在某处存储一个 MovingAverage 实例。如果传感器事件数据有限,自然问题不大,但是如果传感器 key 集合无限制增加的情况下,存储空间毕竟也是有限的,所以需要清除不再需要的状态。做法比较简单,就是通过在状态对象上调用 clear() 来完成,如下所示:

averageState.clear()

另外,笔者将在事件驱动的应用程序课程中,掌握在 ProcessFunction 中使用 Timer 定时器来清除状态的操作。

Flink 1.6 版本之后还增加了 TTL 功能。不过这种方法的适用性有限,但是在某些情况下可以依靠它来清除不需要的状态。

相互连接的流

在线上的生产环境中,有时并不是简单地对单一的输入流做预处理转换操作,而是使用动态的数据流通过阈值、规则和其他参数对常规的数据流进行转换,在 Flink 中支持连接流,比如单个运算符具有两个输入流,如下所示: 

连接流还可以用于实现流 join,后面有对应的实验课程。

示例

在此示例中,控制流用于从 streamOfWords 中过滤掉指定的单词。被 ControlFunction 调用的 RichCoFlatMapFunction 将应用于连接流以完成此操作。

先看一下 RichCoFlatMapFunction 抽象类:

package org.apache.flink.streaming.api.functions.co;
import org.apache.flink.annotation.Public;
import org.apache.flink.apimon.functions.AbstractRichFunction;
import org.apache.flink.apimon.functions.RichFunction;
/*** A RichCoFlatMapFunction represents a FlatMap transformation with two different input* types. In addition to that the user can use the features provided by the* {@link RichFunction} interface.** @param <IN1>*            Type of the first input.* @param <IN2>*            Type of the second input.* @param <OUT>*            Output type.*/
@Public
public abstract class RichCoFlatMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implementsCoFlatMapFunction<IN1, IN2, OUT> {private static final long serialVersionUID = 1L;
}

RichCoFlatMapFunction 是 FlatMapFunction 的一种,可以应用于一对连接的流,并且可以访问 Rich 函数接口。

RichCoFlatMapFunction 实现了 CoFlatMapFunction 接口,实现两个连接流上的 FlatMap 转换操作。

/*** @param <IN1> Type of the first input.* @param <IN2> Type of the second input.* @param <OUT> Output type.*/
@Public
public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {/*** This method is called for each element in the first of the connected streams.*/void flatMap1(IN1 value, Collector<OUT> out) throws Exception;/*** This method is called for each element in the second of the connected streams.*/void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
}

接下来,我们根据示例来演示 Flink 连接流操作,为了便于读者查看,笔者直接在代码上添加注释。

import org.apache.flink.apimon.state.ValueState;
import org.apache.flink.apimon.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;
public class ConnectedStreamFlatMapSolution {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// control 会被 flatMap1 处理// 从 streamOfWords 流中过滤出不在 control 流中的单词DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);// streamOfWords 会被 flatMap2 处理// 因为 data 和 artisans 单词不在 control 流中,所以其状态在 flatMap1 中为 null,不为 TRUE,因此 streamOfWords 在调用 flatMap2 时满足 blocked.value() == null, 则会被输出DataStream<String> streamOfWords = env.fromElements("data", "DROP", "artisans", "IGNORE").keyBy(x -> x);// control 流连接 streamOfWords 流,两个流都是以单词做 keyBy,即 key 值为单词control.connect(streamOfWords).flatMap(new ControlFunction()).print();env.execute();}public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {// key 状态使用 Boolean 值保存,blocked 用于判断每个单词是否在 control 流中private ValueState<Boolean> blocked;@Overridepublic void open(Configuration config) {blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));}// control.connect(streamOfWords) 表明 control 流中的元素会被 flatMap1 处理,streamOfWords 流中的元素会被 flatMap2 处理@Overridepublic void flatMap1(String control_value, Collector<String> out) throws Exception {blocked.update(Boolean.TRUE);}// 对于不在 control 流中的元素,其状态不为 TRUE,即 blocked.value() == null,从而被 flatMap2 处理时,会被 out 输出@Overridepublic void flatMap2(String data_value, Collector<String> out) throws Exception {if (blocked.value() == null) {out.collect(data_value);}}}
}

准确输出输出结果为:

4> artisans
1> data

笔者这里为什么会说“准确”二字呢?输出结果果真会一直是这个吗?

读者可以多运行几遍代码,是不是会发现输出结果并非如此,还有其他输出:

1> data
4> artisans
1> DROP
4> IGNORE

1> data
1> DROP
4> artisans

4> artisans
4> IGNORE
1> data

Why? What happened?

可能细心的读者已经发现,Flink 运行时是无法控制 flatMap1 和 flatMap2 回调的调用顺序的,这两个输入流相互竞争,也就是说 control 和 streamOfWords 两个流的每个元素分别调用 flatMap1 和 flatMap2 的先后顺序不是确定的,就会产生上面的不同输出结果。

笔者拿其中的两个输出结果进行说明:

  • 如果 streamOfWords 流的元素全部都优先调用 flatMap2 其实 flatMap1 根本就没有执行,ValueState 状态为 null,没有任何单词的状态。streamOfWords 流中的每个单词在调用 flatMap2 时,所有单词的 blocked.value() == null,因此输出 streamOfWords 流中的所有单词,即:

1> data
4> artisans
1> DROP
4> IGNORE
  • 如果 flatMap1 只优先处理过 control 流中的 IGNORE 单词 其实有了上面的基础,这里的情况就很容易了解了。既然 control 流中的 IGNORE 单词优先处理,则其状态为 True,那么 streamOfWords 流中的其他单词调用 flatMap2 时状态值为 null,直接输出,即:

1> data
1> DROP
4> artisans

最后大家需要注意,Flink 运行时所连接的两个流必须以兼容的方式进行 keyed,即要么两个流都未被 keyed,要么两个流都被 keyed,并且如果两个都被 keyed,则 key 值必须相同。在本示例中,两个流都属于 DataStream 类型。另外,示例中的 RichCoFlatmap 在 keyed state 中存储一个 Boolean 值,并且该 Boolean 值由两个流共享。

提示:在时序或排序很重要的情况下,就非常有必要在管理的 Flink 状态中缓冲事件,直到应用程序准备好处理它们为止,这样可以避免出现不可预料的未知结果。

总结

笔者在本篇文章中,讲解了无状态和有状态数据流转换,包括 map、flatmap 和 RichMapFunction 等实战操作,以及使用 Keyed Streams 对传感器进行 keyBy 操作,最后还使用 connect 对两个流进行连接操作。

为了增加实战效果,笔者将传感器的案例丰富化:要求对于传感器的流式数据,每隔5秒钟计算平均温度,实验中考虑到使用 EventTime、Watermar、keyBy 以及 timeWindow。

全部实战代码见 Github 项目,运行的 main 方法位于 AverageSensorReadings 对象中。

本文标签: Apache Flink 培训系列