元数据平台-数据血缘

架构

image-20210903154320404

首先,我们先了解一下元数据平台架构,主流程是:SQL采集 –》 SQL解析 –》应用层。

  1. SQL采集:针对各种SQL查询引擎,编写相应的钩子函数进行SQL收集,收集内容有执行时间、执行耗时、执行用户、执行引擎、jobId和执行SQL等等,最后把SQL信息入Kafka。Hive是实现ExecuteWithHookContext接口,Presto是实现EventListener接口,Spark是实现SparkListner接口。
  2. SQL解析:Flink实时消费Kafka数据,进行SQL解析。解析SQL的过程为:定义词法规则和语法规则文件 –》 使用Antlr实现SQL词法和语法解析 –》生成AST语法树 –》遍历AST语法树,考虑到Presto和Spark的SQL语法类似,因此直接参考Hive底层源码实现SQL解析。解析完成后,把血缘信息和元数据信息分别入JanusGraph和ElasticSearch。
  1. 应用层:数据录入JanusGraph和ElasticSearch后,就可以进行血缘查询和元数据查询。然后通过graph节点的出度入度进行热点分析。

SQL采集

本文以Hive的SQL采集为例,编写自定义Hive Hook类,实现SQL采集。此Hook类可通过hive-site.xml配置实现SQL信息入Kafka或JDBC存储引擎。

SqlCollectionHook

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package com.xyueji.traffic.hive;

import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import com.xyueji.traffic.hive.core.SqlEntity;
import com.xyueji.traffic.hive.strategy.JdbcSqlCollectionStrategy;
import com.xyueji.traffic.hive.strategy.KafkaSqlCollectionStrategy;
import com.xyueji.traffic.hive.strategy.SqlCollectionStrategy;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.TaskRunner;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.session.SessionState;

import java.util.Date;
import java.util.List;
import java.util.Set;

/**
* @author xiongzhigang
* @date 2021-08-16 10:22
* @description
*/
public class SqlCollectionHook implements ExecuteWithHookContext {
private static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";

private static final String LINEAGE_SQL_STRATEGY = "lineage.sql.strategy";
private static final String LINEAGE_SQL_STRATEGY_JDBC = "jdbc";
private static final String LINEAGE_SQL_STRATEGY_KAFKA = "kafka";

@Override
public void run(HookContext hookContext) {
assert (hookContext.getHookType() == HookContext.HookType.POST_EXEC_HOOK);
QueryPlan plan = hookContext.getQueryPlan();
SessionState ss = SessionState.get();

try {

HiveConf conf = ss.getConf();
boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST);

// 获取hook类名
Set<String> postExecHooks = Sets.newHashSet(Splitter.on(",").trimResults()
.omitEmptyStrings()
.split(Strings.nullToEmpty(HiveConf.getVar(conf, HiveConf.ConfVars.POSTEXECHOOKS))));
if (!testMode && postExecHooks.contains(this.getClass().getName())) {
// Don't emit user/timestamp info in test mode,
// so that the test golden output file is fixed.

long queryTime = plan.getQueryStartTime().longValue();
if (queryTime == 0) {
queryTime = System.currentTimeMillis();
}

String logtime = DateFormatUtils.format(new Date(queryTime), DATE_TIME_FORMAT);
String sql = plan.getQueryStr();
String user = hookContext.getUgi().getUserName();
long timestamp = queryTime;
long duration = System.currentTimeMillis() - queryTime;

StringBuilder jobIdStr = new StringBuilder();
List<TaskRunner> tasks = hookContext.getCompleteTaskList();
if (tasks != null && !tasks.isEmpty()) {
for (TaskRunner task : tasks) {
String jobId = task.getTask().getJobID();
if (jobId != null) {
jobIdStr.append(jobId).append(",");
}
}
if (jobIdStr.length() > 0) {
jobIdStr.deleteCharAt(jobIdStr.length() - 1);
}
}
String jobIds = jobIdStr.toString();
String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
String database = ss.getCurrentDatabase();
String datasource = "hive";

SqlEntity sqlEntity = SqlEntity.newBuilder()
.logtime(logtime)
.sql(sql)
.user(user)
.timestamp(timestamp)
.duration(duration)
.jobids(jobIds)
.engine(engine)
.database(database)
.datasource(datasource)
.build();

SqlCollectionStrategy strategy = getStrategy(conf);
if (strategy != null) {
strategy.invoke(conf, sqlEntity);
}
}

} catch (Throwable t) {
// Don't fail the query just because of any lineage issue.
log("Failed to save lineage sql\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(t));
}
}

/**
* Logger an error to console if available.
*/
private void log(String error) {
SessionState.LogHelper console = SessionState.getConsole();
if (console != null) {
console.printError(error);
}
}

private SqlCollectionStrategy getStrategy(HiveConf conf) {
switch (conf.get(LINEAGE_SQL_STRATEGY)) {
case LINEAGE_SQL_STRATEGY_JDBC:
return new JdbcSqlCollectionStrategy();
case LINEAGE_SQL_STRATEGY_KAFKA:
return new KafkaSqlCollectionStrategy();
default:
return null;
}
}
}

SqlCollectionStrategy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.xyueji.traffic.hive.strategy;

import com.xyueji.traffic.hive.core.SqlEntity;
import org.apache.hadoop.hive.conf.HiveConf;

/**
* @author xiongzhigang
* @date 2021-08-17 10:27
* @description
*/
public interface SqlCollectionStrategy {
/**
* 执行策略
*
* @param conf HiveConf
* @param sqlEntity sql
* @return bool
*/
boolean invoke(HiveConf conf, SqlEntity sqlEntity);
}

KafkaSqlCollectionStrategy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.xyueji.traffic.hive.strategy;

import com.xyueji.traffic.hive.core.SqlEntity;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

/**
* @author xiongzhigang
* @date 2021-08-17 10:52
* @description
*/
public class KafkaSqlCollectionStrategy implements SqlCollectionStrategy {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSqlCollectionStrategy.class);

private static final String LINEAGE_BOOTSTRAP_SERVERS = "lineage.bootstrap.servers";
private static final String LINEAGE_TOPIC = "lineage.topic";
private static final ObjectMapper MAPPER = new ObjectMapper();

