精准去重-字典编码+Bitmap

Bitmap的基本思想是以一个bit位来表示一个元素对应的value,即使用bit数组下标来表示元素值,以大大缩小存储空间。BitMap一般用来快速查找、去重、删除等操作,但是它只能用于数字类型。那么如果要使用字符串类型的该怎么办呢?

这就需要先把字符串字典编码,生成字符串到数字的映射。本文参考kylin的全局字典编码配合RoaringBitmap以实现精准去重。

AppendTrie树

Trie树

Trie树又名前缀树,是是一种有序树,一个节点的所有子孙都有相同的前缀,也就是这个节点对应的字符串,根节点对应空字符串,而每个字符串对应的编码值由对应节点在树中的位置来决定。

image-20210902161324316

AppendTrie树

Trie 树模型可以实现 Append,有两种方式:

  • 按照节点 ( 而不是位置 ) 保存 ID,从而保持已有 ID 不变。
  • 记录当前模型最大 ID,便于给新增节点分配 ID 。

但是这种模型会带来一个问题:假如基数特别大,Trie 树在内存中就会无限扩张。为了控制内存占用率,选择单颗子树设定阈值,超过即分裂,进而控制单颗子树内存消耗。分裂后,每棵树负责一个范围 ( 类似于 HBase 中的 region 分区 ) ;查询时候只需要查询一颗子树即可。

image-20210902161548529

注:本文直接使用Kylin构建全局字典类AppendTrieDictionaryBuilder。

构建字典任务

BuildDictJob

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
package com.xyueji.traffic.dict;

import com.xyueji.traffic.core.bean.Constants;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.kylin.dict.AppendTrieDictionary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

/**
* @author xiongzhigang
* @date 2021-07-28 11:07
* @description 实现文本的固定列精准去重
*/
public class BuildDictJob {
private static final Logger log = LoggerFactory.getLogger(BuildDictJob.class);

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length != 2) {
System.out.println("<Usage>: <InputPath> <OutputPath>");
return;
}

Configuration conf = new Configuration();
conf.set("dict.zookeeper.quorum", "hadoop1:2181");
conf.set("dict.znode.lock.bath", "/dict");
conf.set("dict.datasource", "traffic");
conf.set("dict.table", Constants.ROAD_SECT_SPEED_TABLE());

// 构建job
Job job = Job.getInstance(conf);
job.setJobName("BuildDictJob");
job.setJarByClass(BuildDictJob.class);

// 设置mapper、reducer
job.setMapperClass(BuildDictMapper.class);
job.setReducerClass(BuildDictReducer.class);

job.setCombinerClass(BuildDictCombine.class);

// 设置map输出的k-v
job.setMapOutputKeyClass(SelfDefineSortableKey.class);
job.setMapOutputValueClass(NullWritable.class);

job.setPartitionerClass(BuildDictPartitioner.class);

// 设置最终输出的k-v
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);


// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean res = job.waitForCompletion(true);

System.exit(res ? 0 : 1);
}

public static class BuildDictMapper extends Mapper<LongWritable, Text, SelfDefineSortableKey, NullWritable> {
private String columnIndexes;
private String splitChar;
private int maxIndex;
private boolean isLegal;

private Text outputKey = new Text();
private ByteBuffer tmpBuf;
private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();


@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
columnIndexes = conf.get("dict.column.index.quorum");
splitChar = conf.get("dict.column.split.char");

if (StringUtils.isEmpty(splitChar)) {
splitChar = ",";
}

if (isLegal = isLegal() && StringUtils.isNotEmpty(columnIndexes)) {
maxIndex = getMaxIndex(columnIndexes);
}

tmpBuf = ByteBuffer.allocate(4096);
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (StringUtils.isEmpty(columnIndexes)) {
writeValue(value.toString(), context);
} else {
String line = value.toString();
String[] fields = line.split(splitChar);
if (isLegal && maxIndex < fields.length) {
String[] indexes = columnIndexes.split(",");
StringBuilder val = new StringBuilder();
for (String index : indexes) {
val.append(fields[Integer.parseInt(index)]).append(splitChar);
}

writeValue(val.substring(0, val.length() - 1), context);
}
}
}

