本文共 2169 字,大约阅读时间需要 7 分钟。
针对公司项目微服务化,随着项目及服务器的不断增多,决定采用ELK(Elasticsearch+Logstash+Kibana)日志分析平台进行微服务日志分析。
1.在微服务服务器上部署Logstash,作为Shipper的角色,对微服务日志文件数据进行数据采集,将采集到的数据输出到Redis消息队列。
2.在另外一台服务器上部署Logstash,作为Indexer的角色,从Redis消息队列中读取数据(可以对数据进行处理),输出到Elasticsearch-Master主节点。
3.Elasticsearch-Master主节点内部与副节点同步数据。(Elasticsearch集群建议3个服务以上奇数)
4.Kibana部署一台服务器内,读取Elasticsearch集群数据,展示Web查询页面,提供数据展示。
在我这个最终方案中,选择了使用Redis作为消息队列进行缓冲,降低Elasticsearch压力,起到削峰作用,主要原因还是由于公司考虑成本问题,日志收集也是只针对我们单个项目组来使用,所以选择了公司现在就已经有的Redis集群进行复用。
最初方案中,在消息队列上选择的是Kafka,毕竟Kafka天生就是做为消息队列的,具体二者的毕竟在这里我就不多说了,百度上一大堆。
这里就不在这里写出来了,提供三个地址仅供参考:
从日志文件读取到redis
#从日志文件读取数据#file{}#type 日志类型#path 日志位置# 可以直接读取文件(a.log)# 可以所有后缀为log的日志(*.log)# 读取文件夹下所有文件(路径)#start_position 文件读取开始位置 (beginning)#sincedb_path 从什么位置读取(设置为/dev/null自动从开始位置读取)input { file { type => "log" path => ["/root/logs/info.log"] start_position => "beginning" sincedb_path => "/dev/null" }}#根据时间戳分隔日志#grok 区分日志中得字段filter { multiline { pattern => "^%{TIMESTAMP_ISO8601} " negate => true what => previous } #定义数据的格式 grok { match => { "message" => "%{DATA:datetime} - %{DATA:logLevel} - %{DATA:serviceName} - %{DATA:ip} - %{DATA:pid} - %{DATA:thread} - %{DATA-msg}"} }}#输出数据到Redis#host Redis主机地址#port Redis端口#db Redis数据库编号#data_type Redis数据类型#key Redis的key#password Redis密码output { redis { host => "ip" port => "6379" db => "6" data_type => "list" password => "password" key => "test_log" }}
从redis读取到es
#从redis内读取数据#host Redis主机ip#port Redis端口#data_type Redis数据类型#batch_count#password Redis密码#key Redis读取Keyinput { redis { host => "ip" port => "6379" db => "6" data_type => "list" password => "password" key => "test_log" }}#数据的输出我们指向了es集群#hosts Elasticsearch主机地址#index Elasticsearch索引名称output { elasticsearch { hosts => "ip:9200" index => "logs-%{+YYYY.MM.dd}" }}
其他剩下的就是Es集群和Kibana了,这两个没什么特别值得注意的地方,上网随便搜,一大堆文章。
以上仅仅代表本人项目使用方案,不一定完美适合所有场景,仅供参考。
转载地址:http://gvlia.baihongyu.com/