diff --git a/hutool-extra/pom.xml b/hutool-extra/pom.xml
index 5dd55c992..dc292ca7e 100755
--- a/hutool-extra/pom.xml
+++ b/hutool-extra/pom.xml
@@ -570,5 +570,11 @@
5.24.0
provided
+
+ org.apache.rocketmq
+ rocketmq-client
+ 5.3.1
+ provided
+
diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaConsumer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaConsumer.java
index 45408fee2..083b95f9b 100644
--- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaConsumer.java
+++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaConsumer.java
@@ -90,10 +90,20 @@ public class KafkaConsumer implements Consumer {
IoUtil.nullSafeClose(this.consumer);
}
+ /**
+ * 消费者记录包装为消息
+ *
+ * @author looly
+ */
private static class ConsumerRecordMessage implements Message {
private final ConsumerRecord record;
+ /**
+ * 构造
+ *
+ * @param record {@link ConsumerRecord}
+ */
private ConsumerRecordMessage(final ConsumerRecord record) {
this.record = record;
}
diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQConsumer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQConsumer.java
new file mode 100644
index 000000000..559c298b9
--- /dev/null
+++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQConsumer.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2025 Hutool Team and hutool.cn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.dromara.hutool.extra.mq.engine.rocketmq;
+
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.dromara.hutool.extra.mq.Consumer;
+import org.dromara.hutool.extra.mq.MQException;
+import org.dromara.hutool.extra.mq.Message;
+import org.dromara.hutool.extra.mq.MessageHandler;
+
+import java.io.IOException;
+
+/**
+ * RocketMQ 消费者
+ *
+ * @author Looly
+ * @since 6.0.0
+ */
+public class RocketMQConsumer implements Consumer {
+
+ private final MQPushConsumer consumer;
+
+ /**
+ * 构造
+ *
+ * @param consumer RocketMQ PushConsumer
+ */
+ public RocketMQConsumer(final MQPushConsumer consumer) {
+ this.consumer = consumer;
+ }
+
+ /**
+ * 设置消费的Topic
+ *
+ * @param topic Topic
+ * @return this
+ */
+ public RocketMQConsumer setTopic(final String topic) {
+ try {
+ this.consumer.subscribe(topic, "*");
+ } catch (final MQClientException e) {
+ throw new MQException(e);
+ }
+ return this;
+ }
+
+ @Override
+ public void subscribe(final MessageHandler messageHandler) {
+ this.consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+ for (final MessageExt msg : msgs) {
+ messageHandler.handle(new RocketMQMessage(msg));
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (null != this.consumer) {
+ this.consumer.shutdown();
+ }
+ }
+
+ /**
+ * RocketMQ消息包装
+ *
+ * @author Looly
+ * @since 6.0.0
+ */
+ private static class RocketMQMessage implements Message {
+ private final MessageExt messageExt;
+
+ private RocketMQMessage(final MessageExt messageExt) {
+ this.messageExt = messageExt;
+ }
+
+
+ @Override
+ public String topic() {
+ return messageExt.getTopic();
+ }
+
+ @Override
+ public byte[] content() {
+ return messageExt.getBody();
+ }
+ }
+}
diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQEngine.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQEngine.java
new file mode 100644
index 000000000..1cb324a07
--- /dev/null
+++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQEngine.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2025 Hutool Team and hutool.cn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.dromara.hutool.extra.mq.engine.rocketmq;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.dromara.hutool.core.lang.Assert;
+import org.dromara.hutool.extra.mq.Consumer;
+import org.dromara.hutool.extra.mq.MQConfig;
+import org.dromara.hutool.extra.mq.MQException;
+import org.dromara.hutool.extra.mq.Producer;
+import org.dromara.hutool.extra.mq.engine.MQEngine;
+
+/**
+ * RocketMQ引擎
+ *
+ * @author Looly
+ * @since 6.0.0
+ */
+public class RocketMQEngine implements MQEngine {
+
+ private MQConfig config;
+
+ /**
+ * 默认构造
+ */
+ public RocketMQEngine() {
+ // SPI方式加载时检查库是否引入
+ Assert.notNull( org.apache.rocketmq.common.message.Message.class);
+ }
+
+ @Override
+ public RocketMQEngine init(final MQConfig config) {
+ this.config = config;
+ return this;
+ }
+
+ @Override
+ public Producer getProducer() {
+ final DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
+ defaultMQProducer.setNamesrvAddr(config.getBrokerUrl());
+ try {
+ defaultMQProducer.start();
+ } catch (final MQClientException e) {
+ throw new MQException(e);
+ }
+ return new RocketMQProducer(defaultMQProducer);
+ }
+
+ @Override
+ public Consumer getConsumer() {
+ final DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
+ defaultMQPushConsumer.setNamesrvAddr(config.getBrokerUrl());
+ return new RocketMQConsumer(defaultMQPushConsumer);
+ }
+}
diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQProducer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQProducer.java
new file mode 100644
index 000000000..decd947c8
--- /dev/null
+++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQProducer.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2025 Hutool Team and hutool.cn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.dromara.hutool.extra.mq.engine.rocketmq;
+
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.dromara.hutool.extra.mq.MQException;
+import org.dromara.hutool.extra.mq.Message;
+import org.dromara.hutool.extra.mq.Producer;
+
+import java.io.IOException;
+
+/**
+ * RocketMQ Producer
+ *
+ * @author Looly
+ * @since 6.0.0
+ */
+public class RocketMQProducer implements Producer {
+
+ private final MQProducer producer;
+
+ /**
+ * 构造
+ *
+ * @param producer RocketMQ Producer
+ */
+ public RocketMQProducer(final MQProducer producer) {
+ this.producer = producer;
+ }
+
+ @Override
+ public void send(final Message message) {
+ final org.apache.rocketmq.common.message.Message rocketMessage =
+ new org.apache.rocketmq.common.message.Message(message.topic(), message.content());
+ try {
+ this.producer.send(rocketMessage);
+ } catch (final Exception e) {
+ throw new MQException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (null != this.producer) {
+ this.producer.shutdown();
+ }
+ }
+}
diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/package-info.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/package-info.java
new file mode 100644
index 000000000..004cc5991
--- /dev/null
+++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2025 Hutool Team and hutool.cn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * RocketMQ引擎
+ *
+ * @author Looly
+ */
+package org.dromara.hutool.extra.mq.engine.rocketmq;