private void writeValue(String value, Context context) throws IOException, InterruptedException {
byte[] bytesValue = value.getBytes(Charset.forName("UTF-8"));

tmpBuf.clear();
int size = bytesValue.length;
if (size >= tmpBuf.capacity()) {
tmpBuf = ByteBuffer.allocate(countNewSize(tmpBuf.capacity(), size));
}

tmpBuf.put(bytesValue, 0, bytesValue.length);
outputKey.set(tmpBuf.array(), 0, tmpBuf.position());
sortableKey.init(outputKey);

context.write(sortableKey, NullWritable.get());
}

private int countNewSize(int oldSize, int dataSize) {
int newSize = oldSize * 2;
while (newSize < dataSize) {
newSize *= 2;
}

return newSize;
}

/**
* 检查数据是否合法
*
* @return
*/
private boolean isLegal() {
if (StringUtils.isNotEmpty(columnIndexes)) {
String indexes = columnIndexes.replaceAll(",", "");
return StringUtils.isNumeric(indexes);
}

return true;
}

private int getMaxIndex(String columnIndexes) {
String[] indexes = columnIndexes.split(",");
int index = 0;
for (String i : indexes) {
index = Math.max(Integer.parseInt(i), index);
}

return index;
}
}

public static class BuildDictCombine extends Reducer<SelfDefineSortableKey, NullWritable, SelfDefineSortableKey, NullWritable> {
@Override
protected void reduce(SelfDefineSortableKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}

public static class BuildDictReducer extends Reducer<SelfDefineSortableKey, NullWritable, NullWritable, NullWritable> {
private GlobalDictionaryBuilder builder;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
String datasource = conf.get("dict.datasource");
String table = conf.get("dict.table");
String column = conf.get("dict.column");
String zkHosts = conf.get("dict.zookeeper.quorum");
String zkBase = conf.get("dict.znode.lock.bath");

this.builder = new GlobalDictionaryBuilder();
this.builder.init(datasource, table, column, zkHosts, zkBase);
}

@Override
protected void reduce(SelfDefineSortableKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
Text keyText = key.getText();
String value = toString(keyText.getBytes(), 0, keyText.getLength());
this.builder.addValue(value);
}

private static String toString(final byte[] b, int off, int len) {
if (b == null) {
return null;
}

if (len == 0) {
return "";
}

return new String(b, off, len, Charset.forName("UTF-8"));
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
AppendTrieDictionary dict = this.builder.build();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (PrintStream ps = new PrintStream(baos, true, "UTF-8")) {
dict.dump(ps);
}
String dictInfo = new String(baos.toByteArray(), StandardCharsets.UTF_8);

log.info("dict info:{},min:{},max:{}", dictInfo, dict.getMinId(), dict.getMaxId() & 0xFFFFFFFFL);
}
}

public static class BuildDictPartitioner extends Partitioner<SelfDefineSortableKey, NullWritable> {

@Override
public int getPartition(SelfDefineSortableKey sKey, NullWritable nullWritable, int numReduceTasks) {
return (readUnsigned(sKey.getText().getBytes(), 0, 1) & 2147483647) % numReduceTasks;
}

public static int readUnsigned(byte[] bytes, int offset, int size) {
int res = 0;
for (int i = offset, n = offset + size; i < n; i++) {
res <<= 8;
res |= (int)(bytes[i]) & 0xFF;
}

return res;
}
}

}

SelfDefineSortableKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.xyueji.traffic.dict;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* Modified from the SelfDefineSortableKey in https://github.com/apache/kylin
*/
public class SelfDefineSortableKey implements WritableComparable<SelfDefineSortableKey> {

public enum TypeFlag {
NONE_NUMERIC_TYPE,
INTEGER_FAMILY_TYPE,
DOUBLE_FAMILY_TYPE
}

private byte typeId = (byte) TypeFlag.NONE_NUMERIC_TYPE.ordinal(); //non-numeric(0000 0000) int(0000 0001) other numberic(0000 0010)

private Text rawKey;

private Object keyInObj;

public SelfDefineSortableKey() {
}

public SelfDefineSortableKey(Text key)
{
init(key, (byte) TypeFlag.NONE_NUMERIC_TYPE.ordinal());
}

public void init(Text key, byte typeId) {
this.typeId = typeId;
this.rawKey = key;
this.keyInObj = key;
}

public void init(Text key)
{
init(key, (byte) TypeFlag.NONE_NUMERIC_TYPE.ordinal());
}

@Override
public int compareTo(SelfDefineSortableKey o) {
if (this.typeId != o.typeId) {
throw new IllegalStateException("Error. Incompatible types");
}

return ((Text) this.keyInObj).compareTo(((Text) o.keyInObj));
}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeByte(typeId);
rawKey.write(dataOutput);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
this.typeId = dataInput.readByte();
Text inputKey = new Text();
inputKey.readFields(dataInput);
init(inputKey, typeId);
}

