应用场景

在计费账务流程中,bolt线程用来做相同业务逻辑的任务处理,而对资料信息的更新逻辑需要一个线程定时扫描更新,该线程需要与bolt的线程并列共同在一个worker中,但又要达到个性化控制的要求。

设计原则

  • 不能影响原有的spout、bolt业务逻辑

  • 该处理线程的业务逻辑要跟bolt的区分开来

  • 该线程的生命周期与worker的一致

  • 该线程的业务处理更新bolt中的数据源

  • 该线程只需要启动一个即可,不能多,多了浪费资源

示例

bolt中定义线程,模拟计费流程中的资料更新任务

//自定义线程
class TestThread extends Thread {
	
	public void run() {
		try {
			while (true) {
				//模拟修改资料信息的变化
				step = step + 5;
				logger.info("worker name: {} | define thread id: {} | datetime: {} | step: {}", workerName, 
						Thread.currentThread().getId(), formatter.format(new Date()), step);
				Thread.currentThread().sleep(5000);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

在bolt流程中启动自定义线程

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
	workerName = ManagementFactory.getRuntimeMXBean().getName();    
	this.collector = collector;
	//使用锁,保证一个bolt流程中只启动一个自定义线程即可,不能多
	if(lock==0){
		lock++;
		TestThread test = new TestThread();
		test.start();
	}
}

在bolt流程中测试step值

@Override
public void execute(Tuple input) {
	//测试bolt的业务流程中取到的step值是否在不断的更新
	logger.info("worker name: {} | bolt thread id: {} | datetime: {} | step: {}", workerName, 
			Thread.currentThread().getId(), formatter.format(new Date()), step);
}

测试结果

1

从结果可以看出:

(1)thread id为60的自定义线程更新step值为60后,thread id为57和55的两个bolt线程取到的值都为60;

(2)自定义线程的更新频率为5秒,而bolt的更新频率收spout的影响为2秒,两者业务逻辑完全不同;

(3)停止该topology后worker停止,日志不在打印,说明自定义线程也结束。