大数据监控-Metrics2录入指标到OpenTSDB

Hadoop Metrics2是一个收集各种指标的监控工具,可应用于Hadoop、HBase、Kylin等大数据组件监控指标收集。主要分为3个主要部分:MetricsSystemImpl、Source和Sink,其中Source和Sink都可以自定义。本文讲述如何自定义Sink,来收集指标入OpenTSDB。
首先来看看这3个部分的代码结构:

MetricsSystemImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// 1. 注册source和sink
private synchronized void configure(String prefix) {
config = MetricsConfig.create(prefix);
configureSinks();
configureSources();
configureSystem();
}

public synchronized void start() {
checkNotNull(prefix, "prefix");
if (monitoring) {
LOG.warn(prefix +" metrics system already started!",
new MetricsException("Illegal start"));
return;
}
for (Callback cb : callbacks) cb.preStart();
for (Callback cb : namedCallbacks.values()) cb.preStart();
configure(prefix);
// 2. 启动一个定时器
startTimer();
monitoring = true;
LOG.info(prefix +" metrics system started");
for (Callback cb : callbacks) cb.postStart();
for (Callback cb : namedCallbacks.values()) cb.postStart();
}

private synchronized void startTimer() {
if (timer != null) {
LOG.warn(prefix +" metrics system timer already started!");
return;
}
logicalTime = 0;
long millis = period;
timer = new Timer("Timer for '"+ prefix +"' metrics system", true);
// 3. 根据period定时执行metrics任务
timer.scheduleAtFixedRate(new TimerTask() {
public void run() {
try {
onTimerEvent();
} catch (Exception e) {
LOG.warn("Error invoking metrics timer", e);
}
}
}, millis, millis);
LOG.info("Scheduled Metric snapshot period at " + (period / 1000)
+ " second(s).");
}

// 4. 从所有的sources中收集metrics
public synchronized MetricsBuffer sampleMetrics() {
collector.clear();
MetricsBufferBuilder bufferBuilder = new MetricsBufferBuilder();

for (Entry<String, MetricsSourceAdapter> entry : sources.entrySet()) {
if (sourceFilter == null || sourceFilter.accepts(entry.getKey())) {
snapshotMetrics(entry.getValue(), bufferBuilder);
}
}
if (publishSelfMetrics) {
snapshotMetrics(sysSource, bufferBuilder);
}
MetricsBuffer buffer = bufferBuilder.get();
return buffer;
}

// 5. 发送metrics到所有的sinks
synchronized void onTimerEvent() {
logicalTime += period;
if (sinks.size() > 0) {
publishMetrics(sampleMetrics(), false);
}
}

Source

实现MetricsSource接口以自定义Source

1
2
3
4
5
6
7
8
public interface MetricsSource {
/**
* Get metrics from the source
* @param collector to contain the resulting metrics snapshot
* @param all if true, return all metrics even if unchanged.
*/
void getMetrics(MetricsCollector collector, boolean all);
}

JvmMetrics为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder rb = collector.addRecord(JvmMetrics)
.setContext("jvm").tag(ProcessName, processName)
.tag(SessionId, sessionId);
// 内存指标
getMemoryUsage(rb);
// GC指标
getGcUsage(rb);
// 线程指标
getThreadUsage(rb);
// 事件(LogFatal、LogError、LogWarn、LogInfo)计数指标
getEventCounters(rb);
}

Sink

实现MetricsSink以自定义Sink

1
2
3
4
5
6
7
8
9
10
11
12
public interface MetricsSink extends MetricsPlugin {
/**
* Put a metrics record in the sink
* @param record the record to put
*/
void putMetrics(MetricsRecord record);

/**
* Flush any buffered metrics
*/
void flush();
}

以FileSink为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class FileSink implements MetricsSink, Closeable {
private static final String FILENAME_KEY = "filename";
private PrintStream writer;

@Override
public void init(SubsetConfiguration conf) {
String filename = conf.getString(FILENAME_KEY);
try {
// 1. 初始化打印流
writer = filename == null ? System.out
: new PrintStream(new FileOutputStream(new File(filename)),
true, "UTF-8");
} catch (Exception e) {
throw new MetricsException("Error creating "+ filename, e);
}
}

// 2. 写入指标
@Override
public void putMetrics(MetricsRecord record) {
writer.print(record.timestamp());
writer.print(" ");
writer.print(record.context());
writer.print(".");
writer.print(record.name());
String separator = ": ";
for (MetricsTag tag : record.tags()) {
writer.print(separator);
separator = ", ";
writer.print(tag.name());
writer.print("=");
writer.print(tag.value());
}
for (AbstractMetric metric : record.metrics()) {
writer.print(separator);
separator = ", ";
writer.print(metric.name());
writer.print("=");
writer.print(metric.value());
}
writer.println();
}

@Override
public void flush() {
writer.flush();
}

@Override
public void close() throws IOException {
writer.close();
}
}