public short getTypeId()
{
return typeId;
}

public Text getText() {
return rawKey;
}

public void setTypeId(byte typeId)
{
this.typeId = typeId;
}
}

GlobalDictionaryBuilder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package com.xyueji.traffic.dict;

import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.dict.AppendTrieDictionary;
import org.apache.kylin.dict.global.AppendTrieDictionaryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* @author xiongzhigang
* @date 2021-07-28 15:18
* @description
*/
public class GlobalDictionaryBuilder {
private static final Logger log = LoggerFactory.getLogger(GlobalDictionaryBuilder.class);

private AppendTrieDictionaryBuilder builder;
private DistributedLock lock;
private String lockPath;
private long counter;
private static final String EMPTY_FILL = "all";
private static final String CONNECT_CHAR = "_";

/**
* 初始化appendtrie,因需并发操作hdfs同目录,则需使用分布式锁,此处采用zk实现。
*
* @param datasource
* @param table
* @param column
* @param zkHosts
* @param zkBase
*/
public void init(String datasource, String table, String column, String zkHosts, String zkBase) {
datasource = StringUtils.isEmpty(datasource) ? EMPTY_FILL : datasource;
table = StringUtils.isEmpty(table) ? EMPTY_FILL : table;
column = StringUtils.isEmpty(column) ? EMPTY_FILL : column;

this.lockPath = datasource + CONNECT_CHAR + table + CONNECT_CHAR + column;

// 获取锁
this.lock = new ZookeeperDistributedLock.Factory(zkHosts, zkBase).lockForCurrentThread();
lock.lock(getLockPath(this.lockPath), Long.MAX_VALUE);

// build in kylin job server
String hdfsDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();

String baseDir = hdfsDir + "/resources/GlobalDict/dict/"
+ datasource + "/" + table + "/" + column;
int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
try {
this.builder = new AppendTrieDictionaryBuilder(baseDir, maxEntriesPerSlice, true);
} catch (Exception e) {
this.lock.unlock(lockPath);
throw new RuntimeException(String.format("Failed to create global dictionary on %s", lockPath), e);
}
}

public boolean addValue(String value) {
counter++;
if (counter % 1_000_000 == 0) {
if (lock.lock(getLockPath(lockPath))) {
log.info("processed {} values for {}", counter, lockPath);
} else {
throw new RuntimeException("Failed to create global dictionary on " + lockPath + " This client doesn't keep the lock");
}
}

if (value == null) {
return false;
}

try {
this.builder.addValue(value);
} catch (Throwable e) {
lock.unlock(getLockPath(lockPath));
throw new RuntimeException(String.format("Failed to create global dictionary on %s ", lockPath), e);
}

return true;
}

public AppendTrieDictionary build() throws IOException {
try {
if (lock.lock(getLockPath(lockPath))) {
return this.builder.build(0);
} else {
throw new RuntimeException("Failed to create global dictionary on " + lockPath + " This client doesn't keep the lock");
}
} finally {
lock.unlock(getLockPath(lockPath));
}
}

private String getLockPath(String pathName) {
return "/dict/" + pathName;
}

}
分布式锁

DistributedLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.xyueji.traffic.dict;

/**
* Modified from the DistributedLock in https://github.com/apache/kylin
* <p>
* A distributed lock. Every instance is owned by a client, on whose behalf locks are acquired and/or released.
* <p>
* Implementation must ensure all <code>lockPath</code> will be prefix-ed with "/kylin/metadata-prefix" automatically.
*/
public interface DistributedLock {

/**
* Returns the client that owns this instance.
*/
String getClient();

/**
* Acquire the lock at given path, non-blocking.
*
* @return If the lock is acquired or not.
*/
boolean lock(String lockPath);

/**
* Acquire the lock at given path, block until given timeout.
*
* @return If the lock is acquired or not.
*/
boolean lock(String lockPath, long timeout);

/**
* Returns if lock is available at given path.
*/
boolean isLocked(String lockPath);

/**
* Returns if lock is available at given path.
*/
boolean isLockedByMe(String lockPath);

/**
* Returns the owner of a lock path; returns null if the path is not locked by any one.
*/
String peekLock(String lockPath);

/**
* Unlock the lock at given path.
*
* @throws IllegalStateException if the client is not holding the lock.
*/
void unlock(String lockPath) throws IllegalStateException;

/**
* Purge all locks under given path. For clean up.
*/
void purgeLocks(String lockPathRoot);
}

ZookeeperDistributedLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
package com.xyueji.traffic.dict;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.management.ManagementFactory;
import java.nio.charset.Charset;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;

/**
* @author xiongzhigang
* @date 2021-07-28 15:32
* @description zk分布式锁
* Modified from the ZookeeperDistributedLock in https://github.com/apache/kylin
*/
public class ZookeeperDistributedLock implements DistributedLock {
private static final Logger log = LoggerFactory.getLogger(ZookeeperDistributedLock.class);

private final CuratorFramework curator;
private final String zkPathBase;
private final String client;
private final byte[] clientBytes;

private ZookeeperDistributedLock(CuratorFramework curator, String zkPathBase, String client) {
if (client == null) {
throw new NullPointerException("client must not be null");
}

if (zkPathBase == null) {
throw new NullPointerException("zkPathBase must not be null");
}

this.curator = curator;
this.zkPathBase = zkPathBase;
this.client = client;
this.clientBytes = client.getBytes(Charset.forName("UTF-8"));
}

/**
* 封装curator工厂
*/
public static class Factory {
private static final ConcurrentHashMap<String, CuratorFramework> CACHE = new ConcurrentHashMap<>();
private final String zkPathBase;
private final CuratorFramework curator;

static {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
for (CuratorFramework curator : CACHE.values()) {
try {
curator.close();
} catch (Exception e) {
log.error("closing curator failed", e);
}
}
}
}));
}

private static CuratorFramework getZkClient(String zkHosts) {
CuratorFramework zkClient = CACHE.get(zkHosts);
if (zkClient == null) {
synchronized (ZookeeperDistributedLock.class) {
zkClient = CACHE.get(zkHosts);
if (zkClient == null) {
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
zkClient = CuratorFrameworkFactory.builder()
.connectString(zkHosts)
.sessionTimeoutMs(120000)
.connectionTimeoutMs(15000)
.retryPolicy(retryPolicy)
.build();

zkClient.start();
CACHE.put(zkHosts, zkClient);
if (CACHE.size() > 1) {
log.warn("More than one singleton exist");
}
}
}
}

return zkClient;
}

public Factory(String zkHosts, String zkBase) {
this.curator = getZkClient(zkHosts);
this.zkPathBase = fixFlash(zkBase + "/" + "global_lock");
}

public DistributedLock lockForClient(String client) {
return new ZookeeperDistributedLock(this.curator, this.zkPathBase, client);
}

public DistributedLock lockForCurrentThread() {
return lockForClient(threadProcessAndHost());
}

private static String threadProcessAndHost() {
return Thread.currentThread().getId() + "-" + processAndHost();
}

private static String processAndHost() {
return ManagementFactory.getRuntimeMXBean().getName();
}
}

@Override
public String getClient() {
return this.client;
}

@Override
public boolean lock(String lockPath) {
lockPath = normalize(lockPath);

log.debug(client + " trying to lock " + lockPath);
try {
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(lockPath, clientBytes);
} catch (KeeperException.NodeExistsException e) {
log.debug(client + " see " + lockPath + " is already locked");
} catch (Exception e) {
throw new RuntimeException("Error while " + client + " trying to lock " + lockPath, e);
}

String lockOwner = peekLock(lockPath);
if (client.equals(lockOwner)) {
log.info(client + " acquired lock at " + lockPath);
return true;
} else {
log.debug(client + " failed to acquire lock at " + lockPath + ", which is held by " + lockOwner);
return false;
}
}

@Override
public boolean lock(String lockPath, long timeout) {
lockPath = normalize(lockPath);

if (lock(lockPath)) {
return true;
}

if (timeout == 0) {
timeout = Long.MAX_VALUE;
}

log.debug(client + " will wait for lock path " + lockPath);
long waitStart = System.currentTimeMillis();
Random random = ThreadLocalRandom.current();
// 10 seconds
long sleep = 10 * 1000;

while (System.currentTimeMillis() - waitStart <= timeout) {
try {
Thread.sleep((long) (1000 + sleep * random.nextDouble()));
} catch (InterruptedException e) {
return false;
}

if (lock(lockPath)) {
log.debug(client + " waited " + (System.currentTimeMillis() - waitStart) + " ms for lock path " + lockPath);
return true;
}
}

// timeout
return false;
}

@Override
public boolean isLocked(String lockPath) {
return peekLock(lockPath) != null;
}