@Override
public boolean invoke(HiveConf conf, SqlEntity sqlEntity) {
Properties properties = new Properties();
properties.put("bootstrap.servers", conf.get(LINEAGE_BOOTSTRAP_SERVERS));
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
String topic = conf.get(LINEAGE_TOPIC);
producer.send(new ProducerRecord<>(topic, MAPPER.writeValueAsString(sqlEntity)));
return true;
} catch (Exception e) {
LOG.error("Error send linage sql to kafka, " + e.getMessage());
}

return false;
}
}

JdbcSqlCollectionStrategy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package com.xyueji.traffic.hive.strategy;

import com.xyueji.traffic.hive.core.SqlEntity;
import org.apache.hadoop.hive.conf.HiveConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.Properties;

/**
* @author xiongzhigang
* @date 2021-08-17 10:34
* @description
*/
public class JdbcSqlCollectionStrategy implements SqlCollectionStrategy {
private static final Logger LOG = LoggerFactory.getLogger(JdbcSqlCollectionStrategy.class);

private static final String LINEAGE_DB_DRIVER = "lineage.db.driver";
private static final String LINEAGE_DB_URL = "lineage.db.url";
private static final String LINEAGE_DB_USER = "lineage.db.user";
private static final String LINEAGE_DB_PASSWORD = "lineage.db.password";
private static final String LINEAGE_DB_TABLE = "lineage.db.table";

@Override
public boolean invoke(HiveConf conf, SqlEntity sqlEntity) {
Connection conn = null;
PreparedStatement pstmt = null;

try {
conn = getConnection(conf);
pstmt = conn.prepareStatement(getInsertLineageSql(conf));

pstmt.setString(1, sqlEntity.getLogtime());
pstmt.setString(2, sqlEntity.getSql());
pstmt.setString(3, sqlEntity.getUser());
pstmt.setLong(4, sqlEntity.getTimestamp());
pstmt.setLong(5, sqlEntity.getDuration());
pstmt.setString(6, sqlEntity.getJobids());
pstmt.setString(7, sqlEntity.getEngine());
pstmt.setString(8, sqlEntity.getDatabase());
pstmt.setString(9, sqlEntity.getDatasource());

return pstmt.execute();
} catch (Exception e) {
LOG.error("Error insert lineage sql table, " + e.getMessage());
} finally {
closeResources(conn, pstmt, null);
}
return false;
}

private Connection getConnection(HiveConf conf) throws Exception {
String jdbcDriver = conf.get(LINEAGE_DB_DRIVER);
Driver driver = (Driver) Class.forName(jdbcDriver).newInstance();
Properties prop = new Properties();
prop.setProperty("user", conf.get(LINEAGE_DB_USER));
prop.setProperty("password", conf.get(LINEAGE_DB_PASSWORD));
Connection conn = driver.connect(conf.get(LINEAGE_DB_URL), prop);
conn.setAutoCommit(true);
return conn;
}

private String getInsertLineageSql(HiveConf conf) {
String table = conf.get(LINEAGE_DB_TABLE);

return "insert into " + table
+ "(`logtime`, `sql`, `user`, `timestamp`, `duration`, `jobids`, `engine`, `database`, `datasource`) "
+ "values(?, ?, ?, ?, ?, ?, ?, ?, ?)";
}


private void closeResources(Connection conn, Statement stmt, ResultSet rs) {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
LOG.error("Error closing ResultSet: " + e.getMessage());
}
}

if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
System.err.println("Error closing Statement: " + e.getMessage());
}
}

if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
System.err.println("Error closing Connection: " + e.getMessage());
}
}
}
}

hive-site.xml

添加自定义Hook类配置,有两种SQL存储策略:Kafka和JDBC,选择一种配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<configuration>
<property>
<name>hive.exec.post.hooks</name>
<value>com.xyueji.traffic.hive.SqlCollectionHook</value>
<description>Comma-separated list of post-execution hooks to be invoked for each statement. A post-execution hook is specified as the name of a Java class which implements the org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext interface.</description>
</property>
<!--kafka存储策略-->
<property>
<name>lineage.sql.strategy</name>
<value>kafka</value>
</property>
<property>
<name>lineage.bootstrap.servers</name>
<value>localhost:9092</value>
</property>
<property>
<name>lineage.topic</name>
<value>lineage_sql</value>
</property>
<!--jdbc存储策略-->
<!--
<property>
<name>lineage.sql.strategy</name>
<value>jdbc</value>
</property>
<property>
<name>lineage.db.driver</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>lineage.db.url</name>
<value>jdbc:mysql://xxx:3306/traffic?useUnicode=true&characterEncoding=utf-8</value>
</property>
<property>
<name>lineage.db.user</name>
<value>xxx</value>
</property>
<property>
<name>lineage.db.password</name>
<value>xxx</value>
</property>
<property>
<name>lineage.db.table</name>
<value>lineage_sql</value>
</property>
-->
</configuration>

重启Hive metastore使Hive Hook生效

SQL解析

参考Hive源码模仿编写SQL解析工具,先经过类进行语法分析,再根据Schema生成执行计划QueryPlan。字段血缘可根据LineageLogger类进行实现,表血缘根据字段血缘获取。

SQLParse2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
package com.xyueji.traffic.hive;

import com.google.common.collect.Lists;
import com.google.gson.stream.JsonWriter;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections.SetUtils;
import org.apache.commons.io.output.StringBuilderWriter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ParseUtils;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.joor.Reflect;

import java.io.IOException;
import java.io.Serializable;
import java.util.*;