自定义OpenTSDBHttpSink

自定义sink把指标收集到OpenTSDB以实现集群监控,我们以http的方式录入指标到OpenTSDB,因此首先写一个HttpUtils类:

HttpUtils

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package org.apache.hadoop.metrics2.sink.opentsdb;

import com.google.gson.JsonElement;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.AuthSchemes;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.X509Certificate;
import java.util.Arrays;

/**
* http utils
*/
public class HttpUtils {
public static final Logger logger = LoggerFactory.getLogger(HttpUtils.class);

private HttpUtils() {
throw new UnsupportedOperationException("Construct HttpUtils");
}

public static CloseableHttpClient getInstance() {
return HttpClientInstance.httpClient;
}

private static class HttpClientInstance {
private static final CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(cm).setDefaultRequestConfig(requestConfig).build();
}

private static String paramJson;
private static PoolingHttpClientConnectionManager cm;
private static SSLContext ctx = null;
private static SSLConnectionSocketFactory socketFactory;
private static RequestConfig requestConfig;
private static Registry<ConnectionSocketFactory> socketFactoryRegistry;
private static X509TrustManager xtm = new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) {
}

@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) {
}

@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}
};

static {
try {
ctx = SSLContext.getInstance(SSLConnectionSocketFactory.TLS);
ctx.init(null, new TrustManager[]{xtm}, null);
} catch (NoSuchAlgorithmException e) {
logger.error("SSLContext init with NoSuchAlgorithmException", e);
} catch (KeyManagementException e) {
logger.error("SSLContext init with KeyManagementException", e);
}
socketFactory = new SSLConnectionSocketFactory(ctx, NoopHostnameVerifier.INSTANCE);
/** set timeout、request time、socket timeout */
requestConfig = RequestConfig.custom().setCookieSpec(CookieSpecs.IGNORE_COOKIES)
.setExpectContinueEnabled(Boolean.TRUE)
.setTargetPreferredAuthSchemes(Arrays.asList(AuthSchemes.NTLM, AuthSchemes.DIGEST))
.setProxyPreferredAuthSchemes(Arrays.asList(AuthSchemes.BASIC))
.setConnectTimeout(60 * 1000).setSocketTimeout(60 * 1000)
.setConnectionRequestTimeout(60 * 1000).setRedirectsEnabled(true)
.build();
socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.INSTANCE).register("https", socketFactory).build();
cm = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
cm.setDefaultMaxPerRoute(60);
cm.setMaxTotal(100);
}

/**
* get http request content
*
* @param url url
* @return http get request response content
*/
public static String get(String url) {
CloseableHttpClient httpclient = HttpUtils.getInstance();
HttpGet httpget = new HttpGet(url);
return getResponseContentString(httpget, httpclient);
}

