一、常用Sink介绍
此处介绍的 Sinks 有:HDFS Sink、Logger Sink、Avro/Thrift Sink、HBase Sink。
Avro Sink
Avro Sink 为 Flume 的层次连接提供了一半的支持( 另一半为 Avro Source ),发送到此 Sink 的 Flume 被转换为 Avro 事件,然后被发送到对应的主机:端口上。其必须的属性如下:
属性名 | 默认值 | 描述 |
---|---|---|
channels | - | |
type | - | 组件类型名称必须是avro |
hostname | - | 绑定的主机名或者 IP 地址 |
port | - | 绑定的端口号 |
batch-size | 100 | 一次同时发送的 event 数 |
connect-timeout | 20000 | 第一次握手请求时允许的时长。( ms ) |
request-timeout | 20000 | 第一次过后,后续请求允许的时长 ( ms ) |
ireset-connection-interval | none | |
compression-type | none | 可选项为“none”或“deflate”,compression-type必须符合匹配AvroSource的compression-type |
compression-level | 6 | 压缩 event 的压缩级别,0 为不压缩,1 - 9 为压缩,数字越大则压缩率越大。 |
ssl | false | 设置为true 启用SSL加密,同时可以选择性设置“truststore”,“truststore-password”,“truststore-type”,并且指定是否打开“trust-all-certs” |
trust-all-certs | false | 如果设置为 true ,则远程服务( Avro Source )的 SSL 服务证书将不会进行校验,因而生产环境不能设置为 true |
truststore | - | Java truststore文件的路径,需要启用SSL加密 |
truststore-password | - | Java truststore的密码,需要启用SSl加密 |
truststore-type | JKS | Java truststore的类型,可选项为:“JSK” 或其它支持的Java truststore 类型 |
exclude-protocols | SSLv2Hello SSLv3 | Space-separated list of SSL/TLS protocols to exclude |
maxIoWorkers | 2 * 当前机器上可用的处理器数 | I/O 处理线程的最大数,在 NettyAvroRpcClient 和 NioClientSocketChannelFactory 上被加载。 |
实例 Agent a1
:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
Thrift Sink
Thrift Sink 为 Flume 的层次连接提供了一半的支持( 另一半为 Thrift Source ),发送到此 Sink 的 Flume 被转换为 Thrift 事件,然后被发送到对应的主机:端口上。其必须的属性如下:
属性名 | 默认值 | 描述 |
---|---|---|
channels | - | |
type | - | 组件类型名称必须是thrift |
hostname | - | 绑定的主机名或者 IP 地址 |
port | - | 绑定的端口号 |
batch-size | 100 | 一次同时发送的 event 数 |
connect-timeout | 20000 | 第一次握手请求时允许的时长。( ms ) |
request-timeout | 20000 | 第一次过后,后续请求允许的时长 ( ms ) |
ireset-connection-interval | none |
实例,Agent a1
:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
HDFS Sink
HDFS Sink 将 events 写入到 Hadoop 分布式文件系统( HDFS )当中,目前它支持创建文本文件和二进制序列化文件( SequenceFile ),这两种文件类型都支持压缩文件。数据库文件能够周期性地,在运行时间、数据大小或event数量的基础上轮转( 关闭当前文件,并创建一个新文件 )。It also buckets/partitions data by attributes like timestamp or machine where the event originated. The HDFS directory path may contain formatting escape sequences that will replaced by the HDFS sink to generate a directory/file name to store the events. 使用 HDFS Sink 需要安装 hadoop,如此 Flume 便能够通过 hadoop jars 连接 HDFS 集群,注意,Hadoop 的版本必须支持sync()
调用。
下面是支持的转义序列:
别名 | 描述 |
---|---|
%{host} | |
%t | Unix 时间的秒数 |
%a | 本地的星期缩写名称 (Mon, Tue, ...) |
%A | 本地的星期完整名称 (Monday, Tuesday, ...) |
%b | 本地的月份缩写名称 (Jan, Feb, ...) |
%B | 本地的月份完整名称 (January, February, ...) |
%c | 本地的日期和时间 (Thu Mar 3 23:05:25 2005) |
%d | 某月中的某天 (01) |
%D | 日期,与 %m/%d/%y 一样 |
%H | 24 小时制的小时,补齐两位 (00..23) |
%I | 12 小时制的小时,补齐两位 (01..12) |
%j | 某年中的某天 (001..366) |
%k | 24 小时制的小时,不补齐两位 ( 0..23) |
%m | 月份 (01..12) |
%M | 分钟数 (00..59) |
%p | 本地上午或下午 (am, pm) |
%s | 从 1970-01-01 00:00:00 UTC 到现在的秒数 |
%S | 秒数 (00..59) |
%y | 年份的后两位数字 (00..99) |
%Y | 年份 (2015) |
%z | +hhmm numeric timezone (for example, -0400) |
正在读取的文件的文件名会以 .tmp
结尾,一旦文件被关闭,则扩展部分会被移除,这使排除目录中部分完成的文件称为可能。其config属性如下:
注意:对于所有与时间相关的转义系列,一个包含 “timestamp” 的header必须在 event 的所有headers中存在( 除非 hdfs.useLocalTimeStamp
被设置为 true
)。设置自动添加的一种方式是使用 TimestampInterceptor
。
属性名 | 默认值 | 描述 |
---|---|---|
channels | - | |
type | - | 组件类型名称必须是hdfs |
hdfs.path | - | HDFS 目录路径 (如 hdfs://namenode/flume/webdata/ ) |
hdfs.filePrefix | FlumeData | 在 HDFS 目录中,Flume 创建的文件的前缀 |
hdfs.fileSuffix | - | 文件的后缀 (如 .avro - 注意:时间不是自动添加的) |
hdfs.inUsePrefix | - | Flume 正在写入的临时文件的的前缀 |
hdfs.inUseSuffix | .tmp | Flume 正在写入的临时文件的后缀 |
更多属性配置详见:http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
实例,Agent a1
:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
上面的配置将时间戳四舍五入到最近的10分钟,如:一个 event 的时间戳是 11:54:34 AM, June 12, 2012
,那么映射到 hdfs 路径则为 /flume/events/2012-06-12/1150/00
HBase Sink
具体介绍等,详见:http://flume.apache.org/FlumeUserGuide.html#hbasesink
属性名 | 默认值 | 描述 |
---|---|---|
channels | - | |
type | - | 组件类型名称必须是hbase |
table | - | HBase 中写入数据的表 |
columnFamily | - | HBase 中写入数据列簇 |
zookeeperQuorum | - | The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml |
znodeParent | /hbase | The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml |
batchSize | 100 | 每次事务处理写入的 event数 |
coalesceIncrements | false | 是否添加一个保存文件 basename 的Header |
serializer | org.apache.flume.sink.hbase.SimpleHbaseEventSerializer | Default increment column = “iCol”, payload column = “pCol” |
serializer.* | - | Properties to be passed to the serializer. |
kerberosPrincipal | - | Kerberos user principal for accessing secure HBase |
kerberosKeytab | - | Kerberos keytab for accessing secure HBase |
实例,Agent agent-1
:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
Logger Sink
Logs Sink 属于 INFO 级别的,通常用作测试或调试目的,其属性:
属性名 | 默认值 | 描述 |
---|---|---|
channels | - | |
type | - | 组件类型名称必须是logger |
实例,Agnet a1
:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
</br>
===
未完待续。。。