0%

如何模拟消息队列来并行处理任务

在某些场景,没有资源独立部署消息队列集群,但是有异步任务又需要快速处理完成,这时候就需要并行计算来解决了,

参考MQ的生产者消费者工作模式,将任务生产者消费者通过队列的实现解耦;

1个生产者生成一批任务后,阻塞等待多个消费者并行执行完成,然后释放消费者线程和生产者线程;

broker-中间层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* 消息队列中间件
*/
public class Broker {
public ArrayBlockingQueue<JSONObject> queue = new ArrayBlockingQueue<JSONObject>(1000);
public Boolean continueProducing = Boolean.TRUE;

public void put(JSONObject data) throws InterruptedException {
this.queue.put(data);
}

public JSONObject get() throws InterruptedException {
return this.queue.poll(1, TimeUnit.SECONDS);
}
}

IProducer-生产者接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 生产者
*
* @author wanggang
* @date 2019/07/29
*/
public interface IProducer {

/***
* 向队列提交1条消息
* @param message 消息体
* @throws Exception
*/
void produceMessage(JSONObject message);
}

生产者抽象类,
具体生产者继承后可不断produceMessage,
如:定时扫描数据库的待办任务并放入任务队列,或逐行读取1个csv后放入队列;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public abstract class Producer implements IProducer, Runnable {
private static final Logger logger = Logger.getLogger(Producer.class);
private Broker broker;

public void bindBroker(Broker broker) {
this.broker = broker;
}

@Override
public void produceMessage(JSONObject message) {
try {
broker.put(message);
} catch (InterruptedException e) {
logger.error(e);
}
}

/***
* 将broker标记为不再生产新的消息,是用于终止消费者的信号
*/
public void terminateProducer() {
this.broker.continueProducing = Boolean.FALSE;
logger.info("Producer[%s] terminating.");
}
}

IConsumer-消费者接口

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 消费者
*/
public interface IConsumer {

/***
* 队列收到消息时调用本方法
* @param message 消息体
* @throws Exception
*/
void onConsumerMessage(JSONObject message) throws Exception;
}

消费者抽象类,具体消费者通过继承Consumer,必须实现onConsumerMessage方法,可选择实现before和after方法;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* 消费者
*/
public abstract class Consumer implements IConsumer, Runnable {
private static final Logger logger = Logger.getLogger(Consumer.class);
private Broker broker;
private final int elapse = 30 * 1000;

public void bindBroker(Broker broker) {
this.broker = broker;
}


@Override
public void run() {
try {
JSONObject message = broker.get();
while (broker.continueProducing) {
if (message == null) {
Thread.sleep(3000);
} else {
long start = System.currentTimeMillis();
try {
doBeforeConsumerMessage(message);
onConsumerMessage(message);
doAfterConsumerMessage(message, null);
} catch (Exception e) {
logger.error(String.format("messagge[%s]", message), e);
doAfterConsumerMessage(message, e);
}
if (System.currentTimeMillis() - start > elapse) {
logger.warn(String.format("处理超时,elapse[%s],msg[%s]", System.currentTimeMillis() - start, message.toString()));
}
}
message = broker.get();
}
logger.info(String.format("Consumer[%s]已完成任务,terminating.", Thread.currentThread().getName()));
} catch (Exception e) {
logger.error(e);
}
}

/***
* 消息前处理
* @param msg 消息体
*/
protected abstract void doBeforeConsumerMessage(JSONObject msg) throws Exception;

/***
* 消息后处理
* @param msg 消息体
* @param error 处理异常信息
* @throws Exception
*/
protected abstract void doAfterConsumerMessage(JSONObject msg, Throwable error) throws Exception;
}

消息队列使用辅助类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* message queue service 消息队列中间件工具类
*/
public class MqsClientHelper {
private static final Logger logger = Logger.getLogger(MqsClientHelper.class);
private final Producer producer;
private final Consumer consumer;
private final String consumerName;
private final int consumerThreads;

public MqsClientHelper(Producer producer, Consumer consumer, String consumerName, int consumerThreads) {
this.producer = producer;
this.consumer = consumer;
this.consumerThreads = consumerThreads;
this.consumerName = consumerName;
}

public void start() {
new Thread(() -> {
try {
//线程池并行处理
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(consumerName + "-%d").build();
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(consumerThreads + 1, consumerThreads + 1,
10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
//生产者/消费者绑定队列
final Broker broker = new Broker();
producer.bindBroker(broker);
consumer.bindBroker(broker);

//消费者定义
IntStream.range(0, consumerThreads).forEach(e -> threadPool.execute(consumer));
//生产者:生成message并提交给broker,consumer从broker中获取消息
Future producerStatus = threadPool.submit(producer);

logger.debug(String.format("consumer[%s],等待数据导入消费者处理完成...", consumerThreads));
producerStatus.get();

producer.terminateProducer();
logger.info("Producer[%s]已完成任务,terminating.");

threadPool.shutdown();
while (!threadPool.awaitTermination(10, TimeUnit.MINUTES)) {
logger.info(String.format("threadPool[%s],等待消费者处理完成[10m]...", threadPool));
}
} catch (Exception e) {
logger.error(e);
}
}).start();
}
}