Kubernetes集群日志收集 Filebeat+ELK

前言

我们首先介绍一下传统的日志监控方案。其中,ELK Stack 是我们最熟悉不过的架构。所谓ELK,分别指Elastic公司的Elasticsearch、Logstash、Kibana。在比较旧的ELK架构中,Logstash身兼日志的采集、过滤两职。但由于Logstash基于JVM,性能有一定限制,因此,目前业界更推荐使用Go语言开发FIiebeat代替Logstash的采集功能,Logstash只作为了日志过滤的中间件。

最常见的ELK架构如下:

image

如上图所示,各角色功能如下:

  • 多个Filebeat在各个业务端进行日志采集,然后上传至Logstash
  • 多个Logstash节点并行(负载均衡,不作为集群),对日志记录进行过滤处理,然后上传至Elasticsearch集群
  • 多个Elasticsearch构成集群服务,提供日志的索引和存储能力
  • Kibana负责对Elasticsearch中的日志数据进行检索、分析

当然,在该架构中,根据业务特点,还可以加入某些中间件,如Redis、Kafak等:

image

如上图所示,使用Redis或Kafka集群作为消息缓冲队列,可以降低大量FIlebeat对Logstash的并发访问压力。

日志采集方式

官方文档:https://kubernetes.io/docs/concepts/cluster-administration/logging/

方式1: Node级别日志代理

在每个节点(即宿主机)上可以独立运行一个Node级日志代理,通常的实现方式为DaemonSet。用户应用只需要将日志写到标准输出,Docker 的日志驱动会将每个容器的标准输出收集并写入到主机文件系统,这样Node级日志代理就可以将日志统一收集并上传。另外,可以使用K8S的logrotate或Docker 的log-opt 选项负责日志的轮转。

image

Docker默认的日志驱动(LogDriver)是json-driver,其会将日志以JSON文件的方式存储。所有容器输出到控制台的日志,都会以*-json.log的命名方式保存在/var/lib/docker/containers/目录下。对于Docker日志驱动的具体介绍,请参考官方文档。另外,除了收集Docker容器日志,一般建议同时收集K8S自身的日志以及宿主机的所有系统日志,其位置都在var/log下。

所以,简单来说,本方式就是在每个node上各运行一个日志代理容器,对本节点/var/log/var/lib/docker/containers/两个目录下的日志进行采集,然后汇总到elasticsearch集群,最后通过kibana展示。

方式2:伴生容器(sidecar container)作为日志代理

创建一个伴生容器(也可称作日志容器),与应用程序容器在处于同一个Pod中。同时伴生容器内部运行一个独立的、专门为收集应用日志的代理,常见的有Logstash、Fluentd 、Filebeat等。日志容器通过共享卷可以获得应用容器的日志,然后进行上传。

image

方式3:应用直接上传日志

应用程序容器直接通过网络连接上传日志到后端,这是最简单的方式。

image

对比

image

其中,相对来说,方式1在业界使用更为广泛,并且官方也更为推荐。因此,最终我们采用ELK+Filebeat架构,并基于方式1.

日志采集部署

filebeat

GitHub: https://github.com/elastic/beats

官方文档:https://www.elastic.co/guide/en/beats/filebeat/current/index.html

有任何问题请优先查找官方文档!!!

filebeat处理多行日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
filebeat.inputs:
- type: log
enabled: True
paths:
- /var/log/mysql-slow-*
#这个地方是关键,我们给上边日志加上了tags,方便在logstash里边通过这个tags 过滤并格式化自己想要的内容;
tags: ["mysql_slow_logs"]
#有的时候日志不是一行输出的,如果不用multiline的话,会导致一条日志被分割成多条收集过来,形成不完整日志,这样的日志对于我们来说是没有用的!通过正则匹配语句开头,这样multiline 会在匹配开头之后,一直到下一个这样开通的语句合并成一条语句。
#pattern:多行日志开始的那一行匹配的pattern
#negate:是否需要对pattern条件转置使用,不翻转设为true,反转设置为false
#match:匹配pattern后,与前面(before)还是后面(after)的内容合并为一条日志
#max_lines:合并的最多行数(包含匹配pattern的那一行 默认值是500行
#timeout:到了timeout之后,即使没有匹配一个新的pattern(发生一个新的事件),也把已经匹配的日志事件发送出去
multiline.pattern: '^\d{4}/\d{2}/\d{2}' (2018\05\01 我的匹配是已这样的日期开头的)
multiline.negate: true
multiline.match: after
multiline.Max_lines:20
multiline.timeout: 10s
#在输入中排除符合正则表达式列表的那些行
exclude_lines: ["^DBG"]
#包含输入中符合正则表达式列表的那些行默认包含所有行include_lines执行完毕之后会执行exclude_lines。
include_lines: ["^ERR", "^WARN"]

#向输出的每一条日志添加额外的信息比如“level:debug”方便后续对日志进行分组统计。默认情况下会在输出信息的fields子目录下以指定的新增fields建立子目录例如fields.level。
fields:
level: debug
review: 1
#如果该选项设置为true则新增fields成为顶级目录而不是将其放在fields目录下。自定义的field会覆盖filebeat默认的field。
fields_under_root: false

- input_type: log
paths:
- /var/log/mysql-sql-*
tags: ["mysql_sql_logs"]
multiline.pattern: '^\d{4}/\d{2}/\d{2}'
multiline.negate: true
multiline.match: after
multiline.timeout: 10s
encoding: utf-8
document_type: mysql-proxy
scan_frequency: 20s
harverster_buffer_size: 16384
max_bytes: 10485760
tail_files: true
#tail_files:如果设置为true,Filebeat从文件尾开始监控文件新增内容,把新增的每一行文件作为一个事件依次发送,而不是从文件开始处重新发送所有内容。默认是false;

filebeat测试配置文件是否正确

1
filebeat test config

Yaml File

