Kubernetes集群监控之pushgateway

[TOC]

参考

简介

Pushgateway 是 Prometheus 生态中一个重要工具,使用它的原因主要是:

  • Prometheus 采用 pull 模式,可能由于不在一个子网或者防火墙原因,导致 Prometheus 无法直接拉取各个 target 数据。
  • 在监控业务数据的时候,需要将不同数据汇总, 由 Prometheus 统一收集。

由于以上原因,不得不使用 pushgateway,但在使用之前,有必要了解一下它的一些弊端:

  • 将多个节点数据汇总到 pushgateway, 如果 pushgateway 挂了,受影响比多个 target 大。
  • Prometheus 拉取状态 up 只针对 pushgateway, 无法做到对每个节点有效。
  • Pushgateway 可以持久化推送给它的所有监控数据。

因此,即使你的监控已经下线,prometheus 还会拉取到旧的监控数据,需要手动清理 pushgateway 不要的数据。

拓扑图如下:

image

安装

基于docker

1
docker pull prom/pushgateway
1
2
3
4
docker run -d \
--name=pg \
-p 9091:9091 \
prom/pushgateway

访问url

1
http://192.168.91.132:9091/

配置prometheus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
global:
scrape_interval: 60s
evaluation_interval: 60s

scrape_configs:
- job_name: prometheus
static_configs:
- targets: ['localhost:9090']
labels:
instance: prometheus

- job_name: linux
static_configs:
- targets: ['192.168.91.132:9100']
labels:
instance: localhost

- job_name: pushgateway
static_configs:
- targets: ['192.168.91.132:9091']
labels:
instance: pushgateway

数据管理

正常情况我们会使用 Client SDK 推送数据到 pushgateway, 但是我们还可以通过 API 来管理, 例如:

shell脚本

向 {job=”some_job”} 添加单条数据:

1
echo "some_metric 3.14" | curl -X POST --data-binary @- http://pushgateway.example.org:9091/metrics/job/some_job

–data-binary 表示发送二进制数据,注意:它是使用POST方式发送的!

添加更多更复杂数据,通常数据会带上 instance, 表示来源位置:

1
2
3
4
5
6
7
cat <<EOF | curl --data-binary @- http://pushgateway.example.org:9091/metrics/job/some_job/instance/some_instance
# TYPE some_metric counter
some_metric{label="val1"} 42
# TYPE another_metric gauge
# HELP another_metric Just an example.
another_metric 2398.283
EOF

注意:必须是指定的格式才行!

删除某个组下的某实例的所有数据:

1
curl -X DELETE http://pushgateway.example.org:9091/metrics/job/some_job/instance/some_instance

删除某个组下的所有数据:

1
curl -X DELETE http://pushgateway.example.org:9091/metrics/job/some_job

可以发现 pushgateway 中的数据我们通常按照 jobinstance 分组分类,所以这两个参数不可缺少。

因为 Prometheus 配置 pushgateway 的时候,也会指定 job 和 instance, 但是它只表示 pushgateway 实例,不能真正表达收集数据的含义。所以在 prometheus 中配置 pushgateway 的时候,需要添加 honor_labels: true 参数, 从而避免收集数据本身的 jobinstance 被覆盖。

注意,为了防止 pushgateway 重启或意外挂掉,导致数据丢失,我们可以通过 -persistence.file-persistence.interval 参数将数据持久化下来。

python脚本(flask)

安装模块

1
2
pip3 install flask
pip3 install prometheus_client

metrics

Prometheus提供4种类型Metrics:Counter, Gauge, SummaryHistogram

Counter

Counter可以增长,并且在程序重启的时候会被重设为0,常被用于任务个数,总处理时间,错误个数等只增不减的指标。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import prometheus_client
from prometheus_client import Counter
from prometheus_client.core import CollectorRegistry
from flask import Response, Flask

app = Flask(__name__)

requests_total = Counter("request_count", "Total request cout of the host")

@app.route("/metrics")
def requests_count():
requests_total.inc()
# requests_total.inc(2)
return Response(prometheus_client.generate_latest(requests_total),
mimetype="text/plain")

@app.route('/')
def index():
requests_total.inc()
return "Hello World"

if __name__ == "__main__":
app.run(host="0.0.0.0")

运行该脚本,访问youhost:5000/metrics

1
2
3
# HELP request_count Total request cout of the host
# TYPE request_count counter
request_count 3.0

Gauge

Gauge与Counter类似,唯一不同的是Gauge数值可以减少,常被用于温度、利用率等指标。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import random
import prometheus_client
from prometheus_client import Gauge
from flask import Response, Flask

app = Flask(__name__)

random_value = Gauge("random_value", "Random value of the request")

@app.route("/metrics")
def r_value():
random_value.set(random.randint(0, 10))
return Response(prometheus_client.generate_latest(random_value),
mimetype="text/plain")


if __name__ == "__main__":
app.run(host="0.0.0.0")

运行该脚本,访问youhost:5000/metrics

1
2
3
# HELP random_value Random value of the request
# TYPE random_value gauge
random_value 3.0

Summary/Histogram

Summary/Histogram概念比较复杂,一般exporter很难用到,暂且不说。

