一、常用Sink介绍

此处介绍的 Sinks 有:HDFS Sink、Logger Sink、Avro/Thrift Sink、HBase Sink。

Avro Sink

Avro Sink 为 Flume 的层次连接提供了一半的支持( 另一半为 Avro Source ),发送到此 Sink 的 Flume 被转换为 Avro 事件,然后被发送到对应的主机:端口上。其必须的属性如下:

属性名默认值描述
channels-
type-组件类型名称必须是avro
hostname-绑定的主机名或者 IP 地址
port-绑定的端口号
batch-size100一次同时发送的 event 数
connect-timeout20000第一次握手请求时允许的时长。( ms )
request-timeout20000第一次过后,后续请求允许的时长 ( ms )
ireset-connection-intervalnone
compression-typenone可选项为“none”或“deflate”,compression-type必须符合匹配AvroSource的compression-type
compression-level6压缩 event 的压缩级别,0 为不压缩,1 - 9 为压缩,数字越大则压缩率越大。
sslfalse设置为true启用SSL加密,同时可以选择性设置“truststore”,“truststore-password”,“truststore-type”,并且指定是否打开“trust-all-certs”
trust-all-certsfalse如果设置为 true,则远程服务( Avro Source )的 SSL 服务证书将不会进行校验,因而生产环境不能设置为 true
truststore-Java truststore文件的路径,需要启用SSL加密
truststore-password-Java truststore的密码,需要启用SSl加密
truststore-typeJKSJava truststore的类型,可选项为:“JSK” 或其它支持的Java truststore 类型
exclude-protocolsSSLv2Hello SSLv3Space-separated list of SSL/TLS protocols to exclude
maxIoWorkers2 * 当前机器上可用的处理器数I/O 处理线程的最大数,在 NettyAvroRpcClientNioClientSocketChannelFactory 上被加载。

实例 Agent a1

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

Thrift Sink

Thrift Sink 为 Flume 的层次连接提供了一半的支持( 另一半为 Thrift Source ),发送到此 Sink 的 Flume 被转换为 Thrift 事件,然后被发送到对应的主机:端口上。其必须的属性如下:

属性名默认值描述
channels-
type-组件类型名称必须是thrift
hostname-绑定的主机名或者 IP 地址
port-绑定的端口号
batch-size100一次同时发送的 event 数
connect-timeout20000第一次握手请求时允许的时长。( ms )
request-timeout20000第一次过后,后续请求允许的时长 ( ms )
ireset-connection-intervalnone

实例,Agent a1

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

HDFS Sink

HDFS Sink 将 events 写入到 Hadoop 分布式文件系统( HDFS )当中,目前它支持创建文本文件和二进制序列化文件( SequenceFile ),这两种文件类型都支持压缩文件。数据库文件能够周期性地,在运行时间、数据大小或event数量的基础上轮转( 关闭当前文件,并创建一个新文件 )。It also buckets/partitions data by attributes like timestamp or machine where the event originated. The HDFS directory path may contain formatting escape sequences that will replaced by the HDFS sink to generate a directory/file name to store the events. 使用 HDFS Sink 需要安装 hadoop,如此 Flume 便能够通过 hadoop jars 连接 HDFS 集群,注意,Hadoop 的版本必须支持sync()调用。

下面是支持的转义序列:

别名描述
%{host}
%tUnix 时间的秒数
%a本地的星期缩写名称 (Mon, Tue, ...)
%A本地的星期完整名称 (Monday, Tuesday, ...)
%b本地的月份缩写名称 (Jan, Feb, ...)
%B本地的月份完整名称 (January, February, ...)
%c本地的日期和时间 (Thu Mar 3 23:05:25 2005)
%d某月中的某天 (01)
%D日期,与 %m/%d/%y 一样
%H24 小时制的小时,补齐两位 (00..23)
%I12 小时制的小时,补齐两位 (01..12)
%j某年中的某天 (001..366)
%k24 小时制的小时,不补齐两位 ( 0..23)
%m月份 (01..12)
%M分钟数 (00..59)
%p本地上午或下午 (am, pm)
%s1970-01-01 00:00:00 UTC 到现在的秒数
%S秒数 (00..59)
%y年份的后两位数字 (00..99)
%Y年份 (2015)
%z+hhmm numeric timezone (for example, -0400)

正在读取的文件的文件名会以 .tmp 结尾,一旦文件被关闭,则扩展部分会被移除,这使排除目录中部分完成的文件称为可能。其config属性如下:

注意:对于所有与时间相关的转义系列,一个包含 “timestamp” 的header必须在 event 的所有headers中存在( 除非 hdfs.useLocalTimeStamp 被设置为 true )。设置自动添加的一种方式是使用 TimestampInterceptor

属性名默认值描述
channels-
type-组件类型名称必须是hdfs
hdfs.path-HDFS 目录路径 (如 hdfs://namenode/flume/webdata/)
hdfs.filePrefixFlumeData在 HDFS 目录中,Flume 创建的文件的前缀
hdfs.fileSuffix-文件的后缀 (如 .avro - 注意:时间不是自动添加的)
hdfs.inUsePrefix-Flume 正在写入的临时文件的的前缀
hdfs.inUseSuffix.tmpFlume 正在写入的临时文件的后缀

更多属性配置详见:http://flume.apache.org/FlumeUserGuide.html#hdfs-sink

实例,Agent a1

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

上面的配置将时间戳四舍五入到最近的10分钟,如:一个 event 的时间戳是 11:54:34 AM, June 12, 2012,那么映射到 hdfs 路径则为 /flume/events/2012-06-12/1150/00

HBase Sink

具体介绍等,详见:http://flume.apache.org/FlumeUserGuide.html#hbasesink

属性名默认值描述
channels-
type-组件类型名称必须是hbase
table-HBase 中写入数据的表
columnFamily-HBase 中写入数据列簇
zookeeperQuorum-The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
znodeParent/hbaseThe base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
batchSize100每次事务处理写入的 event数
coalesceIncrementsfalse是否添加一个保存文件 basename 的Header
serializerorg.apache.flume.sink.hbase.SimpleHbaseEventSerializerDefault increment column = “iCol”, payload column = “pCol”
serializer.*-Properties to be passed to the serializer.
kerberosPrincipal-Kerberos user principal for accessing secure HBase
kerberosKeytab-Kerberos keytab for accessing secure HBase

实例,Agent agent-1

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1

Logger Sink

Logs Sink 属于 INFO 级别的,通常用作测试或调试目的,其属性:

属性名默认值描述
channels-
type-组件类型名称必须是logger

实例,Agnet a1

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

</br>

===

未完待续。。。