应用场景
在计费账务流程中,bolt线程用来做相同业务逻辑的任务处理,而对资料信息的更新逻辑需要一个线程定时扫描更新,该线程需要与bolt的线程并列共同在一个worker中,但又要达到个性化控制的要求。
设计原则
-
不能影响原有的spout、bolt业务逻辑
-
该处理线程的业务逻辑要跟bolt的区分开来
-
该线程的生命周期与worker的一致
-
该线程的业务处理更新bolt中的数据源
-
该线程只需要启动一个即可,不能多,多了浪费资源
示例
bolt中定义线程,模拟计费流程中的资料更新任务
//自定义线程
class TestThread extends Thread {
public void run() {
try {
while (true) {
//模拟修改资料信息的变化
step = step + 5;
logger.info("worker name: {} | define thread id: {} | datetime: {} | step: {}", workerName,
Thread.currentThread().getId(), formatter.format(new Date()), step);
Thread.currentThread().sleep(5000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在bolt流程中启动自定义线程
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
workerName = ManagementFactory.getRuntimeMXBean().getName();
this.collector = collector;
//使用锁,保证一个bolt流程中只启动一个自定义线程即可,不能多
if(lock==0){
lock++;
TestThread test = new TestThread();
test.start();
}
}
在bolt流程中测试step值
@Override
public void execute(Tuple input) {
//测试bolt的业务流程中取到的step值是否在不断的更新
logger.info("worker name: {} | bolt thread id: {} | datetime: {} | step: {}", workerName,
Thread.currentThread().getId(), formatter.format(new Date()), step);
}
测试结果
从结果可以看出:
(1)thread id为60的自定义线程更新step值为60后,thread id为57和55的两个bolt线程取到的值都为60;
(2)自定义线程的更新频率为5秒,而bolt的更新频率收spout的影响为2秒,两者业务逻辑完全不同;
(3)停止该topology后worker停止,日志不在打印,说明自定义线程也结束。