/**
* @author xiongzhigang
* @date 2021-08-11 11:58
* @description
*/
public class SQLParse2 implements Serializable {
private final static String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";

public final static class Edge implements Serializable {
private Set<Vertex> sources;
private Set<Vertex> targets;
private String expr;
private HiveOperation operation;

Edge(Set<Vertex> sources, Set<Vertex> targets, String expr, HiveOperation operation) {
this.sources = sources;
this.targets = targets;
this.expr = expr;
this.operation = operation;
}

public Set<Vertex> getSources() {
return sources;
}

public void setSources(Set<Vertex> sources) {
this.sources = sources;
}

public Set<Vertex> getTargets() {
return targets;
}

public void setTargets(Set<Vertex> targets) {
this.targets = targets;
}

public String getExpr() {
return expr;
}

public void setExpr(String expr) {
this.expr = expr;
}

public HiveOperation getOperation() {
return operation;
}

public void setOperation(HiveOperation operation) {
this.operation = operation;
}
}

public final static class Vertex implements Serializable {
public enum Type {
COLUMN, TABLE
}

public enum Status {
OFFLINE, ONLINE
}

private Vertex.Type type;
private String createAt;
private String db;
private String owner;
private String table;
private String tableType;
private String col;
private String colType;
private Vertex.Status status;
private String md5;

Vertex(Type type, String createAt, String db, String owner, String table, String tableType, Status status, String md5) {
this.type = type;
this.createAt = createAt;
this.db = db;
this.owner = owner;
this.table = table;
this.tableType = tableType;
this.status = status;
this.md5 = md5;
}

Vertex(Type type, String createAt, String db, String owner, String table, String tableType, String col, String colType, Status status, String md5) {
this(type, createAt, db, owner, table, tableType, status, md5);
this.col = col;
this.colType = colType;
}

@Override
public int hashCode() {
int result = 0;

result = result * 31 + type.hashCode();
result = result * 31 + db.hashCode();
result = result * 31 + table.hashCode();
result = result * 31 + (col == null ? 0 : col.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof SQLParse2.Vertex)) {
return false;
}
SQLParse2.Vertex vertex = (SQLParse2.Vertex) obj;

if (type != null && !type.equals(vertex.type)) {
return false;
}

if (db != null && !db.equals(vertex.db)) {
return false;
}

if (table != null && !table.equals(vertex.table)) {
return false;
}

if (col != null && !col.equals(vertex.col)) {
return false;
}

return true;
}

public Type getType() {
return type;
}

public void setType(Type type) {
this.type = type;
}

public String getCreateAt() {
return createAt;
}

public void setCreateAt(String createAt) {
this.createAt = createAt;
}

public String getDb() {
return db;
}

public void setDb(String db) {
this.db = db;
}

public String getOwner() {
return owner;
}

public void setOwner(String owner) {
this.owner = owner;
}

public String getTable() {
return table;
}

public void setTable(String table) {
this.table = table;
}

public String getTableType() {
return tableType;
}

public void setTableType(String tableType) {
this.tableType = tableType;
}

public String getCol() {
return col;
}

public void setCol(String col) {
this.col = col;
}

public String getColType() {
return colType;
}

public void setColType(String colType) {
this.colType = colType;
}

public Status getStatus() {
return status;
}

public void setStatus(Status status) {
this.status = status;
}

public String getMd5() {
return md5;
}

public void setMd5(String md5) {
this.md5 = md5;
}
}

private static HiveConf conf = new HiveConf();

private void init() {
conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
conf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
conf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://hadoop3:9083");
conf.setVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR, "/tmp/hive");
conf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, "/tmp/user/hive/warehouse");
conf.setVar(HiveConf.ConfVars.LOCALSCRATCHDIR, "/tmp");
conf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "org.apache.hadoop.hive.ql.hooks.PostExecutePrinter");

SessionState.start(conf);
}

public static void main(String[] args) {
String sql = "insert overwrite table temp select coalesce(blockid, EXPONENT) blockid_exponent from ods.road_real_time group by blockid, EXPONENT limit 10";
SQLParse2 sqlParse2 = new SQLParse2();
sqlParse2.run(sql);
}

public List<Edge> run(String query) {
init();
List<Edge> edges = new ArrayList<>();
try {
QueryState queryState = new QueryState(conf);
Context context = new Context(conf);

context.setHDFSCleanup(true);

ASTNode astNode = ParseUtils.parse(query, context);

BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, astNode);

sem.analyze(astNode, context);

Schema schema = Reflect.on(Driver.class).call("getSchema", sem, conf).get();
QueryPlan plan = new QueryPlan(query, sem, 0L, null, queryState.getHiveOperation(), schema);

Index index = SessionState.get() != null ?
SessionState.get().getLineageState().getIndex() : new Index();

edges = getEdges(plan, index);
Set<Vertex> vertices = getVertices(edges);

StringBuilderWriter out = new StringBuilderWriter(1024);
JsonWriter writer = new JsonWriter(out);
writer.beginObject();
writeEdges(writer, edges);
writeVertices(writer, vertices);
writer.endObject();
writer.close();
System.out.println(out.toString());

// 此处还可构建任务血缘,根据hive(ExecuteWithHookContext)、spark(SparkListener)、presto(EventListener)收集的sql以及任务信息(可参考LineageLogger)
// TODO 构建任务血缘

// TODO 根据DDL操作更新图信息

} catch (Exception e) {
log("SQL parse failed, msg is : "
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
}

return edges;
}

/**
* Logger an error to console if available.
*/
private void log(String error) {
SessionState.LogHelper console = SessionState.getConsole();
if (console != null) {
console.printError(error);
}
}