Labels

使用labels来区分metric的特征

1
2
3
4
5
6
7
from prometheus_client import Counter

c = Counter('requests_total', 'HTTP requests total', ['method', 'clientip'])

c.labels('get', '127.0.0.1').inc()
c.labels('post', '192.168.0.1').inc(3)
c.labels(method="get", clientip="192.168.0.1").inc()

REGISTRY

1
2
3
4
5
6
7
from prometheus_client import Counter, Gauge
from prometheus_client.core import CollectorRegistry

REGISTRY = CollectorRegistry(auto_describe=False)

requests_total = Counter("request_count", "Total request cout of the host", registry=REGISTRY)
random_value = Gauge("random_value", "Random value of the request", registry=REGISTRY)

举例

网卡流量

先访问这篇文章《python 获取网卡实时流量》:http://www.py3study.com/Article/details/id/347.html

下面这段python脚本,主要是参考上面文章的基础上修改的

发送本机网卡流量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import prometheus_client
from prometheus_client import Counter
from prometheus_client import Gauge
from prometheus_client.core import CollectorRegistry
import psutil
import time
import requests
import socket

def get_key():
key_info = psutil.net_io_counters(pernic=True).keys()

recv = {}
sent = {}

for key in key_info:
recv.setdefault(key, psutil.net_io_counters(pernic=True).get(key).bytes_recv)
sent.setdefault(key, psutil.net_io_counters(pernic=True).get(key).bytes_sent)

return key_info, recv, sent


def get_rate(func):
import time

key_info, old_recv, old_sent = func()

time.sleep(1)

key_info, now_recv, now_sent = func()

net_in = {}
net_out = {}

for key in key_info:
# float('%.2f' % a)
# net_in.setdefault(key, float('%.2f' %((now_recv.get(key) - old_recv.get(key)) / 1024)))
# net_out.setdefault(key, float('%.2f' %((now_sent.get(key) - old_sent.get(key)) / 1024)))

# 计算流量
net_in.setdefault(key, now_recv.get(key) - old_recv.get(key))
net_out.setdefault(key, now_sent.get(key) - old_sent.get(key))

return key_info, net_in, net_out

# def get_host_ip():
# """
# 查询本机ip地址,针对单网卡
# :return: ip
# """
# try:
# s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# s.connect(('8.8.8.8', 80))
# ip = s.getsockname()[0]
# finally:
# s.close()
# return ip

# 打印多网卡 mac 和 ip 信息
def PrintNetIfAddr():
dic = psutil.net_if_addrs()
net_dic = {}
net_dic['no_ip'] = [] # 无ip的网卡列表
for adapter in dic:
snicList = dic[adapter]
mac = '无 mac 地址'
ipv4 = '无 ipv4 地址'
ipv6 = '无 ipv6 地址'
for snic in snicList:
if snic.family.name in {'AF_LINK', 'AF_PACKET'}:
mac = snic.address
elif snic.family.name == 'AF_INET':
ipv4 = snic.address
elif snic.family.name == 'AF_INET6':
ipv6 = snic.address
# print('%s, %s, %s, %s' % (adapter, mac, ipv4, ipv6))

# 判断网卡名不在net_dic中时,并且网卡不是lo
if adapter not in net_dic and adapter != 'lo':
if not ipv4.startswith("无"): # 判断ip地址不是以无开头
net_dic[adapter] = ipv4 # 增加键值对
else:
net_dic['no_ip'].append(adapter) # 无ip的网卡

# print(net_dic)
return net_dic

key_info, net_in, net_out = get_rate(get_key)

# ip=get_host_ip() # 本机ip
hostname = socket.gethostname() # 主机名

REGISTRY = CollectorRegistry(auto_describe=False)
input = Gauge("network_traffic_input", hostname,['adapter_name','unit','ip','instance'],registry=REGISTRY) # 流入
output = Gauge("network_traffic_output", hostname,['adapter_name','unit','ip','instance'],registry=REGISTRY) # 流出


for key in key_info:
net_addr = PrintNetIfAddr()
# 判断网卡不是lo(回环网卡)以及 不是无ip的网卡
if key != 'lo' and key not in net_addr['no_ip']:
# 流入和流出
input.labels(ip=net_addr[key],adapter_name=key, unit="Byte",instance=hostname).inc(net_in.get(key))
output.labels(ip=net_addr[key],adapter_name=key, unit="Byte",instance=hostname).inc(net_out.get(key))

requests.post("http://192.168.91.132:9091/metrics/job/network_traffic",data=prometheus_client.generate_latest(REGISTRY))
print("发送了一次网卡流量数据")

执行脚本,它会发送1次数据给Push Gateway

取到的流量没有除以1024,所以默认是字节

注意:发送的链接,约定成俗的格式如下:

1
http://Pushgateway地址:9091/metrics/job/监控项目

比如监控etcd,地址就是这样的

1
http://Pushgateway地址:9091/metrics/job/etcd

代码解释

关键代码,就是这几行