filebeat-kubernetes.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: filebeat
namespace: elk-test
subjects:
- kind: ServiceAccount
name: filebeat
namespace: elk-test
roleRef:
kind: ClusterRole
name: filebeat
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
name: filebeat
namespace: elk-test
labels:
elastic-app: filebeat
rules:
- apiGroups: [""] # "" indicates the core API group
resources:
- namespaces
- pods
verbs:
- get
- watch
- list
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: filebeat
namespace: elk-test
labels:
elastic-app: filebeat
---
apiVersion: v1
kind: ConfigMap
metadata:
name: filebeat-config
namespace: elk-test
labels:
elastic-app: filebeat
data:
filebeat.yml: |-
filebeat.config:
inputs:
# Mounted `filebeat-inputs` configmap:
path: ${path.config}/inputs.d/*.yml
# Reload inputs configs as they change:
reload.enabled: true
modules:
path: ${path.config}/modules.d/*.yml
# Reload module configs as they change:
reload.enabled: true
# To enable hints based autodiscover, remove `filebeat.config.inputs` configuration and uncomment this:
#filebeat.autodiscover:
# providers:
# - type: kubernetes
# hints.enabled: true
processors:
- add_cloud_metadata:
cloud.id: ${ELASTIC_CLOUD_ID}
cloud.auth: ${ELASTIC_CLOUD_AUTH}
#output.elasticsearch:
# hosts: ['${ELASTICSEARCH_HOST:elasticsearch-service}:${ELASTICSEARCH_PORT:9200}']
#username: ${ELASTICSEARCH_USERNAME}
#password: ${ELASTICSEARCH_PASSWORD}
output.redis:
hosts: ["192.168.0.95:6379"]
db: 3
timeout: 10
key: "k8s"
---
apiVersion: v1
kind: ConfigMap
metadata:
name: filebeat-inputs
namespace: elk-test
labels:
elastic-app: filebeat
data:
kubernetes.yml: |-
- type: docker
containers.ids:
- "*"
processors:
- add_kubernetes_metadata:
in_cluster: true
multiline:
pattern: '(^((0[1-9])|([12][0-9])|(3[01])|[1-9])-(\w+)-(\d\d){1,2})|(^(\d\d){1,2}-(0?[1-9]|1[0-2])-((0[1-9])|([12][0-9])|(3[01])|[1-9]))'
negate: true
match: after
---
apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
name: filebeat
namespace: elk-test
labels:
elastic-app: filebeat
spec:
template:
metadata:
labels:
elastic-app: filebeat
spec:
serviceAccountName: filebeat
terminationGracePeriodSeconds: 30
containers:
- name: filebeat
image: docker.elastic.co/beats/filebeat:6.8.1
args: [
"-c", "/etc/filebeat.yml",
"-e",
]
env:
- name: ELASTICSEARCH_HOST
value: elasticsearch-service
- name: ELASTICSEARCH_PORT
value: "9200"
- name: ELASTIC_CLOUD_ID
value:
- name: ELASTIC_CLOUD_AUTH
value:
securityContext:
runAsUser: 0
# If using Red Hat OpenShift uncomment this:
#privileged: true
resources:
limits:
memory: 200Mi
cpu: 200m
requests:
cpu: 100m
memory: 100Mi
volumeMounts:
- name: config
mountPath: /etc/filebeat.yml
readOnly: true
subPath: filebeat.yml
- name: inputs
mountPath: /usr/share/filebeat/inputs.d
readOnly: true
- name: data
mountPath: /usr/share/filebeat/data
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
volumes:
- name: config
configMap:
defaultMode: 0600
name: filebeat-config
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: inputs
configMap:
defaultMode: 0600
name: filebeat-inputs
# data folder stores a registry of read status for all files, so we don't send everything again on a Filebeat pod restart
- name: data
hostPath:
path: /var/lib/filebeat
type: DirectoryOrCreate

我们先重点关注一下DaemonSet的volumeMountsvolumes,以了解ConfigMap的挂载方式:

  • config
    filebeat-config这个Configmap会生成一个filebeat.yml文件,其会被挂载为Filebeat的配置文件/etc/filebeat.yml
  • inputs
    inputs这个Configmap会生成一个filebeat-inputs.yml文件,其会被挂载到路径/usr/share/filebeat/inputs.d下,并被filebeat.yml引用
  • data
    Filebeat自身的数据挂载为hostPath: /var/lib/filebeat
  • varlibdockercontainers
    K8S集群的日志都存储在/var/lib/docker/containers,Filebeat将从该路径进行收集日志

部署

1
kubectl apply -f filebeat-kubernetes.yaml

Redis/Kafka

这里缓存选择Redis,直接采用云服务商提供的了,大家可以也可以自己搭建。

只需要将Redis的数据库以及IP地址写到filebeat以及logstash中即可,具体可以参照filebeat以及logstash的配置文件。

Logstash

GitHub:https://github.com/elastic/logstash

官方文档:https://www.elastic.co/guide/en/logstash/current/index.html

Logstash配置文件详解

/logstash/config/logstash.yml:主要用于控制logstash运行时的状态
/logstash/config/startup.options:logstash 运行相关参数

logstash.yml

参数 用途 默认值
node.name 节点名称 主机名称
path.data /数据存储路径 LOGSTASH_HOME/data/
pipeline.workers 输出通道的工作workers数据量(提升输出效率) cpu核数
pipeline.output.workers 每个输出插件的工作wokers数量 1
pipeline.batch.size 每次input数量 125
path.config 过滤配置文件目录
config.reload.automatic 自动重新加载被修改配置 false or true
config.reload.interval 配置文件检查时间
path.logs 日志输出路径
http.host 绑定主机地址,用户指标收集 “127.0.0.1”
http.port 绑定端口 5000-9700
log.level 日志输出级别,如果config.debug开启,这里一定要是debug日志 info
log.format 日志格式 * plain*
path.plugins 自定义插件目录

startup.options

参数 用途
JAVACMD=/usr/bin/java 本地jdk
LS_HOME=/opt/logstash logstash所在目录
LS_SETTINGS_DIR=”${LS_HOME}/config” 默认logstash配置文件目录
LS_OPTS=”–path.settings ${LS_SETTINGS_DIR}” logstash启动命令参数 指定配置文件目录
LS_JAVA_OPTS=”” 指定jdk目录
LS_PIDFILE=/var/run/logstash.pid logstash.pid所在目录
LS_USER=logstash logstash启动用户
LS_GROUP=logstash logstash启动组
LS_GC_LOG_FILE=/var/log/logstash/gc.log logstash jvm gc日志路径
LS_OPEN_FILES=65534 logstash最多打开监控文件数量

测试配置文件是否正确,可以用下面这个方法测试

1
$/usr/share/logstash/bin/logstash -t -f /etc/logstash/conf.d/nginx.conf

配置参数

input plugin 让logstash可以读取特定的事件源。

事件源可以是从stdin屏幕输入读取,可以从file指定的文件,也可以从es,filebeat,kafka,redis等读取

  • stdin 标准输入

  • file

    从文件读取数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    file{
    path => ['/var/log/nginx/access.log'] #要输入的文件路径
    type => 'nginx_access_log'
    start_position => "beginning"
    }
    # path 可以用/var/log/*.log,/var/log/**/*.log,如果是/var/log则是/var/log/*.log
    # type 通用选项. 用于激活过滤器
    # start_position 选择logstash开始读取文件的位置,begining或者end。
    还有一些常用的例如:discover_interval,exclude,sincedb_path,sincedb_write_interval等可以参考官网
  • syslog

    通过网络将系统日志消息读取为事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    syslog{
    port =>"514"
    type => "syslog"
    }
    # port 指定监听端口(同时建立TCP/UDP的514端口的监听)

    #从syslogs读取需要实现配置rsyslog:
    # cat /etc/rsyslog.conf 加入一行
    *.* @172.17.128.200:514  #指定日志输入到这个端口,然后logstash监听这个端口,如果有新日志输入则读取
    # service rsyslog restart #重启日志服务
  • beats

    从Elastic beats接收事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    beats {
    port => 5044 #要监听的端口
    }
    # 还有host等选项

    # 从beat读取需要先配置beat端,从beat输出到logstash。
    # vim /etc/filebeat/filebeat.yml
    ..........
    output.logstash:
    hosts: ["localhost:5044"]
  • kafka

    将 kafka topic 中的数据读取为事件

    1
    2
    3
    4
    5
    6
    kafka{
    bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"
    topics => ["access_log"]
    group_id => "logstash-file"
    codec => "json"
    }
    1
    2
    3
    4
    5
    kafka{
    bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"
    topics => ["weixin_log","user_log"]
    codec => "json"
    }
    1
    2
    3
    4
    # bootstrap_servers 用于建立群集初始连接的Kafka实例的URL列表。
    # topics 要订阅的主题列表,kafka topics
    # group_id 消费者所属组的标识符,默认为logstash。kafka中一个主题的消息将通过相同的方式分发到Logstash的group_id
    # codec 通用选项,用于输入数据的编解码器。

  还有很多的input插件类型,可以参考官方文档来配置。

filter plugin 过滤器插件,对事件执行中间处理

  • grok

    解析文本并构造 。把非结构化日志数据通过正则解析成结构化和可查询化

    1
    2
    3
    4
    5
    6
        grok {
    match => {"message"=>"^%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}$"}
    }
    匹配nginx日志
    # 203.202.254.16 - - [22/Jun/2018:16:12:54 +0800] "GET / HTTP/1.1" 200 3700 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/601.7.7 (KHTML, like Gecko) Version/9.1.2 Safari/601.7.7"
    #220.181.18.96 - - [13/Jun/2015:21:14:28 +0000] "GET /blog/geekery/xvfb-firefox.html HTTP/1.1" 200 10975 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
  • 注意这里grok 可以有多个match匹配规则,如果前面的匹配失败可以使用后面的继续匹配。例如

    1
    2
    3
    4
    grok {
    match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-)"]
    match => ["message", "%{IP:clientip} - %{USER:user} \[%{HTTPDATE:raw_datetime}\] \"(?:%{WORD:verb} %{URI:request} HTTP/%{NUMBER:httpversion})\" (?:\"%{DATA:body}\" )?(?:\"%{DATA:cookie}\" )?%{NUMBER:response} (?:%{NUMBER:bytes:int}|-) \"%{DATA:referrer}\" \"%{DATA:agent}\" (?:(%{IP:proxy},? ?)*|-|unknown) (?:%{DATA:upstream_addr} |)%{NUMBER:request_time:float} (?:%{NUMBER:upstream_time:float}|-)"]
    }

    grok 语法:%{SYNTAX:SEMANTIC} 即 %{正则:自定义字段名}

    官方提供了很多正则的grok pattern可以直接使用  :[https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns](https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns)  
    
    grok debug工具: http://grokdebug.herokuapp.com
    
    正则表达式调试工具: https://www.debuggex.com/
    
    需要用到较多的正则知识,参考文档有:https://www.jb51.net/tools/zhengze.html

​ 自定义模式: (?<字段名>the pattern)

​ 例如: 匹配 2018/06/27 14:00:54

​ (?\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d)

​ 得到结果: “datetime”: “2018/06/27 14:00:54”

  • date 日期解析 解析字段中的日期,然后转存到@timestamp

    date插件对于排序事件和回填旧数据尤其重要,它可以用来转换日志记录中的时间字段,变成Logstash timestamp对象,然后转存到@timestamp字段里面。

    为什么要使用这个插件呢?

      1、一方面由于Logstash会给收集到的每条日志自动打上时间戳(即@timestamp),但是这个时间戳记录的是input接收数据的时间,而不是日志生成的时间(因为日志生成时间与input接收的时间肯定不同),这样就可能导致搜索数据时产生混乱。

      2、另一方面,在上面那段rubydebug编码格式的输出中,@timestamp字段虽然已经获取了timestamp字段的时间,但是仍然比北京时间晚了8个小时,这是因为在Elasticsearch内部,对时间类型字段都是统一采用UTC时间,而日志统一采用UTC时间存储,是国际安全、运维界的一个共识。其实这并不影响什么,因为ELK已经给出了解决方案,那就是在Kibana平台上,程序会自动读取浏览器的当前时区,然后在web页面自动将UTC时间转换为当前时区的时间。

    1
    2
    3
    4
    5
    filter{
    date{
    match => ["timestamp","yyyy-MM-dd'T'HH:mm:ss.SSS", 'ISO8601']
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    filter{
    grok{
    match => {"message" => "\ -\ -\ \[%{HTTPDATE:timestamp}\]"}
    }
    date{
    match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    #读
    input {
    file {
    path => "文件全路径"
    type => "任意名字最好有意义"#自定义日志区分类型
    start_position => "beginning" #从文件开始处读写
    }
    }
    #过滤
    filter {
    grok {
    #切割后日期名字叫logdate
    match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
    }
    date {
    #logdate 从上面过滤后取到的字段名,yyyy-MM-dd HH:mm:ss.SSS 日期格式条件
    match => ["logdate", "yyyy-MM-dd HH:mm:ss.SSS"]
    #match => ["logdate", "yyyyMMdd","yyyy-MM-dd"]
    #赋值给那个key
    target => "@timestamp"
    #删除不需要的字段
    remove_field => ["logdate"]
    }
    #合并错误日志
    multiline {
    pattern => "^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}"
    negate => true
    what => "previous"
    }
    }
    #输出
    output{
    #输出到ES
    elasticsearch{
    hosts=>["127.0.0.1:9200"]
    #es的index名字,默认就是这个,可以更改
    index => "logstash-%{+YYYY.MM.dd}"
    }
    #输出到控制台
    stdout{codec => rubydebug}
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    [2018-07-04 17:43:35,503]
    grok{
    match => {"message"=>"%{DATA:raw_datetime}"}
    }
    date{
    match => ["raw_datetime","YYYY-MM-dd HH:mm:ss,SSS"]
    remove_field =>["raw_datetime"]
    }

    #将raw_datetime存到@timestamp 然后删除raw_datetime

    #24/Jul/2018:18:15:05 +0800
    date {
    match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z]
    }
  • mutate 对字段做处理 重命名、删除、替换和修改字段。

    • covert

      类型转换。类型包括:integer,float,integer_eu,float_eu,string和boolean

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      filter{
      mutate{
      # covert => ["response","integer","bytes","float"] #数组的类型转换
      convert => {"message"=>"integer"}
      }
      }
      #测试------->
      {
      "host" => "localhost",
      "message" => 123, #没带“”,int类型
      "@timestamp" => 2018-06-26T02:51:08.651Z,
      "@version" => "1"
      }
    • split

      使用分隔符把字符串分割成数组

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      mutate{
      split => {"message"=>","}
      }
      #---------->
      aaa,bbb
      {
      "@timestamp" => 2018-06-26T02:40:19.678Z,
      "@version" => "1",
      "host" => "localhost",
      "message" => [
      [0] "aaa",
      [1] "bbb"
      ]}
      192,128,1,100
      {
      "host" => "localhost",
      "message" => [
      [0] "192",
      [1] "128",
      [2] "1",
      [3] "100"
      ],
      "@timestamp" => 2018-06-26T02:45:17.877Z,
      "@version" => "1"
      }
    • merge

      合并字段 。数组和字符串 ,字符串和字符串

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      filter{
      mutate{
      add_field => {"field1"=>"value1"}
      }
      mutate{
      split => {"message"=>"."} #把message字段按照.分割
      }
      mutate{
      merge => {"message"=>"field1"} #将filed1字段加入到message字段
      }
      }
      #--------------->
      abc
      {
      "message" => [
      [0] "abc,"
      [1] "value1"
      ],
      "@timestamp" => 2018-06-26T03:38:57.114Z,
      "field1" => "value1",
      "@version" => "1",
      "host" => "localhost"
      }

      abc,.123
      {
      "message" => [
      [0] "abc,",
      [1] "123",
      [2] "value1"
      ],
      "@timestamp" => 2018-06-26T03:38:57.114Z,
      "field1" => "value1",
      "@version" => "1",
      "host" => "localhost"
      }
    • rename

      对字段重命名

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      filter{
      mutate{
      rename => {"message"=>"info"}
      }
      }
      #-------->
      123
      {
      "@timestamp" => 2018-06-26T02:56:00.189Z,
      "info" => "123",
      "@version" => "1",
      "host" => "localhost"
      }
    • remove_field

      移除字段

      1
      2
      3
      mutate {
      remove_field => ["message","datetime"]
      }
    • join

      用分隔符连接数组,如果不是数组则不做处理

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      mutate{
      split => {"message"=>":"}
      }
      mutate{
      join => {"message"=>","}
      }
      ------>
      abc:123
      {
      "@timestamp" => 2018-06-26T03:55:41.426Z,
      "message" => "abc,123",
      "host" => "localhost",
      "@version" => "1"
      }
      aa:cc
      {
      "@timestamp" => 2018-06-26T03:55:47.501Z,
      "message" => "aa,cc",
      "host" => "localhost",
      "@version" => "1"
      }
    • gsub

      用正则或者字符串替换字段值。仅对字符串有效

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
          mutate{
      gsub => ["message","/","_"] #用_替换/
      }

      ------>
      a/b/c/
      {
      "@version" => "1",
      "message" => "a_b_c_",
      "host" => "localhost",
      "@timestamp" => 2018-06-26T06:20:10.811Z
      }
    • update

      更新字段。如果字段不存在,则不做处理

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
          mutate{
      add_field => {"field1"=>"value1"}
      }
      mutate{
      update => {"field1"=>"v1"}
      update => {"field2"=>"v2"} #field2不存在 不做处理
      }
      ---------------->
      {
      "@timestamp" => 2018-06-26T06:26:28.870Z,
      "field1" => "v1",
      "host" => "localhost",
      "@version" => "1",
      "message" => "a"
      }
    • replace

      更新字段。如果字段不存在,则创建

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
          mutate{
      add_field => {"field1"=>"value1"}
      }
      mutate{
      replace => {"field1"=>"v1"}
      replace => {"field2"=>"v2"}
      }
      ---------------------->
      {
      "message" => "1",
      "host" => "localhost",
      "@timestamp" => 2018-06-26T06:28:09.915Z,
      "field2" => "v2", #field2不存在,则新建
      "@version" => "1",
      "field1" => "v1"
      }
  • geoip

    根据来自Maxmind GeoLite2数据库的数据添加有关IP地址的地理位置的信息

    1
    2
    3
    4
    geoip {
    source => "clientip"
    database =>"/tmp/GeoLiteCity.dat"
    }
  • ruby

    ruby插件可以执行任意Ruby代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    filter{
    urldecode{
    field => "message"
    }
    ruby {
    init => "@kname = ['url_path','url_arg']"
    code => "
    new_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('?'))])
    event.append(new_event)"
    }
    if [url_arg]{
    kv{
    source => "url_arg"
    field_split => "&"
    target => "url_args"
    remove_field => ["url_arg","message"]
    }
    }
    }
    # ruby插件
    # 以?为分隔符,将request字段分成url_path和url_arg
    -------------------->
    www.test.com?test
    {
    "url_arg" => "test",
    "host" => "localhost",
    "url_path" => "www.test.com",
    "message" => "www.test.com?test",
    "@version" => "1",
    "@timestamp" => 2018-06-26T07:31:04.887Z
    }
    www.test.com?title=elk&content=学习elk
    {
    "url_args" => {
    "title" => "elk",
    "content" => "学习elk"
    },
    "host" => "localhost",
    "url_path" => "www.test.com",
    "@version" => "1",
    "@timestamp" => 2018-06-26T07:33:54.507Z
    }
  • urldecode

    用于解码被编码的字段,可以解决URL中 中文乱码的问题

    1
    2
    3
    4
    5
    6
        urldecode{
    field => "message"
    }

    # field :指定urldecode过滤器要转码的字段,默认值是"message"
    # charset(缺省): 指定过滤器使用的编码.默认UTF-8
  • kv

    通过指定分隔符将字符串分割成key/value

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    kv{
    prefix => "url_" #给分割后的key加前缀
    target => "url_ags" #将分割后的key-value放入指定字段
    source => "message" #要分割的字段
    field_split => "&" #指定分隔符
    remove_field => "message"
    }
    -------------------------->
    a=1&b=2&c=3
    {
    "host" => "localhost",
    "url_ags" => {
    "url_c" => "3",
    "url_a" => "1",
    "url_b" => "2"
    },
    "@version" => "1",
    "@timestamp" => 2018-06-26T07:07:24.557Z
  • useragent

    添加有关用户代理(如系列,操作系统,版本和设备)的信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    if [agent] != "-" {
    useragent {
    source => "agent"
    target => "ua"
    remove_field => "agent"
    }
    }
    # if语句,只有在agent字段不为空时才会使用该插件
    #source 为必填设置,目标字段
    #target 将useragent信息配置到ua字段中。如果不指定将存储在根目录中

output plugin 输出插件,将事件发送到特定目标

  • stdout 标准输出。将事件输出到屏幕上

    1
    2
    3
    4
    5
    output{
    stdout{
    codec => "rubydebug"
    }
    }
  • file 将事件写入文件

    1
    2
    3
    4
    file {
    path => "/data/logstash/%{host}/{application}
    codec => line { format => "%{message}"} }
    }
  • kafka 将事件发送到kafka

    1
    2
    3
    4
    kafka{
    bootstrap_servers => "localhost:9092"
    topic_id => "test_topic" #必需的设置。生成消息的主题
    }
  • elasticseach 在es中存储日志

    1
    2
    3
    4
    5
        elasticsearch {
    hosts => "localhost:9200"
    index => "nginx-access-log-%{+YYYY.MM.dd}"
    }
    #index 事件写入的索引。可以按照日志来创建索引,以便于删旧数据和按时间来搜索日志

补充一个codec plugin 编解码器插件

codec 本质上是流过滤器,可以作为input 或output 插件的一部分运行。例如上面output的stdout插件里有用到。

  • multiline codec plugin 多行合并, 处理堆栈日志或者其他带有换行符日志需要用到

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    input {
    stdin {
    codec => multiline {
    pattern => "pattern, a regexp" #正则匹配规则,匹配到的内容按照下面两个参数处理
    negate => "true" or "false" # 默认为false。处理匹配符合正则规则的行。如果为true,处理不匹配符合正则规则的行。
    what => "previous" or "next" #指定上下文。将指定的行是合并到上一行或者下一行。
    }
    }
    }
    codec => multiline {
    pattern => "^\s"
    what => "previous"
    }
    # 以空格开头的行都合并到上一行

    codec => multiline {
    # Grok pattern names are valid! :)
    pattern => "^%{TIMESTAMP_ISO8601} "
    negate => true
    what => "previous"
    }
    # 任何不以这个时间戳格式开头的行都与上一行合并

    codec => multiline {
    pattern => "\\$"
    what => "next"
    }
    # 以反斜杠结尾的行都与下一行合并

logstash配置语法中的条件判断

Logstash中的条件查看和行为与编程语言中的条件相同。

条件语支持if,else if和else语句并且可以嵌套。

条件语法如下:

1
2
3
4
5
6
7
if EXPRESSION {
...
} else if EXPRESSION {
...
} else {
...
}

比较操作

  • 相等: ==, !=, <, >, <=, >=
  • 正则: =~(匹配正则), !~(不匹配正则)
  • 包含: in(包含), not in(不包含)

布尔操作: - and(与), or(或), nand(非与), xor(非或)

一元运算符: 表达式可能很长且很复杂。表达式可以包含其他表达式,您可以使用!来取消表达式,并且可以使用括号(…)对它们进行分组。 - !(取反) - ()(复合表达式), !()(对复合表达式结果取反)

如若action是login则mutate filter删除secret字段:

1
2
3
4
5
filter {
if [action] == "login" {
mutate { remove_field => "secret" }
}
}

若是正则匹配,成功后添加自定义字段:

1
2
3
4
5
6
7
filter {
if [message] =~ /\w+\s+\/\w+(\/learner\/course\/)/ {
mutate {
add_field => { "learner_type" => "course" }
}
}
}

在一个条件里指定多个表达式:

1
2
3
4
5
6
7
8
output {
# Send production errors to pagerduty
if [loglevel] == "ERROR" and [deployment] == "production" {
pagerduty {
...
}
}
}

在in条件,可以比较字段值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
filter {
if [foo] in [foobar] {
mutate { add_tag => "field in field" }
}
if [foo] in "foo" {
mutate { add_tag => "field in string" }
}
if "hello" in [greeting] {
mutate { add_tag => "string in field" }
}
if [foo] in ["hello", "world", "foo"] {
mutate { add_tag => "field in list" }
}
if [missing] in [alsomissing] {
mutate { add_tag => "shouldnotexist" }
}
if !("foo" in ["hello", "world"]) {
mutate { add_tag => "shouldexist" }
}
}

not in示例:

1
2
3
4
5
output {
if "_grokparsefailure" not in [tags] {
elasticsearch { ... }
}
}

@metadata field

在logstash1.5版本开始,有一个特殊的字段,叫做@metadata。@metadata包含的内容不会作为事件的一部分输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
input { stdin { } }

filter {
mutate { add_field => { "show" => "This data will be in the output" } }
mutate { add_field => { "[@metadata][test]" => "Hello" } }
mutate { add_field => { "[@metadata][no_show]" => "This data will not be in the output" } }
}

output {
if [@metadata][test] == "Hello" {
stdout { codec => rubydebug }
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
$ bin/logstash -f ../test.conf
Pipeline main started
asdf
{
"@timestamp" => 2016-06-30T02:42:51.496Z,
"@version" => "1",
"host" => "windcoder.com",
"show" => "This data will be in the output",
"message" => "asdf"
}

“asdf”变成message字段内容。条件与@metadata内嵌的test字段内容判断成功,但是输出并没有展示@metadata字段和其内容。

不过,如果指定了metadata => true,rubydebug codec允许显示@metadata字段的内容。

1
stdout { codec => rubydebug { metadata => true } }

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ bin/logstash -f ../test.conf
Pipeline main started
asdf
{
"@timestamp" => 2016-06-30T02:46:48.565Z,
"@metadata字段及其子字段内容。" => {
"test" => "Hello",
"no_show" => "This data will not be in the output"
},
"@version" => "1",
"host" => "windcoder.com",
"show" => "This data will be in the output",
"message" => "asdf"
}

现在就可以见到@metadata字段及其子字段内容。

只有rubydebug codec允许显示@metadata字段的内容。

只要您需要临时字段但不希望它在最终输出中,就可以使用@metadata字段。

最常见的情景是filter的时间字段,需要一临时的时间戳。如:

1
2
3
4
5
6
7
8
9
10
input { stdin { } }

filter {
grok { match => [ "message", "%{HTTPDATE:[@metadata][timestamp]}" ] }
date { match => [ "[@metadata][timestamp]", "dd/MMM/yyyy:HH:mm:ss Z" ] }
}

output {
stdout { codec => rubydebug }
}

输出结果

1
2
3
4
5
6
7
8
9
$ bin/logstash -f ../test.conf
Pipeline main started
02/Mar/2014:15:36:43 +0100
{
"@timestamp" => 2014-03-02T14:36:43.000Z,
"@version" => "1",
"host" => "windcoder.com",
"message" => "02/Mar/2014:15:36:43 +0100"
}

处理匹配失败的日志

忽略过滤器

1
2
3
4
5
6
filter {
grok { match => [ "message", "something" ] }
if "_grokparsefailure" in [tags] {
drop{ }
}
}

Grok插件了解

原文链接https://www.elastic.co/cn/blog/do-you-grok-grok

在日志处理的过程中,有一项非常常见的任务就是把原始的单行日志转换成结构化的日志。如果你使用了ELK,那么你可以利用ES对数据进行聚合,使用Kibana来进行数据可视化从日志中来发现一些有价值的信息。

在LogStash中,这项工作是由logstash-filter-grok来完成的,它有超过200个可用的,大家都认为是比较有用的Grok模式,例如IPv6地址、UNIX路径等等。

下面是一个示例日志

1
2016-09-19T18:19:00 [8.8.8.8:prd] DEBUG this is an example log message

使用Grok库,我们可以很容易的就完成日志格式化提取的任务

1
%{TIMESTAMP_ISO8601:timestamp} \[%{IPV4:ip};%{WORD:environment}\] %{LOGLEVEL:log_level} %{GREEDYDATA:message}

提取后的数据格式如下

1
2
3
4
5
6
7
{
"timestamp": "2016-09-19T18:19:00",
"ip": "8.8.8.8",
"environment": "prd",
"log_level": "DEBUG",
"message": "this is an example log message"
}

看起来这是一件非常简单的事情,好吧。。那这篇文章就这样写完了么,当然不是。。

为什么我的Grok使用起来非常的慢

这是一个非常常见的问题。性能这个问题通常都是要被拿出来讨论的,用户通常会发现使用了Grok表达式之后,LogStash处理日志的速度变得很慢。就像前面所说的一样,Grok模式是基于正则表达式的,所以这个插件在性能上已经对正则做了非常多的性能优化的了。接下来的章节,我们会讨论在使用Grok模式中需要注意的点

多做些性能测试

在设计Grok表达式的时候,我们需要一些方法来测试究竟哪种写法性能表现更好。出于这个原因,我些了个很小的jruby脚步用于测试Grok插件处理我所写的Grok模式的性能,你可以在这里获取到这个脚本

留意grok匹配失败的时候对性能的影响

尽管Grok匹配的性能是非常重要的,但是匹配失败的时候对性能的影响也是我们需要留意的。当grok匹配失败的时候,插件会为这个事件打个tag,默认是_grokparsefailure。LogStash允许你把这些处理失败的事件路由到其他地方做后续的处理,例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
input { # ... }
filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{IPV4:ip};%{WORD:environment}\] %{LOGLEVEL:log_level} %{GREEDYDATA:message}" }
}
}
output {
if "_grokparsefailure" in [tags] {
# write events that didn't match to a file
file { "path" => "/tmp/grok_failures.txt" }
} else {
elasticsearch { }
}
}

这样的话我们就可以对这些处理失败的事件做性能基准测试了。

现在,我们要开始对Apache的日志进行格式化处理了

1
220.181.108.96 - - [13/Jun/2015:21:14:28 +0000] "GET /blog/geekery/xvfb-firefox.html HTTP/1.1" 200 10975 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"

然后我们使用下面的Grok模式去进行格式化提取

1
%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}

然后我们使用三种示例日志去测试这个Grok的性能,和Grok不匹配的日志分别出现在开始,中间和结束的位置

1
2
3
4
5
6
7
8
# beginning mismatch - doesn't start with an IPORHOST
'tash-scale11x/css/fonts/Roboto-Regular.ttf HTTP/1.1" 200 41820 "http://semicomplete.com/presentations/logs'

# middle mismatch - instead of an HTTP verb like GET or PUT there's the number 111
'220.181.108.96 - - [13/Jun/2015:21:14:28 +0000] "111 /blog/geekery/xvfb-firefox.html HTTP/1.1" 200 10975 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'

# end mismatch - the last element isn't a quoted string, but a number
'220.181.108.96 - - [13/Jun/2015:21:14:28 +0000] "GET /blog/geekery/xvfb-firefox.html HTTP/1.1" 200 10975 "-" 1'

下面是性能测试的结果

img

Paste_Image.png

基于上面这个测试结果,我们可以发现,Grok的性能和不匹配的日志所出现的位置有关,最快与最慢的性能差了差不多6倍。这就能解释为什么有用户提出当Grok匹配日志失败的时候CPU会被吃满的原因了,例如这个issues
https://github.com/logstash-plugins/logstash-filter-grok/issues/37.

我们能做些什么呢

快速失败,设置锚点

我们已经知道了处理失败对grok的性能影响是非常大的,所以我们需要解决这个问题。对于正则引擎来说,你需要做的最合适的事情就是减少正则表达式所需要的猜测。这就是为什么贪婪匹配最好少用的原因,那回到这个问题,有没一种更好的方法来调整这个Grok模式呢,让我们重新来看看这行Apache的日志

1
220.181.108.96 - - [13/Jun/2015:21:14:28 +0000] "GET /blog/geekery/xvfb-firefox.html HTTP/1.1" 200 10975 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"

刚才我们使用的Grok模式是这样的

1
%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}

由于用户以为Grok表达式只会从开头匹配到结束,所以导致了在一些普通的场景下也会出现性能问题。但是实际上,Grok只是被告知“在这段文本中寻找匹配的内容”,这就意味着下面这种示例也会被Grok所匹配。。。

1
OMG OMG OMG EXTRA INFORMATION 220.181.108.96 - - [13/Jun/2015:21:14:28 +0000] "GET /blog/geekery/xvfb-firefox.html HTTP/1.1" 200 10975 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)" OH LOOK EVEN MORE STUFF

呃。。这都行,不过解决这个问题还是很简单的,我们加一些锚点就搞定了。锚点可以让你再一个指定的位置处理字符串。加入了开始和结束的锚点之后(^和$),Grok就会从开头处理日志到结束了。这对处理那些不能匹配的日志有非常重要的作用。假如我们没有假如锚点,当正则无法匹配这行日志的时候,它就会开始从子字符串中进行匹配,然后性能就会下降,接下来我们把锚点加上,然后再做一次测试

1
^%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}$

img

Paste_Image.png

可以看到性能有了很大的提升,在一开始就匹配失败的场景中,性能提升了将近10倍

留意匹配了两次的表达式

你可能会说,“好吧,我的日志都是能匹配通过的,没有上面的问题”,但是事情可能并不是这样的

我们看到过非常多的grok模式在处理同一个网关发出的多种应用日志时候所出现的问题,例如syslog。想象一下这样一个场景,我们使用了“common_header: payload“这种日志格式来记录了三种应用日志

1
2
3
Application 1: '8.8.8.8 process-name[666]: a b 1 2 a lot of text at the end'
Application 2: '8.8.8.8 process-name[667]: a 1 2 3 a lot of text near the end;4'
Application 3: '8.8.8.8 process-name[421]: a completely different format | 1111'

通常我们会在一个Grok里面就把三种日志都处理掉

1
2
3
4
5
6
7
grok {
"match" => { "message => [
'%{IPORHOST:clientip} %{DATA:process_name}\[%{NUMBER:process_id}\]: %{WORD:word_1} %{WORD:word_2} %{NUMBER:number_1} %{NUMBER:number_2} %{DATA:data}',
'%{IPORHOST:clientip} %{DATA:process_name}\[%{NUMBER:process_id}\]: %{WORD:word_1} %{NUMBER:number_1} %{NUMBER:number_2} %{NUMBER:number_3} %{DATA:data};%{NUMBER:number_4}',
'%{IPORHOST:clientip} %{DATA:process_name}\[%{NUMBER:process_id}\]: %{DATA:data} | %{NUMBER:number}'
] }
}

值得留意的是即使你的日志是能正常匹配的,Grok还是会按照顺序许匹配送进来的日志,当碰到第一个匹配成功的日志就break掉这个循环。这就要我们自己去判断一下,怎么放是最合适的了,不然的话会一个一个往下进行尝试,毕竟是多种不同的格式。
一种常用的优化方案是使用分层匹配来对这个Grok进行优化

1
2
3
4
5
6
7
8
9
10
11
12
13
filter {
grok {
"match" => { "message" => '%{IPORHOST:clientip} %{DATA:process_name}\[%{NUMBER:process_id}\]: %{GREEDYDATA:message}' },
"overwrite" => "message"
}
grok {
"match" => { "message" => [
'%{WORD:word_1} %{WORD:word_2} %{NUMBER:number_1} %{NUMBER:number_2} %{GREEDYDATA:data}',
'%{WORD:word_1} %{NUMBER:number_1} %{NUMBER:number_2} %{NUMBER:number_3} %{DATA:data};%{NUMBER:number_4}',
'%{DATA:data} | %{NUMBER:number}'
] }
}
)

这是两种匹配方案的性能测试结果

img

Paste_Image.png

看起来真有意思。。使用锚点的话,无论哪种方案性能都是一样的。不用锚点的情况下分层Grok的方案比不分层的又快很多

那我们怎么知道我们所创建的Grok是合适的

我们已经得出了对_grokparsefaiure进行处理的必要性了,那么我们还能做什么呢?
从3.2.0这个Grok插件开始,它有一些参数可以帮助你了解为什么一个事件会被处理那么久了。使用timeout_millistag_on_timeout可以设置Grok匹配的最大处理时长。如果超时了,这个事件会被打上_groktimeout的tag,然后我们就可以把他们送到一个Grok处理失败的ES索引里面去做后续的分析了

另外一个很棒的方法是LogStash5.0带了插件性能统计的功能,我们可以通过API来查看插件处理日志的性能了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
$ curl localhost:9600/_node/stats/pipeline?pretty | jq ".pipeline.plugins.filters"
[
{
"id": "grok_b61938f3833f9f89360b5fba6472be0ad51c3606-2",
"events": {
"duration_in_millis": 7,
"in": 24,
"out": 24
},
"failures": 24,
"patterns_per_field": {
"message": 1
},
"name": "grok"
},
{
"id": "kv_b61938f3833f9f89360b5fba6472be0ad51c3606-3",
"events": {
"duration_in_millis": 2,
"in": 24,
"out": 24
},
"name": "kv"
}
]

然后我们就可以通过duration_in_millis来判断一个插件的性能了

总结

希望这篇文章能帮你了解为什么Grok的性能会变得慢和如何去提升他的性能。下面是对这篇文字的总结:

  • Grok在匹配失败的时候性能可能并不那么好
  • 多留意_grokparsefailures出现的频率和出现时候的性能
  • 写正则的时候记得打锚点
  • 不使用锚点的时候分层Grok处理的性能会比不分层的性能好,不过打了锚点的话两个都一样
  • 多使用LogStash的性能监控功能,后续还可以拿来分析用

Yaml File

logstash-kubernetes.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
---
apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-config
namespace: elk-test
labels:
elastic-app: logstash
data:
logstash.conf: |-
input {
beats {
port => 5044
}

redis {
host => "192.168.0.95"
port => 6379
db => "2"
data_type => "list"
batch_count => 1
key => "ecs"
}

redis {
host => "192.168.0.95"
port => 6379
db => "3"
data_type => "list"
batch_count => 1
key => "k8s"
}
}

filter {
#server01 Logs
if "server01_logs" in [tags]{
grok {
match => { "message" => "^\[%{TIMESTAMP_ISO8601:body_date}\s\w{3}\]\s%{DATA:level}\s%{DATA:bundle}\s%{DATA:device}\s%{GREEDYDATA:body}$"}
}
date {
match => ["body_date","yyyy-MM-dd HH:mm:ss.SSS"]
target => "@timestamp"
}
mutate {
remove_field => ["message"]
}
}

#server02 Logs
if [kubernetes][namespace] == "server02-test" {
#admin-ui
#if [kubernetes][container][name] == "admin-ui" or [kubernetes][container][name] == "monitoring" {
#}
grok {
match => {
"message" => [
"^(?<body_date>%{MONTHDAY}-%{MONTH}-%{YEAR}\s%{TIME})\s*(?<level>\w+)\s*(?<thread>\[\S+\])\s*%{GREEDYDATA:body}$",
"^%{TIMESTAMP_ISO8601:body_date}\s*(?<thread>\[\S+\])\s*(?<level>\w+)\s*%{GREEDYDATA:body}$"
]
}
}
#drop failuer
if "_grokparsefailure" in [tags] {
drop { }
}
date {
match => ["body_date","yyyy-MM-dd'T'HH:mm:ss,SSSZ","dd-MMM-yyyy HH:mm:ss.SSS"]
target => "@timestamp"
}
mutate {
remove_field => ["message","MONTHDAY","MONTHNUM","MONTH","YEAR","TIME","HOUR","MINUTE","SECOND","ISO8601_TIMEZONE"]
}
}
}

output {
#server01
if "server01_logs" in [tags]{
elasticsearch {
#hosts => ["192.168.0.92:31200"]
hosts => ["elasticsearch-service:9200"]
index => "server01_logs-%{+YYYY.MM.dd}"
}
}

#server02
if [kubernetes][namespace] == "server02-test" {
elasticsearch {
hosts => ["elasticsearch-service:9200"]
index => "server02-test-%{+YYYY.MM.dd}"
}
}

}

---
apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-yml
namespace: elk-test
labels:
elastic-app: logstash
data:
logstash.yml: |-
path.config: /usr/share/logstash/config/config.d
config.reload.automatic: true
config.reload.interval: 10s
---
apiVersion: v1
kind: Service
metadata:
namespace: elk-test
name: logstash
labels:
elastic-app: logstash
spec:
type: NodePort
ports:
- port: 5044
name: filebeat
- port: 9600
name: logstash
selector:
elastic-app: logstash
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: elk-test
name: logstash
labels:
elastic-app: logstash
spec:
replicas: 2
selector:
matchLabels:
elastic-app: logstash
template:
metadata:
labels:
elastic-app: logstash
spec:
containers:
- name: logstash
image: docker.elastic.co/logstash/logstash:6.8.1
imagePullPolicy: IfNotPresent
resources:
limits:
cpu: "1000m"
memory: "2Gi"
requests:
cpu: "500m"
memory: "1Gi"
env:
- name: REQUESTS_MEMORY
valueFrom:
resourceFieldRef:
resource: requests.memory
divisor: 1Mi
- name: LS_JAVA_OPTS
value: "-Xms$(REQUESTS_MEMORY)m -Xmx$(REQUESTS_MEMORY)m"
ports:
- containerPort: 5044
name: filebeat
protocol: TCP
- containerPort: 9600
name: logstash
protocol: TCP
volumeMounts:
- mountPath: /usr/share/logstash/config/logstash.yml
subPath: logstash.yml
name: logstash-yml
- mountPath: /usr/share/logstash/config/config.d
name: logstash-config
volumes:
- name: logstash-config
configMap:
name: logstash-config
- name: logstash-yml
configMap:
name: logstash-yml

部署

1
kubectl apply -f logstash-kubernetes.yaml

Elasticsearch

官方网站:https://www.elastic.co/guide/en/elasticsearch/reference/current/elasticsearch-intro.html

GitHub:https://github.com/elastic/elasticsearch

elasticsearch-head:https://github.com/mobz/elasticsearch-head

入门elastocsearch

http://www.ruanyifeng.com/blog/2017/08/elasticsearch.html

介绍

为何要搭建 Elasticsearch 集群

凡事都要讲究个为什么。在搭建集群之前,我们首先先问一句,为什么我们需要搭建集群?它有什么优势呢?

高可用性

Elasticsearch 作为一个搜索引擎,我们对它的基本要求就是存储海量数据并且可以在非常短的时间内查询到我们想要的信息。所以第一步我们需要保证的就是 Elasticsearch 的高可用性,什么是高可用性呢?它通常是指,通过设计减少系统不能提供服务的时间。假设系统一直能够提供服务,我们说系统的可用性是 100%。如果系统在某个时刻宕掉了,比如某个网站在某个时间挂掉了,那么就可以它临时是不可用的。所以,为了保证 Elasticsearch 的高可用性,我们就应该尽量减少 Elasticsearch 的不可用时间。

那么怎样提高 Elasticsearch 的高可用性呢?这时集群的作用就体现出来了。假如 Elasticsearch 只放在一台服务器上,即单机运行,假如这台主机突然断网了或者被攻击了,那么整个 Elasticsearch 的服务就不可用了。但如果改成 Elasticsearch 集群的话,有一台主机宕机了,还有其他的主机可以支撑,这样就仍然可以保证服务是可用的。

那可能有的小伙伴就会说了,那假如一台主机宕机了,那么不就无法访问这台主机的数据了吗?那假如我要访问的数据正好存在这台主机上,那不就获取不到了吗?难道其他的主机里面也存了一份一模一样的数据?那这岂不是很浪费吗?

为了解答这个问题,这里就引出了 Elasticsearch 的信息存储机制了。首先解答上面的问题,一台主机宕机了,这台主机里面存的数据依然是可以被访问到的,因为在其他的主机上也有备份,但备份的时候也不是整台主机备份,是分片备份的,那这里就又引出了一个概念——分片。

分片,英文叫做 Shard,顾名思义,分片就是对数据切分成了多个部分。我们知道 Elasticsearch 中一个索引(Index)相当于是一个数据库,如存某网站的用户信息,我们就建一个名为 user 的索引。但索引存储的时候并不是整个存一起的,它是被分片存储的,Elasticsearch 默认会把一个索引分成五个分片,当然这个数字是可以自定义的。分片是数据的容器,数据保存在分片内,分片又被分配到集群内的各个节点里。当你的集群规模扩大或者缩小时, Elasticsearch 会自动的在各节点中迁移分片,使得数据仍然均匀分布在集群里,所以相当于一份数据被分成了多份并保存在不同的主机上。

那这还是没解决问题啊,如果一台主机挂掉了,那么这个分片里面的数据不就无法访问了?别的主机都是存储的其他的分片。其实是可以访问的,因为其他主机存储了这个分片的备份,叫做副本,这里就引出了另外一个概念——副本。

副本,英文叫做 Replica,同样顾名思义,副本就是对原分片的复制,和原分片的内容是一样的,Elasticsearch 默认会生成一份副本,所以相当于是五个原分片和五个分片副本,相当于一份数据存了两份,并分了十个分片,当然副本的数量也是可以自定义的。这时我们只需要将某个分片的副本存在另外一台主机上,这样当某台主机宕机了,我们依然还可以从另外一台主机的副本中找到对应的数据。所以从外部来看,数据结果是没有任何区别的。

一般来说,Elasticsearch 会尽量把一个索引的不同分片存储在不同的主机上,分片的副本也尽可能存在不同的主机上,这样可以提高容错率,从而提高高可用性。

但这时假如你只有一台主机,那不就没办法了吗?分片和副本其实是没意义的,一台主机挂掉了,就全挂掉了。

健康状态

针对一个索引,Elasticsearch 中其实有专门的衡量索引健康状况的标志,分为三个等级:

  • green,绿色。这代表所有的主分片和副本分片都已分配。你的集群是 100% 可用的。
  • yellow,黄色。所有的主分片已经分片了,但至少还有一个副本是缺失的。不会有数据丢失,所以搜索结果依然是完整的。不过,你的高可用性在某种程度上被弱化。如果更多的分片消失,你就会丢数据了。所以可把 yellow 想象成一个需要及时调查的警告。
  • red,红色。至少一个主分片以及它的全部副本都在缺失中。这意味着你在缺少数据:搜索只能返回部分数据,而分配到这个分片上的写入请求会返回一个异常。

如果你只有一台主机的话,其实索引的健康状况也是 yellow,因为一台主机,集群没有其他的主机可以防止副本,所以说,这就是一个不健康的状态,因此集群也是十分有必要的。

存储空间

另外,既然是群集,那么存储空间肯定也是联合起来的,假如一台主机的存储空间是固定的,那么集群它相对于单个主机也有更多的存储空间,可存储的数据量也更大。

所以综上所述,我们需要一个集群!

详细了解 Elasticsearch 集群

接下来我们再来了解下集群的结构是怎样的。

首先我们应该清楚多台主机构成了一个集群,每台主机称作一个节点(Node)。

如图就是一个三节点的集群:

img

在图中,每个 Node 都有三个分片,其中 P 开头的代表 Primary 分片,即主分片,R 开头的代表 Replica 分片,即副本分片。所以图中主分片 1、2,副本分片 0 储存在 1 号节点,副本分片 0、1、2 储存在 2 号节点,主分片 0 和副本分片 1、2 储存在 3 号节点,一共是 3 个主分片和 6 个副本分片。同时我们还注意到 1 号节点还有个 MASTER 的标识,这代表它是一个主节点,它相比其他的节点更加特殊,它有权限控制整个集群,比如资源的分配、节点的修改等等。

这里就引出了一个概念就是节点的类型,我们可以将节点分为这么四个类型:

  • 主节点:即 Master 节点。主节点的主要职责是和集群操作相关的内容,如创建或删除索引,跟踪哪些节点是群集的一部分,并决定哪些分片分配给相关的节点。稳定的主节点对集群的健康是非常重要的。默认情况下任何一个集群中的节点都有可能被选为主节点。索引数据和搜索查询等操作会占用大量的cpu,内存,io资源,为了确保一个集群的稳定,分离主节点和数据节点是一个比较好的选择。虽然主节点也可以协调节点,路由搜索和从客户端新增数据到数据节点,但最好不要使用这些专用的主节点。一个重要的原则是,尽可能做尽量少的工作。
  • 数据节点:即 Data 节点。数据节点主要是存储索引数据的节点,主要对文档进行增删改查操作,聚合操作等。数据节点对 CPU、内存、IO 要求较高,在优化的时候需要监控数据节点的状态,当资源不够的时候,需要在集群中添加新的节点。
  • 负载均衡节点:也称作 Client 节点,也称作客户端节点。当一个节点既不配置为主节点,也不配置为数据节点时,该节点只能处理路由请求,处理搜索,分发索引操作等,从本质上来说该客户节点表现为智能负载平衡器。独立的客户端节点在一个比较大的集群中是非常有用的,他协调主节点和数据节点,客户端节点加入集群可以得到集群的状态,根据集群的状态可以直接路由请求。
  • 预处理节点:也称作 Ingest 节点,在索引数据之前可以先对数据做预处理操作,所有节点其实默认都是支持 Ingest 操作的,也可以专门将某个节点配置为 Ingest 节点。

以上就是节点几种类型,一个节点其实可以对应不同的类型,如一个节点可以同时成为主节点和数据节点和预处理节点,但如果一个节点既不是主节点也不是数据节点,那么它就是负载均衡节点。具体的类型可以通过具体的配置文件来设置。

Elasticsearch配置文件详解

elasticsearch的config文件夹里面有两个配置文 件:elasticsearch.yml和logging.yml,第一个是es的基本配置文件,第二个是日志配置文件,es也是使用log4j来记录日志的,所以logging.yml里的设置按普通log4j配置文件来设置就行了。下面主要讲解下elasticsearch.yml这个文件中可配置的东西。

  • 只是挑些重要的配置选项进行注释,别的可以参考官方文档!!!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
 ################################### Cluster ###################################   

# 代表一个集群,集群中有多个节点,其中有一个为主节点,这个主节点是可以通过选举产生的,主从节点是对于集群内部来说的.

# es的一个概念就是去中心化,字面上理解就是无中心节点,这是对于集群外部来说的,
# 因为从外部来看es集群,在逻辑上是个整体,你与任何一个节点的通信和与整个es集群通信是等价的

cluster.name: elasticsearch

配置es的集群名称,默认是elasticsearch,es会自动发现在同一网段下的es,
如果在同一网段下有多个集群,就可以用这个属性来区分不同的集群,cluster.name就成为同一个集群的标识。

node.name: "Franz Kafka"

节点名,默认随机指定一个name列表中名字,该列表在es的jar包中config文件夹里name.txt文件中,
其中有很多作者添加的有趣名字,节点名称同理,可自动生成也可手动配置。

node.master: true

指定该节点是否有资格被选举成为node,默认是true,es是默认集群中的第一台机器为master,
如果这台机挂了就会重新选举master。

node.data: true

指定该节点是否存储索引数据,默认为true。

1. # 配置文件中给出了三种配置高性能集群拓扑结构的模式,如下:

2. # 1. 如果你想让节点从不选举为主节点,只用来存储数据,可作为负载器

3. # node.master: **false**

4. # node.data: **true**

5. # node.ingest: **false**

6.

7. # 2. 如果想让节点成为主节点,且不存储任何数据,并保有空闲资源,可作为协调器

8. # node.master: **true**

9. # node.data: **false**

10. # node.ingest: **false**

11.

12. # 3. 如果想让节点既不称为主节点,又不成为数据节点,那么可将他作为搜索器,从节点中获取数据,生成搜索结果等

13. # node.master: **false**

14. # node.data: **false**

15. # node.ingest: **true** (可不指定默认开启)

16.

17. # 4. 仅作为协调器

18. # node.master: **false**

19. # node.data: **false**

20. # node.ingest: **false**

index.number_of_shards: 5

设置默认索引分片个数,默认为5片。

index.number_of_replicas: 1

设置默认索引副本个数,默认为1个副本。

1. # 配置文件中提到的最佳实践是,如果服务器够多,可以将分片提高,尽量将数据平均分布到大集群中去

2. # 同时,如果增加副本数量可以有效的提高搜索性能

3. # 需要注意的是,"number_of_shards" 是索引创建后一次生成的,后续不可更改设置

4. # "number_of_replicas" 是可以通过API去实时修改设置的


path.conf: /path/to/conf

设置配置文件的存储路径,默认是es根目录下的config文件夹。


path.data: /path/to/data

设置索引数据的存储路径,默认是es根目录下的data文件夹,可以设置多个存储路径,用逗号隔开,例:

path.data: /path/to/data1,/path/to/data2


path.work: /path/to/work

设置临时文件的存储路径,默认是es根目录下的work文件夹。


path.logs: /path/to/logs

设置日志文件的存储路径,默认是es根目录下的logs文件夹


path.plugins: /path/to/plugins

设置插件的存放路径,默认是es根目录下的plugins文件夹


bootstrap.memory_lock:

服务器发生系统swapping的时候ES节点的性能会非常差,也会影响节点的稳定性。
所以要不惜一切代价来避免swapping。swapping会导致Java GC的周期延迟从毫秒级恶化到分钟,
更严重的是会引起节点响应延迟甚至脱离集群。

这个参数的目的是当你无法关闭系统的swap的时候,建议把这个参数设为true。
防止在内存不够用的时候,elasticsearch的内存被交换至交换区,导致性能骤降。


bootstrap.mlockall: true

设置为true来锁住内存。因为当jvm开始swapping时es的效率会降低,所以要保证它不swap,
可以把ES_MIN_MEM和 ES_MAX_MEM两个环境变量设置成同一个值,并且保证机器有足够的内存分配给es。
同时也要允许elasticsearch的进程可以锁住内存,linux下可以通过`ulimit -l unlimited`命令。

network.bind_host: 192.168.0.1

设置绑定的ip地址,可以是ipv4或ipv6的,默认为0.0.0.0。


network.publish_host: 192.168.0.1

设置其它节点和该节点交互的ip地址,如果不设置它会自动判断,值必须是个真实的ip地址。


network.host: 192.168.0.1

这个参数是用来同时设置bind_host和publish_host上面两个参数。


transport.tcp.port: 9300

设置节点间交互的tcp端口,默认是9300。


transport.tcp.compress: true

设置是否压缩tcp传输时的数据,默认为false,不压缩。


http.port: 9200

设置对外服务的http端口,默认为9200。


http.max_content_length: 100mb

设置内容的最大容量,默认100mb


http.enabled: false

是否使用http协议对外提供服务,默认为true,开启。


1. ###################### 使用head等插件监控集群信息,需要打开以下配置项 ###########

2. # http.cors.enabled: **true**

3. # http.cors.allow-origin: "*"

4. # http.cors.allow-credentials: **true**

1. # 下面的配置控制怎样以及何时启动一整个集群重启的初始化恢复过程

2. # (当使用shard gateway时,是为了尽可能的重用local data(本地数据))


gateway.type: local

gateway的类型,默认为local即为本地文件系统,可以设置为本地文件系统,分布式文件系统,Hadoop的HDFS,和amazon的s3服务器。


gateway.recover_after_nodes: 1

集群中N个节点启动后进行数据恢复,默认为1。


gateway.recover_after_time: 5m

设置初始化恢复过程的超时时间,超时时间从上一个配置中配置的N个节点启动后算起,默认是5分钟。


gateway.expected_nodes: 2

设置这个集群中节点的数量,默认为2,一旦这N个节点启动,就会立即进行数据恢复。


1. # 下面这些配置允许在初始化恢复,副本分配,再平衡,或者添加和删除节点时控制节点间的分片分配

2. # 设置一个节点的并行恢复数


cluster.routing.allocation.node_initial_primaries_recoveries: 4

初始化数据恢复时,并发恢复线程的个数,默认为4。


cluster.routing.allocation.node_concurrent_recoveries: 2

添加删除节点或负载均衡时并发恢复线程的个数,默认为4。


indices.recovery.max_size_per_sec: 0

设置数据恢复时限制的带宽,如入100mb,默认为0,即无限制。


indices.recovery.concurrent_streams: 5

设置这个参数来限制从其它分片恢复数据时最大同时打开并发流的个数,默认为5。


discovery.zen.minimum_master_nodes: 1

设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点。默认为1,对于大的集群来说,可以设置大一点的值(2-4)


discovery.zen.ping.timeout: 3s

设置集群中自动发现其它节点时ping连接超时时间,默认为3秒,对于比较差的网络环境可以高点的值来防止自动发现时出错。


discovery.zen.ping.multicast.enabled: false

设置是否打开多播发现节点,默认是true。


discovery.zen.ping.unicast.hosts: ["host1", "host2:port", "host3[portX-portY]"]

设置集群中master节点的初始列表,可以通过这些节点来自动发现新加入集群的节点。


###节点ping设置,更多等待时间设置discovery.zen.fd.ping_interval:该属性默认为1s(1秒钟),指定了节点互相ping的时间间隔。


discovery.zen.fd.ping_timeout:该属性默认为30s(30秒钟),指定了节点发送ping信息后等待响应的时间,超过此时间则认为对方节点无响应。


discovery.zen.fd.ping_retries:该属性默认为3,指定了重试次数,超过此次数则认为对方节点已停止工作。


1. # 官方插件 相关设置请查看此处

2. # https://www.elastic.co/guide/en/x-pack/current/xpack-settings.html

下面是一些查询时的慢日志参数设置

index.search.slowlog.level: TRACE

index.search.slowlog.threshold.query.warn: 10s

index.search.slowlog.threshold.query.info: 5s

index.search.slowlog.threshold.query.debug: 2s

index.search.slowlog.threshold.query.trace: 500ms

index.search.slowlog.threshold.fetch.warn: 1s

index.search.slowlog.threshold.fetch.info: 800ms

index.search.slowlog.threshold.fetch.debug:500ms

index.search.slowlog.threshold.fetch.trace: 200ms

jvm配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
##JVM configuration

################################################################
## IMPORTANT: JVM heap size
################################################################
##
## You should always set the min and max JVM heap
## size to the same value. For example, to set
## the heap to 4 GB, set:
##
## -Xms4g
## -Xmx4g
##
## See https://www.elastic.co/guide/en/elasticsearch/reference/current/heap-size.html
## for more information
##
################################################################

# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space

-Xms32g
-Xmx32g
#-Xms4g
#-Xmx4g

################################################################
## Expert settings
################################################################
##
## All settings below this section are considered
## expert settings. Don't tamper with them unless
## you understand what you are doing
##
################################################################

## GC configuration
-XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=75
-XX:+UseCMSInitiatingOccupancyOnly

## optimizations

# pre-touch memory pages used by the JVM during initialization
-XX:+AlwaysPreTouch

## basic

# force the server VM (remove on 32-bit client JVMs)
-server

# explicitly set the stack size (reduce to 320k on 32-bit client JVMs)
-Xss1m

# set to headless, just in case
-Djava.awt.headless=true

# ensure UTF-8 encoding by default (e.g. filenames)
-Dfile.encoding=UTF-8

# use our provided JNA always versus the system one
-Djna.nosys=true

# use old-style file permissions on JDK9
-Djdk.io.permissionsUseCanonicalPath=true

# flags to configure Netty
-Dio.netty.noUnsafe=true
-Dio.netty.noKeySetOptimization=true
-Dio.netty.recycler.maxCapacityPerThread=0

# log4j 2
-Dlog4j.shutdownHookEnabled=false
-Dlog4j2.disable.jmx=true
-Dlog4j.skipJansi=true

## heap dumps

# generate a heap dump when an allocation from the Java heap fails
# heap dumps are created in the working directory of the JVM
-XX:+HeapDumpOnOutOfMemoryError

# specify an alternative path for heap dumps
# ensure the directory exists and has sufficient space
#-XX:HeapDumpPath=${heap.dump.path}

## GC logging

#-XX:+PrintGCDetails
#-XX:+PrintGCTimeStamps
#-XX:+PrintGCDateStamps
#-XX:+PrintClassHistogram
#-XX:+PrintTenuringDistribution
#-XX:+PrintGCApplicationStoppedTime

# log GC status to a file with time stamps
# ensure the directory exists
#-Xloggc:${loggc}

# By default, the GC log file will not rotate.
# By uncommenting the lines below, the GC log file
# will be rotated every 128MB at most 32 times.
#-XX:+UseGCLogFileRotation
#-XX:NumberOfGCLogFiles=32
#-XX:GCLogFileSize=128M

# Elasticsearch 5.0.0 will throw an exception on unquoted field names in JSON.
# If documents were already indexed with unquoted fields in a previous version
# of Elasticsearch, some operations may throw errors.
#
# WARNING: This option will be removed in Elasticsearch 6.0.0 and is provided
# only for migration purposes.
#-Delasticsearch.json.allow_unquoted_field_names=true

Elasticsearch最佳优化

角色划分

  • es分为三种角色: master、client、data,三种角色根据elasticsearch.yml配置中node.master、node.data区分,分别为true false、false false、true true
  • master: 该节点不和应用创建连接,主要用于元数据(metadata)的处理,比如索引的新增、删除、分片分配等,master节点不占用io和cpu,内存使用量一般
  • client: 该节点和检索应用创建连接、接受检索请求,但其本身不负责存储数据,可当成负载均衡节点,client节点不占用io、cpu、内存
  • data: 该节点和索引应用创建连接、接受索引请求,该节点真正存储数据,es集群的性能取决于该节点个数(每个节点最优配置情况下),data节点会占用大量的cpu、io、内存
  • 各节点间关系: master节点具备主节点的选举权,主节点控制整个集群元数据。client节点接受检索请求后将请求转发到与查询条件相关的的data节点的分片上,data节点的分片执行查询语句获得查询结果后将结果反馈至client,在client对数据进行聚合、排序等操作将最终结果返回给上层请求

资源规划

  • master节点: 只需部署三个节点,每个节点jvm分配2-10G,根据集群大小决定
  • client节点: 增加client节点可增加检索并发,但检索的速度还是取决于查询所命中的分片个数以及分片中的数据量。如果不清楚检索并发,初始节点数可设置和data节点数一致,每个节点jvm分配2-10
  • data节点: ①单个索引在一个data节点上分片数保持在3个以内;②每1GB堆内存对应集群的分片保持在20个以内;③每个分片不要超过30G。
  • data节点经验:
    • 如果单索引每个节点可支撑90G数据,依此可计算出所需data节点数 。
    • 如果是多索引按照单个data节点jvm内存最大30G来计算,一个节点的分片保持在600个以内,存储保持在18T以内。
    • 主机的cpu、io固定,建议一台主机只部署一个data节点,不同角色节点独立部署,方便扩容
    • 每条数据保持在2k以下索引性能大约3000-5000条/s/data节点,增加data节点数可大幅度增加索引速率,节点数与索引效率的增长关系呈抛物线形状

优秀的插件与工具

  • ik分词器: es默认分词器只支持英文分词,ik分词器支持中文分词

  • head数据查询工具: 类似于mysql的Navicat

  • logstash: 数据处理管。采样各种样式、大小的数据来源,实时解析和转换数据,选择众多输出目标导出数据

  • x-pack性能监控: 获取进程运行时资源与状态信息并存储至es中。可通过kibana查看es、logstash性能指标,试用版包括集群状态、延迟、索引速率、检索速率、内存、cpu、io、磁盘、文件量等还可以看到集群数据负载均衡时的情况。商用版还支持安全、告警等功能

  • kibana可视化工具: es的可视化工具可制作各种图表,可在该工具上执行dsl语句灵活操作es

  • es-sql: 用sql查询elasticsearch的工具,将封装复杂的dsl语句封装成sql

  • beats: 轻量级的数据采集工具,可监控网络流量、日志数据、进程信息(负载、内存、磁盘等),支持docker镜像的file采集

  • repository-hdfs: 该插件支持将es中离线数据转存至hdfs中长期存储

Elasticsearch优化经验

  • 参数调优

    • 开启内存锁,禁止swapping

      执行linux命令(临时生效)

      1
      ulimit -l unlimited

      修改主机配置:/etc/security/limits.conf

      1
      2
      * soft memlock unlimited
      * hard memlock unlimited

      修改es配置:config/elasticsearch.yml

      1
      bootstrap.memory_lock : true
    • 调大文件描述符数量

      执行linux命令(临时生效)

      1
      ulimit -n 65535

      修改linux配置文件:/etc/security/limits.conf

      1
      2
      * soft nofile 65536
      * hard nofile 65536
    • 调大最大映射数

      执行linux命令(临时生效)

      1
      sysctl -w vm.max_map_count=262144

      修改linux配置文件:/etc/sysctl.conf

      1
      vm.max_map_count=262144
  • 索引配置

    • settings:{efresh_interval}:数据写入刷新间隔,默认1s,调整增加该值可以减少写入压力、增加写入速度,如设为60

      1
      2
      3
      4
      5
      {
      "settings": {
      "refresh_interval": "60s"
      }
      }
    • mappings:{dynamic}: 禁止es自动创建字段,仅允许预先设定好的字段存入es,防止索引结构混乱

      1
      2
      3
      4
      5
      6
      7
      {
      "mappings": {
      "mytype": {
      "dynamic": false
      }
      }
      }
    • _all:建议禁用

      1
      2
      3
      4
      5
      {
      "_all": {
      "enable": false
      }
      }
    • keyword字段属性: ingore_above超过多少字符不写入,keyword一般用于精确查询,不能写入太长。

      1
      2
      3
      4
      5
      6
      {
      "name": {
      "type": "keyword",
      "ingore_above": 1000
      }
      }
    • index属性:将 不作为查询字段的index值设为false

      1
      2
      3
      4
      5
      6
      7
      8
      {
      {
      "content": {
      "type": "text",
      "index": "false"
      }
      }
      }
  • JVM内存溢出处理

    1
    2
    3
    防止es节点内存溢出后处于僵死状态且无法恢复,影响整个集群,在进程出现OOM时让进程宕掉,退出ES集群并引发告警,然后重启。

    在config/jvm.options中增加JVM启动参数:
    1
    -XX:+ExitOnOutOfMemoryError

    该参数在jdk 1.8.0_92版本上线

  • 数据生命周期

    es中的开启状态的索引都会占用堆内存来存储倒排索引,过多的索引会导致集群整体内存使用率多大,甚至引起内存溢出。所以需要根据自身业务管理历史数据的生命周期,如近3个月的数据开启用于快速查询;过去3-6月的数据索引关闭以释放内存,需要时再开启;超过6个月的可以生成快照保存至hdfs并删除索引,需要的时候从hdfs选择需要的索引恢复至集群中进行查询

    生产上常常使用logstash+索引模板的方式按照一定时间创建新的索引,例如按天创建索引,索引的命名可能是index-yyyy-mm-dd,每天生产不同的索引,清除历史数据时可直接关闭或删除

    需要注意的是:如何按照logstash默认的时间分割索引会有8个小时的误差,所以需要在logstash中将真实数据中的时间字段作为分割条件,保障按照业务时间分割索引

  • 路由查询

    在将数据写入es时,指定一个字段作为路由字段,es会将该字段进行hash计算写入到对应的分片上;查询时根据查询条件中的路由值,直接查找所在的分片,大幅度提高查询速度。

    需要注意的是:路由字段必须是随机分布,否则会导致分片数据不平均引发的主机存储使用不平均,可以作为路由字段的:如业务流水、省份、系统编码等。

  • 过滤器

    ES中的查询操作分为2种:查询(query)和过滤(filter),查询默认会计算每个返回文档的得分,然后根据得分排序;而过滤(filter)只会筛选出符合的文档,并不计算得分,且它可以缓存文档。单从性能考虑,过滤比查询更快而且更节省io资源。过滤适合在大范围筛选数据,而查询则适合精确匹配数据。开发时应先使用过滤操作过滤数据,然后使用查询匹配数据

  • 查询限制

    限制是为了保证es集群的稳定性。限制的内容包括:查询范围、单次查询数量等,过大的查询范围不仅会导致查询效率低,而且会是es集群资源耗费急剧增加,甚至引起es集群崩溃;单次查询数量限制是为了保证内存不会被查询内存大量占用,就是分页原理,es默认可以查询10000条数据

  • 批量导入

    如果你在做大批量导入,考虑通过设置 index.number_of_replicas: 0关闭副本。把每个索引的 index.refresh_interval 改到 -1关闭刷新。导入完毕后再开启副本和刷新

YAML

elasticsearch cluster(2client+3master+2data)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
---
apiVersion: v1
kind: ServiceAccount
metadata:
labels:
elastic-app: elasticsearch
name: elasticsearch-admin
namespace: elk-test

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: elasticsearch-admin
labels:
elastic-app: elasticsearch
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
name: elasticsearch-admin
namespace: elk-test
---
kind: Service
apiVersion: v1
metadata:
labels:
elastic-app: elasticsearch
name: elasticsearch-discovery
namespace: elk-test
spec:
ports:
- port: 9300
targetPort: 9300
selector:
elastic-app: elasticsearch
role: master
---
kind: Service
apiVersion: v1
metadata:
labels:
elastic-app: elasticsearch-service
name: elasticsearch-service
namespace: elk-test
spec:
ports:
- port: 9200
targetPort: 9200
selector:
elastic-app: elasticsearch
role: client
type: NodePort
---
kind: Deployment
apiVersion: apps/v1
metadata:
labels:
elastic-app: elasticsearch
role: client
name: elasticsearch-client
namespace: elk-test
spec:
replicas: 2
revisionHistoryLimit: 10
selector:
matchLabels:
elastic-app: elasticsearch
role: client
template:
metadata:
labels:
elastic-app: elasticsearch
role: client
spec:
containers:
- name: elasticsearch-client
image: docker.elastic.co/elasticsearch/elasticsearch:6.8.1
lifecycle:
postStart:
exec:
command: ["/bin/bash", "-c", "sysctl -w vm.max_map_count=262144; ulimit -l unlimited;"]
ports:
- containerPort: 9200
protocol: TCP
- containerPort: 9300
protocol: TCP
env:
- name: "node.name"
value: "${HOSTNAME}"
- name: "cluster.name"
value: "elasticsearch-cluster"
- name: "discovery.zen.ping.unicast.hosts"
value: "elasticsearch-discovery"
- name: "node.master"
value: "false"
- name: "node.data"
value: "false"
- name: "ES_JAVA_OPTS"
value: "-Xms512m -Xmx512m"
- name: "http.cors.enabled"
value: "true"
- name: "http.cors.allow-origin"
value: "*"
securityContext:
privileged: true
resources:
requests:
memory: 0.3Gi
cpu: 0.2
limits:
memory: 1.0Gi
cpu: 0.5
serviceAccountName: elasticsearch-admin
---
kind: Deployment
apiVersion: apps/v1
metadata:
labels:
elastic-app: elasticsearch
role: master
name: elasticsearch-master
namespace: elk-test
spec:
replicas: 3
revisionHistoryLimit: 10
selector:
matchLabels:
elastic-app: elasticsearch
role: master
template:
metadata:
labels:
elastic-app: elasticsearch
role: master
spec:
containers:
- name: elasticsearch-master
image: docker.elastic.co/elasticsearch/elasticsearch:6.8.1
lifecycle:
postStart:
exec:
command: ["/bin/bash", "-c", "sysctl -w vm.max_map_count=262144; ulimit -l unlimited;"]
ports:
- containerPort: 9300
protocol: TCP
env:
- name: "node.name"
value: "${HOSTNAME}"
- name: "cluster.name"
value: "elasticsearch-cluster"
- name: "discovery.zen.ping.unicast.hosts"
value: "elasticsearch-discovery"
- name: "discovery.zen.minimum_master_nodes"
value: "2"
- name: "discovery.zen.ping_timeout"
value: "5s"
- name: "node.master"
value: "true"
- name: "node.data"
value: "false"
- name: "node.ingest"
value: "false"
- name: "ES_JAVA_OPTS"
value: "-Xms512m -Xmx512m"
securityContext:
privileged: true
resources:
requests:
memory: 0.3Gi
cpu: 0.3
limits:
memory: 1.0Gi
cpu: 0.5
serviceAccountName: elasticsearch-admin
---
kind: StatefulSet
apiVersion: apps/v1
metadata:
labels:
elastic-app: elasticsearch
role: data
name: elasticsearch-data
namespace: elk-test
spec:
replicas: 2
selector:
matchLabels:
elastic-app: elasticsearch
role: data
template:
metadata:
labels:
elastic-app: elasticsearch
role: data
spec:
containers:
- name: elasticsearch-data
image: docker.elastic.co/elasticsearch/elasticsearch:6.8.1
lifecycle:
postStart:
exec:
command: ["/bin/bash", "-c", "sysctl -w vm.max_map_count=262144; ulimit -l unlimited;"]
ports:
- containerPort: 9300
protocol: TCP
volumeMounts:
- name: es-data
mountPath: /usr/share/elasticsearch/data
env:
- name: "node.name"
value: "${HOSTNAME}"
- name: "cluster.name"
value: "elasticsearch-cluster"
- name: "discovery.zen.ping.unicast.hosts"
value: "elasticsearch-discovery"
- name: "node.master"
value: "false"
- name: "node.data"
value: "true"
- name: "ES_JAVA_OPTS"
value: "-Xms1024m -Xmx1024m"
securityContext:
privileged: true
resources:
requests:
memory: 1.0Gi
cpu: 1
limits:
memory: 2.0Gi
cpu: 2
serviceAccountName: elasticsearch-admin
serviceName: elasticsearch-data
volumeClaimTemplates:
- metadata:
name: es-data
creationTimestamp: null
annotations:
volume.beta.kubernetes.io/storage-class: nfs-rw
volume.beta.kubernetes.io/storage-provisioner: flexvolume-huawei.com/fuxinfs
enable: true
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Gi

elasticsearch(3data node statefulset)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: es-cluster
namespace: elasticsearch
spec:
serviceName: elasticsearch
replicas: 3
selector:
matchLabels:
app: elasticsearch
template:
metadata:
labels:
app: elasticsearch
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- elasticsearch
topologyKey: "kubernetes.io/hostname"
imagePullSecrets:
- name: registry-secret
containers:
- name: elasticsearch
image: registry.cn-beijing.aliyuncs.com/rj-bai/elasticsearch:6.5.4
imagePullPolicy: IfNotPresent
livenessProbe:
tcpSocket:
port: 9200
initialDelaySeconds: 60
timeoutSeconds: 5
failureThreshold: 12
readinessProbe:
tcpSocket:
port: 9200
initialDelaySeconds: 60
timeoutSeconds: 5
failureThreshold: 12
resources:
limits:
cpu: "1000m"
memory: "2Gi"
requests:
cpu: "500m"
memory: "1Gi"
ports:
- containerPort: 9200
name: es-http
protocol: TCP
- containerPort: 9300
name: cluster-port
protocol: TCP
volumeMounts:
- mountPath: /usr/share/elasticsearch/data
name: data-elasticsearch
env:
- name: REQUESTS_MEMORY
valueFrom:
resourceFieldRef:
resource: requests.memory
divisor: 1Mi
- name: cluster.name
value: es-cluster
- name: node.name
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: http.cors.enabled
value: "true"
- name: http.cors.allow-origin
value: "*"
- name: discovery.zen.minimum_master_nodes
value: "2"
- name: discovery.zen.fd.ping_timeout
value: "120s"
- name: discovery.zen.fd.ping_retries
value: "6"
- name: discovery.zen.fd.ping_interval
value: "30s"
- name: ES_JAVA_OPTS
value: "-Xms$(REQUESTS_MEMORY)m -Xmx$(REQUESTS_MEMORY)m"
- name: discovery.zen.ping.unicast.hosts
value: "es-cluster-0.elasticsearch,es-cluster-1.elasticsearch,es-cluster-2.elasticsearch"
initContainers:
- name: vm-max-map
image: busybox
imagePullPolicy: IfNotPresent
command: ["sysctl", "-w", "vm.max_map_count=262144"]
securityContext:
privileged: true
- name: fd-ulimit
image: busybox
imagePullPolicy: IfNotPresent
command: ["sh", "-c", "ulimit -n 65535"]
securityContext:
privileged: true
volumeClaimTemplates:
- metadata:
name: data-elasticsearch
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 5Gi

---
kind: Service
apiVersion: v1
metadata:
name: elasticsearch
namespace: elasticsearch
labels:
app: elasticsearch
spec:
type: NodePort
selector:
app: elasticsearch
ports:
- port: 9200
name: es-http
nodePort: 39200

crobjob定期清理历史数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: elasticsearch
labels:
app.kubernetes.io/name: elasticsearch
helm.sh/chart: elasticsearch-0.1.0
app.kubernetes.io/instance: elasticsearch
app.kubernetes.io/managed-by: Tiller
spec:
schedule: "0 1 * * *"
concurrencyPolicy: Forbid
jobTemplate:
spec:
template:
spec:
containers:
- name: hello
image: "192.168.101.88:5000/busybox:1.29.3"
imagePullPolicy: IfNotPresent
command:
- "sh"
- "-c"
- >
history=$(date -D '%s' +"%Y.%m.%d" -d "$(( `date +%s`-60*60*24*6 ))");
echo ${history};
hostname=elasticsearch.ns-monitor;
echo ${hostname};
echo -ne "DELETE /*${history}* HTTP/1.1\r\nHost: ${hostname}\r\n\r\n" | nc -v -i 1 ${hostname} 9200
restartPolicy: OnFailure

脚本定期清理历史数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/bin/bash

set -e
#set -x

LAST_DATE=`date -d "-7 days" "+%Y-%m-%d"`
LAST_TIMESTAMP=`date -d "$LAST_DATE" +%s`

indexs=`curl http://192.168.0.250:31191/_cat/indices|awk '$3~/server01|server02/ {print $3}'`

for index in $indexs;do
index_time=`echo $index|awk -F - '{print $NF}'|tr '.' '-'`
index_timestamp=`date -d "$index_time" +%s`
if [ $index_timestamp -lt $LAST_TIMESTAMP ];then
curl -XDELETE "http://192.168.0.250:31191/$index" &> /dev/null
fi
done

部署

1
kubectl apply -f es-kubernetes.yaml

Kibana

官方文档:https://www.elastic.co/guide/en/kibana/current/

GitHub:https://github.com/elastic/kibana

YAML

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
---
kind: Service
apiVersion: v1
metadata:
labels:
elastic-app: kibana
name: kibana-service
namespace: elk-test
spec:
ports:
- port: 5601
targetPort: 5601
selector:
elastic-app: kibana
type: NodePort
---
kind: Deployment
apiVersion: apps/v1
metadata:
name: kibana
namespace: elk-test
labels:
elastic-app: kibana
spec:
replicas: 1
selector:
matchLabels:
elastic-app: kibana
template:
metadata:
name: kibana
labels:
elastic-app: kibana
spec:
containers:
- name: kibana
image: 'docker.elastic.co/kibana/kibana:6.8.1'
ports:
- name: http
containerPort: 5601
protocol: TCP
env:
- name: ELASTICSEARCH_URL
value: 'http://elasticsearch-service:9200'
resources:
limits:
cpu: 500m
memory: 1Gi
requests:
cpu: 100m
memory: 256Mi
imagePullPolicy: IfNotPresent

部署

1
kubectl apply -f kibana-kubernetes.yaml

参考资料

https://cloud.tencent.com/developer/article/1189282

https://blog.csdn.net/chenleiking/article/details/79453460

https://blog.csdn.net/miss1181248983/article/details/89492918

https://blog.rj-bai.com/post/157.html

https://www.jianshu.com/p/b264b6cf9340

--------------------本文结束,感谢您的阅读--------------------

本文标题:Kubernetes集群日志收集 Filebeat+ELK

文章作者:弓昭

发布时间:2019年07月22日 - 23:06

最后更新:2020年04月08日 - 22:20

原始链接:https://gongzhao1.gitee.io/Kubernetes集群日志收集-Filebeat-ELK/

联系邮箱:gongzhao1@foxmail.com