适用版本:v3.3.6+;

项目中

dependency
producer依赖相关:

 <dependency>
    <groupId>com.ak.iform.message</groupId>
    <artifactId>iform-message-producer-rabbit</artifactId>
    <version>${iform.version}</version>
</dependency>
<dependency>
    <groupId>com.ak.iform.message</groupId>
    <artifactId>iform-message-producer-kafka</artifactId>
    <version>${iform.version}</version>
</dependency>
<dependency>
    <groupId>com.ak.iform.message</groupId>
    <artifactId>iform-message-producer-mq</artifactId>
    <version>${iform.version}</version>
</dependency>

consumer依赖相关:

<dependency>
    <groupId>com.ak.iform.message</groupId>
    <artifactId>iform-message-consumer-rabbit</artifactId>
    <version>${iform.version}</version>
</dependency>
<dependency>
    <groupId>com.ak.iform.message</groupId>
    <artifactId>iform-message-consumer-kafka</artifactId>
    <version>${iform.version}</version>
</dependency>
<dependency>
    <groupId>com.ak.iform.message</groupId>
    <artifactId>iform-message-consumer-mq</artifactId>
    <version>${iform.version}</version>
</dependency>

配置说明
平台目前支持的有rabbitmq、kafka、active、redis和rocket,默认使用redis,通过配置文控制当前使用哪一种消息队列。
在application-common.yml配置文件里,有使用哪一种消息队列的配置

com:
  ak:
    mq:
      # 使用的消息队列,目前支持 redis/kafka/rocketMq/rabbitMq/activeMq
      type: ${IFORM_MQ_TYPE:redis}

在application-*-base.yml配置文件里,有使用的消息队列的配置

rocketmq:
  name-server: ${ROCKETMQ_HOST:127.0.0.1}:${ROCKETMQ_PORT:9876}
  producer:
    group: iform-message-producer

在开发时,可以直接在启动模块的环境配置文件yml内,通过别名进行消息队列的选择及配置,如:

# MQ选择 支持 redis/kafka/rocketMq/rabbitMq/activeMq
IFORM_MQ_TYPE: rocketMq
# rocketmq配置
ROCKETMQ_HOST: 192.168.1.211
ROCKETMQ_PORT: 9876

当前文档以rabbitmq为例。消费者通过message中的MessageType来判断由哪个handler处理消息,该handler需要实现IMessageQueueHandler或者ICommandQueueHandler接口。

conf/iform-message-consumer.xml:

<!-- mq 消息发送实现类注册 -->
<bean id="messageHandlerList" class="java.util.ArrayList">
   <constructor-arg>
      <list>
         <ref bean="innerMessageQueueHandler" />
         <ref bean="mailMessageQueueHandler" />
         <ref bean="smsMessageQueueHandler" />
         <ref bean="wechatMessageQueueHandler" />
      </list>
   </constructor-arg>
</bean>

<util:map id="messageQueueHandlers" map-class="java.util.HashMap">
   <entry key="innerMessageQueueHandler" value-ref="innerMessageQueueHandler"/>
   <entry key="mailMessageQueueHandler" value-ref="mailMessageQueueHandler"/>
   <entry key="smsMessageQueueHandler" value-ref="smsMessageQueueHandler"/>
   <entry key="wechatMessageQueueHandler" value-ref="wechatMessageQueueHandler"/>
</util:map>

<util:map id="commandQueueHandlers" map-class="java.util.HashMap">
   <entry key="removeCacheCommandQueueHandler" value-ref="removeCacheCommandQueueHandler"/>
   <entry key="syncDatasourceCommandQueueHandler" value-ref="syncDatasourceCommandQueueHandler"/>
</util:map>

API说明

IMessageQueueProducer:

    /**
     * 发送消息到队列中
     * @param message 
     */
    public void push(T message);

MessageQueueProductorUtil:

/**
 * 发送多种消息到消息队列
 *
 * @param msgTypes 消息类型集合,{@link com.ak.iform.cloud.mq.core.constants.MessageType}
 * @param receivers    接收人集合,用户ID/邮箱/手机号码/微信号
 * @param receiverNames    接收人姓名集合,与接收人集合一一对应
 * @param subject  消息标题
 * @param content  消息内容
 * @param contentType  消息类型,{@link com.ak.iform.cloud.mq.core.constants.ContentType}
 */
