一、常用Sources介绍

此处介绍的 Sources 有:Avro Source、Thrift Source、Exec Source、Spooling Directory Source。

Avro Source

监听 Avro 端口,接收外部 Avro 客户端发来的 Event 是流,当和另一个Agent (Event流上,前面一个) 的 Avro Sink 连接配对时,能够将两个 Agent 连接形成一个Event 链。其必须的属性如下:

属性名默认值描述
channels-
type-组件类型名称必须是avro
bind-监听的主机名或者 IP 地址
port-监听的端口号
threads-能生成的工作线程的最大数
selector.type
selector.*
interceptors-Space-separated list of interceptors
interceptors.*
compression-typenone可选项为“none”或“deflate”,compression-type必须符合匹配AvroSource的compression-type
sslfalse设置为true启用SSL加密,同时必须制定一个“keystore”和一个“keystore-password”
keystore-Java keystore文件的路径,需要启用SSL加密
keystore-password-Java keystore的密码,需要启用SSl加密
keystore-typeJKSJava keystore的类型,可选项为:“JSK” 和 “PKCS12”
exclude-protocolsSSLv3Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
ipFilterfalse设置为true启用ip过滤
ipFilter.rules-通过此配置,定义ip过滤的表达式规则

实例 Agent a1

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4141
a1.sources.r1.ipFilter = true
a1.sources.r1.ipFilter.rules = allow:ip:127.*,allow:name:localhost,deny:ip:*

a1.sources.r1.channels = c1

ipFilter.rules定义格式如下 :

<allow or deny>:<ip or name for computer name>:<pattern> 或

allow/deny:ip/name:pattern

Thrift Source

监听 Thrift 端口,接收外部 Thrift 客户端发来的 Event 是流,当和另一个Agent (Event流上,前面一个) 的 Thrift Sink 连接配对时,能够将两个 Agent 连接形成一个Event 链。其必须的属性如下:

属性名默认值描述
channels-
type-组件类型名称必须是thrift
bind-监听的主机名或者 IP 地址
port-监听的端口号
threads-能生成的工作线程的最大数
selector.type
selector.*
interceptors-Space-separated list of interceptors
interceptors.*

实例,Agent a1

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

Exec Source

Exec Source 运行一个给定的 Unix 命令,此命令需要在启动后,进程能不断的产生数据到标准输出( 除非将 logStdErr 设置为 true,否则标准错误输出stderr将会被抛弃 )。如果 Unix 命令进程意外退出了,Exec Source 也会退出,不会再产生数据。这意味着配置如 cat [named pipe]tail -F [file] 命令的时候将会产生期望的结果,而使用 date 命令的时候不会,前面两个命令会产生数据流,但是后面一个任务只会产生一个单一的 Event,然后马上退出。其config属性如下:

属性名默认值描述
channels-
type-组件类型名称必须是exec
command-需要执行的命令
shell-调用来运行命令的shell,如:/bin/sh -c,Required only for commands relying on shell features like wildcards, back ticks, pipes etc.
restartThrottle10000重启之前的等待时间(ms )
resstartfalse设置是否重启命令,如果命令进程死了
logStdErrfalse设置是否命令的标准错误输出会被发送
batchSize20同时读取和发送的最大行数
selector.typereplicatingreplicating 或 multiplexing
selector.*依赖selector.type的值
interceptors-Space-separated list of interceptors
interceptors.*

注意:可以使用Exec Source 模仿Flume 0.9x ( flume-og )中的 Tail Source,只要使用 Unix 命令tail -F /full/path/to/your/file,在此情况下,参数 -F-f要更好,因为它会根据文件轮询。

实例,Agent a1

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

shell配置来执行command,通过一个命令行脚本( 如 Bash 或 PowerShell ),

agent_foo.sources.tailsource-1.type = exec
agent_foo.sources.tailsource-1.shell = /bin/bash -c
agent_foo.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

Spooling Directory Source

SpoolDir Source 支持从磁盘“spooling”文件夹读取数据文件,此源会监控指定的文件夹的新增文件,一旦有新文件出现,SpoolDir Source 会将其解析为 Event发送,这个 Event 解析逻辑时插件化的。在一个文件被全部读入到 Channel 之后, 该文件会被重命名标记完成( 或选择性的删除 )。