/**
* Based on the final select operator, find out all the target columns.
* For each target column, find out its sources based on the dependency index.
*/
private List<Edge> getEdges(QueryPlan plan, Index index) {
LinkedHashMap<String, ObjectPair<SelectOperator,
Table>> finalSelOps = index.getFinalSelectOps();
Map<String, Vertex> vertexCache = new LinkedHashMap<String, Vertex>();
List<Edge> edges = new ArrayList<Edge>();
for (ObjectPair<SelectOperator,
org.apache.hadoop.hive.ql.metadata.Table> pair : finalSelOps.values()) {
List<FieldSchema> fieldSchemas = new ArrayList<>();
SelectOperator finalSelOp = pair.getFirst();
org.apache.hadoop.hive.ql.metadata.Table t = pair.getSecond();
List<String> colNames = null;
if (t != null) {
fieldSchemas = t.getCols();
} else {
// Based on the plan outputs, find out the target table name and column names.
for (WriteEntity output : plan.getOutputs()) {
Entity.Type entityType = output.getType();
if (entityType == Entity.Type.TABLE
|| entityType == Entity.Type.PARTITION) {
t = output.getTable();
fieldSchemas = t.getCols();
List<FieldSchema> cols = t.getCols();
if (cols != null && !cols.isEmpty()) {
colNames = Utilities.getColumnNamesFromFieldSchema(cols);
}
break;
}
}
}
Map<ColumnInfo, LineageInfo.Dependency> colMap = index.getDependencies(finalSelOp);
List<LineageInfo.Dependency> dependencies = colMap != null ? Lists.newArrayList(colMap.values()) : null;
int fields = fieldSchemas.size();
if (t != null && colMap != null && fields < colMap.size()) {
// Dynamic partition keys should be added to field schemas.
List<FieldSchema> partitionKeys = t.getPartitionKeys();
int dynamicKeyCount = colMap.size() - fields;
int keyOffset = partitionKeys.size() - dynamicKeyCount;
if (keyOffset >= 0) {
fields += dynamicKeyCount;
for (int i = 0; i < dynamicKeyCount; i++) {
FieldSchema field = partitionKeys.get(keyOffset + i);
fieldSchemas.add(field);
if (colNames != null) {
colNames.add(field.getName());
}
}
}
}
if (dependencies == null || dependencies.size() != fields) {
log("Result schema has " + fields
+ " fields, but we don't get as many dependencies");
} else {
// Go through each target column, generate the lineage edges.
Set<Vertex> targets = new LinkedHashSet<Vertex>();
for (int i = 0; i < fields; i++) {
Vertex target = getOrCreateVertex(vertexCache,
t.getTTable(), fieldSchemas.get(i),
Vertex.Type.COLUMN);

targets.add(target);

LineageInfo.Dependency dep = dependencies.get(i);
addEdge(vertexCache, edges, dep.getBaseCols(), target,
dep.getExpr(), plan.getOperation());

Vertex tTarget = getOrCreateVertex(vertexCache,
t.getTTable(), null,
Vertex.Type.TABLE);
targets.add(target);
for (LineageInfo.BaseColumnInfo baseCol : dep.getBaseCols()) {
baseCol.setColumn(null);
}
addEdge(vertexCache, edges, dep.getBaseCols(), tTarget,
plan.getQueryString(), plan.getOperation());

}
Set<LineageInfo.Predicate> conds = index.getPredicates(finalSelOp);
if (conds != null && !conds.isEmpty()) {
for (LineageInfo.Predicate cond : conds) {
addEdge(vertexCache, edges, cond.getBaseCols(),
new LinkedHashSet<Vertex>(targets), cond.getExpr(), plan.getOperation());
}
}
}
}
return edges;
}

private void addEdge(Map<String, Vertex> vertexCache, List<Edge> edges,
Set<LineageInfo.BaseColumnInfo> srcCols, Vertex target, String expr, HiveOperation operation) {
Set<Vertex> targets = new LinkedHashSet<Vertex>();
targets.add(target);
addEdge(vertexCache, edges, srcCols, targets, expr, operation);
}

/**
* Find an edge from all edges that has the same source vertices.
* If found, add the more targets to this edge's target vertex list.
* Otherwise, create a new edge and add to edge list.
*/
private void addEdge(Map<String, Vertex> vertexCache, List<Edge> edges,
Set<LineageInfo.BaseColumnInfo> srcCols, Set<Vertex> targets, String expr, HiveOperation operation) {
Set<Vertex> sources = createSourceVertices(vertexCache, srcCols);
Edge edge = findSimilarEdgeBySources(edges, sources, expr, operation);
if (edge == null) {
edges.add(new Edge(sources, targets, expr, operation));
} else {
edge.targets.addAll(targets);
}
}

/**
* Convert a list of columns to a set of vertices.
* Use cached vertices if possible.
*/
private Set<Vertex> createSourceVertices(
Map<String, Vertex> vertexCache, Collection<LineageInfo.BaseColumnInfo> baseCols) {
Set<Vertex> sources = new LinkedHashSet<Vertex>();
if (baseCols != null && !baseCols.isEmpty()) {
for (LineageInfo.BaseColumnInfo col : baseCols) {
org.apache.hadoop.hive.metastore.api.Table table = col.getTabAlias().getTable();
if (table.isTemporary()) {
// Ignore temporary tables
continue;
}
Vertex.Type type = Vertex.Type.TABLE;
FieldSchema fieldSchema = col.getColumn();
if (fieldSchema != null) {
type = Vertex.Type.COLUMN;
}

sources.add(getOrCreateVertex(vertexCache, table, fieldSchema, type));
}
}
return sources;
}

/**
* Find a vertex from a cache, or create one if not.
*/
private Vertex getOrCreateVertex(
Map<String, Vertex> vertices, org.apache.hadoop.hive.metastore.api.Table t, FieldSchema fieldSchema, Vertex.Type type) {
Vertex vertex;
if (type.equals(Vertex.Type.TABLE)) {
String key = getMd5Hash(type.name() + "." + t.getDbName() + "." + t.getTableName());
vertex = vertices.get(key);
if (vertex == null) {
vertex = new Vertex(type, DateFormatUtils.format(new Date(t.getCreateTime() * 1000L), DATE_FORMAT), t.getDbName(),
t.getOwner(), t.getTableName(), t.getTableType(), Vertex.Status.ONLINE, key);
vertices.put(key, vertex);
}
} else {
String key = getMd5Hash(type.name() + "." + t.getDbName() + "." + t.getTableName() + "." + fieldSchema.getName());
vertex = vertices.get(key);
if (vertex == null) {
vertex = new Vertex(type, DateFormatUtils.format(new Date(t.getCreateTime() * 1000L), DATE_FORMAT), t.getDbName(),
t.getOwner(), t.getTableName(), t.getTableType(), fieldSchema.getName(), fieldSchema.getType(), Vertex.Status.ONLINE, key);
vertices.put(key, vertex);
}
}

return vertex;
}

