Flink学习--基本概念与环境搭建

基本概念

数据流编程模型

抽象级别

Flink给流/批处理应用提供了不同级别的抽象,主要分为以下4种:
67b15a33539c6c55e414fc99120a5037

  1. Stateful Stream Processing:最底层的抽象是有状态的数据流。它经Process Function嵌入DataStream API。允许用户非常灵活地处理来自一个或多个流的数据。并且用户能够注册event time和processing time(稍后详解),允许项目能够实现复杂的计算。
  2. DataStream/DataSet API:在实际过程中,很多应用并不需要上述提到的最底层抽象。于是提供了DataStream API(有界/无界数据流)和DataSet API(有界数据流)。这些API提供了共有的构建模块去处理数据,像各种各样用户指定的转换、连接、聚合、窗口和状态等等。
  3. Table API:是围绕着表定义的模拟语言,可以动态改变表。
  4. SQL:可以让程序完全用SQL来描述。

程序&数据流

构建Flink程序的基本模块是流和转换,概念上来说一个stream是数据记录的流,一个转换是以一个或多个作为输入流而产生一个或多个输出流的一个操作。
当Flink程序运行的时候,Flink程序映射成数据流,包含流和转换操作。每个流开始于一个或多个sources结束于一个或多个sinks,这些数据流就像非循环指向图(DAGs)。
115b53d69ec17c0cdd668563b51deba9

并行数据流

在Flink程序运行期间,一个流有一个或多个流分区,每个操作有一个或多个操作子任务。这些操作子任务彼此依赖,且运行在不同的线程、可能原型在不同的机器或container。
一个流在数据转换中间有两种操作形式,一个是一对一(one to one),另一个是重新分配(redistributing)。

窗口

窗口可以是时间驱动(如:每30秒),也可以是数据驱动(如:每100个元素)。

时间

在流处理程序中,定义了以下三种时间概念:

  • Event Time:是事件被创建的时间,它通常在事件中被描述为时间戳,比较常见的是数据自带的时间属性。
  • Ingestion time:是时间进入Flink数据流的时间。
  • Processing Time:是每个操作所在的物理机上的时间。

有状态算子

许多在数据流中的算子在某时刻看作一个独立的事件,一些算子通过多种事件记录信息。这些算子被称作有状态的。

容错检查点

Flink通过流的重放和检查点结合实现容错机制。一个检查点联系着一个特殊的指向,这个指向每个与每个算子状态相关联的输入流。

批处理流

当流是有边界的情况下,Flink运行的流处理程序就是一个特殊的批处理程序。这种情况下有些注意的情况:

  • 容错在批处理程序中不用检查点,发生异常时流全部回放。
  • 在DataSet API中的有状态算子,将不用key/value索引,而采用简单的in-memory/out-of-core数据结构。
  • DataSet API用特殊的同步迭代。

分布式运行环境

详细解释见:https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/concepts/runtime.html

环境搭建

作者采用docker-compose搭建Flink环境,详细可参考这篇文章docker-compose搭建大数据环境

WordCount程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.xyueji.flink.examples;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
* @author xiongzhigang
* @date 2020-05-28 14:50
* @description
*/
public class SocketTextStreamWordCount {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("SocketTextStreamWordCount USAGE: <hostname> <port>");
}

String hostname = args[0];
Integer port = Integer.parseInt(args[1]);

// 设置stream execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 获取数据
DataStreamSource<String> stream = env.socketTextStream(hostname, port);

// 计数
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
.keyBy(0)
.sum(1);

sum.print();

env.execute("SocketTextStreamWordCount");

}

/**
* 分词,构建元组
*
*/
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) {
String[] words = line.toLowerCase().split("\\W+");
for (String word : words) {
if (word.length() > 0) {
collector.collect(new Tuple2<>(word, 1));
}
}
}
}
}

在本地使用nc开启9090端口:

1
nc -l 9090

打包程序上传到docker容器中,运行如下命令:

1
flink run -c com.xyueji.flink.examples.SocketTextStreamWordCount flink-learning-examples-1.0-SNAPSHOT.jar <本机ip> 9090

本机输入字符串:
4537004427b6a880c577e8fd912ebf18
查看flink运行结果,使用docker logs查看:

1
docker logs taskmanager

f023cf1473a7315c68ed72d4a170560f
5f7c36004bfd2a991e5c5aa2309fbc8a

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. 基本概念
    1. 1.1. 数据流编程模型
      1. 1.1.1. 抽象级别
      2. 1.1.2. 程序&数据流
      3. 1.1.3. 并行数据流
      4. 1.1.4. 窗口
      5. 1.1.5. 时间
      6. 1.1.6. 有状态算子
      7. 1.1.7. 容错检查点
      8. 1.1.8. 批处理流
    2. 1.2. 分布式运行环境
  2. 2. 环境搭建
  3. 3. WordCount程序
,