filebeat配置参数_容器日志采集利器:filebeat深度剖析与实践

阅读: 评论:0

filebeat配置参数_容器⽇志采集利器:filebeat深度剖析与实践
在云原⽣时代和容器化浪潮中,容器的⽇志采集是⼀个看起来不起眼却⼜⽆法忽视的重要议题。对于容器⽇志采集我们常⽤的⼯具有filebeat和fluentd,两者对⽐各有优劣,相⽐基于ruby的fluentd,考虑到可定制性,我们⼀般默认选择golang技术栈的filbeat作为主⼒的⽇志采集agent。 相⽐较传统的⽇志采集⽅式,容器化下单节点会运⾏更多的服务,负载也会有更短的⽣命周期,⽽这些更容易对⽇志采集agent造
成压⼒,虽然filebeat⾜够轻量级和⾼性能,但如果不了解filebeat的机制,不合理的配置filebeat,实际的⽣产环境使⽤中可能也会给我们带来意想不到的⿇烦和难题。
整体架构
⽇志采集的功能看起来不复杂,主要功能⽆⾮就是到配置的⽇志⽂件,然后读取并处理,发送⾄相应的后端如elasticsearch,kafka等。filebeat官⽹有张⽰意图,如下所⽰:
针对每个⽇志⽂件,filebeat都会启动⼀个harvester协程,即⼀个goroutine,在该goroutine中不停的读取⽇志⽂件,直到⽂件的EOF末尾。⼀个最简单的表⽰采集⽬录的input配置⼤概如下所⽰:
filebeat.inputs:- type: log  # Paths that should be crawled and fetched. Glob based paths.  paths:    - /var/log/*.log复制代码
不同的harvester goroutine采集到的⽇志数据都会发送⾄⼀个全局的队列queue中,queue的实现有两种:基于内存和基于磁盘的队列,⽬前基于磁盘的队列还是处于alpha阶段,filebeat默认启⽤的是基于内存的缓存队列。 每当队列中的数据缓存到⼀定的⼤⼩或者超过了定时的时间(默认1s),会被注册的client从队列中消费,发送⾄配置的后端。⽬前可以设置的client有kafka、elasticsearch、redis等。
虽然这⼀切看着挺简单,但在实际使⽤中,我们还是需要考虑更多的问题,例如:
⽇志⽂件是如何被filbebeat发现⼜是如何被采集的?
filebeat是如何确保⽇志采集发送到远程的存储中,不丢失⼀条数据的?
如果filebeat挂掉,下次采集如何确保从上次的状态开始⽽不会重新采集所有⽇志?
filebeat的内存或者cpu占⽤过多,该如何分析解决?
filebeat如何⽀持docker和kubernetes,如何配置容器化下的⽇志采集?
想让filebeat采集的⽇志发送⾄的后端存储,如果原⽣不⽀持,怎样定制化开发?
这些均需要对filebeat有更深⼊的理解,下⾯让我们跟随filebeat的源码⼀起探究其中的实现机制。
⼀条⽇志是如何被采集的
filebeat源码归属于beats项⽬,⽽beats项⽬的设计初衷是为了采集各类的数据,所以beats抽象出了⼀个libbeat库,基于libbeat我们可以快速的开发实现⼀个采集的⼯具,除了filebeat,还有像metricbeat、packetbeat等官⽅的项⽬也是在beats⼯程中。 如果我们⼤致看⼀下代码就会发现,libbeat已经实现了内存缓存队列memqueue、⼏种output⽇志发送客户端,数据的过滤处理processor等通⽤功能,⽽filebeat只需要实现⽇志⽂件的读取等和⽇志相关的逻辑即可。
从代码的实现⾓度来看,filebeat⼤概可以分以下⼏个模块:
input: 到配置的⽇志⽂件,启动harvester
harvester: 读取⽂件,发送⾄spooler
spooler: 缓存⽇志数据,直到可以发送⾄publisher
publisher: 发送⽇志⾄后端,同时通知registrar
registrar: 记录⽇志⽂件被采集的状态
1. 到⽇志⽂件
对于⽇志⽂件的采集和⽣命周期管理,filebeat抽象出⼀个Crawler的结构体, 在filebeat启动后,crawler会根据配置创建,然后遍历并运⾏每个input:
for _, inputConfig := range c.inputConfigs {err := c.startInput(pipeline, inputConfig, r.GetStates())}复制代码
在每个input运⾏的逻辑⾥,⾸先会根据配置获取匹配的⽇志⽂件,需要注意的是,这⾥的匹配⽅式并⾮正则,⽽是采⽤linux glob的规则,和正则还是有⼀些区别。
matches, err := filepath.Glob(path)复制代码
获取到了所有匹配的⽇志⽂件之后,会经过⼀些复杂的过滤,例如如果配置了exclude_files则会忽略这类⽂件,同时还会查询⽂件的状
态,如果⽂件的最近⼀次修改时间⼤于ignore_older的配置,也会不去采集该⽂件。
2. 读取⽇志⽂件
匹配到最终需要采集的⽇志⽂件之后,filebeat会对每个⽂件启动harvester goroutine,在该goroutine中不停的读取⽇志,并发送给内存缓存队列memqueue。 在(h *Harvester) Run()⽅法中,我们可以看到这么⼀个⽆限循环,省略了⼀些逻辑的代码如下所⽰:
for {message, err := h.reader.Next()if err != nil {switch err {case ErrFileTruncate:logp.Info("File was truncated. Begin reading file from offset 0: %s", h.state
可以看到,reader.Next()⽅法会不停的读取⽇志,如果没有返回异常,则发送⽇志数据到缓存队列中。 返回的异常有⼏种类型,除了读取到EOF外,还会有例如⽂件⼀段时间不活跃等情况发⽣会使harvester goroutine退出,不再采集该⽂件,并关闭⽂件句柄。 filebeat为了防⽌占据过多的采集⽇志⽂件的⽂件句柄,默认的close_inactive参数为5min,如果⽇志⽂件5min内没有被修改,上⾯代码会进⼊ErrInactive的case,之后该harvester goroutine会被关闭。 这种场景下还需要注意的是,如果某个⽂件⽇志采集中被移除了,但是由于
此时被filebeat保持着⽂件句柄,⽂件占据的磁盘空间会被保留直到harvester goroutine结束。sae认证
3. 缓存队列
在memqueue被初始化时,filebeat会根据配置min_event是否⼤于1创建BufferingEventLoop或者DirectEventLoop,⼀般默认都是BufferingEventLoop,即带缓冲的队列。
bbzs
type bufferingEventLoop struct {broker *Brokerbuf        *batchBufferflushList  flushListeventCount intminEvents    intmaxEvents    intflushTimeout time.Dura
BufferingEventLoop是⼀个实现了Broker、带有各种channel的结构,主要⽤于将⽇志发送⾄consumer消费。 BufferingEventLoop的run⽅法中,同样是⼀个⽆限循环,这⾥可以认为是⼀个⽇志事件的调度中⼼。
for {select {case  0 {l.flushBuffer()}}}复制代码
上⽂中harvester goroutine每次读取到⽇志数据之后,最终会被发送⾄bufferingEventLoop中的events chan pushRequest
channel,然后触发上⾯req :=
这⾥获取到了consumer消费者的response channel,然后发送数据给这个channel。真正到这,才会
触发consumer对memqueue的消费。所以,其实memqueue并⾮⼀直不停的在被consumer消费,⽽是在memqueue通知consumer的时候才被消费,我们可以理解为⼀
种脉冲式的发送。
焊锡炉4. 消费队列
实际上,早在filebeat初始化的时候,就已经创建了⼀个eventConsumer并在loop⽆限循环⽅法⾥试图从Broker中获取⽇志数据。
for {if !paused && c.out != nil && consumer != nil && batch == nil {out = c.out.workQueuequeueBatch, err := consumer.Get(c.out.batchSize)...batch = newB
上⾯consumer.Get就是消费者consumer从Broker中获取⽇志数据,然后发送⾄out的channel中被output client发送,我们看⼀下Get
⽅法⾥的核⼼代码:
select {case quests
getRequest的结构如下:
type getRequest struct {sz  int              // request sz events from the brokerresp chan getResponse // channel to send response to}复制代码getResponse的结构:
type getResponse struct {ack *ackChanbuf []publisher.Event}复制代码
getResponse⾥包含了⽇志的数据,⽽getRequest包含了⼀个发送⾄消费者的channel。 在上⽂bufferingEventLoop缓冲队列的handleConsumer⽅法⾥接收到的参数为getRequest,⾥⾯包含了consumer请求的getResponse channel。 如果handleConsumer不发送数据,consumer.Get⽅法会⼀直阻塞在select中,直到flushBuffer,consumer的getResponse channel才会接收到⽇志数据。
5. 发送⽇志
在创建beats时,会创建⼀个clientWorker,clientWorker的run⽅法中,会不停的从consumer发送的channel⾥读取⽇志数据,然后调
物料周转箱⽤client.Publish批量发送⽇志。
func (w *clientWorker) run() {for !w.closed.Load() {for batch := range w.qu {if err := w.client.Publish(batch); err != nil {return}}}}复制代码
libbeats库中包含了kafka、elasticsearch、logstash等⼏种client,它们均实现了client接⼝:
type Client interface {Close() errorPublish(publisher.Batch) errorString() string}复制代码
当然最重要的是实现Publish接⼝,然后将⽇志发送出去。
实际上,filebeat中⽇志数据在各种channel⾥流转的设计还是⽐较复杂和繁琐的,笔者也是研究了好久、画了很长的架构图才理清楚其中
的逻辑。 这⾥抽出了⼀个简化后的图以供参考:
如何保证at least once
filebeat维护了⼀个registry⽂件在本地的磁盘,该registry⽂件维护了所有已经采集的⽇志⽂件的状态。 实际上,每当⽇志数据发送⾄后
端成功后,会返回ack事件。filebeat启动了⼀个独⽴的registry协程负责监听该事件,接收到ack事件后会将⽇志⽂件的State状态更新⾄registry⽂件中,State中的Offset表⽰读取到的⽂件偏移量,所以filebeat会保证Offset记录之前的⽇志数据肯定被后端的⽇志存储接收到。 State结构如下所⽰:
type State struct {Id          string            `json:"-"` // local unique id to make comparison more efficientFinished    bool              `json:"-"` // harvester stateFile 记录在registry⽂件中的数据⼤致如下所⽰:
[{"source":"/tmp/aa.log","offset":48,"timestamp":"2019-07-03T13:54:01.298995+08:00","ttl":-1,"type":"log","meta":null,"FileStateOS":{"inode":7048952,"devi
由于⽂件可能会被改名或移动,filebeat会根据inode和设备号来标志每个⽇志⽂件。 如果filebeat异常重启,每次采集harvester启动的时候都会读取registry⽂件,从上次记录的状态继续采集,确保不会从头开始重复发送所有的⽇志⽂件。 当然,如果⽇志发送过程中,还没来得及返回ack,filebeat就挂掉,
registry⽂件肯定不会更新⾄最新的状态,那么下次采集的时候,这部分的⽇志就会重复发送,所以这意味着filebeat只能保证at least once,⽆法保证不重复发送。 还有⼀个⽐较异常的情况是,linux下如果⽼⽂件被移除,新⽂件马上创建,很
有可能它们有相同的inode,⽽由于filebeat根据inode来标志⽂件记录采集的偏移,会导致registry⾥记录的其实是被移除的⽂件State状态,这样新的⽂件采集却从⽼的⽂件Offset开始,从⽽会遗漏⽇志数据。 为了尽量避免inode被复⽤的情况,同时防⽌registry⽂件随着时间增长越来越⼤,建议使⽤clean_inactive和clean_remove配置将长时间未更新或者被删除的⽂件State从registry中移除。
同时我们可以发现在harvester读取⽇志中,会更新registry的状态处理⼀些异常场景。例如,如果⼀个⽇志⽂件被清空,filebeat会在下⼀次Reader.Next⽅法中返回ErrFileTruncate异常,将inode标志⽂件的Offset置为0,结束这次harvester,重新启动新的harvester,虽
然⽂件不变,但是registry中的Offset为0,采集会从头开始。
特别注意的是,如果使⽤容器部署filebeat,需要将registry⽂件挂载到宿主机上,否则容器重启后registry⽂件丢失,会使filebeat从头开始重复采集⽇志⽂件。
filebeat⾃动reload更新
⽬前filebeat⽀持reload input配置,module配置,但reload的机制只有定时更新。 在配置中打开able之后,还可以配置reload.period表⽰⾃动reload配置的时间间隔。 filebeat在启动时,会创建⼀个专门⽤于reload的协程。对于每个正在运⾏的
harvester,filebeat会将其加⼊⼀个全局的Runner列表,每次到了定时的间隔后,会触发⼀次配置⽂件的diff判断,如果是需要停⽌的加⼊stopRunner列表,然后逐个关闭,新的则加⼊startRunner列表,启动新的Runner。
filebeat对kubernetes的⽀持
filebeat官⽅⽂档提供了在kubernetes下基于daemonset的部署⽅式,最主要的⼀个配置如下所⽰:
- type: docker      containers.ids:      - "*"      processors:        - add_kubernetes_metadata:            in_cluster: true复制代码
即设置输⼊input为docker类型。由于所有的容器的标准输出⽇志默认都在节点的/var/lib/docker/containers//*-json.log路径,所以本质上采集的是这类⽇志⽂件。 和传统的部署⽅式有所区别的是,如果服务部署在kubernetes上,我们查看和检索⽇志的维度不能仅仅局限于节点和服务,还需要有podName,containerName等,所以每条⽇志我们都需要打标增加kubernetes的元信息
才发送⾄后端。filebeat会在配置中增加了add_kubernetes_metadata的processor的情况下,启动监听kubernetes的watch服务,监听所有kubernetes pod的变更,然后将归属本节点的pod最新的事件同步⾄本地的缓存中。 节点上⼀旦发⽣容器的销毁创
建,/var/lib/docker/containers/下会有⽬录的变动,filebeat根据路径提取出containerId,再根据containerId从本地的缓存中到pod 信息,从⽽可以获取到podName、label等数据,并加到⽇志的元信息fields中。 filebeat还有⼀个beta版的功能
autodiscover,autodiscover的⽬的是把分散到不同节点上的filebeat配置⽂件集中管理。⽬前也⽀持kubernetes作为provider,本质上还是监听kubernetes事件然后采集docker的标准输出⽂件。 ⼤致架构如下所⽰:
但是在实际⽣产环境使⽤中,仅采集容器的标准输出⽇志还是远远不够,我们往往还需要采集容器挂载出来的⾃定义⽇志⽬录,还需要控制每个服务的⽇志采集⽅式以及更多的定制化功能。
在轻⾈容器云上,我们⾃研了⼀个监听kubernetes事件⾃动⽣成filebeat配置的agent,通过CRD的⽅式,⽀持⾃定义容器内部⽇志⽬录、⽀持⾃定义fields、⽀持多⾏读取等功能。同时可在kubernetes上统⼀管理各种⽇志配置,⽽且⽆需⽤户感知pod的创建销毁和迁移,⾃动完成各种场景下的⽇志配置⽣成和更新。
性能分析与调优
虽然beats系列主打轻量级,虽然⽤golang写的filebeat的内存占⽤确实⽐较基于jvm的logstash等好太多,但是事实告诉我们其实没那么简单。 正常启动filebeat,⼀般确实只会占⽤3、40MB内存,但是在轻⾈容器云上偶发性的我们也会发现某些节点上的filebeat容器内存占⽤超过配置的pod limit限制(⼀般设置为200MB),并且不停的触发的OOM。 究其原因,⼀般容器化环境中,特别是裸机上运⾏的容器个数可能会⽐较多,导致创建⼤量的harvester去采集⽇志。如果没有很好的配置filebeat,会有较⼤概率导致内存急剧上升。 当然,filebeat 内存占据较⼤的部分还是memqueue,所有采集到的⽇志都会先发送⾄memqueue聚集,再通过output发送出去。每条⽇志的数据在filebeat中都被组装为event结构,filebeat默认配置的memqueue缓存的event个数为4096,可通过vents设置。默认最
⼤的⼀条⽇志的event⼤⼩限制为10MB,可通过max_bytes设置。4096 * 10MB = 40GB,可以想象,极端场景下,filebeat⾄少占据硅胶模具制作方法
40GB的内存。特别是配置了multiline多⾏模式的情况下,如果multiline配置有误,单个event误采集为上千条⽇志的数据,很可能导致memqueue占据了⼤量内存,致使内存爆炸。 所以,合理的配置⽇志⽂件的匹配规则,限制单⾏⽇志⼤⼩,根据实际情况配置memqueue 缓存的个数,才能在实际使⽤中规避filebeat的内存占⽤过⼤的问题。
如何对filebeat进⾏扩展开发
汽水取样

本文发布于:2023-06-18 02:14:56,感谢您对本站的认可!

本文链接:https://patent.en369.cn/patent/3/142892.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:采集   配置   容器   发送   数据
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 369专利查询检索平台 豫ICP备2021025688号-20 网站地图