在某些场景,没有资源独立部署消息队列集群,但是有异步任务又需要快速处理完成,这时候就需要并行计算来解决了,
参考MQ的生产者消费者工作模式,将任务生产者
与消费者
通过队列
的实现解耦;
1个生产者生成一批任务后,阻塞等待多个消费者并行执行完成,然后释放消费者线程和生产者线程;
broker-中间层
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit;
public class Broker { public ArrayBlockingQueue<JSONObject> queue = new ArrayBlockingQueue<JSONObject>(1000); public Boolean continueProducing = Boolean.TRUE;
public void put(JSONObject data) throws InterruptedException { this.queue.put(data); }
public JSONObject get() throws InterruptedException { return this.queue.poll(1, TimeUnit.SECONDS); } }
|
IProducer-生产者接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
public interface IProducer {
void produceMessage(JSONObject message); }
|
生产者抽象类,
具体生产者继承后可不断produceMessage,
如:定时扫描数据库的待办任务并放入任务队列,或逐行读取1个csv后放入队列;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public abstract class Producer implements IProducer, Runnable { private static final Logger logger = Logger.getLogger(Producer.class); private Broker broker;
public void bindBroker(Broker broker) { this.broker = broker; }
@Override public void produceMessage(JSONObject message) { try { broker.put(message); } catch (InterruptedException e) { logger.error(e); } }
public void terminateProducer() { this.broker.continueProducing = Boolean.FALSE; logger.info("Producer[%s] terminating."); } }
|
IConsumer-消费者接口
1 2 3 4 5 6 7 8 9 10 11 12
|
public interface IConsumer {
void onConsumerMessage(JSONObject message) throws Exception; }
|
消费者抽象类,具体消费者通过继承Consumer,必须实现onConsumerMessage方法,可选择实现before和after方法;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
|
public abstract class Consumer implements IConsumer, Runnable { private static final Logger logger = Logger.getLogger(Consumer.class); private Broker broker; private final int elapse = 30 * 1000;
public void bindBroker(Broker broker) { this.broker = broker; }
@Override public void run() { try { JSONObject message = broker.get(); while (broker.continueProducing) { if (message == null) { Thread.sleep(3000); } else { long start = System.currentTimeMillis(); try { doBeforeConsumerMessage(message); onConsumerMessage(message); doAfterConsumerMessage(message, null); } catch (Exception e) { logger.error(String.format("messagge[%s]", message), e); doAfterConsumerMessage(message, e); } if (System.currentTimeMillis() - start > elapse) { logger.warn(String.format("处理超时,elapse[%s],msg[%s]", System.currentTimeMillis() - start, message.toString())); } } message = broker.get(); } logger.info(String.format("Consumer[%s]已完成任务,terminating.", Thread.currentThread().getName())); } catch (Exception e) { logger.error(e); } }
protected abstract void doBeforeConsumerMessage(JSONObject msg) throws Exception;
protected abstract void doAfterConsumerMessage(JSONObject msg, Throwable error) throws Exception; }
|
消息队列使用辅助类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
|
public class MqsClientHelper { private static final Logger logger = Logger.getLogger(MqsClientHelper.class); private final Producer producer; private final Consumer consumer; private final String consumerName; private final int consumerThreads;
public MqsClientHelper(Producer producer, Consumer consumer, String consumerName, int consumerThreads) { this.producer = producer; this.consumer = consumer; this.consumerThreads = consumerThreads; this.consumerName = consumerName; }
public void start() { new Thread(() -> { try { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(consumerName + "-%d").build(); ThreadPoolExecutor threadPool = new ThreadPoolExecutor(consumerThreads + 1, consumerThreads + 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); final Broker broker = new Broker(); producer.bindBroker(broker); consumer.bindBroker(broker);
IntStream.range(0, consumerThreads).forEach(e -> threadPool.execute(consumer)); Future producerStatus = threadPool.submit(producer);
logger.debug(String.format("consumer[%s],等待数据导入消费者处理完成...", consumerThreads)); producerStatus.get();
producer.terminateProducer(); logger.info("Producer[%s]已完成任务,terminating.");
threadPool.shutdown(); while (!threadPool.awaitTermination(10, TimeUnit.MINUTES)) { logger.info(String.format("threadPool[%s],等待消费者处理完成[10m]...", threadPool)); } } catch (Exception e) { logger.error(e); } }).start(); } }
|