Руководство пользователя Артемис
2. Термины, сокращения, определения
| Термин, определение, сокращение | Описание |
|---|---|
| Брокер | Программа (сервер), которая принимает, хранит и пересылает сообщения между приложениями |
| Producer | Компонент (или приложение), отправляющий сообщения |
| Consumer | Компонент, получающий и обрабатывающий сообщения |
| Очередь (Queue | Хранилище сообщений. Каждое сообщение читается только одним получателем |
| TCP, AMQP | Протоколы обмена данными в сети и обмена сообщениями |
| JMS, Java Message Service | Стандарт промежуточного ПО для рассылки сообщений, который позволяет приложения на платформе JavaEE создавать, посылать, получать, и читать сообщения |
3. Установка ConnectionFactory для подключения
3.1 Подключение по TCP
Для стандартного подключение по TCP, используется класс ActiveMQConnectionFactory. Для работы с ним необходимо подключить зависимость artemis-jakarta-client:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jakarta-client</artifactId>
<version>${artemis.version}</version>
</dependency>3.2 Подключение по AMQP
Для подключения по AMQP предпочтительным для использования является JmsConnectionFactory из библиотеки qpid-jms-client:
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>1.12.1</version>
</dependency>3.3 Примеры
3.3.1 TCP
@Bean(value = "activeMQConnectionFactory")
public ActiveMQConnectionFactory activeMQConnectionFactory() throws JMSException
{
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(standartBrokerURL);
connectionFactory.setUser(user);
connectionFactory.setPassword(password);
connectionFactory.setConnectionTTL(2000);
connectionFactory.setClientFailureCheckPeriod(2000);
return connectionFactory;
}3.3.2 AMQP
@Bean
public ConnectionFactory connectionFactory() {
return new JmsConnectionFactory(user, password, amqpBrokerURL);
}4. Отправка сообщения в очередь
4.1 Зависимости
Для отправки сообщения используется JmsTemplate метод send. Он требует для передачи наименование очереди (передать обычный String) и MessageCreator (можно как в примере использовать lambda выражение) Находится он в библиотеке spring-jms. Стартер для подключения библиотеки:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>3.1.2</version>
</dependency>4.2 Примеры использования
4.2.1 Отправка строки
public void sendTestMessageToJms() throws Exception {
TestDto testDto = TestDto.builder()
.serviceName("my Service name")
.build();
String jsonRequest;
try {
jsonRequest = objectMapper.writeValueAsString(testDto);
} catch (JsonProcessingException e) {
throw new Exception("Ошибка обработки входящего сообщения" + testDto +
". Ошибка: " + e + ": " + e.getMessage());
}
// Отправка сообщения в очередь запросов
jmsTemplate.send(sendQueue, session -> {
TextMessage message = session.createTextMessage(jsonRequest);
return message;
});
}4.2.2 Отправка бинарного файла
public void sendBinaryMessageToJms(String filePath) throws Exception {
Path path = Paths.get(filePath);
byte[] bytes = Files.readAllBytes(path);
// Отправка сообщения в очередь запросов
jmsTemplate.send(sendQueue, session -> {
BytesMessage bytesMessage = session.createBytesMessage();
//Тело сообщения
bytesMessage.writeBytes(bytes);
//Заголовки
bytesMessage.setStringProperty("fileName",
path.getFileName().toString());
return bytesMessage;
});
}4.2.3 Отправка сериализуемого сообщения
public void sendSerializableMessageToJms() throws Exception {
TestDto testDto = TestDto.builder()
.serviceName("my Service name")
.build();
// Отправка сообщения в очередь запросов
jmsTemplate.send(sendQueue, session -> {
ObjectMessage objectMessage = session.createObjectMessage(testDto);
return objectMessage;
});
}5. Получение сообщения из очереди
Для того чтобы принять сообщение нужно использовать аннотацию @JmsListener из библиотеки
spring-jms над методом приемки сообщения.
Пример:
@JmsListener(destination = "${receiveQueue}")
в параметре используется destination – указание очереди откуда нужно считать сообщение.
Метод принимает класс Message из библиотеки activemq-client-jakarta, который находится в том же стартере, что и spring-jms, далее обрабатываете сообщение и выполняете с ним работу. Пример использования:
@JmsListener(destination = "${receiveQueue}")
public void handleResponse(Message message) throws JMSException {
String correlationId = message.getJMSCorrelationID();
String responseText;
//Проверка на тип сообщения
if (message instanceof JmsBytesMessage) {
//Байт сообщение в строку
responseText = new String((byte[]) message.getBody(Object.class),
StandardCharsets.UTF_8);
} else {
//Строковое сообщение
responseText = ((TextMessage) message).getText();
}
//Работа с принятым сообщением
try {
TestDto testDto = objectMapper.readValue(responseText, TestDto.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}6. Листинг
6.1 ArtemisConfig.java
package ru.diasoft.micro.config;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
@Configuration
@SuppressWarnings("all")
public class ArtemisConfig {
/**
url для подключения по amqp
*/
@Value("${connection-factory.amqpBrokerURL:amqp://localhost:61616}")
private String amqpBrokerURL;
/**
url для подключения по tcp
*/
@Value("${connection-factory.standartBrokerURL:tcp://localhost:61616}")
private String standartBrokerURL;
/**
Пользователь под которым будет производиться подключение к артемису
*/
@Value("${connection-factory.user:artemis}")
private String user;
/**
Пароль под которым будет производиться подключение к артемису
*/
@Value("${connection-factory.password:artemis}")
private String password;
public ArtemisConfig() {
}
/**
Подключение по amqp
*/
@Bean
public ConnectionFactory connectionFactory() {
return new JmsConnectionFactory(user, password, amqpBrokerURL);
}
/**
Подключение по tcp
*/
@Bean(value = "activeMQConnectionFactory")
public ActiveMQConnectionFactory activeMQConnectionFactory() throws JMSException
{
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(standartBrokerURL);
connectionFactory.setUser(user);
connectionFactory.setPassword(password);
connectionFactory.setConnectionTTL(2000);
connectionFactory.setClientFailureCheckPeriod(2000);
return connectionFactory;
}
}6.2 JmsService.java
package ru.diasoft.micro.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.qpid.jms.message.JmsBytesMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import ru.diasoft.micro.dto.TestDto;
import javax.jms.*;
import javax.servlet.http.HttpServletRequest;
import java.io.File;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;
@Service
@RequiredArgsConstructor
@Log4j2
@SuppressWarnings("all")
public class JmsService {
/**
* JmsTemplate для отправки сообщения
*/
private final JmsTemplate jmsTemplate;
private final ObjectMapper objectMapper;
//Очередь для отправки сообщения
@Value("${sendQueue}")
private String sendQueue;
//Очередь откуда будут считываться сообщения
@Value("${receiveQueue}")
private String receiveQueue;
/**
* Отправка сообщения
* @throws Exception
*/
public void sendTestMessageToJms() throws Exception {
TestDto testDto = TestDto.builder()
.serviceName("my Service name")
.build();
String jsonRequest;
try {
jsonRequest = objectMapper.writeValueAsString(testDto);
} catch (JsonProcessingException e) {
throw new Exception("Ошибка обработки входящего сообщения" + testDto +
". Ошибка: " + e + ": " + e.getMessage());
}
// Отправка сообщения в очередь запросов
jmsTemplate.send(sendQueue, session -> {
TextMessage message = session.createTextMessage(jsonRequest);
return message;
});
}
/**
* Отправка бинарного файла
*/
public void sendBinaryMessageToJms(String filePath) throws Exception {
Path path = Paths.get(filePath);
byte[] bytes = Files.readAllBytes(path);
// Отправка сообщения в очередь запросов
jmsTemplate.send(sendQueue, session -> {
BytesMessage bytesMessage = session.createBytesMessage();
//Тело сообщения
bytesMessage.writeBytes(bytes);
//Заголовки
bytesMessage.setStringProperty("fileName",
path.getFileName().toString());
return bytesMessage;
});
}
/**
* Отправка сериализуемого сообщения
* @throws Exception
*/
public void sendSerializableMessageToJms() throws Exception {
TestDto testDto = TestDto.builder()
.serviceName("my Service name")
.build();
// Отправка сообщения в очередь запросов
jmsTemplate.send(sendQueue, session -> {
ObjectMessage objectMessage = session.createObjectMessage(testDto);
return objectMessage;
});
}
/**
* Приемка сообщения
* @param message
* @throws JMSException
*/
@JmsListener(destination = "${receiveQueue}")
public void handleResponse(Message message) throws JMSException {
String correlationId = message.getJMSCorrelationID();
String responseText;
//Проверка на тип сообщения
if (message instanceof JmsBytesMessage) {
//Байт сообщение в строку
responseText = new String((byte[]) message.getBody(Object.class), StandardCharsets.UTF_8);
} else {
//Строковое сообщение
responseText = ((TextMessage) message).getText();
}
//Работа с принятым сообщением
try {
TestDto testDto = objectMapper.readValue(responseText, TestDto.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}