/**
* Find an edge that has the same type, expression, and sources.
*/
private Edge findSimilarEdgeBySources(
List<Edge> edges, Set<Vertex> sources, String expr, HiveOperation operation) {
for (Edge edge : edges) {
if (edge.operation == operation && StringUtils.equals(edge.expr, expr)
&& SetUtils.isEqualSet(edge.sources, sources)) {
return edge;
}
}
return null;
}

/**
* Get all the vertices of all edges. Targets at first,
* then sources. Assign id to each vertex.
*/
public Set<Vertex> getVertices(List<Edge> edges) {
Set<Vertex> vertices = new LinkedHashSet<Vertex>();
for (Edge edge : edges) {
vertices.addAll(edge.targets);
}
for (Edge edge : edges) {
vertices.addAll(edge.sources);
}

return vertices;
}

/**
* Generate query string md5 hash.
*/
private String getMd5Hash(String key) {
return DigestUtils.md5Hex(key);
}

/**
* Write out an JSON array of edges.
*/
private void writeEdges(JsonWriter writer, List<SQLParse2.Edge> edges) throws IOException {
writer.name("edges");
writer.beginArray();
for (SQLParse2.Edge edge : edges) {
writer.beginObject();
writer.name("sources");
writer.beginArray();
for (SQLParse2.Vertex vertex : edge.sources) {
writer.value(vertex.md5);
}
writer.endArray();
writer.name("targets");
writer.beginArray();
for (SQLParse2.Vertex vertex : edge.targets) {
writer.value(vertex.md5);
}
writer.endArray();
if (edge.expr != null) {
writer.name("expr").value(edge.expr);
}
writer.name("operation").value(edge.operation.name());
writer.endObject();
}
writer.endArray();
}

/**
* Write out an JSON array of vertices.
*/
private void writeVertices(JsonWriter writer, Set<SQLParse2.Vertex> vertices) throws IOException {
writer.name("vertices");
writer.beginArray();
for (SQLParse2.Vertex vertex : vertices) {
writer.beginObject();
writer.name("md5").value(vertex.md5);
writer.name("type").value(vertex.type.name());
writer.name("createAt").value(vertex.createAt);
writer.name("db").value(vertex.db);
writer.name("owner").value(vertex.owner);
writer.name("table").value(vertex.table);
writer.name("tableType").value(vertex.tableType);
writer.name("col").value(vertex.col);
writer.name("colType").value(vertex.colType);
writer.name("status").value(vertex.status.name());
writer.endObject();
}
writer.endArray();
}
}

注意事项

  • 开启血缘信息:查看hive源码发现hive的hive.exec.post.hooks配置需要包含org.apache.hadoop.hive.ql.hooks.PostExecutePrinter、org.apache.hadoop.hive.ql.hooks.LineageLogger或org.apache.atlas.hive.hook.HiveHook中任意一个Hook函数,SessionState中才会有血缘关系。因此可以在配置中加入其中任意一个Hook配置:conf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, “org.apache.hadoop.hive.ql.hooks.PostExecutePrinter”)。

相关代码可以参见SemanticAnalyzer类:

image-20210903170253930

  • 过滤表/视图存在与否检测:修改SemanticAnalyzer中检查表/视图存在与否的代码,重新打包,使SQL解析工具使用这个hive-exec-2.3.7.jar这个包。不然SQL解析过程中会报错。
  • 开启动态分区非严格模式:conf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, “nonstrict”)

Flink任务

有了SQL解析工具后,就可以编写Flink程序消费Kafka数据解析SQL,然后录入JanusGraph。本文只提供录入JanusGraph方案,录入ElasticSearch请自行实现。

LineageTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.xyueji.traffic.task.service

import java.time.Duration
import java.util.Properties

import com.xyueji.traffic.core.bean.Constants
import com.xyueji.traffic.core.util.{EnvUtil, PropertiesUtil}
import com.xyueji.traffic.hive.core.SqlEntity
import com.xyueji.traffic.task.function.{LineageBuildMapFunction, LineageProcessWindowFunction}
import com.xyueji.traffic.task.sink.LineageToGraphSinkFunction
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.codehaus.jackson.map.ObjectMapper

/**
* @author xiongzhigang
* @date 2021-08-17 14:07
* @description
*/
class LineageTask {

def buildGraph(topic: String, configurationFile: String, remoteTraversalSourceName: String): JobExecutionResult = {
val env = EnvUtil.getFlinkStreamEnv()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val properties = new Properties()
properties.setProperty(Constants.KAFKA_BOOTSTRAP_SERVERS, PropertiesUtil.getValue(Constants.KAFKA_BOOTSTRAP_SERVERS))
properties.setProperty(Constants.KAFKA_GROUP_ID, PropertiesUtil.getValue(Constants.KAFKA_GROUP_ID))

val consumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)
val consumerStream = env.addSource(consumer)
.map(new RichMapFunction[String, SqlEntity] {
private var MAPPER: ObjectMapper = _

override def open(parameters: Configuration): Unit = {
MAPPER = new ObjectMapper
}

override def map(element: String): SqlEntity = {
MAPPER.readValue[SqlEntity](element, classOf[SqlEntity])
}
})

val watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness[SqlEntity](Duration.ofSeconds(10))
.withTimestampAssigner(new SerializableTimestampAssigner[SqlEntity] {
override def extractTimestamp(sqlEntity: SqlEntity, l: Long): Long = {
println(sqlEntity.getTimestamp)
sqlEntity.getTimestamp
}
})

val watermarkStream = consumerStream.assignTimestampsAndWatermarks(watermarkStrategy)

val outPutTag = new OutputTag[SqlEntity]("late-data") {}
val lineageStream = watermarkStream.
keyBy(_.getDatabase)
.timeWindow(Time.seconds(20))
.allowedLateness(Time.seconds(5))
.sideOutputLateData(outPutTag)
.trigger(EventTimeTrigger.create())
.process(new LineageProcessWindowFunction)
.map(new LineageBuildMapFunction)

lineageStream.addSink(new LineageToGraphSinkFunction(configurationFile, remoteTraversalSourceName))
//获取迟到太久的数据
val sideOutput = watermarkStream.getSideOutput[SqlEntity](outPutTag)
val sideOutPutStream = sideOutput.map(new LineageBuildMapFunction)
sideOutPutStream.addSink(new LineageToGraphSinkFunction(configurationFile, remoteTraversalSourceName))

env.execute()
}
}

