统计HDFS元数据

  1. 编写offline_fsimage.sh脚本,解析fsimage文件获取HDFS元数据并上传至HDFS。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#!/bin/bash

# 加载基本配置信息
# source ./config-env.sh

# 集群启用了Kerberos,先使用hdfs的Kerberos账号
# kinit -kt hdfs.keytab hdfs/admin

# 将HDFS的FsImage导出
mkdir -p ./tmp_meta
rm -rf ./tmp_meta/*
hdfs dfsadmin -fetchImage ./tmp_meta

if [ $? -ne 0 ];then
echo "获取FsImage失败..."
exit
fi

# 使用hdfs提供的oiv解析FsImage数据文件,将FsImage转换为csv格式数据
hdfs oiv -i ./tmp_meta/* -o ./tmp_meta/fsimage.csv -p Delimited

# 将生成的csv文件头去掉,并上传至HDFS的/tmp目录
sed -i -e "1d" ./tmp_meta/fsimage.csv

# 切换为Hive的用户进行put
# kinit -kt hive.keytab hive/admin

TMP_DIR=/tmp/meta/fsimage
hdfs dfs -rmr $TMP_DIR
hdfs dfs -mkdir -p $TMP_DIR
echo "创建HDFS临时目录[${TMP_DIR}]"
hdfs dfs -copyFromLocal ./tmp_meta/fsimage.csv ${TMP_DIR}

if [ $? -ne 0 ];then
echo "上传fsimage.csv文件失败..."
exit
fi

echo "上传[./tmp_meta/fsimage.csv文件至[${TMP_DIR}]成功"

元数据平台-数据血缘

架构

image-20210903154320404

首先,我们先了解一下元数据平台架构,主流程是:SQL采集 –》 SQL解析 –》应用层。

  1. SQL采集:针对各种SQL查询引擎,编写相应的钩子函数进行SQL收集,收集内容有执行时间、执行耗时、执行用户、执行引擎、jobId和执行SQL等等,最后把SQL信息入Kafka。Hive是实现ExecuteWithHookContext接口,Presto是实现EventListener接口,Spark是实现SparkListner接口。
  2. SQL解析:Flink实时消费Kafka数据,进行SQL解析。解析SQL的过程为:定义词法规则和语法规则文件 –》 使用Antlr实现SQL词法和语法解析 –》生成AST语法树 –》遍历AST语法树,考虑到Presto和Spark的SQL语法类似,因此直接参考Hive底层源码实现SQL解析。解析完成后,把血缘信息和元数据信息分别入JanusGraph和ElasticSearch。

精准去重-字典编码+Bitmap

Bitmap的基本思想是以一个bit位来表示一个元素对应的value,即使用bit数组下标来表示元素值,以大大缩小存储空间。BitMap一般用来快速查找、去重、删除等操作,但是它只能用于数字类型。那么如果要使用字符串类型的该怎么办呢?

这就需要先把字符串字典编码,生成字符串到数字的映射。本文参考kylin的全局字典编码配合RoaringBitmap以实现精准去重。

AppendTrie树

Spark解决数据倾斜问题

1、使用Hive ETL预处理数据方案适用场景:
如果导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表执行某个分析操作,那么比较适合使用这种技术方案。

方案实现思路:

此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。

方案实现原理:

这种方案从根源上解决了数据倾斜,因为彻底避免了在Spark中执行shuffle类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,避免Spark程序发生数据倾斜而已。

Spark-Shuffle原理

前言

Spark Shuffle是大众讨论的比较多的话题了。它是Spark任务执行过程中最为重要的过程之一。那么什么是Shuffle呢?
Shuffle一般被翻译成数据混洗,是类MapReduce分布式计算框架独有的机制,也是这类分布式计算框架最重要的执行机制。接下来会按照两个层面来谈谈Shuffle机制。分别为:

  • 逻辑层面
  • 物理层面

逻辑层面主要是从RDD的血缘出发,从DAG的角度来讲解Shuffle,另外也会说明Spark容错机制。物理层面是从执行角度来剖析Shuffle是如何发生的

1. RDD血缘与Spark容错

MapReduce-Shuffle过程

从map()的输出到reduce()的输入,中间的过程被称为shuffle过程。

map side

  1. 在写入磁盘之前,会先写入环形缓冲区(circular memory buffer),默认100M(mapreduce.task.io.sort.mb可修改),当缓冲区内容达到
    80M(mapreduce.map.sort.spill.percent可修改),缓冲区内容会被溢写到磁盘,形成一个spill file文件。

  2. 分区:在写入磁盘之前,会先进分区(partition),而partition的数量是由reducer的数量决定的。

    job.setNumReduceTasks(2);

Flink学习--如何计算实时热门商品

首先我们把“实时热门商品”翻译成程序员易理解的语言:“每隔5分钟输出最近一小时内点击量最多的前N个商品”。将这个需求分解一下,将做如下几件事:

  • 构建数据源
  • 抽取业务数据时间戳,告诉Flink基于业务时间做窗口
  • 过滤出点击行为数据
  • 按1小时的窗口,每隔5分钟,做滑动窗口聚合
  • 按每个窗口聚合,输出每个窗口中点击量前N的商品
,