tail2kafka 的所有配置都是lua文件(lua非常简单,甚至都可能意识不到这是个lua)。这些lua文件放到同一个目录,启动时以这个目录为参数。这些lua文件中,有一个名称固定的 main.lua
用来指定全局配置,其余配置,每个lua文件指定一个数据文件。
可以参考源码 blackboxtest/tail2kafka
中的一些配置。
必配项,string
当多台机器发送数据到kafka时,可能需要标识数据的来源。例如多台web服务器发送access_log到kafka。可以用hostshell指定一行shell命令,获取机器名,例如IP或者hostname。注意:如果使用了自动分区,该机器名必须能够解析出IP。
例如: hostshell = "hostname"
,tail2kafka会执行 hostname
命令,该命令的输出作为机器名。
必配项,string
指定tail2kafka的pid文件,pid文件用于停止或重新加载配置文件
例如: pidfile = "/var/run/tail2kafka.pid"
必配项,string
指定kafka的机器和端口,多个机器和端口用逗号分号。
例如: brokers = "127.0.0.1:9092"
可选项,int
指定kafka的partition,也可以再各个数据文件的配置中指定。
必选项,table
librdkafka全局配置,参考源码 blackboxtest/tail2kafka
和 librdkafka
可选项,table
librdkafka的topic配置
可选项, int, 默认值 polllimit=100
当文件写入相当频繁,可以转成轮训模式,参数用来指定轮训的间隔,单位是毫秒。
可选项,int,默认值 -1,关闭,单位是秒
以nginx为例,当想rotate文件时,先把文件mv走,然后给nginx发送 USR1
信号,重新打开文件。这里有个问题,文件mv走时,tail2kafka感知到了,然后去读新文件,但是在mv走之后,nginx重新打开文件之前,nginx仍然会往老文件写日志。 rotatedelay
指定tail2kafka 感知到文件mv时,延迟几秒钟打开新文件,防止数据丢失。
这个值不能设置太大,如果太大,则新文件中的数据会有大的延迟,建议关闭或者设置在10s以内。
可选项,字符串,默认值 “”,关闭
pingbackurl
是一个http地址,tail2kafka会把关键事件以GET请求的形式发送这个地址。例如配置了 pingbackurl=http://server/pingback/tail2kafka
则发生文件rotate时会请求 http://server/pingback/tail2kafka?event=ROTATE&file=/data/logs/access_2018-03-21-04_10.142.113.65_log&size=891010&md5=4b04460d5b0a8b79a5a7a7b78e55aecf
这里file是rotate后老文件的名称,size是文件大小,md5是文件的md5。可以通过日志了解更多事件信息。
可选项,字符串,默认值 /var/lib/tail2kafka
用于存放fileoff,topic的历史文件等一些运行信息。
可选项,字符串,默认值 /var/log/tail2kafka
部分配置可以同时出现在main.lua和数据源文件lua,后者会覆盖前者,如果后者没有指定,会继承前者。
必填项,string
指定kafka的topic
例如: topic = "cron"
可选项,string,默认和topic值相同
一个文件可以被发往多个topic,当kafka不可用时,需要记录尚未发送数据的文件列表,使用 fileAlias 作为文件列表的文件名。
必填项,string,例如: file = "/var/log/message"
指定要发往kafka的源数据文件,tail2kafka可以检测到3种文件rotate的情况,并在rotate后重新打开文件,从0开始读取。三种rotate情况:
- 文件被清空,例如:
truncate --size 0 /tmp/log
不推荐,这种rotate方式可能丢数据 - 文件被改名,例如:
/tmp/log /tmp/log.1
- 文件被删除,例如:
unlink /tmp/log
不推荐,这种rotate方式可能丢数据 - 文件名自身带时间,建议至少分钟级别,例如:=basic.%Y-%m-%d_%H-%M.log= ,这种格式的文件,需要设置
fileWithTimeFormat=true
可选项,boolean,默认值 fileWithTimeFormat=false
当文件名自身带时间时,设置为true。tail2kafka会跟踪时间变化。
可选项,string,默认值 startpos=log_start
每往kafka发送一条消息,会在fileoff中记录消息在相应文件中的位置。当tail2kafka重启启动,或者reload时会使用这个文件, startpos
指定了获取文件开始位置的策略,有4个可选值。
名称 | 含义 |
---|---|
log_start | 优先使用fileoff中记录的文件位置,如果fileoff没找到,从头开始 |
log_end | 优先使用fileoff中记录的文件位置,如果fileoff没找到,从最后一行开始 |
start | 从头开始,忽略fileoff中的值 |
end | 从最后一行开始 |
可选项,boolean,默认 autocreat = false
默认情况,当file指定的文件不存在时,tail2kafka会启动失败,如果指定 autocreat 为 true,可以自动创建不存在的文件。
可选项,字符串
当 autocreat
为true时,自动创建文件,默认文件的owner和tail2kafka的运行用户相同,通过fileOwner改变,以免写文件的进程无法写入。例如:某些时候 nginx 以 nobody 的身份写入log。
可选项,boolean,默认值 true
实时计算发送内容的md5,用于消费kafka时校验数据的完整性。这个md5不一定准,当tail2kafka发送重启或reload时,如果不是从文件开头读,md5值不准确。计算md5需要耗费cpu,一般情况影响有限。
可选项,int,无默认值
指定kafka的partition,如果没有指定,使用main.lua中的配置。精心指定partition,可以实现均衡,但也很容易出错。 partition=-100
是一个特殊配置,把数据在多个分区间随机分布,且只能在每个lua配置中单独指定。
可选项,boolean,默认 autoparti = false
如果autoparti为true,那么使用hostshell的配置对应的IP得到一个数对kafka的全部partition取模。这会导致partition不均衡,但是配置简单,适合数据源机器特别多的情况。
含义同main.lua
可选项,boolean,默认 rawcopy = false
默认情况,逐行发送新增内容到kafka。一次发送一行。如果不需要逐行处理,可以设置 rawcopy
为 true,一次复制多行数据到kafka,可以提高性能。
注意 默认情况,一次发送一行,不包含换行符。一次发送多行时,只有最后一行没有换行符。处理kafka中的数据时,直接按换行符split就行。
可选项,table,无默认值
tail2kafka内置了split功能,把数据行按照空格分隔成字段,通过filter指定字段的下标,然后拼接成行发送。相当于选择一行的某些字段发送,而不是整行发送。对于特别大的行,而行中的某些字段显然没有用,可以使用filter减少发送的内容。
注意 ""和[]
包围的字符,被当做一个字段处理。下标从1开始,负数下标倒着数。
例如: filter = {4, 5, 6, -3}
,行的内容为 127.0.0.1 - - [16/Dec/2016:10:17:01 +0800] "GET /logm HTTP/1.1" 200 288 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.16.2.3 Basic ECC
,发送的内容为 2016-12-16T10:17:01 GET /logm HTTP/1.1 200 288
这里同时指定了 timeidx = 4
,把时间转成了 iso8601
格式。
可选项,function,无默认值
grep 是 filter 的增强版,是个lua函数。filter只能挑选制定的字段,不能改变字段的内容。grep 的输入是行split后的字段,输出是处理后的字段。
注意 如果返回 nil
,这行数据会被忽略。
如果指定了 withhost = true
,把主机名 (参考: hostshell
)自动放到行首。
例如:
grep = function(fields)
return {'[' .. fields[4] .. '] "' .. fields[5] .. '"', fields[6]}
end
那么发送的行为 zzyong [16/Dec/2016:10:17:01 +0800] "GET /logm HTTP/1.1" 200
这里指定了 withhost
,但是没有指定 timeidx
可选项,function,无默认值
aggregate 是 grep 的增强版,aggregate的输出是一个 key + hash table
,用于做各种统计,例如统计状态码,错误数量等。同一时间字段的数据会被尽量合并,但因为日志不报证时间字段绝对递增,所以同一时间的数据可能分多次发送,尤其时单位是秒时,处理kafka中的数据时需要合并。
注意 必须配置 timeidx
和 withtime
,另外时间字段的精度(秒,分钟等)决定了聚合的粒度。为了能做到机器级别,可以配置 withhost
字段
例如:
aggregate = function(fields)
local tbl = {}
tbl["total"] = 1
tbl["status_" .. fields[6] = 1
return "user", tbl
end
那么发送到kafka类似 2016-12-16T10:17:01 zzyong user total=100 status_200=94 status_304=6
如果配置了 pkey=www
,那么同时会发送 2016-12-16T10:17:01 zzyong www total=190 status_200=174 status_304=16
这个是什么意思呢?它统计了user这个类别(可以是域名,日志文件名,或者某个业务)下的总请求量,http各个状态码的数量。如果配置了pkey,那么同时统计了这台机器的总请求量,各个状态码的数量。
这里时间字段是秒级的,所以统计也是秒级的。但是因为并发访问,可能出现
127.0.0.1 - - [16/Dec/2016:10:17:01 +0800] "GET /logm HTTP/1.1" 200 288 "-" "curl/7.19.7" 127.0.0.1 - - [16/Dec/2016:10:17:02 +0800] "GET /logm HTTP/1.1" 200 288 "-" "curl/7.19.7" 127.0.0.1 - - [16/Dec/2016:10:17:01 +0800] "GET /logm HTTP/1.1" 200 288 "-" "curl/7.19.7"
这里时间字段不是绝对递增的,kafka 会收到两条 2016-12-16T10:17:01
的数据,处理数据时,需要把他们累加起来。
注意 如果返回 nil
,这行数据会被忽略。
可选项 string || int,无默认
配合 aggregate
使用,指定全局的统计类别。
可选项 function 无默认值
输入是一行数据,transform操作这行数据,然后输出操作后的数据。 注意 如果返回 nil
忽略这行,如果返回空字符串,则使用源数据(也可以返回元数据,返回空算一种优化吧)。
transform = function(line)
local s = string.sub(line, 1, 7);
if s == "[error]" then return "";
elseif s == "[warn]" then return "[error]" .. string.sub(line, 8)
else return nil end
end
如果是=[error]= 开头的,原样发送,如果是 [warn]
开头的,用 [error]
替换然后发送,否则忽略。
可选项 int 无默认值
指定时间字段的下标,主要配合 filter grep aggregate
使用。如果指定timeidx,时间从格式 28/Feb/2015:12:30:23 +0800
转成 2015-03-30T16:31:53
。
可选项 boolean 默认 withtime=false
如果 true
,会在发往kafka前添加时间字段。
可选项 boolean 默认 withhost=false
如果 true
,会在发往kafka前添加机器名。
可选项 boolean 默认 autonl=true
如果 true
,会在发往kafka的行尾添加换行