object LineageTask {
def main(args: Array[String]): Unit = {
new LineageTask().buildGraph("lineage_sql", "remote.yaml", "hive_traversal")
}
}
SqlEntity
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package com.xyueji.traffic.hive.core;

import java.io.Serializable;

/**
* @author xiongzhigang
* @date 2021-08-17 10:43
* @description
*/
public class SqlEntity implements Serializable {
private String logtime;
private String sql;
private String user;
private long timestamp;
private long duration;
private String jobids;
private String engine;
private String database;
private String datasource;

public SqlEntity() {
}

public SqlEntity(String logtime, String sql, String user, long timestamp, long duration, String jobids, String engine, String database, String datasource) {
this.logtime = logtime;
this.sql = sql;
this.user = user;
this.timestamp = timestamp;
this.duration = duration;
this.jobids = jobids;
this.engine = engine;
this.database = database;
this.datasource = datasource;
}

public static Builder newBuilder() {
return new Builder();
}

static public class Builder {
private String logtime;
private String sql;
private String user;
private long timestamp;
private long duration;
private String jobids;
private String engine;
private String database;
private String datasource;

private Builder() {

}

public Builder logtime(String logtime) {
this.logtime = logtime;
return this;
}

public Builder sql(String sql) {
this.sql = sql;
return this;
}

public Builder user(String user) {
this.user = user;
return this;
}

public Builder timestamp(long timestamp) {
this.timestamp = timestamp;
return this;
}

public Builder duration(long duration) {
this.duration = duration;
return this;
}

public Builder jobids(String jobids) {
this.jobids = jobids;
return this;
}

public Builder engine(String engine) {
this.engine = engine;
return this;
}

public Builder database(String database) {
this.database = database;
return this;
}

public Builder datasource(String datasource) {
this.datasource = datasource;
return this;
}

public SqlEntity build() {
return new SqlEntity(this.logtime, this.sql, this.user, this.timestamp, this.duration
, this.jobids, this.engine, this.database, this.datasource);
}
}

public String getLogtime() {
return logtime;
}

public void setLogtime(String logtime) {
this.logtime = logtime;
}

public String getSql() {
return sql;
}

public void setSql(String sql) {
this.sql = sql;
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}

public long getTimestamp() {
return timestamp;
}

public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}

public long getDuration() {
return duration;
}

public void setDuration(long duration) {
this.duration = duration;
}

public String getJobids() {
return jobids;
}

public void setJobids(String jobids) {
this.jobids = jobids;
}

public String getEngine() {
return engine;
}

public void setEngine(String engine) {
this.engine = engine;
}

public String getDatabase() {
return database;
}

public void setDatabase(String database) {
this.database = database;
}

public String getDatasource() {
return datasource;
}

public void setDatasource(String datasource) {
this.datasource = datasource;
}
}
LineageProcessWindowFunction
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.xyueji.traffic.task.function

import com.xyueji.traffic.hive.core.SqlEntity
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.Sorting

/**
* @author xiongzhigang
* @date 2021-08-17 15:34
* @description
*/
class LineageProcessWindowFunction() extends ProcessWindowFunction[SqlEntity, SqlEntity, String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[SqlEntity], out: Collector[SqlEntity]): Unit = {
val arrBuf = ArrayBuffer[Long]()
val map = new mutable.HashMap[Long, SqlEntity]()

//组装结果
elements.map(line => {
arrBuf.append(line.getTimestamp)
map.put(line.getTimestamp, line)
})

val arr = arrBuf.toArray
//对窗口内的数据按照时间进行排序
Sorting.quickSort(arr)
val arrayLength = arr.length
for (i <- 0 until arrayLength) {
val timestamp = arr(i)
val sqlEntity = map(timestamp)

out.collect(sqlEntity)
}
}
}
LineageBuildMapFunction
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.xyueji.traffic.task.function

import java.util

import com.xyueji.traffic.hive.SQLParse2
import com.xyueji.traffic.hive.core.SqlEntity
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration

/**
* @author xiongzhigang
* @date 2021-08-17 16:18
* @description
*/
class LineageBuildMapFunction extends RichMapFunction[SqlEntity, java.util.List[SQLParse2.Edge]]{
var sqlParse2: SQLParse2 = _

override def open(parameters: Configuration): Unit = {
sqlParse2 = new SQLParse2
}

override def map(sqlEntity: SqlEntity): util.List[SQLParse2.Edge] = {
sqlParse2.run(sqlEntity.getSql)
}
}
LineageToGraphSinkFunction
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package com.xyueji.traffic.task.sink

import java.util

import com.xyueji.traffic.hive.SQLParse2
import org.apache.commons.lang3.StringUtils
import org.apache.flink.configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.util.SerializableObject
import org.apache.tinkerpop.gremlin.driver.Cluster
import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource
import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph
import org.apache.tinkerpop.gremlin.structure.{Graph, Vertex}

import scala.collection.JavaConversions._
import scala.util.control.Breaks._

