add activeMQ support

This commit is contained in:
Looly
2025-07-23 18:38:34 +08:00
parent 27f77f83a7
commit 8bb82a9d92
10 changed files with 347 additions and 1 deletions

View File

@@ -70,6 +70,7 @@
<kafka.version>4.0.0</kafka.version>
<rabbitmq.version>5.25.0</rabbitmq.version>
<rocketmq.version>5.3.3</rocketmq.version>
<activemq.version>6.1.7</activemq.version>
</properties>
<dependencies>
@@ -554,5 +555,11 @@
<version>${rocketmq.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>${activemq.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@@ -39,7 +39,6 @@ public interface Consumer extends Closeable {
*
* @param messageHandler 消息处理器
*/
@SuppressWarnings("InfiniteLoopStatement")
default void listen(final MessageHandler messageHandler) {
ThreadUtil.execAsync(() -> {
for(;;) {

View File

@@ -18,6 +18,7 @@ package cn.hutool.v7.extra.mq;
import cn.hutool.v7.extra.mq.engine.MQEngine;
import java.io.Serial;
import java.io.Serializable;
import java.util.Properties;
@@ -28,6 +29,7 @@ import java.util.Properties;
* @since 6.0.0
*/
public class MQConfig implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**

View File

@@ -18,12 +18,15 @@ package cn.hutool.v7.extra.mq;
import cn.hutool.v7.core.exception.HutoolException;
import java.io.Serial;
/**
* 消息队列异常
*
* @author Looly
*/
public class MQException extends HutoolException {
@Serial
private static final long serialVersionUID = 1L;
/**

View File

@@ -0,0 +1,29 @@
package cn.hutool.v7.extra.mq.engine.activemq;
import cn.hutool.v7.core.lang.Assert;
import cn.hutool.v7.extra.mq.MQConfig;
import cn.hutool.v7.extra.mq.engine.jms.JmsEngine;
import jakarta.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* ActiveMQ引擎
*
* @author Looly
* @since 7.0.0
*/
public class ActiveMQEngine extends JmsEngine {
/**
* 构造
*/
public ActiveMQEngine() {
// SPI方式加载时检查库是否引入
Assert.notNull(org.apache.activemq.ActiveMQConnectionFactory.class);
}
@Override
protected ConnectionFactory createConnectionFactory(final MQConfig config) {
return new ActiveMQConnectionFactory(config.getBrokerUrl());
}
}

View File

@@ -0,0 +1,23 @@
/*
* Copyright (c) 2025 Hutool Team and hutool.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* ActiveMQ消息队列引擎
*
* @author Looly
* @since 7.0.0
*/
package cn.hutool.v7.extra.mq.engine.activemq;

View File

@@ -0,0 +1,85 @@
package cn.hutool.v7.extra.mq.engine.jms;
import cn.hutool.v7.core.io.IoUtil;
import cn.hutool.v7.core.util.ByteUtil;
import cn.hutool.v7.extra.mq.Consumer;
import cn.hutool.v7.extra.mq.MQException;
import cn.hutool.v7.extra.mq.Message;
import cn.hutool.v7.extra.mq.MessageHandler;
import jakarta.jms.BytesMessage;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.TextMessage;
import java.io.IOException;
/**
* JMS消息消费者
*
* @author Looly
* @since 7.0.0
*/
public class JmsConsumer implements Consumer {
private String consumerGroup;
private final MessageConsumer consumer;
/**
* 构造
*
* @param consumerGroup 消费者组
* @param consumer 消费者
*/
public JmsConsumer(final String consumerGroup, final MessageConsumer consumer) {
this.consumerGroup = consumerGroup;
this.consumer = consumer;
}
/**
* 设置消费者组
*
* @param consumerGroup 消费者组
* @return this
*/
public JmsConsumer setConsumerGroup(final String consumerGroup) {
this.consumerGroup = consumerGroup;
return this;
}
@Override
public void subscribe(final MessageHandler messageHandler) {
try {
this.consumer.setMessageListener(message -> {
messageHandler.handle(new Message() {
@Override
public String topic() {
return consumerGroup;
}
@Override
public byte[] content() {
try {
if (message instanceof TextMessage) {
// TODO 考虑编码
return ByteUtil.toUtf8Bytes(((TextMessage) message).getText());
} else if (message instanceof BytesMessage) {
return new byte[(int) ((BytesMessage) message).getBodyLength()];
} else {
throw new IllegalArgumentException("Unsupported message type: " + message.getClass().getName());
}
} catch (final JMSException e) {
throw new MQException(e);
}
}
});
});
} catch (final JMSException e) {
throw new MQException(e);
}
}
@Override
public void close() throws IOException {
IoUtil.closeQuietly(this.consumer);
}
}

View File

@@ -0,0 +1,123 @@
package cn.hutool.v7.extra.mq.engine.jms;
import cn.hutool.v7.core.io.IoUtil;
import cn.hutool.v7.extra.mq.Consumer;
import cn.hutool.v7.extra.mq.MQConfig;
import cn.hutool.v7.extra.mq.MQException;
import cn.hutool.v7.extra.mq.Producer;
import cn.hutool.v7.extra.mq.engine.MQEngine;
import jakarta.jms.*;
import java.io.Closeable;
import java.io.IOException;
/**
* JMS(Java Message Service)引擎
*
* @author Looly
* @since 7.0.0
*/
public abstract class JmsEngine implements MQEngine, Closeable {
private Connection connection;
private Session session;
private boolean isTopic;
private String producerGroup = "hutool.queue";
private String consumerGroup = "hutool.queue";
@Override
public MQEngine init(final MQConfig config) {
try {
this.connection = createConnectionFactory(config).createConnection();
this.session = this.connection.createSession();
} catch (final JMSException e) {
throw new MQException(e);
}
return this;
}
/**
* 创建ConnectionFactory
*
* @param config 配置
* @return ConnectionFactory
*/
protected abstract ConnectionFactory createConnectionFactory(final MQConfig config);
/**
* 设置是否Topic
*
* @param isTopic 是否Topic
* @return this
*/
public JmsEngine setTopic(final boolean isTopic) {
this.isTopic = isTopic;
return this;
}
/**
* 设置生产者组
*
* @param producerGroup 生产者组
* @return this
*/
public JmsEngine setProducerGroup(final String producerGroup) {
this.producerGroup = producerGroup;
return this;
}
/**
* 设置消费者组
*
* @param consumerGroup 消费者组
* @return this
*/
public JmsEngine setConsumerGroup(final String consumerGroup) {
this.consumerGroup = consumerGroup;
return this;
}
@Override
public Producer getProducer() {
final MessageProducer messageProducer;
try {
messageProducer = this.session.createProducer(createDestination(producerGroup));
} catch (final JMSException e) {
throw new MQException(e);
}
return new JmsProducer(this.session, messageProducer);
}
@Override
public Consumer getConsumer() {
final MessageConsumer messageConsumer;
try {
messageConsumer = this.session.createConsumer(createDestination(consumerGroup));
} catch (final JMSException e) {
throw new MQException(e);
}
return new JmsConsumer(this.consumerGroup, messageConsumer);
}
@Override
public void close() throws IOException {
IoUtil.closeQuietly(this.session);
IoUtil.closeQuietly(this.connection);
}
/**
* 创建Destination
*
* @param group 组
* @return Destination
*/
private Destination createDestination(final String group) {
try {
return isTopic ? this.session.createTopic(group) : this.session.createQueue(group);
} catch (final JMSException e) {
throw new MQException(e);
}
}
}

View File

@@ -0,0 +1,52 @@
package cn.hutool.v7.extra.mq.engine.jms;
import cn.hutool.v7.core.io.IoUtil;
import cn.hutool.v7.extra.mq.MQException;
import cn.hutool.v7.extra.mq.Message;
import cn.hutool.v7.extra.mq.Producer;
import jakarta.jms.BytesMessage;
import jakarta.jms.JMSException;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.io.IOException;
/**
* JMS消息生产者
*
* @author looly
* @since 7.0.0
*/
public class JmsProducer implements Producer {
private final Session session;
private final MessageProducer producer;
/**
* 构造
*
* @param session JMS会话
* @param producer JMS消息生产者
*/
public JmsProducer(final Session session, final MessageProducer producer) {
this.session = session;
this.producer = producer;
}
@Override
public void send(final Message message) {
final BytesMessage bytesMessage;
try {
bytesMessage = this.session.createBytesMessage();
bytesMessage.writeBytes(message.content());
this.producer.send(bytesMessage);
} catch (final JMSException e) {
throw new MQException(e);
}
}
@Override
public void close() throws IOException {
IoUtil.closeQuietly(this.producer);
}
}

View File

@@ -0,0 +1,23 @@
/*
* Copyright (c) 2025 Hutool Team and hutool.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* JMS(Java Message Service)消息队列引擎
*
* @author Looly
* @since 7.0.0
*/
package cn.hutool.v7.extra.mq.engine.jms;