一、常用Sink介绍
此处介绍的 Sinks 有:HDFS Sink、Logger Sink、Avro/Thrift Sink、HBase Sink。
Avro Sink
Avro Sink 为 Flume 的层次连接提供了一半的支持( 另一半为 Avro Source ),发送到此 Sink 的 Flume 被转换为 Avro 事件,然后被发送到对应的主机:端口上。其必须的属性如下:
| 属性名 | 默认值 | 描述 |
|---|---|---|
| channels | - | |
| type | - | 组件类型名称必须是avro |
| hostname | - | 绑定的主机名或者 IP 地址 |
| port | - | 绑定的端口号 |
| batch-size | 100 | 一次同时发送的 event 数 |
| connect-timeout | 20000 | 第一次握手请求时允许的时长。( ms ) |
| request-timeout | 20000 | 第一次过后,后续请求允许的时长 ( ms ) |
| ireset-connection-interval | none | |
| compression-type | none | 可选项为“none”或“deflate”,compression-type必须符合匹配AvroSource的compression-type |
| compression-level | 6 | 压缩 event 的压缩级别,0 为不压缩,1 - 9 为压缩,数字越大则压缩率越大。 |
| ssl | false | 设置为true启用SSL加密,同时可以选择性设置“truststore”,“truststore-password”,“truststore-type”,并且指定是否打开“trust-all-certs” |
| trust-all-certs | false | 如果设置为 true,则远程服务( Avro Source )的 SSL 服务证书将不会进行校验,因而生产环境不能设置为 true |
| truststore | - | Java truststore文件的路径,需要启用SSL加密 |
| truststore-password | - | Java truststore的密码,需要启用SSl加密 |
| truststore-type | JKS | Java truststore的类型,可选项为:“JSK” 或其它支持的Java truststore 类型 |
| exclude-protocols | SSLv2Hello SSLv3 | Space-separated list of SSL/TLS protocols to exclude |
| maxIoWorkers | 2 * 当前机器上可用的处理器数 | I/O 处理线程的最大数,在 NettyAvroRpcClient 和 NioClientSocketChannelFactory 上被加载。 |
实例 Agent a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
Thrift Sink
Thrift Sink 为 Flume 的层次连接提供了一半的支持( 另一半为 Thrift Source ),发送到此 Sink 的 Flume 被转换为 Thrift 事件,然后被发送到对应的主机:端口上。其必须的属性如下:
| 属性名 | 默认值 | 描述 |
|---|---|---|
| channels | - | |
| type | - | 组件类型名称必须是thrift |
| hostname | - | 绑定的主机名或者 IP 地址 |
| port | - | 绑定的端口号 |
| batch-size | 100 | 一次同时发送的 event 数 |
| connect-timeout | 20000 | 第一次握手请求时允许的时长。( ms ) |
| request-timeout | 20000 | 第一次过后,后续请求允许的时长 ( ms ) |
| ireset-connection-interval | none |
实例,Agent a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
HDFS Sink
HDFS Sink 将 events 写入到 Hadoop 分布式文件系统( HDFS )当中,目前它支持创建文本文件和二进制序列化文件( SequenceFile ),这两种文件类型都支持压缩文件。数据库文件能够周期性地,在运行时间、数据大小或event数量的基础上轮转( 关闭当前文件,并创建一个新文件 )。It also buckets/partitions data by attributes like timestamp or machine where the event originated. The HDFS directory path may contain formatting escape sequences that will replaced by the HDFS sink to generate a directory/file name to store the events. 使用 HDFS Sink 需要安装 hadoop,如此 Flume 便能够通过 hadoop jars 连接 HDFS 集群,注意,Hadoop 的版本必须支持sync()调用。
下面是支持的转义序列:
| 别名 | 描述 |
|---|---|
| %{host} | |
| %t | Unix 时间的秒数 |
| %a | 本地的星期缩写名称 (Mon, Tue, ...) |
| %A | 本地的星期完整名称 (Monday, Tuesday, ...) |
| %b | 本地的月份缩写名称 (Jan, Feb, ...) |
| %B | 本地的月份完整名称 (January, February, ...) |
| %c | 本地的日期和时间 (Thu Mar 3 23:05:25 2005) |
| %d | 某月中的某天 (01) |
| %D | 日期,与 %m/%d/%y 一样 |
| %H | 24 小时制的小时,补齐两位 (00..23) |
| %I | 12 小时制的小时,补齐两位 (01..12) |
| %j | 某年中的某天 (001..366) |
| %k | 24 小时制的小时,不补齐两位 ( 0..23) |
| %m | 月份 (01..12) |
| %M | 分钟数 (00..59) |
| %p | 本地上午或下午 (am, pm) |
| %s | 从 1970-01-01 00:00:00 UTC 到现在的秒数 |
| %S | 秒数 (00..59) |
| %y | 年份的后两位数字 (00..99) |
| %Y | 年份 (2015) |
| %z | +hhmm numeric timezone (for example, -0400) |
正在读取的文件的文件名会以 .tmp 结尾,一旦文件被关闭,则扩展部分会被移除,这使排除目录中部分完成的文件称为可能。其config属性如下:
注意:对于所有与时间相关的转义系列,一个包含 “timestamp” 的header必须在 event 的所有headers中存在( 除非 hdfs.useLocalTimeStamp 被设置为 true )。设置自动添加的一种方式是使用 TimestampInterceptor。
| 属性名 | 默认值 | 描述 |
|---|---|---|
| channels | - | |
| type | - | 组件类型名称必须是hdfs |
| hdfs.path | - | HDFS 目录路径 (如 hdfs://namenode/flume/webdata/) |
| hdfs.filePrefix | FlumeData | 在 HDFS 目录中,Flume 创建的文件的前缀 |
| hdfs.fileSuffix | - | 文件的后缀 (如 .avro - 注意:时间不是自动添加的) |
| hdfs.inUsePrefix | - | Flume 正在写入的临时文件的的前缀 |
| hdfs.inUseSuffix | .tmp | Flume 正在写入的临时文件的后缀 |
更多属性配置详见:http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
实例,Agent a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
上面的配置将时间戳四舍五入到最近的10分钟,如:一个 event 的时间戳是 11:54:34 AM, June 12, 2012,那么映射到 hdfs 路径则为 /flume/events/2012-06-12/1150/00
HBase Sink
具体介绍等,详见:http://flume.apache.org/FlumeUserGuide.html#hbasesink
| 属性名 | 默认值 | 描述 |
|---|---|---|
| channels | - | |
| type | - | 组件类型名称必须是hbase |
| table | - | HBase 中写入数据的表 |
| columnFamily | - | HBase 中写入数据列簇 |
| zookeeperQuorum | - | The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml |
| znodeParent | /hbase | The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml |
| batchSize | 100 | 每次事务处理写入的 event数 |
| coalesceIncrements | false | 是否添加一个保存文件 basename 的Header |
| serializer | org.apache.flume.sink.hbase.SimpleHbaseEventSerializer | Default increment column = “iCol”, payload column = “pCol” |
| serializer.* | - | Properties to be passed to the serializer. |
| kerberosPrincipal | - | Kerberos user principal for accessing secure HBase |
| kerberosKeytab | - | Kerberos keytab for accessing secure HBase |
实例,Agent agent-1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
Logger Sink
Logs Sink 属于 INFO 级别的,通常用作测试或调试目的,其属性:
| 属性名 | 默认值 | 描述 |
|---|---|---|
| channels | - | |
| type | - | 组件类型名称必须是logger |
实例,Agnet a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
</br>
===
未完待续。。。