不同于 Exec Source,这个源是可靠的,即使 Flume 进程重启或是被杀掉都不会丢失数据。作为可靠性的交换,只有不变的且命名唯一的文件才能被放入源监控的目录,Flume 会检测这些问题条件,如果违反了,Flume 会报错:

  1. 如果一个文件在移动到 SpoolDir Source 监控目录下之后被更改过,Flume 会在日志文件中输出错误信息,并停止 Flume 进程;
  2. 如果一个文件名在一段时间后被重复使用,Flume 会在日志文件中输出错误信息,并停止 Flume 进程。

为了避免上述问题,比较好的方法是:在日志文件被移动到监控目录时,给日志文件用唯一标示符来命名( 例如:时间戳,timestamp );

属性名默认值描述
channels-
type-组件类型名称必须是spooldir
spoolDir-源监控的目录路径
fileSuffix.COMPLETED文件被读入完成后添加的标示符后缀
deletePolicynever是否删除完成读入的文件,可选项:neverimmediate
fileHeaderfalse是否添加一个保存文件绝对路径的 Header
fileHeaderKeyfile当给 event header 添加绝对路径名的时候使用
basenameHeaderfalse是否添加一个保存文件 basename 的Header
basenameHeaderKeybasename当给 event header 添加 basename 的时候使用
ignorePattern^$忽略正则表达式指定的文件
trackerDir.flumespool保存跟进程文件相关元数据的目录,如果此路径不是一个绝对路径,就会解释为一个相对于 spoolDir 的路径。
consumeOrderoldest监控目录下的文件被读取的顺序,可选项为:oldestyoungestrandom
maxBackoff4000 ( ms )当 Channel 满了之后,连续尝试往 Channel 传送数据的最大时间间隔。SpoolDir Source 开始会启动一个很低的 maxBackoff,一旦 Channel 抛出一个 ChannelException 的时候,就会增加此 maxBackoff 值,直到达到指定的最大值。
batchSize100数据被传送到 Channel 的粒度。
inputCharsetUTF-8将输入当做文本文档解析时候使用的字符集,即监控文件的字符集
decodeErrorPolicyFAIL当发现一个无法解析字符集的输入文件时,需要做的处理:FAIL,抛出一个异常并标记解析失败;REPLACE,使用 “replacement character” 字符重复解析错误解析字符,例如 Unicode U+FFFD;IGNORE,删除无法解析的字符串序列。
deserializerLINE指定将文件解析为 Event 的解析器,默认解析每行为一个 Event,指定的限定性类名必须实现接口 EventDeserializer.Builder
deserializer.*Varies per event deserializer.
bufferMaxLines-该选项已经被忽略
bufferMaxLineLength5000(Deprecated) 提交缓存中一行的最大长度,使用 deserializer.maxLineLength 代替。
selector.typereplicatingreplicating 或 multiplexing
selector.*依赖selector.type的值
interceptors-Space-separated list of interceptors
interceptors.*

实例,Agent agent-1

agent-1.channels = ch-1
agent-1.sources = src-1

agent-1.sources.src-1.type = spooldir
agent-1.sources.src-1.channels = ch-1
agent-1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
agent-1.sources.src-1.fileHeader = true

NetCat Source

属性名默认值描述
channels-
type-组件类型名称必须是netcat
bind-监听主机名或ip地址
port-监听端口号
max-line-length512每个 event 内容的最大行数 ( 单位:字节 )</code>
ack-every-eventtrue每接收到一个 event 则回发一个 OK
selector.typereplicatingreplicating 或 multiplexing
selector.*依赖selector.type的值
interceptors-Space-separated list of interceptors
interceptors.*

实例,Agnet a1

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1

二、Event Deserializers

以下是 Flume 附带的 Event 解析器:Line、Avro、BlobDeserializer。

Line

此 deserializer 对应文本输入的每一行生成一个 event。

属性名默认值描述
deserializer.maxLineLength2048单个 event 能包含字符的最大数,如果一行超过了这个长度,将会被截断,该行中截断后剩余的字符会出现在后续的 event 中
deserializer.outputCharsetUTF-8发送到 Channel 的每个 event 的编码字符集

AVRO

详情见AVRO Deserializer

属性名默认值描述
deserializer.schemaTypeHASHHow the schema is represented. By default, or when the value HASH is specified, the Avro schema is hashed and the hash is stored in every event in the event header “flume.avro.schema.hash”. If LITERAL is specified, the JSON-encoded schema itself is stored in every event in the event header “flume.avro.schema.literal”. Using LITERAL mode is relatively inefficient compared to HASH mode.

BlobDeserializer

详情见Blob Deserializer

属性名默认值描述
deserializer-这个类的全限定性类名: org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
deserializer.maxBlobLength100000000一个给定请求读取和缓存的最大字节数

</br>

===

未完待续。。。