@Override
public boolean isLockedByMe(String lockPath) {
return client.equals(peekLock(lockPath));
}

@Override
public String peekLock(String lockPath) {
lockPath = normalize(lockPath);

lockPath = normalize(lockPath);

try {
byte[] bytes = curator.getData().forPath(lockPath);
return new String(bytes, Charset.forName("UTF-8"));
} catch (KeeperException.NoNodeException ex) {
return null;
} catch (Exception ex) {
throw new RuntimeException("Error while peeking at " + lockPath, ex);
}
}

@Override
public void unlock(String lockPath) throws IllegalStateException {
lockPath = normalize(lockPath);

log.debug(client + " trying to unlock " + lockPath);

String owner = peekLock(lockPath);
if (owner == null) {
throw new IllegalStateException(client + " cannot unlock path " + lockPath + " which is not locked currently");
}
if (!client.equals(owner)) {
throw new IllegalStateException(client + " cannot unlock path " + lockPath + " which is locked by " + owner);
}

try {
curator.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath);

log.info(client + " released lock at " + lockPath);

} catch (Exception ex) {
throw new RuntimeException("Error while " + client + " trying to unlock " + lockPath, ex);
}
}

@Override
public void purgeLocks(String lockPathRoot) {
lockPathRoot = normalize(lockPathRoot);

try {
curator.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPathRoot);

log.info(client + " purged all locks under " + lockPathRoot);

} catch (Exception ex) {
throw new RuntimeException("Error while " + client + " trying to purge " + lockPathRoot, ex);
}
}

private String normalize(String lockPath) {
if (!lockPath.startsWith(zkPathBase)) {
lockPath = zkPathBase + (lockPath.startsWith("/") ? "" : "/") + lockPath;
}

return fixFlash(lockPath);
}

private static String fixFlash(String path) {
if (!path.startsWith("/")) {
path = "/" + path;
}

if (path.endsWith("/")) {
path = path.substring(0, path.length() - 1);
}

for (int n = Integer.MAX_VALUE; n > path.length(); ) {
n = path.length();
path = path.replaceAll("//", "/");
}

return path;
}
}

精准去重-举例

此处举个使用精准去重的例子,我们使用flink消费kafka数据入ClickHouse的过程中按天去重。

使用字典

自定义MapFunction,把字符串通过字典转为编码id。

1
2
3
4
5
6
7
8
9
10
11
class BitIndexBuilderMap(topic: String) extends RichMapFunction[String, (String, Int)] {
var dict: AppendTrieDictionary[String] = _

override def open(parameters: Configuration): Unit = {
dict = HdfsDao.flinkReadDict("traffic", topic)
}

override def map(in: String): (String, Int) = {
(in, dict.getIdFromValue(in))
}
}

构建Bitmap

自定义ProcessFunction,此处我们使用性能较好的RoaringBitmap构建Bitmap,并且在第二天0时清空Bitmap。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.xyueji.traffic.task.function

import com.xyueji.traffic.task.dao.HdfsDao
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import org.apache.kylin.dict.AppendTrieDictionary
import org.roaringbitmap.buffer.MutableRoaringBitmap

class BitDistinctFunction(topic: String = "") extends KeyedProcessFunction[Int, (String, Int), String] {
val dict: AppendTrieDictionary[String] = HdfsDao.flinkReadDict("traffic", topic)
var bitmap: MutableRoaringBitmap = _

override def open(parameters: Configuration): Unit = {
bitmap = new MutableRoaringBitmap()
}

override def processElement(i: (String, Int), context: KeyedProcessFunction[Int, (String, Int), String]#Context, out: Collector[String]): Unit = {
if (!bitmap.contains(i._2)) {
bitmap.add(i._2)
out.collect(i._1)
} else {
println("repeat data is : " + i._1)
}

context.timerService().registerEventTimeTimer(tomorrowZeroTimestampMs(System.currentTimeMillis(), 8) + 1)
}

override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Int, (String, Int), String]#OnTimerContext, out: Collector[String]): Unit = {
bitmap.clear()
}

/**
* 根据当前时间戳获取第二天0时0分0秒的时间戳
*
* @return
*/
def tomorrowZeroTimestampMs(now: Long, timeZone: Int): Long = now - (now + timeZone * 3600000) % 86400000 + 86400000
}

自定义Sink

自定义Sink入ClickHouse:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.xyueji.traffic.task.dao