/**
* @author xiongzhigang
* @date 2021-08-17 16:18
* @description
*/
class LineageToGraphSinkFunction(configurationFile: String, remoteTraversalSourceName: String, maxNumRetries: Int = 3) extends RichSinkFunction[java.util.List[SQLParse2.Edge]] {
var cluster: Cluster = _
var graph: Graph = _
var g: GraphTraversalSource = _
val lock: SerializableObject = new SerializableObject
var isRunning: Boolean = true
var retries: Int = 0

/**
* 创建连接
*/
def createConnection(): Unit = {
//连接Gremlin Server
cluster = Cluster.open(configurationFile)

graph = EmptyGraph.instance
g = graph.traversal.withRemote(DriverRemoteConnection.using(cluster, remoteTraversalSourceName))
}

override def open(parameters: configuration.Configuration): Unit = {
try
this.lock synchronized this.createConnection()

catch {
case var5: Exception =>
throw new Exception("Cannot connect to gremlin server", var5)
}
}

override def invoke(edges: util.List[SQLParse2.Edge]): Unit = {
try {
addElements(edges)
} catch {
case var12: Exception =>
if (this.maxNumRetries == 0) throw new Exception("Failed to insert elements to graph. Connection re-tries are not enabled.", var12)
this.lock synchronized {
var lastException: Exception = null
this.retries = 0

breakable {
while (this.isRunning) {
if (!this.isRunning || this.maxNumRetries >= 0 && this.retries >= this.maxNumRetries) {
if (this.isRunning) throw new Exception("Failed to insert elements to graph. Failed after " + this.retries + " retries.", lastException)
this.isRunning = false
break() //todo: break is not supported
}
try
if (g != null) g.close()
catch {
case var9: Exception =>
println("Could not close g", var9)
}
try
if (cluster != null) cluster.close()
catch {
case var8: Exception =>
println("Could not close cluster", var8)
}
graph = null
this.retries += 1
try {
this.createConnection()
addElements(edges)
} catch {
case var10: Exception =>
lastException = var10
println("Re-connect to gremlin server failed. Retry time(s): " + this.retries, var10)
this.lock.wait(500L)
}
}
}
}
}
}

def addElements(edges: util.List[SQLParse2.Edge]): Unit = {

for (edge <- edges) {
for (source <- edge.getSources) {
val fromV: Vertex = getVertex(source)
for (target <- edge.getTargets) {
val toV: Vertex = getVertex(target)

buildEdge(edge, fromV, toV)
}
}
}
}

def buildEdge(e: SQLParse2.Edge, fromV: Vertex, toV: Vertex): Unit = {
if (StringUtils.isNotEmpty(e.getExpr)) {
g.addE(e.getOperation.getOperationName).from(fromV).to(toV).property("expr", e.getExpr).next()
} else {
g.addE(e.getOperation.getOperationName).from(fromV).to(toV).next()
}
}

def getVertex(v: SQLParse2.Vertex): Vertex = {
val exist = g.V().has("md5", v.getMd5).hasNext
if (!exist) {
v.getType match {
case SQLParse2.Vertex.Type.TABLE => return g.addV(v.getType.name())
.property("md5", v.getMd5)
.property("createAt", v.getCreateAt)
.property("db", v.getDb)
.property("owner", v.getOwner)
.property("table", v.getTable)
.property("tableType", v.getTableType)
.property("status", v.getStatus.name())
.next()

case SQLParse2.Vertex.Type.COLUMN => return g.addV(v.getType.name())
.property("md5", v.getMd5)
.property("createAt", v.getCreateAt)
.property("db", v.getDb)
.property("owner", v.getOwner)
.property("table", v.getTable)
.property("tableType", v.getTableType)
.property("col", v.getCol)
.property("colType", v.getColType)
.property("status", v.getStatus.name())
.next()
}
}

g.V().has("md5", v.getMd5).next()
}

override def close(): Unit = {
this.isRunning = false
this.lock synchronized {
this.lock.notifyAll()

try {
if (g != null) { // this closes the remote, no need to close the empty graph
g.close()
}
if (cluster != null) { // the cluster closes all of its clients
cluster.close()
}
} finally {
g = null
graph = null
cluster = null
}
}
}
}
remote.yaml
1
2
3
4
5
6
7
8
9
10
hosts: [hadoop2]
port: 8182
username: bigdata
password: 123456
serializer: {
className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0,
config: {
ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry]
}
}

数据血缘查询

数据录入JanusGraph后就可以通过连接GremlinServer来查询血缘数据。本文编写一个spark程序例子,查询GremlinServer生成供Echarts使用的图json文件。

GraphToJsonTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package spark.com.xyueji.traffic.service

import java.util

import com.xyueji.traffic.core.util.EnvUtil
import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import spark.com.xyueji.traffic.bean.{Categories, Edge, Vertex}
import spark.com.xyueji.traffic.dao.GraphDao

import scala.collection.JavaConverters._
import scala.util.Random

/**
* @author xiongzhigang
* @date 2021-08-24 11:49
* @description
*/
class GraphToJsonTask {
val graphDao: GraphDao = new GraphDao("remote.yaml", "hive_traversal")

def save(): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("GraphToJson")
// 设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[Vertex], classOf[Edge], classOf[Categories]))

val spark = EnvUtil.getSparkEnv(conf)

import spark.implicits._

val sc = spark.sparkContext

val vertexes = graphDao.listAllV()
val labels = new util.HashSet[String]
vertexes.foreach(v => {
labels.add(v.get("label").mkString)
})
val bothVertexes = graphDao.listAllBothVertex

graphDao.close()
val vertexesRdd = sc.parallelize(vertexes)
val labelList = labels.asScala.toList
val labelsRdd = sc.parallelize(labelList)
val bothVertexesRdd = sc.parallelize(bothVertexes)

val vertexesDF = vertexesRdd.map(v => {
val id = v.get("id").mkString
val db = v.get("db").mkString
val table = v.get("table").mkString
val owner = v.get("owner").mkString
val tableType = v.get("tableType").mkString
val col = v.get("col").mkString
val colType = v.get("colType").mkString
val status = v.get("status").mkString

val category = v.get("label").mkString
val createAt = v.get("createAt").mkString

new Tuple10[String, String, String, String, String, String, String, String, String, String](id, db, table, owner, tableType, col, colType, status, category, createAt)
}).toDF("id", "db", "table", "owner", "tableType", "col", "colType", "status", "category", "createAt")

vertexesDF.createOrReplaceTempView("t_vertex")

val bothVertexesDF = bothVertexesRdd.map(bothV => {
val label = bothV._3.get("label").mkString
val expr = bothV._3.get("expr").mkString
new Tuple4[String, String, String, String](bothV._1.id().toString, bothV._2.id().toString, label, expr)
}).toDF("source", "target", "label", "expr")
bothVertexesDF.createOrReplaceTempView("t_edge")

val categoryDF = labelsRdd.toDF("category")
categoryDF.createOrReplaceTempView("t_category")

val nodes = spark.sql(
"""
| select v.id, v.db, v.table, v.owner, v.tableType, v.col, v.colType, v.status, v.createAt, c.id - 1 category
| from t_vertex v
| left join
| (
| select row_number() over(order by category) id, category
| from t_category
| ) c on v.category = c.category
|
""".stripMargin)
.collect()
.map(r => {
val id = r.getString(0)
val db = r.getString(1)
val table = r.getString(2)
val owner = r.getString(3)
val tableType = r.getString(4)
val col = r.getString(5)
val colType = r.getString(6)
val status = r.getString(7)
val createAt = r.getString(8)
val category = r.getInt(9)
val name = if (StringUtils.isEmpty(col)) db + "." + table else db + "." + table + "." + col
val w = 1000
val h = 800
val x = Random.nextInt(w)
val y = Random.nextInt(h)

Vertex(id,
name,
db,
table,
owner,
tableType,
col,
colType,
status,
createAt,
category,
x,
y
)
})

val links = spark.sql(
"""
| select source, target, label, expr
| from t_edge
""".stripMargin)
.collect()
.map(r => {
val source = r.getString(0)
val target = r.getString(1)
val label = r.getString(2)
val expr = r.getString(3)

Edge(source,
target,
label,
expr
)
})

val categories = spark.sql(
"""
| select category name
| from t_category
| order by category
""".stripMargin)
.collect()
.map(r => {
val name = r.getString(0)

Categories(name)
})

sc.parallelize(Array((nodes, links, categories)))
.toDF("nodes", "links", "categories")
.createOrReplaceTempView("t")

spark.sql(
"""
| select *
| from t
""".stripMargin)
.coalesce(1)
.write.mode(SaveMode.Overwrite)
.json("/graph")

spark.stop()
}
}

