【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】从零开始实现 mq-02-如何实现生产者调用消费者?

【mq】从零开始实现 mq-03-引入 broker 中间人

上一节我们学习了如何实现生产者给消费者发送消息,但是是通过直连的方式。

那么如何才能达到解耦的效果呢?

答案就是引入 broker,消息的中间人。

【mq】从零开始实现 mq-03-引入 broker 中间人_在线工具

核心启动类

类似我们前面 consumer 的启动实现:

package com.github.houbb.mq.broker.core;  /**  * @author binbin.hou  * @since 1.0.0  */ public class MqBroker extends Thread implements IMqBroker {      // 省略     private ChannelHandler initChannelHandler() {         MqBrokerHandler handler = new MqBrokerHandler();         handler.setInvokeService(invokeService);         handler.setRegisterConsumerService(registerConsumerService);         handler.setRegisterProducerService(registerProducerService);         handler.setMqBrokerPersist(mqBrokerPersist);         handler.setBrokerPushService(brokerPushService);         handler.setRespTimeoutMills(respTimeoutMills);          return handler;     }      @Override     public void run() {         // 启动服务端         log.info("MQ 中间人开始启动服务端 port: {}", port);          EventLoopGroup bossGroup = new NioEventLoopGroup();         EventLoopGroup workerGroup = new NioEventLoopGroup();          try {             final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(DelimiterUtil.DELIMITER);             ServerBootstrap serverBootstrap = new ServerBootstrap();             serverBootstrap.group(workerGroup, bossGroup)                     .channel(NioServerSocketChannel.class)                     .childHandler(new ChannelInitializer<Channel>() {                         @Override                         protected void initChannel(Channel ch) throws Exception {                             ch.pipeline()                                     .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf))                                     .addLast(initChannelHandler());                         }                     })                     // 这个参数影响的是还没有被accept 取出的连接                     .option(ChannelOption.SO_BACKLOG, 128)                     // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。                     .childOption(ChannelOption.SO_KEEPALIVE, true);              // 绑定端口,开始接收进来的链接             ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();             log.info("MQ 中间人启动完成,监听【" + port + "】端口");              channelFuture.channel().closeFuture().syncUninterruptibly();             log.info("MQ 中间人关闭完成");         } catch (Exception e) {             log.error("MQ 中间人启动异常", e);             throw new MqException(BrokerRespCode.RPC_INIT_FAILED);         } finally {             workerGroup.shutdownGracefully();             bossGroup.shutdownGracefully();         }     }  } 

initChannelHandler 中有不少新面孔,我们后面会详细介绍。

MqBrokerHandler 处理逻辑

package com.github.houbb.mq.broker.handler;  import java.util.List;  /**  * @author binbin.hou  * @since 1.0.0  */ public class MqBrokerHandler extends SimpleChannelInboundHandler {      private static final Log log = LogFactory.getLog(MqBrokerHandler.class);      /**      * 调用管理类      * @since 1.0.0      */     private IInvokeService invokeService;      /**      * 消费者管理      * @since 0.0.3      */     private IBrokerConsumerService registerConsumerService;      /**      * 生产者管理      * @since 0.0.3      */     private IBrokerProducerService registerProducerService;      /**      * 持久化类      * @since 0.0.3      */     private IMqBrokerPersist mqBrokerPersist;      /**      * 推送服务      * @since 0.0.3      */     private IBrokerPushService brokerPushService;      /**      * 获取响应超时时间      * @since 0.0.3      */     private long respTimeoutMills;      //set 方法       @Override     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {         ByteBuf byteBuf = (ByteBuf) msg;         byte[] bytes = new byte[byteBuf.readableBytes()];         byteBuf.readBytes(bytes);          RpcMessageDto rpcMessageDto = null;         try {             rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class);         } catch (Exception exception) {             log.error("RpcMessageDto json 格式转换异常 {}", new String(bytes));             return;         }          if (rpcMessageDto.isRequest()) {             MqCommonResp commonResp = this.dispatch(rpcMessageDto, ctx);              if(commonResp == null) {                 log.debug("当前消息为 null,忽略处理。");                 return;             }              // 写回响应,和以前类似。                  writeResponse(rpcMessageDto, commonResp, ctx);         } else {             final String traceId = rpcMessageDto.getTraceId();              // 丢弃掉 traceId 为空的信息             if(StringUtil.isBlank(traceId)) {                 log.debug("[Server Response] response traceId 为空,直接丢弃", JSON.toJSON(rpcMessageDto));                 return;             }              // 添加消息             invokeService.addResponse(traceId, rpcMessageDto);         }     }      /**      * 异步处理消息      * @param mqMessage 消息      * @since 0.0.3      */     private void asyncHandleMessage(MqMessage mqMessage) {         List<Channel> channelList = registerConsumerService.getSubscribeList(mqMessage);         if(CollectionUtil.isEmpty(channelList)) {             log.info("监听列表为空,忽略处理");             return;         }          BrokerPushContext brokerPushContext = new BrokerPushContext();         brokerPushContext.setChannelList(channelList);         brokerPushContext.setMqMessage(mqMessage);         brokerPushContext.setMqBrokerPersist(mqBrokerPersist);         brokerPushContext.setInvokeService(invokeService);         brokerPushContext.setRespTimeoutMills(respTimeoutMills);          brokerPushService.asyncPush(brokerPushContext);     } } 

消息分发

broker 接收到消息以后,dispatch 实现如下:

/**  * 消息的分发  *  * @param rpcMessageDto 入参  * @param ctx 上下文  * @return 结果  */ private MqCommonResp dispatch(RpcMessageDto rpcMessageDto, ChannelHandlerContext ctx) {     try {         final String methodType = rpcMessageDto.getMethodType();         final String json = rpcMessageDto.getJson();         String channelId = ChannelUtil.getChannelId(ctx);         final Channel channel = ctx.channel();         log.debug("channelId: {} 接收到 method: {} 内容:{}", channelId,                 methodType, json);          // 生产者注册         if(MethodType.P_REGISTER.equals(methodType)) {             BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);             return registerProducerService.register(registerReq.getServiceEntry(), channel);         }         // 生产者注销         if(MethodType.P_UN_REGISTER.equals(methodType)) {             BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);             return registerProducerService.unRegister(registerReq.getServiceEntry(), channel);         }         // 生产者消息发送         if(MethodType.P_SEND_MSG.equals(methodType)) {             MqMessage mqMessage = JSON.parseObject(json, MqMessage.class);             MqMessagePersistPut persistPut = new MqMessagePersistPut();             persistPut.setMqMessage(mqMessage);             persistPut.setMessageStatus(MessageStatusConst.WAIT_CONSUMER);             MqCommonResp commonResp = mqBrokerPersist.put(persistPut);             this.asyncHandleMessage(mqMessage);             return commonResp;         }         // 生产者消息发送-ONE WAY         if(MethodType.P_SEND_MSG_ONE_WAY.equals(methodType)) {             MqMessage mqMessage = JSON.parseObject(json, MqMessage.class);             MqMessagePersistPut persistPut = new MqMessagePersistPut();             persistPut.setMqMessage(mqMessage);             persistPut.setMessageStatus(MessageStatusConst.WAIT_CONSUMER);             mqBrokerPersist.put(persistPut);             this.asyncHandleMessage(mqMessage);             return null;         }          // 消费者注册         if(MethodType.C_REGISTER.equals(methodType)) {             BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);             return registerConsumerService.register(registerReq.getServiceEntry(), channel);         }         // 消费者注销         if(MethodType.C_UN_REGISTER.equals(methodType)) {             BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);             return registerConsumerService.unRegister(registerReq.getServiceEntry(), channel);         }         // 消费者监听注册         if(MethodType.C_SUBSCRIBE.equals(methodType)) {             ConsumerSubscribeReq req = JSON.parseObject(json, ConsumerSubscribeReq.class);             return registerConsumerService.subscribe(req, channel);         }         // 消费者监听注销         if(MethodType.C_UN_SUBSCRIBE.equals(methodType)) {             ConsumerUnSubscribeReq req = JSON.parseObject(json, ConsumerUnSubscribeReq.class);             return registerConsumerService.unSubscribe(req, channel);         }          // 消费者主动 pull         if(MethodType.C_MESSAGE_PULL.equals(methodType)) {             MqConsumerPullReq req = JSON.parseObject(json, MqConsumerPullReq.class);             return mqBrokerPersist.pull(req, channel);         }         throw new UnsupportedOperationException("暂不支持的方法类型");     } catch (Exception exception) {         log.error("执行异常", exception);         MqCommonResp resp = new MqCommonResp();         resp.setRespCode(MqCommonRespCode.FAIL.getCode());         resp.setRespMessage(MqCommonRespCode.FAIL.getMsg());         return resp;     } } 

消息推送

this.asyncHandleMessage(mqMessage); 是 broker 接收到消息之后的处理逻辑。

/**  * 异步处理消息  * @param mqMessage 消息  * @since 0.0.3  */ private void asyncHandleMessage(MqMessage mqMessage) {     List<Channel> channelList = registerConsumerService.getSubscribeList(mqMessage);     if(CollectionUtil.isEmpty(channelList)) {         log.info("监听列表为空,忽略处理");         return;     }      BrokerPushContext brokerPushContext = new BrokerPushContext();     brokerPushContext.setChannelList(channelList);     brokerPushContext.setMqMessage(mqMessage);     brokerPushContext.setMqBrokerPersist(mqBrokerPersist);     brokerPushContext.setInvokeService(invokeService);     brokerPushContext.setRespTimeoutMills(respTimeoutMills);     brokerPushService.asyncPush(brokerPushContext); } 

推送的核心实现如下:

package com.github.houbb.mq.broker.support.push;  /**  * @author binbin.hou  * @since 0.0.3  */ public class BrokerPushService implements IBrokerPushService {      private static final Log log = LogFactory.getLog(BrokerPushService.class);      private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();      @Override     public void asyncPush(final BrokerPushContext context) {         EXECUTOR_SERVICE.submit(new Runnable() {             @Override             public void run() {                 log.info("开始异步处理 {}", JSON.toJSON(context));                 final List<Channel> channelList = context.getChannelList();                 final IMqBrokerPersist mqBrokerPersist = context.getMqBrokerPersist();                 final MqMessage mqMessage = context.getMqMessage();                 final String messageId = mqMessage.getTraceId();                 final IInvokeService invokeService = context.getInvokeService();                 final long responseTime = context.getRespTimeoutMills();                  for(Channel channel : channelList) {                     try {                         String channelId = ChannelUtil.getChannelId(channel);                          log.info("开始处理 channelId: {}", channelId);                         //1. 调用                         mqMessage.setMethodType(MethodType.B_MESSAGE_PUSH);                         MqConsumerResultResp resultResp = callServer(channel, mqMessage,                                 MqConsumerResultResp.class, invokeService, responseTime);                          //2. 更新状态                         mqBrokerPersist.updateStatus(messageId, resultResp.getConsumerStatus());                          //3. 后期添加重试策略                          log.info("完成处理 channelId: {}", channelId);                     } catch (Exception exception) {                         log.error("处理异常");                         mqBrokerPersist.updateStatus(messageId, ConsumerStatus.FAILED.getCode());                     }                 }                  log.info("完成异步处理");             }         });     } } 

此处在消息推送之后,需要更新消息的 ACK 状态。

消息生产者处理类

IBrokerProducerService 接口定义如下:

package com.github.houbb.mq.broker.api;  /**  * <p> 生产者注册服务类 </p>  *  * @author houbinbin  * @since 0.0.3  */ public interface IBrokerProducerService {      /**      * 注册当前服务信息      * (1)将该服务通过 {@link ServiceEntry#getGroupName()} 进行分组      * 订阅了这个 serviceId 的所有客户端      * @param serviceEntry 注册当前服务信息      * @param channel channel      * @since 0.0.8      */     MqCommonResp register(final ServiceEntry serviceEntry, Channel channel);      /**      * 注销当前服务信息      * @param serviceEntry 注册当前服务信息      * @param channel 通道      * @since 0.0.8      */     MqCommonResp unRegister(final ServiceEntry serviceEntry, Channel channel);      /**      * 获取服务地址信息      * @param channel channel      * @return 结果      * @since 0.0.3      */     ServiceEntry getServiceEntry(final Channel channel);  } 

实现如下:

本地基于 map 存储请求过来的基本信息。

package com.github.houbb.mq.broker.support.api;  /**  * <p> 生产者注册服务类 </p>  *  * @author houbinbin  * @since 0.0.3  */ public class LocalBrokerProducerService implements IBrokerProducerService {      private static final Log log = LogFactory.getLog(LocalBrokerProducerService.class);      private final Map<String, BrokerServiceEntryChannel> registerMap = new ConcurrentHashMap<>();      @Override     public MqCommonResp register(ServiceEntry serviceEntry, Channel channel) {         final String channelId = ChannelUtil.getChannelId(channel);         BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry, channel);         registerMap.put(channelId, entryChannel);           MqCommonResp resp = new MqCommonResp();         resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());         resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());         return resp;     }      @Override     public MqCommonResp unRegister(ServiceEntry serviceEntry, Channel channel) {         final String channelId = ChannelUtil.getChannelId(channel);         registerMap.remove(channelId);          MqCommonResp resp = new MqCommonResp();         resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());         resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());         return resp;     }      @Override     public ServiceEntry getServiceEntry(Channel channel) {         final String channelId = ChannelUtil.getChannelId(channel);         return registerMap.get(channelId);     }  } 

消息消费者处理类

接口定义如下:

package com.github.houbb.mq.broker.api;  /**  * <p> 消费者注册服务类 </p>  *  * @author houbinbin  * @since 0.0.3  */ public interface IBrokerConsumerService {      /**      * 注册当前服务信息      * (1)将该服务通过 {@link ServiceEntry#getGroupName()} 进行分组      * 订阅了这个 serviceId 的所有客户端      * @param serviceEntry 注册当前服务信息      * @param channel channel      * @since 0.0.3      */     MqCommonResp register(final ServiceEntry serviceEntry, Channel channel);      /**      * 注销当前服务信息      * @param serviceEntry 注册当前服务信息      * @param channel channel      * @since 0.0.3      */     MqCommonResp unRegister(final ServiceEntry serviceEntry, Channel channel);      /**      * 监听服务信息      * (1)监听之后,如果有任何相关的机器信息发生变化,则进行推送。      * (2)内置的信息,需要传送 ip 信息到注册中心。      *      * @param serviceEntry 客户端明细信息      * @param clientChannel 客户端 channel 信息      * @since 0.0.3      */     MqCommonResp subscribe(final ConsumerSubscribeReq serviceEntry,                    final Channel clientChannel);      /**      * 取消监听服务信息      * (1)监听之后,如果有任何相关的机器信息发生变化,则进行推送。      * (2)内置的信息,需要传送 ip 信息到注册中心。      *      * @param serviceEntry 客户端明细信息      * @param clientChannel 客户端 channel 信息      * @since 0.0.3      */     MqCommonResp unSubscribe(final ConsumerUnSubscribeReq serviceEntry,                    final Channel clientChannel);      /**      * 获取所有匹配的消费者      * 1. 同一个 groupName 只返回一个,注意负载均衡      * 2. 返回匹配当前消息的消费者通道      *      * @param mqMessage 消息体      * @return 结果      */     List<Channel> getSubscribeList(MqMessage mqMessage);  } 

默认实现:

package com.github.houbb.mq.broker.support.api;  /**  * @author binbin.hou  * @since 1.0.0  */ public class LocalBrokerConsumerService implements IBrokerConsumerService {      private final Map<String, BrokerServiceEntryChannel> registerMap = new ConcurrentHashMap<>();      /**      * 订阅集合      * key: topicName      * value: 对应的订阅列表      */     private final Map<String, Set<ConsumerSubscribeBo>> subscribeMap = new ConcurrentHashMap<>();      @Override     public MqCommonResp register(ServiceEntry serviceEntry, Channel channel) {         final String channelId = ChannelUtil.getChannelId(channel);         BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry, channel);         registerMap.put(channelId, entryChannel);          MqCommonResp resp = new MqCommonResp();         resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());         resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());         return resp;     }      @Override     public MqCommonResp unRegister(ServiceEntry serviceEntry, Channel channel) {         final String channelId = ChannelUtil.getChannelId(channel);         registerMap.remove(channelId);          MqCommonResp resp = new MqCommonResp();         resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());         resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());         return resp;     }      @Override     public MqCommonResp subscribe(ConsumerSubscribeReq serviceEntry, Channel clientChannel) {         final String channelId = ChannelUtil.getChannelId(clientChannel);         final String topicName = serviceEntry.getTopicName();          Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName);         if(set == null) {             set = new HashSet<>();         }         ConsumerSubscribeBo subscribeBo = new ConsumerSubscribeBo();         subscribeBo.setChannelId(channelId);         subscribeBo.setGroupName(serviceEntry.getGroupName());         subscribeBo.setTopicName(topicName);         subscribeBo.setTagRegex(serviceEntry.getTagRegex());         set.add(subscribeBo);          subscribeMap.put(topicName, set);          MqCommonResp resp = new MqCommonResp();         resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());         resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());         return resp;     }      @Override     public MqCommonResp unSubscribe(ConsumerUnSubscribeReq serviceEntry, Channel clientChannel) {         final String channelId = ChannelUtil.getChannelId(clientChannel);         final String topicName = serviceEntry.getTopicName();          ConsumerSubscribeBo subscribeBo = new ConsumerSubscribeBo();         subscribeBo.setChannelId(channelId);         subscribeBo.setGroupName(serviceEntry.getGroupName());         subscribeBo.setTopicName(topicName);         subscribeBo.setTagRegex(serviceEntry.getTagRegex());          // 集合         Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName);         if(CollectionUtil.isNotEmpty(set)) {             set.remove(subscribeBo);         }          MqCommonResp resp = new MqCommonResp();         resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());         resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());         return resp;     }      @Override     public List<Channel> getSubscribeList(MqMessage mqMessage) {         final String topicName = mqMessage.getTopic();         Set<ConsumerSubscribeBo> set = subscribeMap.get(topicName);         if(CollectionUtil.isEmpty(set)) {             return Collections.emptyList();         }          //2. 获取匹配的 tag 列表         final List<String> tagNameList = mqMessage.getTags();          Map<String, List<ConsumerSubscribeBo>> groupMap = new HashMap<>();         for(ConsumerSubscribeBo bo : set) {             String tagRegex = bo.getTagRegex();              if(hasMatch(tagNameList, tagRegex)) {                 //TODO: 这种设置模式,统一添加处理                 String groupName = bo.getGroupName();                 List<ConsumerSubscribeBo> list = groupMap.get(groupName);                 if(list == null) {                     list = new ArrayList<>();                 }                 list.add(bo);                  groupMap.put(groupName, list);             }         }          //3. 按照 groupName 分组之后,每一组只随机返回一个。最好应该调整为以 shardingkey 选择         final String shardingKey = mqMessage.getShardingKey();         List<Channel> channelList = new ArrayList<>();          for(Map.Entry<String, List<ConsumerSubscribeBo>> entry : groupMap.entrySet()) {             List<ConsumerSubscribeBo> list = entry.getValue();              ConsumerSubscribeBo bo = RandomUtils.random(list, shardingKey);             BrokerServiceEntryChannel entryChannel = registerMap.get(bo.getChannelId());             channelList.add(entryChannel.getChannel());         }          return channelList;     }      private boolean hasMatch(List<String> tagNameList,                              String tagRegex) {         if(CollectionUtil.isEmpty(tagNameList)) {             return false;         }          Pattern pattern = Pattern.compile(tagRegex);          for(String tagName : tagNameList) {             if(RegexUtils.match(pattern, tagName)) {                 return true;             }         }          return false;     }  } 

getSubscribeList 的逻辑可能稍微复杂点,其实就是消息过来,找到匹配的订阅消费者而已。

因为同一个 groupName 的消费者消息只消费一次,所以需要一次分组。

消息持久化

接口如下:

package com.github.houbb.mq.broker.support.persist;  /**  * @author binbin.hou  * @since 0.0.3  */ public interface IMqBrokerPersist {      /**      * 保存消息      * @param mqMessage 消息      * @since 0.0.3      */     MqCommonResp put(final MqMessagePersistPut mqMessage);      /**      * 更新状态      * @param messageId 消息唯一标识      * @param status 状态      * @return 结果      * @since 0.0.3      */     MqCommonResp updateStatus(final String messageId,                               final String status);      /**      * 拉取消息      * @param pull 拉取消息      * @return 结果      */     MqConsumerPullResp pull(final MqConsumerPullReq pull, final Channel channel);  } 

本地默认实现:

package com.github.houbb.mq.broker.support.persist;  /**  * 本地持久化策略  * @author binbin.hou  * @since 1.0.0  */ public class LocalMqBrokerPersist implements IMqBrokerPersist {      private static final Log log = LogFactory.getLog(LocalMqBrokerPersist.class);      /**      * 队列      * ps: 这里只是简化实现,暂时不考虑并发等问题。      */     private final Map<String, List<MqMessagePersistPut>> map = new ConcurrentHashMap<>();      //1. 接收     //2. 持久化     //3. 通知消费     @Override     public synchronized MqCommonResp put(MqMessagePersistPut put) {         log.info("put elem: {}", JSON.toJSON(put));          MqMessage mqMessage = put.getMqMessage();         final String topic = mqMessage.getTopic();          List<MqMessagePersistPut> list = map.get(topic);         if(list == null) {             list = new ArrayList<>();         }         list.add(put);         map.put(topic, list);          MqCommonResp commonResp = new MqCommonResp();         commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());         commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());         return commonResp;     }      @Override     public MqCommonResp updateStatus(String messageId, String status) {         // 这里性能比较差,所以不可以用于生产。仅作为测试验证         for(List<MqMessagePersistPut> list : map.values()) {             for(MqMessagePersistPut put : list) {                 MqMessage mqMessage = put.getMqMessage();                 if(mqMessage.getTraceId().equals(messageId)) {                     put.setMessageStatus(status);                      break;                 }             }         }          MqCommonResp commonResp = new MqCommonResp();         commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());         commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());         return commonResp;     }      @Override     public MqConsumerPullResp pull(MqConsumerPullReq pull, Channel channel) {         //TODO... 待实现         return null;     }  } 

ps: 后续将会基于 springboot+mysql 进行持久化策略实现。

我们将生产者、消费者的启动都进行调整,连接到 broker 中。

二者是类似的,此处以消费者为例。

核心启动类

package com.github.houbb.mq.consumer.core;  /**  * 推送消费策略  *  * @author binbin.hou  * @since 1.0.0  */ public class MqConsumerPush extends Thread implements IMqConsumer  {      // 属性&设置      @Override     public void run() {         // 启动服务端         log.info("MQ 消费者开始启动服务端 groupName: {}, brokerAddress: {}",                 groupName, brokerAddress);          //1. 参数校验         this.paramCheck();          try {             // channel handler             ChannelHandler channelHandler = this.initChannelHandler();              //channel future             this.channelFutureList = ChannelFutureUtils.initChannelFutureList(brokerAddress, channelHandler);              // register to broker             this.registerToBroker();              // 标识为可用             enableFlag = true;             log.info("MQ 消费者启动完成");         } catch (Exception e) {             log.error("MQ 消费者启动异常", e);             throw new MqException(ConsumerRespCode.RPC_INIT_FAILED);         }     }      //订阅&取消订阅      @Override     public void registerListener(IMqConsumerListener listener) {         this.mqListenerService.register(listener);     }  } 

初始化 handler

private ChannelHandler initChannelHandler() {     final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(DelimiterUtil.DELIMITER);      final MqConsumerHandler mqConsumerHandler = new MqConsumerHandler(invokeService, mqListenerService);     // handler 实际上会被多次调用,如果不是 @Shareable,应该每次都重新创建。     ChannelHandler handler = new ChannelInitializer<Channel>() {         @Override         protected void initChannel(Channel ch) throws Exception {             ch.pipeline()                     .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf))                     .addLast(mqConsumerHandler);         }     };      return handler; } 

注册到服务端

/**  * 注册到所有的服务端  * @since 0.0.3  */ private void registerToBroker() {     for(RpcChannelFuture channelFuture : this.channelFutureList) {         ServiceEntry serviceEntry = new ServiceEntry();         serviceEntry.setGroupName(groupName);         serviceEntry.setAddress(channelFuture.getAddress());         serviceEntry.setPort(channelFuture.getPort());         serviceEntry.setWeight(channelFuture.getWeight());          BrokerRegisterReq brokerRegisterReq = new BrokerRegisterReq();         brokerRegisterReq.setServiceEntry(serviceEntry);         brokerRegisterReq.setMethodType(MethodType.C_REGISTER);         brokerRegisterReq.setTraceId(IdHelper.uuid32());          log.info("[Register] 开始注册到 broker:{}", JSON.toJSON(brokerRegisterReq));         final Channel channel = channelFuture.getChannelFuture().channel();         MqCommonResp resp = callServer(channel, brokerRegisterReq, MqCommonResp.class);         log.info("[Register] 完成注册到 broker:{}", JSON.toJSON(resp));     } } 

订阅与取消订阅

消费者对于关心的消息,实现也比较简单:

public void subscribe(String topicName, String tagRegex) {     ConsumerSubscribeReq req = new ConsumerSubscribeReq();     String messageId = IdHelper.uuid32();     req.setTraceId(messageId);     req.setMethodType(MethodType.C_SUBSCRIBE);     req.setTopicName(topicName);     req.setTagRegex(tagRegex);     req.setGroupName(groupName);      Channel channel = getChannel();      MqCommonResp resp = callServer(channel, req, MqCommonResp.class);     if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {         throw new MqException(ConsumerRespCode.SUBSCRIBE_FAILED);     } } 

取消订阅:

public void unSubscribe(String topicName, String tagRegex) {     ConsumerUnSubscribeReq req = new ConsumerUnSubscribeReq();     String messageId = IdHelper.uuid32();     req.setTraceId(messageId);     req.setMethodType(MethodType.C_UN_SUBSCRIBE);     req.setTopicName(topicName);     req.setTagRegex(tagRegex);     req.setGroupName(groupName);      Channel channel = getChannel();      MqCommonResp resp = callServer(channel, req, MqCommonResp.class);     if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {         throw new MqException(ConsumerRespCode.UN_SUBSCRIBE_FAILED);     } } 

broker 启动

MqBroker broker = new MqBroker(); broker.start(); 

启动日志:

[DEBUG] [2022-04-21 20:36:27.158] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter. [INFO] [2022-04-21 20:36:27.186] [Thread-0] [c.g.h.m.b.c.MqBroker.run] - MQ 中间人开始启动服务端 port: 9999 [INFO] [2022-04-21 20:36:29.060] [Thread-0] [c.g.h.m.b.c.MqBroker.run] - MQ 中间人启动完成,监听【9999】端口 

consumer 启动

final MqConsumerPush mqConsumerPush = new MqConsumerPush(); mqConsumerPush.start();  mqConsumerPush.subscribe("TOPIC", "TAGA"); mqConsumerPush.registerListener(new IMqConsumerListener() {     @Override     public ConsumerStatus consumer(MqMessage mqMessage, IMqConsumerListenerContext context) {         System.out.println("---------- 自定义 " + JSON.toJSONString(mqMessage));         return ConsumerStatus.SUCCESS;     } }); 

启动日志:

... [INFO] [2022-04-21 20:37:40.985] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.registerToBroker] - [Register] 完成注册到 broker:{"respMessage":"成功","respCode":"0000"} 

启动时会注册到 broker。

producer 启动

MqProducer mqProducer = new MqProducer(); mqProducer.start(); String message = "HELLO MQ!"; MqMessage mqMessage = new MqMessage(); mqMessage.setTopic("TOPIC"); mqMessage.setTags(Arrays.asList("TAGA", "TAGB")); mqMessage.setPayload(message);  SendResult sendResult = mqProducer.send(mqMessage);  System.out.println(JSON.toJSON(sendResult)); 

日志:

... [INFO] [2022-04-21 20:39:17.885] [Thread-0] [c.g.h.m.p.c.MqProducer.registerToBroker] - [Register] 完成注册到 broker:{"respMessage":"成功","respCode":"0000"} ... 

此时消费者消费到我们发送的消息。

---------- 自定义 {"methodType":"B_MESSAGE_PUSH","payload":"HELLO MQ!","tags":["TAGA","TAGB"],"topic":"TOPIC","traceId":"2237bbfe55b842328134e6a100e36364"} 

到这里,我们就实现了基于中间人的生产者与消费者通讯。

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq

rpc-从零开始实现 rpc https://github.com/houbb/rpc