Flink学习--如何计算实时热门商品

首先我们把“实时热门商品”翻译成程序员易理解的语言:“每隔5分钟输出最近一小时内点击量最多的前N个商品”。将这个需求分解一下,将做如下几件事:

  • 构建数据源
  • 抽取业务数据时间戳,告诉Flink基于业务时间做窗口
  • 过滤出点击行为数据
  • 按1小时的窗口,每隔5分钟,做滑动窗口聚合
  • 按每个窗口聚合,输出每个窗口中点击量前N的商品

构建数据源

本文数据为淘宝用户行为数据集(来自阿里云天池公开数据集,特别感谢)。本数据集包含了淘宝上某一天随机一百万用户的所有行为(包括点击、购买、加购、收藏)。数据集的组织形式和MovieLens-20M类似,即数据集的每一行表示一条用户行为,由用户ID、商品ID、商品类目ID、行为类型和时间戳组成,并以逗号分隔。关于数据集中每一列的详细描述如下:

列名称 说明
用户ID 整数类型,加密后的用户ID
商品ID 整数类型,加密后的商品ID
商品类目ID 整数类型,加密后的商品所属类目ID
行为类型 字符串,枚举类型,包括(‘pv’, ‘buy’, ‘cart’, ‘fav’)
时间戳 行为发生的时间戳,单位秒

下载数据集,放入项目resource目录

1
2
cd flink-learning-project/src/main/resources
curl https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv > UserBehavior.csv

创建数据集实体类,UserBehavior.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.xyueji.flink.core.model;

/**
* @author xiongzhigang
* @date 2020-06-05 18:19
* @description
*/
public class UserBehavior {
private long userId;
private long itemId;
private int categoryId;
private String behavior;
private long timestamp;

@Override
public String toString() {
return "UserBehavior{" +
"userId=" + userId +
", itemId=" + itemId +
", categoryId=" + categoryId +
", behavior='" + behavior + '\'' +
", timestamp=" + timestamp +
'}';
}

public long getUserId() {
return userId;
}

public void setUserId(long userId) {
this.userId = userId;
}

public long getItemId() {
return itemId;
}

public void setItemId(long itemId) {
this.itemId = itemId;
}

public int getCategoryId() {
return categoryId;
}

public void setCategoryId(int categoryId) {
this.categoryId = categoryId;
}

public String getBehavior() {
return behavior;
}

public void setBehavior(String behavior) {
this.behavior = behavior;
}

public long getTimestamp() {
return timestamp;
}

public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
}

编写Flink主程序,com.xyueji.flink.project.topnhot.HotItems

1
2
3
4
5
public class HotItems {
public static void main(String[] args) {

}
}

读取数据集,构建Flink source:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// UserBehavior.csv本地路径
URL url = HotItems.class.getClassLoader().getResource("UserBehavior.csv");
Path localFile = Path.fromLocalFile(new File(url.toURI()));

PojoTypeInfo pojoTypeInfo = (PojoTypeInfo) TypeExtractor.createTypeInfo(UserBehavior.class);
// 字段顺序
String[] fieldNames = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"};
// csv InputFormat
PojoCsvInputFormat csvInputFormat = new PojoCsvInputFormat<>(localFile, pojoTypeInfo, fieldNames);
// 创建Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 模拟真实数据流,这里并行度设为1
env.setParallelism(1);
// 添加source
DataStreamSource dataStreamSource = env.createInput(csvInputFormat, pojoTypeInfo);

EventTime 与 Watermark

抽取业务数据时间戳,设置EventTime,以业务时间做窗口。这里说明一下:

  • ProcessingTime:事件被处理的时间。也就是由机器的系统时间来决定。
  • EventTime:事件发生的时间。一般就是数据本身携带的时间。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 这里要设置EventTime, 默认ProcessingTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    DataStream timeData = dataStreamSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
    @Override
    public long extractAscendingTimestamp(UserBehavior userBehavior) {
    // 数据集时间单位为秒,转换为毫秒
    return userBehavior.getTimestamp() * 1000;
    }
    });

过滤

再来回顾一下,”每隔5分钟输出最近1小时点击量前N的商品“,我们先把点击这个行为类型的数据过滤出来。

1
2
3
4
5
6
DataStream pvData = timeData.filter(new FilterFunction<UserBehavior>() {
@Override
public boolean filter(UserBehavior userBehavior) throws Exception {
return userBehavior.getBehavior().equals("pv");
}
});

滑动窗口

按1小时做窗口,每隔5分钟,做滑动窗口聚合。

1
2
3
4
DataStream windowDate = pvData
.keyBy("itemId") //以itemId分组
.timeWindow(Time.minutes(60), Time.minutes(5))
.aggregate(new CountAgg(), new WindowResultFunction());