1
2
3
4
5
6
REGISTRY = CollectorRegistry(auto_describe=False)
input = Gauge("network_traffic_input", hostname,['adapter_name','unit','ip','instance'],registry=REGISTRY) # 流入
output = Gauge("network_traffic_output", hostname,['adapter_name','unit','ip','instance'],registry=REGISTRY) # 流出

input.labels(ip=net_addr[key],adapter_name=key, unit="Byte",instance=hostname).inc(net_in.get(key))
output.labels(ip=net_addr[key],adapter_name=key, unit="Byte",instance=hostname).inc(net_out.get(key))

1、自定义的指标收集类都必须到CollectorRegistry进行注册, 指标数据通过CollectorRegistry类的方法或者函数,返回给Prometheus.
2、CollectorRegistry必须提供register()和unregister()函数,一个指标收集器可以注册多个CollectorRegistry.
3、客户端库必须是线程安全的

代码第一行,声明了CollectorRegistry

input和output是流入流出的流量。Metrics使用的是Gauge

1
input = Gauge("network_traffic_input", hostname,['adapter_name','unit','ip','instance'],registry=REGISTRY)  # 流入

network_traffic_input表示键值,它必须唯一。因为在grafana图表中,要用这个键值绘制图表。

“” 为空,它其实对应的是描述信息。为了避免数据冗长,一般不写它。

[‘adapter_name’,’unit’,’ip’,’instance’] ,它是一个列表,里面每一个元素都是labels,它是用来区分metric的特征

registry=REGISTRY 把数据注册到REGISTRY中

1
input.labels(ip=net_addr[key],adapter_name=key, unit="Byte",instance=hostname).inc(net_in.get(key))

这里定义了input的labels,括号里面有3个键值对。注意:这3个键值对必须在[‘adapter_name’,’unit’,’ip’] 列表中。

如果labels中要增加键值对,那么上面的列表中,也要增加对应的元素。否则会报错!

inc表示具体值。它对应的是input

刷新Push Gateway页面IP:9091 就可以看到流入流出的数据了

python脚本(非flask)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from prometheus_client import CollectorRegistry, Gauge, pushadd_to_gateway

def push2gateway(datas):

registry = CollectorRegistry()

g = Gauge('node_process_status_info','process monitor',['group','process_name','status','days','icon'],registry=registry)

for group,process,status,runtime,icon in datas:

print group,process,status,runtime,icon

g.labels(group,process,status,str(runtime),icon).set(icon)

pushadd_to_gateway('10.10.148.34:9091', job='pushgateway' ,registry=registry,timeout=200)

以上函数中,

node_process_status_info 表示指标名

list参数中的值表示标签名

循环只是为了给同一个指标的不同标签设置不同的value

g.labels括号中表示按顺序为标签名赋值

g.labels末尾的set表示该标签下的value值

最后推送的时候指标越多,timeout应该设置越高,否则会推送失败,具体时间根据现场网络状况测试一下就知道了

完整的案例,检测URL返回状态码并推送到pushgateway

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#coding:utf-8

import requests

from prometheus_client import CollectorRegistry, Gauge, pushadd_to_gateway

url = '[http://10.10.164.119:8500/v1/a/b/net.c.api.d](http://10.10.164.119:8500/v1/a/b/net.c.api.d)'

headers = {

"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36 QIHU 360SE"

}

status_code = requests.get(url,headers=headers).status_code

print status_code

registry = CollectorRegistry()

g = Gauge('node_web_status_code','web monitor',['url'],registry=registry)

g.labels(url).set(status_code)

pushadd_to_gateway('10.10.148.34:9091', job='pushgateway' ,registry=registry,timeout=200)

注意:

使用prometheus_client推送到pushgateway的时候,如果你的指标拥有多个标签,并且在循环里写入了很多次推送,但是在pushgateway中往往只能看到最后一个,这大概是因为pushgateway推送的时候相同的指标名(job名)是以覆盖的方式进行的(具体没有更多研究和验证,我的问题解决便放下了)。所以这个时候可以将pushadd_to_gateway放在指标注册的最后,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def findpush(path,g):
output = os.popen('hadoop fs -du -h %s'%path)
for line in output.readlines():
row = filter(lambda x:x,map(lambda x:x.strip().replace(" ",""),line.split(" ")))
row_info = (conversion(row[0]),conversion(row[1]),row[2])
g.labels(row_info[2],'false').set(row_info[0])
g.labels(row_info[2],'true').set(row_info[1])

def main():
registry = CollectorRegistry()
g = Gauge('hadoop_hdfs_du_filesize_metrics','hdfs filepath filesize from hadoop fs du',['path','backupornot'],registry=registry)

for path in paths:
findpush(path,g)
pushadd_to_gateway('pushgateway的IP地址:9091', job='custom' ,registry=g,timeout=200)

总结

使用Prometheus监控,有2中方式

  1. 暴露http方式的Metrics,注意:需要在Prometheus的配置文件中添加job

  2. 主动发送数据到Pushgateway,注意:只需要添加一个Pushgateway就可以了。它相当于一个API,无论有多少个服务器,发送到统一的地址。

生产环境中,一般使用Pushgateway,简单,也不需要修改Prometheus的配置文件!

--------------------本文结束,感谢您的阅读--------------------