一、Custom Source
实现 Source 接口即可定义Custom Sources,在启动Flume Agent的时候,用户自定义源的类及其依赖必须包括在agent 的 classpath
中。Custom Source 的类型是它的全限定性类名。
属性名 | 默认值 | 描述 |
---|---|---|
channels | - | |
type | - | 组件类型名称必须是FQCN ( 全限定性类名,如:org.example.MyCustomSource ) |
selector.type | replicating 或 multiplexing | |
selector.* | 依赖selector.type的值 | |
interceptors | - | Space-separated list of interceptors |
interceptors.* |
实例:Agent a1
:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MyCustomSource
a1.sources.r1.channels = c1
自定义 Source
-
创建工程:
mvn archetype:create -DgroupId=org.apache.flume -DartifactId=flume-ng-db-source -DpackageName=com.ai.flume -Dversion=1.0
-
编辑
pom.xml
添加依赖:</dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.35</version> </dependency> <dependency> <groupId>com.oracle</groupId> <artifactId>ojdbc14</artifactId> <version>10.2.0.1.0</version> </dependency> </dependencies>
-
创建一个类,继承
AbstractSource
,并实现接口Configurable
和PollableSource
,如:package com.ai.flume.source; public class FlumeNGDBSource extends AbstractSource implements Configurable, PollableSource { private static final Logger logger = LoggerFactory.getLogger(FlumeNGDBSource.class); public void configure(Context context) { // 处理配置文件( 如:conf/example.conf )中的属性 } @Override public synchronized void start() { // Source 启动时的初始化、开启进程等 } public Status process() throws EventDeliveryException { // Source 真正的工作进程 // 获取data,封装到event中,发送到channel } @Override public synchronized void stop() { // Source 结束时的变量释放、进程结束等 } }
-
配置文件创建和使用:
private String sqlType; private String sqlHost; private int sqlPort; private String Db; private String sqlUser; private String sqlPwd; private String sql; ... 省略中间代码 ... public void configure(Context context) { // 在此方法中,通过context.getString("property", defaultValue) // context.getInteger("property", defaultValue)来读取配置文件 sqlType = context.getString("sql-type", "oracle"); sqlHost = context.getString("sql-host", "localhost"); sqlPort = context.getInteger("sql-port", 3306); sqlDb = context.getString("sql-db", "test"); sqlUser = context.getString("sql-user", null); sqlPwd = context.getString("sql-pwd", null); sql = context.getString("sql", "select * from info;"); }
上述文件的配置文件可以如下设置,
mydb.conf
:a1.sources = r1 a1.channels = c1 a1.sources.r1.type = com.ai.flume.FlumeNGDBSource a1.sources.r1.sql-type = mysql a1.sources.r1.sql-host = 127.0.0.1 # 这里的SQL不需要加双引号或单引号 a1.sources.r1.sql = select * from users; # 使用默认端口 3306,默认数据库 test,默认没有用户和密码
-
数据封装为 Event,并发送到 Channel:
return status;private ChannelProcessor channelProcessor = null; // 用于存放需要发送的队列 // 从数据库中取出,然后放到队列中 private LinkedBlockingQueue<String> queue = null; ... 省略中间代码 ... public Status process() throws EventDeliveryException { Status status = Status.READY; // 获取连接Channel的对象 channelProcessor = getChannelProcessor(); try { // 取出一条要发送的数据 String line = queue.take(); // 通过logger以INFO的级别输出到Source的日志文件中 logger.info(line); // 调用EventBuilder.withBody(String body, Charset set), // 将要发送的数据封装为一个 event Event e = EventBuilder.withBody(line, Charset.forName("UTF8")); // 将封装后的 event发送到连接的 Channel channelProcessor.processEvent(e); } catch (Exception e) { // 出现错误,返回Status.BACKOFF,告诉Source 发送失败 // 当自定义 Sink 的时候,这里需要注意,详见后面 status = Status.BACKOFF; logger.error("flume-ng mysql source error!", e); throw new EventDeliveryException(e); }
} 到这基本自定义 Source已经完成了,其它要做的就是通过代码实现数据的生成或抓取,然后放到上述的`queue`当中。
-
通过
Timer
定时从数据库中取数据放到queue
中:scannerTimer = new Timer(“FlumeNG_Scanner_Timer_Thread”, true);private Timer scannerTimer; ... 省略中间代码 ... @Override public synchronized void start() { super.start(); queue = new LinkedBlockingQueue<String>();
scannerTimer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { // 调用操作数据库方法,执行sql,返回数据 list = ibd.selectAll(sql); if (list.size() <= 0) { try { Thread.sleep(10); } catch (InterruptedException e) { logger.error("[ Error ]:", e); } } String res = ""; for (int i = 0; i < list.size(); i++) { if ("" == res) { res += list.get(i); } else { res += "&data=" + list.get(i); } } // 将结果缓存到queue当中 queue.offer(res); } }, 0, runSpeed); // runSpeed为配置文件当中的查询间隔 }
-
stop
方法:@Override public synchronized void stop() { super.stop(); // 关闭与Channel的连接 channelProcessor.close(); // 结束数据获取线程 if (scannerTimer != null) { try { scannerTimer.cancel(); } catch (Exception e) { } finally { scannerTimer = null; } } queue = null; } 至此,自定义Source已经完成。
-
生成
jar
包:mvn clean package 在上面命令运行成功后,复制 `target/flume-ng-db-source-1.0.jar`到`$FLUME_HOME/lib`目录即可。 cp target/flume-ng-db-source-1.0.jar $FLUME_HOME/lib
二、Custom Sink
实现 Sink 接口即可定义Custom Sink,在启动Flume Agent的时候,用户自定义Sink的类及其依赖必须包括在agent 的 classpath
中。Custom Sink 的类型是它的全限定性类名。
属性名 | 默认值 | 描述 |
---|---|---|
channels | - | |
type | - | 组件类型名称必须是FQCN ( 全限定性类名,如:org.example.MyCustomSink ) |
selector.type | replicating 或 multiplexing | |
selector.* | 依赖selector.type的值 | |
interceptors | - | Space-separated list of interceptors |
interceptors.* |
实例:Agent a1
:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MyCustomSink
a1.sources.r1.channels = c1
自定义 Sink
此 Sink从 Channel 获取 events,然后通过HTTP POST发送到远程的主机端口上。
-
同样的,通过maven创建工程:
mvn archetype:create -DgroupId=org.apache.flume -DartifactId=flume-ng-http-source -DpackageName=com.ai.flume -Dversion=1.0
-
编辑
pom.xml
添加依赖:</dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.5.2</version> </dependency> </dependencies>
-
创建一个类,继承
AbstractSource
,并实现接口Configurable
和PollableSource
,如:package com.ai.flume.source; public class FlumeNGHttpSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(FlumeNGHttpSink.class); public void configure(Context context) { // 处理配置文件( 如:conf/example.conf )中的属性 } public Status process() throws EventDeliveryException { // Source 真正的工作进程 // 获取data,封装到event中,发送到channel } public String post(String url, String param) { // http发送post请求 } }
-
配置文件创建和使用:
private String url; private String host; private String type; ... 省略中间代码 ... public void configure(Context context) { // 在此方法中,通过context.getString("property", defaultValue) // context.getInteger("property", defaultValue)来读取配置文件 url = context.getString("node-url", "http://127.0.0.1:5000/receive"); host = context.getString("node-host", "127.0.0.1"); type = context.getString("node-type", "TuxState"); }
上述文件的配置文件可以如下设置,
mydb.conf
:a1.sources = r1 a1.channels = c1 a1.sources.r1.type = com.ai.flume.FlumeNGHttpSink a1.sources.r1.node-url = http://localhost:3000/receive a1.sources.r1.node-host = 127.0.0.1 a1.sources.r1.node-type = TuxState
-
从 Channel 获取 Event,并取出其中包含的数据:
public Status process() throws EventDeliveryException {txn.commit();// TODO(ai) Auto-generated method stub Status status = Status.READY; Channel channel = getChannel(); Transaction txn = null; try { txn = channel.getTransaction(); txn.begin(); // 开始从Channel获取Event的事务处理 Event e = channel.take(); // 从Channel取一个Event if (null != e) { String line = EventHelper.dumpEvent(e); logger.info(line); // 取出 Event 中包含的内容,然后转换为需要的类型,此处为String byte[] body = e.getBody(); String data = new String(body); //logger.info(data); String str = "localhost=" + host + "&type=" + type + "&data=" + data; //将配置文件的信息和数据组合成post方法的param,调用编写的post方法发送到目的地 logger.info(post(url, str)); } else { status = Status.BACKOFF; }
return status;} catch (Exception e) { // 前面提及过,在处理这的时候需要注意 // 若发送错误的原因是数据处理的问题,则可能会出现死循环 // 出错,而不执行txn.commit(),则出错的Event并不会从 Channel中删除 // 下一次获取的仍然是出错的 Event logger.error("can't process events, drop it!", e); if (txn != null) { txn.commit(); //出现BUG,丢弃当前Event,防止出现死循环 } throw new EventDeliveryException(e); } finally { if (null != txn) { txn.close(); } }
} HTTP的post方法此处不再给出,到这基本自定义 Sink已经完成了。
-
生成
jar
包:mvn clean package 在上面命令运行成功后,复制 `target/flume-ng-http-sink-1.0.jar`到`$FLUME_HOME/lib`目录即可。 cp target/flume-ng-http-sink-1.0.jar $FLUME_HOME/lib
三、Custom Channel
…
</br>
===
未完待续。。。