我这边选取filebeat做的日志收集客户端
filebeat是一块轻量级方法,用于转发和汇总日志与文件,让简单的事情不再繁杂。
主机名 | 系统版本 | 安装软件 |
---|---|---|
filebeat | centos7.4 | filebeat-6.4.2 |
file beat安装就很简单了,rpm包也许,官网yum源也可以
sudo rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
vim /etc/yum.repos.d/filebeat.repo
[elastic-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
sudo yum install filebeat
filebeat 每个大版本配置文件有少许变化,这里用的6.4版本的~
sudo vim /etc/filebeat/filebeat.yml
filebeat.prospectors: #prospector(input)段配置
- type: log #YAML格式,每个输入都是"-"开头 可以指定多个,默认log,日志文件读取每一行
paths: #从这里输入,要指定path
-/var/log/*.log #日志文件路径列表,可用通配符,不递归
encoding:plain #编码,默认无,plain(不验证或者改变任何输入),
exclude_lines: ['^DBG'] #删除任何以DBG开头的行
include_lines: ['^ERR','^WARN'] #只匹配以ERR,WARN开头的行
exclude_files: [".gz$"] #排除文件,后接一个正则表达式的列表,默认无
ignore_older: 0 #排除更改时间超过定义的文件,时间字符串可以用2h表示2小时,5m表示5分钟,默认0
scan_frequency: 10s #prospector扫描新文件的时间间隔,默认10秒
max_bytes: 10485760 #单文件最大收集的字节数,单文件超过此字节数后的字节将被丢弃,默认10MB,需要增大,保持与日志输出配置的单文件最大值一致即可
multiline.pattern: ^\d{4}- #多行匹配模式,后接正则表达式,默认无
multiline.negate: ture #多行匹配模式后配置的模式是否取反,默认false
multiline.match: after #定义多行内容被添加到模式匹配行之后还是之前,默认无,可以被设置为after或者before
multiline.max_lines: 500 #单一多行匹配聚合的最大行数,超过定义行数后的行会被丢弃,默认500
multiline.timeout: 5s #多行匹配超时时间,超过超时时间后的当前多行匹配事件将停止并发送,然后开始一个新的多行匹配事件,默认5秒
tail_files: true #可配置为true时,filebeat将从新文件的最后位置开始读取
close_renamed: false #当文件被重命名或被轮询时关闭重命名的文件处理。默认false
close_removed: true #如果文件不存在,立即关闭文件处理。如果后面文件又出现了,会在scan_frequency之后继续从最后一个已知position处开始收集,默认true
enabled: true #每个prospectors的开关,默认true
fields: #自定义字段
log_topic: ca-pre
#Outputs配置段,输出到Kafka,可以有一个缓冲和消息队列不丢失数据
output.kafka:
hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
topic: '%{[fields.log_topic]}' #可以取自定义字段的当作topic
# topic: 'ca-pre' #也可以直接指定
partition.round_robin: #topic分区策略,默认为hash
reachable_only: false #默认情况下,所有分区程序都会尝试将事件发布到所有分区。设置为true,则事件将仅发布到可用分区。
client_id: beats #用于记录,调试,审计
work: 2 #并发数量
timeout: 30s #超时设置
broker_timeout: 10s #等待ack确认超时
bulk_max_size: 2048 #单个Kafka请求中批量处理的最大事件数
required_acks: 1 #ack确认消息
compression: gzip #压缩格式
compression_level: 4 #压缩等级1-9 默认4
max_message_bytes: 1000000 #JSON编码消息的最大允许大小。更大的消息将被删除
keep_alive: 60 #网络活动保持连接 默认0