From 8bb82a9d92b252e691fc53bd51ffd269f4fc6acf Mon Sep 17 00:00:00 2001 From: Looly Date: Wed, 23 Jul 2025 18:38:34 +0800 Subject: [PATCH] add activeMQ support --- hutool-extra/pom.xml | 7 + .../java/cn/hutool/v7/extra/mq/Consumer.java | 1 - .../java/cn/hutool/v7/extra/mq/MQConfig.java | 2 + .../cn/hutool/v7/extra/mq/MQException.java | 3 + .../mq/engine/activemq/ActiveMQEngine.java | 29 +++++ .../mq/engine/activemq/package-info.java | 23 ++++ .../v7/extra/mq/engine/jms/JmsConsumer.java | 85 ++++++++++++ .../v7/extra/mq/engine/jms/JmsEngine.java | 123 ++++++++++++++++++ .../v7/extra/mq/engine/jms/JmsProducer.java | 52 ++++++++ .../v7/extra/mq/engine/jms/package-info.java | 23 ++++ 10 files changed, 347 insertions(+), 1 deletion(-) create mode 100644 hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/activemq/ActiveMQEngine.java create mode 100644 hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/activemq/package-info.java create mode 100644 hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/JmsConsumer.java create mode 100644 hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/JmsEngine.java create mode 100644 hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/JmsProducer.java create mode 100644 hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/package-info.java diff --git a/hutool-extra/pom.xml b/hutool-extra/pom.xml index ef06d6def..0374946fe 100755 --- a/hutool-extra/pom.xml +++ b/hutool-extra/pom.xml @@ -70,6 +70,7 @@ 4.0.0 5.25.0 5.3.3 + 6.1.7 @@ -554,5 +555,11 @@ ${rocketmq.version} provided + + org.apache.activemq + activemq-client + ${activemq.version} + provided + diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/Consumer.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/Consumer.java index 0e424d84b..c9324d112 100644 --- a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/Consumer.java +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/Consumer.java @@ -39,7 +39,6 @@ public interface Consumer extends Closeable { * * @param messageHandler 消息处理器 */ - @SuppressWarnings("InfiniteLoopStatement") default void listen(final MessageHandler messageHandler) { ThreadUtil.execAsync(() -> { for(;;) { diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/MQConfig.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/MQConfig.java index bfc938c40..ed642632c 100644 --- a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/MQConfig.java +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/MQConfig.java @@ -18,6 +18,7 @@ package cn.hutool.v7.extra.mq; import cn.hutool.v7.extra.mq.engine.MQEngine; +import java.io.Serial; import java.io.Serializable; import java.util.Properties; @@ -28,6 +29,7 @@ import java.util.Properties; * @since 6.0.0 */ public class MQConfig implements Serializable { + @Serial private static final long serialVersionUID = 1L; /** diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/MQException.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/MQException.java index 7b6f1537a..099fba3a8 100644 --- a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/MQException.java +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/MQException.java @@ -18,12 +18,15 @@ package cn.hutool.v7.extra.mq; import cn.hutool.v7.core.exception.HutoolException; +import java.io.Serial; + /** * 消息队列异常 * * @author Looly */ public class MQException extends HutoolException { + @Serial private static final long serialVersionUID = 1L; /** diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/activemq/ActiveMQEngine.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/activemq/ActiveMQEngine.java new file mode 100644 index 000000000..cc02e4329 --- /dev/null +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/activemq/ActiveMQEngine.java @@ -0,0 +1,29 @@ +package cn.hutool.v7.extra.mq.engine.activemq; + +import cn.hutool.v7.core.lang.Assert; +import cn.hutool.v7.extra.mq.MQConfig; +import cn.hutool.v7.extra.mq.engine.jms.JmsEngine; +import jakarta.jms.ConnectionFactory; +import org.apache.activemq.ActiveMQConnectionFactory; + +/** + * ActiveMQ引擎 + * + * @author Looly + * @since 7.0.0 + */ +public class ActiveMQEngine extends JmsEngine { + + /** + * 构造 + */ + public ActiveMQEngine() { + // SPI方式加载时检查库是否引入 + Assert.notNull(org.apache.activemq.ActiveMQConnectionFactory.class); + } + + @Override + protected ConnectionFactory createConnectionFactory(final MQConfig config) { + return new ActiveMQConnectionFactory(config.getBrokerUrl()); + } +} diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/activemq/package-info.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/activemq/package-info.java new file mode 100644 index 000000000..36ce5826d --- /dev/null +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/activemq/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * ActiveMQ消息队列引擎 + * + * @author Looly + * @since 7.0.0 + */ +package cn.hutool.v7.extra.mq.engine.activemq; diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/JmsConsumer.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/JmsConsumer.java new file mode 100644 index 000000000..25f6df1b2 --- /dev/null +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/JmsConsumer.java @@ -0,0 +1,85 @@ +package cn.hutool.v7.extra.mq.engine.jms; + +import cn.hutool.v7.core.io.IoUtil; +import cn.hutool.v7.core.util.ByteUtil; +import cn.hutool.v7.extra.mq.Consumer; +import cn.hutool.v7.extra.mq.MQException; +import cn.hutool.v7.extra.mq.Message; +import cn.hutool.v7.extra.mq.MessageHandler; +import jakarta.jms.BytesMessage; +import jakarta.jms.JMSException; +import jakarta.jms.MessageConsumer; +import jakarta.jms.TextMessage; + +import java.io.IOException; + +/** + * JMS消息消费者 + * + * @author Looly + * @since 7.0.0 + */ +public class JmsConsumer implements Consumer { + + private String consumerGroup; + private final MessageConsumer consumer; + + /** + * 构造 + * + * @param consumerGroup 消费者组 + * @param consumer 消费者 + */ + public JmsConsumer(final String consumerGroup, final MessageConsumer consumer) { + this.consumerGroup = consumerGroup; + this.consumer = consumer; + } + + /** + * 设置消费者组 + * + * @param consumerGroup 消费者组 + * @return this + */ + public JmsConsumer setConsumerGroup(final String consumerGroup) { + this.consumerGroup = consumerGroup; + return this; + } + + @Override + public void subscribe(final MessageHandler messageHandler) { + try { + this.consumer.setMessageListener(message -> { + messageHandler.handle(new Message() { + @Override + public String topic() { + return consumerGroup; + } + + @Override + public byte[] content() { + try { + if (message instanceof TextMessage) { + // TODO 考虑编码 + return ByteUtil.toUtf8Bytes(((TextMessage) message).getText()); + } else if (message instanceof BytesMessage) { + return new byte[(int) ((BytesMessage) message).getBodyLength()]; + } else { + throw new IllegalArgumentException("Unsupported message type: " + message.getClass().getName()); + } + } catch (final JMSException e) { + throw new MQException(e); + } + } + }); + }); + } catch (final JMSException e) { + throw new MQException(e); + } + } + + @Override + public void close() throws IOException { + IoUtil.closeQuietly(this.consumer); + } +} diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/JmsEngine.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/JmsEngine.java new file mode 100644 index 000000000..63b23fc9a --- /dev/null +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/JmsEngine.java @@ -0,0 +1,123 @@ +package cn.hutool.v7.extra.mq.engine.jms; + +import cn.hutool.v7.core.io.IoUtil; +import cn.hutool.v7.extra.mq.Consumer; +import cn.hutool.v7.extra.mq.MQConfig; +import cn.hutool.v7.extra.mq.MQException; +import cn.hutool.v7.extra.mq.Producer; +import cn.hutool.v7.extra.mq.engine.MQEngine; +import jakarta.jms.*; + +import java.io.Closeable; +import java.io.IOException; + +/** + * JMS(Java Message Service)引擎 + * + * @author Looly + * @since 7.0.0 + */ +public abstract class JmsEngine implements MQEngine, Closeable { + + private Connection connection; + private Session session; + private boolean isTopic; + private String producerGroup = "hutool.queue"; + private String consumerGroup = "hutool.queue"; + + @Override + public MQEngine init(final MQConfig config) { + try { + this.connection = createConnectionFactory(config).createConnection(); + this.session = this.connection.createSession(); + } catch (final JMSException e) { + throw new MQException(e); + } + return this; + } + + /** + * 创建ConnectionFactory + * + * @param config 配置 + * @return ConnectionFactory + */ + protected abstract ConnectionFactory createConnectionFactory(final MQConfig config); + + /** + * 设置是否Topic + * + * @param isTopic 是否Topic + * @return this + */ + public JmsEngine setTopic(final boolean isTopic) { + this.isTopic = isTopic; + return this; + } + + /** + * 设置生产者组 + * + * @param producerGroup 生产者组 + * @return this + */ + public JmsEngine setProducerGroup(final String producerGroup) { + this.producerGroup = producerGroup; + return this; + } + + /** + * 设置消费者组 + * + * @param consumerGroup 消费者组 + * @return this + */ + public JmsEngine setConsumerGroup(final String consumerGroup) { + this.consumerGroup = consumerGroup; + return this; + } + + @Override + public Producer getProducer() { + final MessageProducer messageProducer; + try { + messageProducer = this.session.createProducer(createDestination(producerGroup)); + } catch (final JMSException e) { + throw new MQException(e); + } + + return new JmsProducer(this.session, messageProducer); + } + + @Override + public Consumer getConsumer() { + final MessageConsumer messageConsumer; + try { + messageConsumer = this.session.createConsumer(createDestination(consumerGroup)); + } catch (final JMSException e) { + throw new MQException(e); + } + + return new JmsConsumer(this.consumerGroup, messageConsumer); + } + + @Override + public void close() throws IOException { + IoUtil.closeQuietly(this.session); + IoUtil.closeQuietly(this.connection); + } + + /** + * 创建Destination + * + * @param group 组 + * @return Destination + */ + private Destination createDestination(final String group) { + try { + return isTopic ? this.session.createTopic(group) : this.session.createQueue(group); + } catch (final JMSException e) { + throw new MQException(e); + } + } +} diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/JmsProducer.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/JmsProducer.java new file mode 100644 index 000000000..1da1878ec --- /dev/null +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/JmsProducer.java @@ -0,0 +1,52 @@ +package cn.hutool.v7.extra.mq.engine.jms; + +import cn.hutool.v7.core.io.IoUtil; +import cn.hutool.v7.extra.mq.MQException; +import cn.hutool.v7.extra.mq.Message; +import cn.hutool.v7.extra.mq.Producer; +import jakarta.jms.BytesMessage; +import jakarta.jms.JMSException; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; + +import java.io.IOException; + +/** + * JMS消息生产者 + * + * @author looly + * @since 7.0.0 + */ +public class JmsProducer implements Producer { + + private final Session session; + private final MessageProducer producer; + + /** + * 构造 + * + * @param session JMS会话 + * @param producer JMS消息生产者 + */ + public JmsProducer(final Session session, final MessageProducer producer) { + this.session = session; + this.producer = producer; + } + + @Override + public void send(final Message message) { + final BytesMessage bytesMessage; + try { + bytesMessage = this.session.createBytesMessage(); + bytesMessage.writeBytes(message.content()); + this.producer.send(bytesMessage); + } catch (final JMSException e) { + throw new MQException(e); + } + } + + @Override + public void close() throws IOException { + IoUtil.closeQuietly(this.producer); + } +} diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/package-info.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/package-info.java new file mode 100644 index 000000000..c52122f29 --- /dev/null +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * JMS(Java Message Service)消息队列引擎 + * + * @author Looly + * @since 7.0.0 + */ +package cn.hutool.v7.extra.mq.engine.jms;