import com.xyueji.traffic.core.bean.Constants
import com.xyueji.traffic.task.sink.CkSinkBuilderSinkFunction
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink}
import org.apache.flink.streaming.api.functions.sink.SinkFunction


/**
* @author xiongzhigang
* @date 2021-07-15 15:10
* @description clickhouse操作
*/
object ClickHouseDao {
def insert(sql: String): SinkFunction[Array[String]] = {
JdbcSink.sink(sql,
new CkSinkBuilderSinkFunction,
new JdbcExecutionOptions.Builder()
.withBatchSize(1000)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName(Constants.CLICKHOUSE_DRIVER)
.withUrl(Constants.CLICKHOUSE_URL)
.withUsername(Constants.CLICKHOUSE_USER)
.withPassword(Constants.CLICKHOUSE_PASSWORD)
.build()
)
}
}

任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.xyueji.traffic.task.service

import java.util.Properties

import com.xyueji.traffic.core.bean.Constants
import com.xyueji.traffic.core.util.{EnvUtil, PropertiesUtil}
import com.xyueji.traffic.task.dao.ClickHouseDao
import com.xyueji.traffic.task.function.{BitDistinctFunction, BitIndexBuilderMap}
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

/**
* @author xiongzhigang
* @date 2021-07-23 15:18
* @description
*/
class KafkaToClickHouse{
def toCkFromKafka(sql: String, topic: String): JobExecutionResult = {
val env = EnvUtil.getFlinkStreamEnv()
val properties = new Properties()
properties.setProperty(Constants.KAFKA_BOOTSTRAP_SERVERS, PropertiesUtil.getValue(Constants.KAFKA_BOOTSTRAP_SERVERS))
properties.setProperty(Constants.KAFKA_GROUP_ID, PropertiesUtil.getValue(Constants.KAFKA_GROUP_ID))


val consumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)
consumer.setStartFromEarliest()

// 以字典编码+BitMap做精准去重
val consumeStream = env
.addSource(consumer)
.map(new BitIndexBuilderMap(topic))
.keyBy(_._2)
.process(new BitDistinctFunction(topic))
.map(line => line.split(","))

// 以RocksDB做精准去重
// val consumeStream = env
// .addSource(consumer)
// .keyBy(_.mkString)
// .process(new RocksDBDistinctFunction)
// .map(line => line.split(","))


consumeStream.addSink(ClickHouseDao.insert(sql))
env.execute()
}
}

object KafkaToClickHouse {
def main(args: Array[String]): Unit = {
val sql =
"""
| insert into road_sect_speed (
| logtime,
| roadsect_id,
| gotime,
| gocount,
| golen,
| period
|) values (
| ?,
| ?,
| ?,
| ?,
| ?,
| ?
| )
""".stripMargin


new KafkaToClickHouse().toCkFromKafka(sql, Constants.ROAD_REAL_TIME_TABLE)
}
}

其他精准去重

  • 基于Redis自增id,实时生成编码,进行去重。
  • 使用Flink自带的RocksDB进行去重,此方式不适合大数据量。

此处我们提供RocksDB去重例子,RocksDBDistinctFunction:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.xyueji.traffic.task.function

import org.apache.flink.api.common.state.StateTtlConfig.{StateVisibility, UpdateType}
import org.apache.flink.api.common.state.{StateTtlConfig, ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector

/**
* @author xiongzhigang
* @date 2021-07-30 14:54
* @description
*/
class RocksDBDistinctFunction extends KeyedProcessFunction[String, String, String]{
private var existState: ValueState[Boolean] = _

override def open(parameters: Configuration): Unit = {
val stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1))
.setStateVisibility(StateVisibility.NeverReturnExpired)
.setUpdateType(UpdateType.OnCreateAndWrite)
.cleanupInRocksdbCompactFilter(10000)
.build()

val valueStateDesc = new ValueStateDescriptor("dict-state", Boolean.getClass)
valueStateDesc.enableTimeToLive(stateTtlConfig)

existState = this.getRuntimeContext.getState(valueStateDesc).asInstanceOf[ValueState[Boolean]]
}

override def processElement(i: String, context: KeyedProcessFunction[String, String, String]#Context, out: Collector[String]): Unit = {
if (existState.value() == null) {
existState.update(true)
out.collect(i)
} else {
println("repeat data is: " + i)
}
}
}

参考:

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. AppendTrie树
  2. 2. 构建字典任务
    1. 2.0.1. 分布式锁
  • 3. 精准去重-举例
  • ,