这里的CountAgg实现了AggregateFunction接口,功能是统计窗口中的条数,即遇到一条数据就加一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {

@Override
public Long createAccumulator() {
return 0L;
}

@Override
public Long add(UserBehavior userBehavior, Long acc) {
return acc + 1;
}

@Override
public Long getResult(Long acc) {
return acc;
}

@Override
public Long merge(Long acc1, Long acc2) {
return acc1 + acc2;
}
}

WindowResultFunction实现了WindowFunction接口,将每个key每个窗口聚合后的结果带上其他信息进行输出。我们这里实现的WindowResultFunction将主键商品ID,窗口,点击量封装成了ItemViewCount进行输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {

@Override
public void apply(
Tuple key, // 窗口的主键,即 itemId
TimeWindow window, // 窗口
Iterable<Long> aggregateResult, // 聚合函数的结果,即 count 值
Collector<ItemViewCount> collector // 输出类型为 ItemViewCount
) throws Exception {
Long itemId = (Long) ((Tuple1) key).f0;
Long count = aggregateResult.iterator().next();
collector.collect(ItemViewCount.of(itemId, window.getEnd(), count));
}
}

/**
* 商品点击量(窗口操作的输出类型)
*/
public static class ItemViewCount {
public long itemId; // 商品ID
public long windowEnd; // 窗口结束时间戳
public long viewCount; // 商品的点击量

public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {
ItemViewCount result = new ItemViewCount();
result.itemId = itemId;
result.windowEnd = windowEnd;
result.viewCount = viewCount;
return result;
}
}

TopN 计算最热门商品

进行窗口聚合后,需要将窗口进行分组,这里根据ItemViewCount中的windowEnd进行keyBy分组,然后使用ProcessFunction计算每个窗口的topN的商品。

1
DataStream topItems = windowDate.keyBy("windowEnd").process(new TopNHotItems(5));

ProcessFunction 是 Flink 提供的一个 low-level API,用于实现更高级的功能。它主要提供了定时器 timer 的功能(支持EventTime或ProcessingTime)。本案例中我们将利用 timer 来判断何时收齐了某个 window 下所有商品的点击量数据。由于 Watermark 的进度是全局的,

在 processElement 方法中,每当收到一条数据(ItemViewCount),我们就注册一个 windowEnd+1 的定时器(Flink 框架会自动忽略同一时间的重复注册)。windowEnd+1 的定时器被触发时,意味着收到了windowEnd+1的 Watermark,即收齐了该windowEnd下的所有商品窗口统计值。我们在 onTimer() 中处理将收集的所有商品及点击量进行排序,选出 TopN,并将排名信息格式化成字符串后进行输出。

这里我们还使用了 ListState 来存储收到的每条 ItemViewCount 消息,保证在发生故障时,状态数据的不丢失和一致性。ListState 是 Flink 提供的类似 Java List 接口的 State API,它集成了框架的 checkpoint 机制,自动做到了 exactly-once 的语义保证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串
*/
public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {
private final int topSize;
public TopNHotItems(int topSize) {
this.topSize = topSize;
}
// 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算
private ListState<ItemViewCount> itemState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 状态的注册
ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
"itemState-state",
ItemViewCount.class);
itemState = getRuntimeContext().getListState(itemsStateDesc);
}
@Override
public void processElement(
ItemViewCount input,
Context context,
Collector<String> collector) throws Exception {
// 每条数据都保存到状态中
itemState.add(input);
// 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
context.timerService().registerEventTimeTimer(input.windowEnd + 1);
}
@Override
public void onTimer(
long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 获取收到的所有商品点击量
List<ItemViewCount> allItems = new ArrayList<>();
for (ItemViewCount item : itemState.get()) {
allItems.add(item);
}
// 提前清除状态中的数据,释放空间
itemState.clear();
// 按照点击量从大到小排序
allItems.sort(new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
return (int) (o2.viewCount - o1.viewCount);
}
});
// 将排名信息格式化成 String, 便于打印
StringBuilder result = new StringBuilder();
result.append("====================================\n");
result.append("时间: ").append(new Timestamp(timestamp - 1)).append("\n");
for (int i = 0; i < topSize; i++) {
ItemViewCount currentItem = allItems.get(i);
// No1: 商品ID=12224 浏览量=2413
result.append("No").append(i).append(":")
.append(" 商品ID=").append(currentItem.itemId)
.append(" 浏览量=").append(currentItem.viewCount)
.append("\n");
}
result.append("====================================\n\n");
// 控制输出频率,模拟实时滚动结果
Thread.sleep(1000);
out.collect(result.toString());
}
}

打印

1
topItems.print();

执行

1
env.execute("Hot Items Job");

执行结果

616ed78c549665c8dfabe75ef53884b7

具体代码见:https://github.com/xyueji/flink-learning

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. 构建数据源
  2. 2. EventTime 与 Watermark
  3. 3. 过滤
  4. 4. 滑动窗口
  5. 5. TopN 计算最热门商品
  6. 6. 打印
  7. 7. 执行
  8. 8. 执行结果
,