Spring – JMS 集成
JMS 是一个标准的Java API,它允许Java应用程序向另一个应用程序发送消息。它具有高度可扩展性,允许我们使用异步消息传递松散地耦合应用程序。使用 JMS,我们可以读取、发送和读取消息。
下面是一些JMS的实现如下:
- 亚马逊 SQS
- Apache ActiveMQ
- JBoss 消息传递
- 兔MQ
JMS 消息
一条 JMS 消息可以分为以下三部分:
- 标头:它包含有关消息的元数据。
- 属性:它可以进一步细分为三个部分——
- Application:发送消息的Java应用程序。
- 提供者:它由 JMS 提供者使用,并且是特定于实现的。
- 标准属性:这些由 JMS API 定义。
- 有效负载:此字段是消息本身。
实现:在这里,我们将构建一个示例问候(基于 Maven)应用程序来演示如何集成和使用 JMS。为简单起见,我们将使用嵌入式服务器而不是创建另一个应用程序。
项目结构如下:
第 1 步:使用 spring initializr 创建一个 spring 项目。如下图所示创建文件夹和文件,如下所示:
第 2 步:将以下依赖项添加到 pom.xml 文件中。
XML
org.apache.activemq
artemis-server
org.apache.activemq
artemis-jms-server
org.springframework.boot
spring-boot-starter-artemis
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-devtools
runtime
true
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
Java
// Java Program to Illustrate Model Layer
package com.anuanu.springjms.model;
// Importing required classes
import java.io.Serializable;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
// Annotation
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
// Class
// Implementing Serializable interface
public class GreetingMessage implements Serializable {
// Class data members
static final long serialVersionUID
= -7462433555964441775L;
private UUID id;
private String message;
}
Java
// Java Program to Illustrate Embedded Server Configuration
package com.anuanu.springjms;
// Importing required classes
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringJmsApplication {
// Main driver method
public static void main(String[] args) throws Exception
{
// Embedded Server Configuration
ActiveMQServer activeMQServer
= ActiveMQServers.newActiveMQServer(
new ConfigurationImpl()
.setPersistenceEnabled(false)
.setJournalDirectory(
"target/data/journal")
.setSecurityEnabled(false)
.addAcceptorConfiguration("invm",
"vm://0"));
activeMQServer.start();
SpringApplication.run(SpringJmsApplication.class,
args);
}
}
Java
// Java Program to Illustrate Task Configuration
package com.anuanu.springjms.config;
// Importing required classes
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
// Annotations
@EnableScheduling
@EnableAsync
@Configuration
// Class
public class TaskConfig {
// Task Configuration
@Bean TaskExecutor taskExecutor()
{
return new SimpleAsyncTaskExecutor();
}
// taskExecutor bean is injected into spring context,
// and spring will use it to execute tasks for us
}
Java
// Java Program to Illustrate Task Configuration
package com.anuanu.springjms.config;
// Importing required classes
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
// Annotations
@Configuration
// Class
public class JmsConfig {
// Class data member
public static final String QUEUE = "greet";
// Annotation
@Bean
// Class
public MessageConverter messageConverter()
{
MappingJackson2MessageConverter converter
= new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
// Enabling spring to take jms messages and flip it
// to a json message. then it can read
// that jms message as a jms text message and
// convert it back to java object
}
Java
// Java Program to Illustrate Sending JMS Messages
package com.anuanu.springjms.sender;
// Importing required classes
import com.anuanu.springjms.config.JmsConfig;
import com.anuanu.springjms.model.GreetingMessage;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
// Annotations
@RequiredArgsConstructor
@Component
// Class
public class MessageSender {
// Class data member
private final JmsTemplate jmsTemplate;
private static Integer ID = 1;
// Annotation
@Scheduled(fixedRate = 2000)
// Method
public void sendMessage()
{
// Display command
System.out.println("Greetings user");
GreetingMessage message
= GreetingMessage.builder()
.id(UUID.randomUUID())
.message("Greetings user " + ID++)
.build();
jmsTemplate.convertAndSend(JmsConfig.QUEUE,
message);
// Display command
System.out.println("Message sent!!!");
}
}
Java
// Java Program to Illustrate Receiving JMS Messages
package com.anuanu.springjms.listener;
// Importing required classes
import com.anuanu.springjms.config.JmsConfig;
import com.anuanu.springjms.model.GreetingMessage;
import javax.jms.Message;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
// Annotation
@Component
// Class
public class MessageListener {
@JmsListener(destination = JmsConfig.QUEUE)
public void
listen(@Payload GreetingMessage greetingMessage,
@Headers MessageHeaders messageHeaders,
Message message)
{
// Display command
System.out.println("Greeting Received!!!");
System.out.println(greetingMessage);
}
}
模型层
第 3 步:创建一个简单的 POJO(Plain old Java类),它将作为我们发送和接收消息的模型。在这里,我们使用 Lombok 来减少样板代码。使用的注解如下:
- @Data:此注解为所有字段生成 getter、setter、toString 方法和 equals & hashCode 方法。
- @Builder:此注解使用构建器模式来自动构建复杂的构建器 API。
- @NoArgsConstructor:这个注解生成一个没有参数的构造函数。
- @AllArgsConstructor:这个注解生成一个带有所有参数的构造函数。
例子:
Java
// Java Program to Illustrate Model Layer
package com.anuanu.springjms.model;
// Importing required classes
import java.io.Serializable;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
// Annotation
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
// Class
// Implementing Serializable interface
public class GreetingMessage implements Serializable {
// Class data members
static final long serialVersionUID
= -7462433555964441775L;
private UUID id;
private String message;
}
所需配置如下:
A. 嵌入式服务器配置
如前所述,我们将创建一个嵌入式服务器来演示 JMS 消息传递。我们将使用 ActiveMQServers 来创建我们的嵌入式服务器。这是 newActiveMQServer 的方法签名:Modifier and Type Methods and Description static ActiveMQServer newActiveMQServer(Configuration config)
Note: Configuration can be changed as per the requirement.
当我们的 Spring Boot 应用程序启动时,嵌入式服务器也会启动。 (因为我们在服务器上调用了 start() 方法)。
例子:
Java
// Java Program to Illustrate Embedded Server Configuration
package com.anuanu.springjms;
// Importing required classes
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringJmsApplication {
// Main driver method
public static void main(String[] args) throws Exception
{
// Embedded Server Configuration
ActiveMQServer activeMQServer
= ActiveMQServers.newActiveMQServer(
new ConfigurationImpl()
.setPersistenceEnabled(false)
.setJournalDirectory(
"target/data/journal")
.setSecurityEnabled(false)
.addAcceptorConfiguration("invm",
"vm://0"));
activeMQServer.start();
SpringApplication.run(SpringJmsApplication.class,
args);
}
}
C. 任务配置
taskConfig 类将帮助我们异步执行任务。这里的任务是以固定的时间间隔发送消息。为了以固定的时间间隔发送消息,我们使用 @EnableScheduling 注解启用了调度程序。使用的注解如下:
- @EnableScheduling:此注解为我们的应用程序启用调度程序。
- @EnableAsync:这个注解使spring能够在后台线程池中运行@Async方法。
- @Configuration:这个注解表示指定的类有@Bean定义方法。
- @Bean:它是一个方法级别的注解,用于显式声明一个bean。
例子:
Java
// Java Program to Illustrate Task Configuration
package com.anuanu.springjms.config;
// Importing required classes
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
// Annotations
@EnableScheduling
@EnableAsync
@Configuration
// Class
public class TaskConfig {
// Task Configuration
@Bean TaskExecutor taskExecutor()
{
return new SimpleAsyncTaskExecutor();
}
// taskExecutor bean is injected into spring context,
// and spring will use it to execute tasks for us
}
D. 消息转换器配置
jmsConfig 类为生产者和消费者分别提供生产和消费消息的通用流。它还提供了一个用于转换Java对象和 JMS 消息的 bean。使用的注解如下:
- @Configuration :这个注解表示指定的类有@Bean 定义方法。
- @Bean :它是一个方法级别的注解,用于显式声明一个 bean。
Note : Message converter uses Jackson 2.x to convert messages to and from JSON. It maps an object to a BytesMessage, or to a TextMessage if the targetType is set to MessageType.TEXT. It converts from a TextMessage or BytesMessage to an object.
例子:
Java
// Java Program to Illustrate Task Configuration
package com.anuanu.springjms.config;
// Importing required classes
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
// Annotations
@Configuration
// Class
public class JmsConfig {
// Class data member
public static final String QUEUE = "greet";
// Annotation
@Bean
// Class
public MessageConverter messageConverter()
{
MappingJackson2MessageConverter converter
= new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
// Enabling spring to take jms messages and flip it
// to a json message. then it can read
// that jms message as a jms text message and
// convert it back to java object
}
发送 JMS 消息
MessageSender 类(如下所述)主要用于创建消息并将消息生成/发送到消费者可以消费消息的公共流。我们使用了 JmsTemplate,它是一个帮助类,使我们更容易通过 JMS 接收和发送消息。我们还用 @Scheduled 注释了这个类,其值为 2000 ms,它告诉调度程序每隔 2 秒发送一次消息。使用的注解如下:
- @RequiredArgsConstructor :此注释生成一个带有必需参数的构造函数,即具有最终字段或具有其他约束的字段的参数。
- @Component:这个注解将我们的类标记为允许spring检测任何自定义bean的组件。
- @Scheduled:这个注解标记了一个要被调度的方法。它必须具有这些 cron()、fixedDelay() 或 fixedRate() 属性中的任何一个。
例子:
Java
// Java Program to Illustrate Sending JMS Messages
package com.anuanu.springjms.sender;
// Importing required classes
import com.anuanu.springjms.config.JmsConfig;
import com.anuanu.springjms.model.GreetingMessage;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
// Annotations
@RequiredArgsConstructor
@Component
// Class
public class MessageSender {
// Class data member
private final JmsTemplate jmsTemplate;
private static Integer ID = 1;
// Annotation
@Scheduled(fixedRate = 2000)
// Method
public void sendMessage()
{
// Display command
System.out.println("Greetings user");
GreetingMessage message
= GreetingMessage.builder()
.id(UUID.randomUUID())
.message("Greetings user " + ID++)
.build();
jmsTemplate.convertAndSend(JmsConfig.QUEUE,
message);
// Display command
System.out.println("Message sent!!!");
}
}
接收 JMS 消息
MessageListener 类(如下所述)充当消费者,即它消费/接收驻留在公共流中且尚未消费的消息。公共流“JmsConfig.QUEUE”的位置被传递给@JmsListener 中的目标方法。使用的注解如下:
- @JmsListener:此注解将方法标记为指定destination() 上JMS 消息侦听器的目标。
- @Component:这个注解将我们的类标记为允许spring检测任何自定义bean的组件。
- @Payload:该注解标志着要提取的消息的有效载荷为注解参数。
- @Headers:此注解提取 Map
中的所有标题。它在这里与实现 Map 并用于消息头的 MessageHeaders 类一起使用。
例子:
Java
// Java Program to Illustrate Receiving JMS Messages
package com.anuanu.springjms.listener;
// Importing required classes
import com.anuanu.springjms.config.JmsConfig;
import com.anuanu.springjms.model.GreetingMessage;
import javax.jms.Message;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
// Annotation
@Component
// Class
public class MessageListener {
@JmsListener(destination = JmsConfig.QUEUE)
public void
listen(@Payload GreetingMessage greetingMessage,
@Headers MessageHeaders messageHeaders,
Message message)
{
// Display command
System.out.println("Greeting Received!!!");
System.out.println(greetingMessage);
}
}
输出:
消息每 2 秒发送和接收一次。为每条新消息传递一个唯一的用户名和 ID。
检查 messageListener 类:
下图向我们展示了在 messageListener 类中接收到的 jms 消息。可以在标题字段中自定义各种属性。请注意我们在标题中添加的一个键值对。
'_type' -> 'com.anuanu.springjms.model.GreetingMessage'