public static void send(List<String> msgTypes, List<String> receivers, List<String> receiverNames, 
      String subject, String content, String contentType)

    /**
     * 发送指定类型消息到消息队列
     *
     * @param msgType    消息类型,{@link com.ak.iform.cloud.mq.core.constants.MessageType}
     * @param receivers    接收人集合,用户ID/邮箱/手机号码/微信号
     * @param receiverNames    接收人姓名集合,与接收人集合一一对应
     * @param subject     消息标题
     * @param content     消息内容
     * @param contentType     消息类型,{@link com.ak.iform.cloud.mq.core.constants.ContentType}
     */
    public static void send(String msgType, List<String> receivers, List<String> receiverNames, 
            String subject, String content, String contentType)

    /**
     * 发送指定类型消息到消息队列
     *
     * @param msgType    消息类型,{@link com.ak.iform.cloud.mq.core.constants.MessageType}
     * @param receivers    接收人集合,用户ID/邮箱/手机号码/微信号
     * @param receiverNames    接收人姓名集合,与接收人集合一一对应
     * @param typeKey     消息模版标识
     * @param vars     消息变量
     */
    public static void sendByTemplate(String msgType, List<String> receivers, List<String> receiverNames, 
            String typeKey, Map<String, Object> vars) 

    /**
     * 发送多种消息到消息队列
     *
     * @param sender     发送人
     * @param senderName     发送人姓名
     * @param msgTypes    消息类型集合,{@link com.ak.iform.cloud.mq.core.constants.MessageType}
     * @param receivers    接收人集合,用户ID/邮箱/手机号码/微信号
     * @param receiverNames    接收人姓名集合,与接收人集合一一对应
     * @param subject     消息标题
     * @param content     消息内容
     * @param contentType     消息类型,{@link com.ak.iform.cloud.mq.core.constants.ContentType}
     */
    public static void send(String sender, String senderName, 
            List<String> msgTypes, List<String> receivers, List<String> receiverNames, 
            String subject, String content, String contentType) {
        if(BeanUtils.isEmpty(msgTypes))

    /**
     * 发送指定类型消息到消息队列
     *
     * @param sender     发送人
     * @param senderName     发送人姓名
     * @param msgType    消息类型,{@link com.ak.iform.cloud.mq.core.constants.MessageType}
     * @param receivers    接收人集合,用户ID/邮箱/手机号码/微信号
     * @param receiverNames    接收人姓名集合,与接收人集合一一对应
     * @param subject     消息标题
     * @param content     消息内容
     * @param contentType     消息类型,{@link com.ak.iform.cloud.mq.core.constants.ContentType}
     */
    public static void send(String sender, String senderName, 
            String msgType, List<String> receivers, List<String> receiverNames, 
            String subject, String content, String contentType,
            String typeKey, Map<String, Object> vars)

    /**
     * 发送指定类型消息到消息队列
     *
     * @param sender     发送人
     * @param senderName     发送人姓名
     * @param msgType    消息类型,{@link com.ak.iform.cloud.mq.core.constants.MessageType}
     * @param receivers    接收人集合,用户ID/邮箱/手机号码/微信号
     * @param receiverNames    接收人姓名集合,与接收人集合一一对应
     * @param typeKey     消息模版标识
     * @param vars     消息变量
     */
    public static void sendByTemplate(String sender, String senderName, 
            String msgType, List<String> receivers, List<String> receiverNames, 
            String typeKey, Map<String, Object> vars) {
        if (StringUtil.isBlank(typeKey)) 

IQueueConsumer:

/**
 * 消费队列中的消息
 * @param message 
 */
public void popup(M message);

使用示例

MessageQueueProductorUtil: // producer

MessageQueueProductorUtil.send("-1", "System", MessageType.MAIL.value(), 
                receivers, null, subject, content, 
                ContentType.HTML.name(), null, null);

InnerMessageQueueHandler:// consumer

...
    @Override
    public void send(M message) {
        logger.debug(">>>>>>>>>>>>>>>>>>>>>starting to send {} message>>>>>>>>>>>>>>>>>>", getMessageType());

        if(BeanUtils.isEmpty(message) 
                || BeanUtils.isEmpty(message.getSenderId()) 
                || BeanUtils.isEmpty(message.getReceivers())
                || BeanUtils.isEmpty(message.getTemplate())
                )
        {
            return;
        }

        MessageQueueConsumerUtil.transfer(message);

        if (innerMessageDomain != null) {
            InnerMessageVo vo = new InnerMessageVo();
            vo.setSubject(message.getTemplate().getSubject());
            vo.setContent(message.getTemplate().getContent());
            vo.setIsPublic(InnerMessageVo.IS_PUBLIC_NO);
            vo.setCanreply(InnerMessagePo.IS_REPLY_NO);
            vo.setMessageType(InnerMessagePo.TYPE_SYSTEM);
            vo.setReceiverId(StringUtil.join(message.getReceivers(), StringPool.COMMA));
            vo.setReceiver(StringUtil.join(message.getReceiverNames(), StringPool.COMMA));

            innerMessageDomain.send(vo, message.getSenderId(), message.getSenderName());

            logger.debug("<<<<<<<<<<<<<<<<<<<<<ending to send {} message<<<<<<<<<<<<<<<<<<<<<", getMessageType());
        }
    }

...
作者:caoyl  创建时间:2024-02-29 17:27
最后编辑:王文舟  更新时间:2025-11-04 15:32