/**
* get http response content
*
* @param httpget httpget
* @param httpClient httpClient
* @return http get request response content
*/
public static String getResponseContentString(HttpGet httpget, CloseableHttpClient httpClient) {
String responseContent = null;
CloseableHttpResponse response = null;
try {
response = httpClient.execute(httpget);
// check response status is 200
if (response.getStatusLine().getStatusCode() == 200) {
HttpEntity entity = response.getEntity();
if (entity != null) {
responseContent = EntityUtils.toString(entity, "UTF-8");
} else {
logger.warn("http entity is null");
}
} else {
logger.error("http get:{} response status code is not 200!", response.getStatusLine().getStatusCode());
}
} catch (IOException ioe) {
logger.error(ioe.getMessage(), ioe);
} finally {
try {
if (response != null) {
EntityUtils.consume(response.getEntity());
response.close();
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
if (!httpget.isAborted()) {
httpget.releaseConnection();
httpget.abort();
}
}
return responseContent;
}

/**
* post http
*
* @param url
* @return http post request response content
*/
public static String post(String url, JsonElement json) {
CloseableHttpClient httpclient = HttpUtils.getInstance();
HttpPost httpPost = new HttpPost(url);
httpPost.addHeader("Content-Type", "application/json;charset=UTF-8");
// 解决中文乱码问题
StringEntity stringEntity = new StringEntity(json.toString(), "UTF-8");
stringEntity.setContentEncoding("UTF-8");
paramJson = json.toString();

httpPost.setEntity(stringEntity);
return post(httpPost, httpclient);
}

/**
* post http
*
* @param httpPost httpget
* @param httpClient httpClient
* @return http post request response content
*/
public static String post(HttpPost httpPost, CloseableHttpClient httpClient) {
String responseContent = null;
try {
ResponseHandler<String> responseHandler = new BasicResponseHandler();

httpClient = HttpClients.createDefault();
responseContent = httpClient.execute(httpPost, responseHandler);

} catch (IOException ioe) {
logger.error("put opentsdb failed, body is {}", paramJson);
} finally {
if (!httpPost.isAborted()) {
httpPost.releaseConnection();
httpPost.abort();
}
}
return responseContent;
}
}

OpenTSDBHttpSink

在hadoop-common-project/hadoop-common下新建一个org.apache.hadoop.metrics2.sink.opentsdb.OpenTSDBHttpSink类,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package org.apache.hadoop.metrics2.sink.opentsdb;

import com.google.gson.JsonObject;
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.hadoop.metrics2.util.MetricsCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Map;

/**
* @author xiongzhigang
* @date 2021-08-02 16:44
* @description
*/
public class OpenTSDBHttpSink implements MetricsSink {
private final Logger LOG = LoggerFactory.getLogger(this.getClass());
public static final boolean SUPPORT_SPARSE_METRICS_DEFAULT = false;
public static final String SUPPORT_SPARSE_METRICS_PROPERTY = "supportsparse";
private boolean supportSparseMetrics = SUPPORT_SPARSE_METRICS_DEFAULT;
private MetricsCache metricsCache = new MetricsCache();
private static final String METRIC = "metric";
private static final String TIMESTAMP = "timestamp";
private static final String VALUE = "value";
private static final String TAGS = "tags";
private static final String SERVERS_PROPERTY = "servers";
private static final String CONTEXT = "Context";
private static final String HOSTANME = "Hostname";
private static final String CLUSTERID = "clusterId";
private static final String HTTP_PREFIX = "http";
private static final String SLASH = "/";
private static final String PUT = "/api/put";
private String metricsServers;

@Override
public void init(SubsetConfiguration conf) {
// see if sparseMetrics is supported. Default is false
supportSparseMetrics = conf.getBoolean(SUPPORT_SPARSE_METRICS_PROPERTY,
SUPPORT_SPARSE_METRICS_DEFAULT);
metricsServers = parseServer(conf.getString(SERVERS_PROPERTY));
}

private String parseServer(String servers) {
if (StringUtils.isNotEmpty(servers)) {
if (!servers.startsWith(HTTP_PREFIX)) {
if (servers.startsWith(SLASH)) {
servers = servers.substring(1);
}
servers = HTTP_PREFIX + ":" + SLASH + SLASH + servers;
}
}
return servers + PUT;
}

/**
* 打入指标
*
* @param record the record to put
*/
@Override
public void putMetrics(MetricsRecord record) {
// The method handles both cases whether Ganglia support dense publish
// of metrics of sparse (only on change) publish of metrics
String recordName = record.name();
String contextName = record.context();
StringBuilder sb = new StringBuilder();
sb.append(contextName);
sb.append('.');
sb.append(recordName);
sb.append('.');
int sbBaseLen = sb.length();
MetricsCache.Record cachedMetrics;
if (!isSupportSparseMetrics()) {
// for sending dense metrics, update metrics cache
// and get the updated data
cachedMetrics = metricsCache.update(record);
if (cachedMetrics != null && cachedMetrics.metricsEntrySet() != null) {
for (Map.Entry<String, AbstractMetric> entry : cachedMetrics
.metricsEntrySet()) {
AbstractMetric metric = entry.getValue();
sb.append(metric.name());
String name = sb.toString();
// send metric to opentsdb
emitMetric(name, metric.value(), record.timestamp(), record.tags());

// reset the length of the buffer for next iteration
sb.setLength(sbBaseLen);
}
}
} else {
// we support sparse updates
Collection<AbstractMetric> metrics = (Collection<AbstractMetric>) record
.metrics();
if (metrics.size() > 0) {
// we got metrics. so send the latest
for (AbstractMetric metric : record.metrics()) {
sb.append(metric.name());
String name = sb.toString();
// send metric to opentsdb
emitMetric(name, metric.value(), record.timestamp(), record.tags());

// reset the length of the buffer for next iteration
sb.setLength(sbBaseLen);
}
}
}
}

protected void emitMetric(String name, Number value, long timestamp, Collection<MetricsTag> tags) {
JsonObject params = new JsonObject();
params.addProperty(METRIC, name);
params.addProperty(VALUE, value);
params.addProperty(TIMESTAMP, timestamp);

JsonObject tagsJson = getTags(tags);
if (tagsJson.entrySet().size() > 0) {
params.add(TAGS, tagsJson);
}

// LOG.info("put metric to openTSDB, data is: {}", params.toString());
HttpUtils.post(metricsServers, params);
}

private JsonObject getTags(Collection<MetricsTag> tags) {
JsonObject tagsJson = new JsonObject();
if (tags != null && tags.size() > 0) {
// opentsdb tags应该尽量少,因此值存储必要的tag
for (MetricsTag next : tags) {
String name = next.name();
String value = next.value();

if (StringUtils.isNotEmpty(name) && StringUtils.isNotEmpty(value)) {
if (HOSTANME.equals(name) || CONTEXT.equals(name) || CLUSTERID.equals(name)) {
tagsJson.addProperty(name, value);
}
}
}
}
return tagsJson;
}

/**
* @return whether sparse metrics are supported
*/
protected boolean isSupportSparseMetrics() {
return supportSparseMetrics;
}

@Override
public void flush() {
}
}

hadoop-metrics2.properties

修改hadoop-metrics2.properties内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# syntax: [prefix].[source|sink].[instance].[options]
# See javadoc of package-info.java for org.apache.hadoop.metrics2 for details

*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
# default sampling period, in seconds
*.period=10

# The namenode-metrics.out will contain metrics from all context
#namenode.sink.file.filename=namenode-metrics.out
# Specifying a special sampling period for namenode:
#namenode.sink.*.period=8

#datanode.sink.file.filename=datanode-metrics.out

#resourcemanager.sink.file.filename=resourcemanager-metrics.out

#nodemanager.sink.file.filename=nodemanager-metrics.out

#mrappmaster.sink.file.filename=mrappmaster-metrics.out

#jobhistoryserver.sink.file.filename=jobhistoryserver-metrics.out

# the following example split metrics of different
# context to different sinks (in this case files)
#nodemanager.sink.file_jvm.class=org.apache.hadoop.metrics2.sink.FileSink
#nodemanager.sink.file_jvm.context=jvm
#nodemanager.sink.file_jvm.filename=nodemanager-jvm-metrics.out
#nodemanager.sink.file_mapred.class=org.apache.hadoop.metrics2.sink.FileSink
#nodemanager.sink.file_mapred.context=mapred
#nodemanager.sink.file_mapred.filename=nodemanager-mapred-metrics.out

#
# Below are for sending metrics to Ganglia
#
# for Ganglia 3.0 support
# *.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink30
#
# for Ganglia 3.1 support
# *.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31

# *.sink.ganglia.period=10

# default for supportsparse is false
# *.sink.ganglia.supportsparse=true

#*.sink.ganglia.slope=jvm.metrics.gcCount=zero,jvm.metrics.memHeapUsedM=both
#*.sink.ganglia.dmax=jvm.metrics.threadsBlocked=70,jvm.metrics.memHeapUsedM=40

# Tag values to use for the ganglia prefix. If not defined no tags are used.
# If '*' all tags are used. If specifiying multiple tags separate them with
# commas. Note that the last segment of the property name is the context name.
#
#*.sink.ganglia.tagsForPrefix.jvm=ProcesName
#*.sink.ganglia.tagsForPrefix.dfs=
#*.sink.ganglia.tagsForPrefix.rpc=
#*.sink.ganglia.tagsForPrefix.mapred=

#namenode.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649

#datanode.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649

#resourcemanager.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649

#nodemanager.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649

#mrappmaster.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649

#jobhistoryserver.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649

######## OpenTSDB Collect Metrics #########
*.sink.opentsdb.class=org.apache.hadoop.metrics2.sink.opentsdb.OpenTSDBHttpSink
namenode.sink.opentsdb.servers=opentsdb地址
datanode.sink.opentsdb.servers=opentsdb地址
*.period=60
*.source.filter.class=org.apache.hadoop.metrics2.filter.RegexFilter
*.record.filter.class=${*.source.filter.class}
*.metric.filter.class=${*.source.filter.class}

hbase.sink.opentsdb.source.filter.include=^(.*IPC.*)$
hbase.sink.opentsdb.source.filter.exclude=^(.*ClassPath.*)|(MetaHlog\\w+)|(.*\\.jar.*)|(.*SystemProperties.*)|(.*Balancer.*)|(.*AssignmentManager.*)|(.*Control.*)|(.*Stats.*)|(.*FileSystem.*)|(.*Procedure.*)|(.*Coprocessor.*)|(.*Memory.*)$
#hbase.sink.opentsdb.metric.filter.include=^(.*Master.*)|(.*Server.*)|(.*JvmMetrics.*)|(.*WAL.*)|(.*RegionServer.*)|(.*Regions.*)$
hbase.sink.opentsdb.servers=opentsdb地址
hbase.sink.opentsdb.period=60

部署服务

  1. 打包编译hadoop,编译教程见https://segmentfault.com/a/1190000019146236
  2. 编译完成后,进入hadoop-dist/target/hadoop-2.9.2拷贝.//share/hadoop/common/hadoop-common-2.9.2.jar到hadoop集群的各个节点。
  3. 拷贝hadoop-metrics2.properties文件到各个hadoop节点。
  4. 重启hadoop集群,就可看见监控指标每分钟打入OpenTSDB。

    安装OpenTSDB

  5. 编译,在build目录会生成tsdb和tsdb-2.4.0.jar文件,注意:若直接./build.sh报错,则需要把third_party目录下的文件拷到build。

    1
    2
    3
    4
    5
    6
    wget https://github.com/OpenTSDB/opentsdb/releases/download/v2.4.0/opentsdb-2.4.0.tar.gz
    tar -zxf opentsdb-2.4.0.tar.gz
    cd opentsdb-2.4.0
    mkdir build
    cp -r third_party build/
    ./build.sh
  6. 修改src/opentsdb.conf,主要内容如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    tsd.network.port = 4399
    tsd.network.bind = 0.0.0.0
    tsd.http.staticroot = /home/bigdata/tsdb/staticroot
    tsd.http.cachedir = /home/bigdata/tsdb/cachedir

    # tsdb数据表,rowkey: <加盐(一个字节)><metric><timestamp><tagN><tagV>,其中timestamp取得是整小时时间戳,此处metric、tagN和tagV都是uid形式。
    tsd.storage.hbase.data_table = opentsdb:tsdb
    # uid映射表,分为两种:uid->字符串和字符串->uid
    tsd.storage.hbase.uid_table = opentsdb:tsdb-uid
    # 树形表,以树状层次关系来表示metric的结构
    tsd.storage.hbase.tree_table = opentsdb:tsdb-tree
    # 元数据表,用来存储时间序列索引和元数据的表
    tsd.storage.hbase.meta_table = opentsdb:tsdb-meta

    tsd.storage.hbase.zk_quorum = hadoop1
  7. 启动服务

    1
    2
    3
    4
    ./build/tsdb tsd --config=src/opentsdb.conf

    # 后台启动
    nohup ./build/tsdb tsd --config=src/opentsdb.conf >> logs/opentsdb-out.log 2>&1 &

安装Grafana

1
2
3
4
wget https://dl.grafana.com/oss/release/grafana-8.0.6.linux-amd64.tar.gz
tar -zxf grafana-8.0.6.linux-amd64.tar.gz

## 配置grafana,启动

配置监控指标

指标可参考:https://intl.cloud.tencent.com/zh/document/product/1026/36879进行配置

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. MetricsSystemImpl
  2. 2. Source
  3. 3. Sink
  • 自定义OpenTSDBHttpSink
    1. 1. HttpUtils
    2. 2. OpenTSDBHttpSink
    3. 3. hadoop-metrics2.properties
    4. 4. 部署服务
  • 安装OpenTSDB
  • 安装Grafana
  • 配置监控指标
  • ,