object GraphToJsonTask {
def main(args: Array[String]): Unit = {
new GraphToJsonService().save()
}
}

GraphDao

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package spark.com.xyueji.traffic.dao

import java.util

import org.apache.tinkerpop.gremlin.driver.Cluster
import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource
import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph
import org.apache.tinkerpop.gremlin.structure.{Edge, Graph, Vertex}

import scala.collection.JavaConverters._

/**
* @author xiongzhigang
* @date 2021-07-07 10:50
* @description
*/
class GraphDao {
private var cluster: Cluster = _
private var graph: Graph = _
private var g: GraphTraversalSource = _

def this(configurationFile: String, remoteTraversalSourceName: String) {
this()
createConnection(configurationFile, remoteTraversalSourceName)
}

/**
* 创建连接
*/
def createConnection(configurationFile: String, remoteTraversalSourceName: String): Unit = {
//连接Gremlin Server
cluster = Cluster.open(configurationFile)

graph = EmptyGraph.instance
g = graph.traversal.withRemote(DriverRemoteConnection.using(cluster, remoteTraversalSourceName))
}

def close(): Unit = {
try {
if (g != null) { // this closes the remote, no need to close the empty graph
g.close()
}
if (cluster != null) { // the cluster closes all of its clients
cluster.close()
}
} finally {
g = null
graph = null
cluster = null
}
}

def getGraph: GraphTraversalSource = g

def getV(id: AnyRef): Vertex = {
g.V(id).next()
}

def getV(key: String, value: String): Vertex = {
g.V().has(key, value).next()
}

def getProperties(id: AnyRef): Map[String, String] = {
val props = g.V(id).properties()
val label = getV(id).label()

val res = new util.HashMap[String, String]()
res.put("id", id.toString)
res.put("label", label)
while (props.hasNext) {
val prop = props.next()
res.put(prop.key(), prop.value().toString)
}

res.asScala.toMap
}

def getEdgeProperties(id: AnyRef): Map[String, String] = {
val props = g.E(id).properties()
val label = getE(id).label()

val res = new util.HashMap[String, String]()
res.put("id", id.toString)
res.put("label", label)
while (props.hasNext) {
val prop = props.next()
res.put(prop.key(), prop.value().toString)
}

res.asScala.toMap
}

def listAllVertexIds(): util.List[AnyRef] = {
g.V().id().toList
}

def listAllV(): List[Map[String, String]] = {
val ids = listAllVertexIds()
val res = new util.ArrayList[Map[String, String]]()
ids.asScala.foreach(id => {
val map = getProperties(id)
res.add(map)
})
res.asScala.toList
}

def getE(id: AnyRef): Edge = {
g.E(id).next()
}

def listAllEdgeIds: util.List[AnyRef] = {
g.E().id().toList
}

def getBothVertexByEdge(edge: Edge): (Vertex, Vertex, Map[String, String]) = {
(edge.outVertex(), edge.inVertex(), getEdgeProperties(edge.id()))
}

def listAllBothVertex: List[(Vertex, Vertex, Map[String, String])] = {
val ids = listAllEdgeIds
val res = new util.ArrayList[(Vertex, Vertex, Map[String, String])]()
ids.asScala.foreach(id => {
val edge = getE(id)
res.add(getBothVertexByEdge(edge))
})

res.asScala.toList
}
}

生成json文件后,Echarts就可以直接使用

Echarts文件

参考:https://github.com/xyueji/traffic/blob/master/graph.html

效果图(https://xyueji.github.io/traffic/graph.html):

image-20210902172517522

参考:

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. 架构
  2. 2. SQL采集
    1. 2.1. SqlCollectionHook
    2. 2.2. SqlCollectionStrategy
    3. 2.3. KafkaSqlCollectionStrategy
    4. 2.4. JdbcSqlCollectionStrategy
    5. 2.5. hive-site.xml
  3. 3. SQL解析
    1. 3.1. SQLParse2
    2. 3.2. Flink任务
      1. 3.2.1. LineageTask
      2. 3.2.2. SqlEntity
      3. 3.2.3. LineageProcessWindowFunction
      4. 3.2.4. LineageBuildMapFunction
      5. 3.2.5. LineageToGraphSinkFunction
      6. 3.2.6. remote.yaml
  4. 4. 数据血缘查询
    1. 4.1. GraphToJsonTask
    2. 4.2. GraphDao
    3. 4.3. Echarts文件
    4. 4.4. 效果图(https://xyueji.github.io/traffic/graph.html):
,