Руководство пользователя Артемис

Руководство пользователя Артемис

2. Термины, сокращения, определения

Термин, определение, сокращениеОписание
БрокерПрограмма (сервер), которая принимает, хранит и пересылает сообщения между приложениями
ProducerКомпонент (или приложение), отправляющий сообщения
Consumer  Компонент, получающий и обрабатывающий сообщения
Очередь (QueueХранилище сообщений. Каждое сообщение читается только одним получателем
TCP, AMQP  Протоколы обмена данными в сети и обмена сообщениями
JMS, Java Message ServiceСтандарт промежуточного ПО для рассылки сообщений, который позволяет приложения на платформе JavaEE создавать, посылать, получать, и читать сообщения

3. Установка ConnectionFactory для подключения

3.1 Подключение по TCP

Для стандартного подключение по TCP, используется класс ActiveMQConnectionFactory. Для работы с ним необходимо подключить зависимость artemis-jakarta-client:

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>artemis-jakarta-client</artifactId>
        <version>${artemis.version}</version>
    </dependency>

3.2 Подключение по AMQP

Для подключения по AMQP предпочтительным для использования является JmsConnectionFactory из библиотеки qpid-jms-client:

   <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-jms-client</artifactId>
       <version>1.12.1</version>
   </dependency>

3.3 Примеры

3.3.1 TCP

  @Bean(value = "activeMQConnectionFactory")
  public ActiveMQConnectionFactory activeMQConnectionFactory() throws JMSException
{
      ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory();
      connectionFactory.setBrokerURL(standartBrokerURL);
      connectionFactory.setUser(user);
      connectionFactory.setPassword(password);
      connectionFactory.setConnectionTTL(2000);
      connectionFactory.setClientFailureCheckPeriod(2000);
      return connectionFactory;
}

3.3.2 AMQP

   @Bean
   public ConnectionFactory connectionFactory() {
       return new JmsConnectionFactory(user, password, amqpBrokerURL);
}

4. Отправка сообщения в очередь

4.1 Зависимости

Для отправки сообщения используется JmsTemplate метод send. Он требует для передачи наименование очереди (передать обычный String) и MessageCreator (можно как в примере использовать lambda выражение) Находится он в библиотеке spring-jms. Стартер для подключения библиотеки:

   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-activemq</artifactId>
       <version>3.1.2</version>
   </dependency>

4.2 Примеры использования

4.2.1 Отправка строки

   public void sendTestMessageToJms() throws Exception {
       TestDto testDto = TestDto.builder()
               .serviceName("my Service name")
               .build();
       String jsonRequest;
       try {
           jsonRequest = objectMapper.writeValueAsString(testDto);
       } catch (JsonProcessingException e) {
           throw new Exception("Ошибка обработки входящего сообщения" + testDto +
". Ошибка: " + e + ": " + e.getMessage());
        }
        // Отправка сообщения в очередь запросов
        jmsTemplate.send(sendQueue, session -> {
            TextMessage message = session.createTextMessage(jsonRequest);
            return message;
        });
}

4.2.2 Отправка бинарного файла

public void sendBinaryMessageToJms(String filePath) throws Exception {
        Path path = Paths.get(filePath);
        byte[] bytes = Files.readAllBytes(path);
        // Отправка сообщения в очередь запросов
        jmsTemplate.send(sendQueue, session -> {
            BytesMessage bytesMessage = session.createBytesMessage();
            //Тело сообщения
            bytesMessage.writeBytes(bytes);
            //Заголовки
            bytesMessage.setStringProperty("fileName",
path.getFileName().toString());
            return bytesMessage;
        });
    }

4.2.3 Отправка сериализуемого сообщения

public void sendSerializableMessageToJms() throws Exception {
        TestDto testDto = TestDto.builder()
                .serviceName("my Service name")
                .build();
        // Отправка сообщения в очередь запросов
        jmsTemplate.send(sendQueue, session -> {
            ObjectMessage objectMessage = session.createObjectMessage(testDto);
            return objectMessage;
        });
    }

5. Получение сообщения из очереди

Для того чтобы принять сообщение нужно использовать аннотацию @JmsListener из библиотеки spring-jms над методом приемки сообщения. Пример: @JmsListener(destination = "${receiveQueue}")

в параметре используется destination – указание очереди откуда нужно считать сообщение.

Метод принимает класс Message из библиотеки activemq-client-jakarta, который находится в том же стартере, что и spring-jms, далее обрабатываете сообщение и выполняете с ним работу. Пример использования:

    @JmsListener(destination = "${receiveQueue}")
    public void handleResponse(Message message) throws JMSException {
        String correlationId = message.getJMSCorrelationID();
        String responseText;
        //Проверка на тип сообщения
        if (message instanceof JmsBytesMessage) {
            //Байт сообщение в строку
            responseText = new String((byte[]) message.getBody(Object.class),
StandardCharsets.UTF_8);
        } else {
            //Строковое сообщение
            responseText = ((TextMessage) message).getText();
        }
        //Работа с принятым сообщением
        try {
            TestDto testDto = objectMapper.readValue(responseText, TestDto.class);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
}

6. Листинг

6.1 ArtemisConfig.java

package ru.diasoft.micro.config;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
@Configuration
@SuppressWarnings("all")
public class ArtemisConfig {
    /**
      url для подключения по amqp
     */
    @Value("${connection-factory.amqpBrokerURL:amqp://localhost:61616}")
    private String amqpBrokerURL;
    /**
     url для подключения по tcp
     */
    @Value("${connection-factory.standartBrokerURL:tcp://localhost:61616}")
    private String standartBrokerURL;
    /**
     Пользователь под которым будет производиться подключение к артемису
     */
    @Value("${connection-factory.user:artemis}")
    private String user;
    /**
     Пароль под которым будет производиться подключение к артемису
     */
     @Value("${connection-factory.password:artemis}")
     private String password;
     public ArtemisConfig() {
     }

     /**
      Подключение по amqp
      */
      @Bean
      public ConnectionFactory connectionFactory() {
          return new JmsConnectionFactory(user, password, amqpBrokerURL);
      }
      /**
       Подключение по tcp
       */
      @Bean(value = "activeMQConnectionFactory")
      public ActiveMQConnectionFactory activeMQConnectionFactory() throws JMSException
{
          ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory();
          connectionFactory.setBrokerURL(standartBrokerURL);
          connectionFactory.setUser(user);
          connectionFactory.setPassword(password);
          connectionFactory.setConnectionTTL(2000);
          connectionFactory.setClientFailureCheckPeriod(2000);
          return connectionFactory;
     }
}

6.2 JmsService.java

package ru.diasoft.micro.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.qpid.jms.message.JmsBytesMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import ru.diasoft.micro.dto.TestDto;

import javax.jms.*;
import javax.servlet.http.HttpServletRequest;
import java.io.File;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;

@Service
@RequiredArgsConstructor
@Log4j2
@SuppressWarnings("all")
public class JmsService {
    /**
     * JmsTemplate для отправки сообщения
     */
    private final JmsTemplate jmsTemplate;
    private final ObjectMapper objectMapper;

    //Очередь для отправки сообщения
    @Value("${sendQueue}")
    private String sendQueue;

    //Очередь откуда будут считываться сообщения
    @Value("${receiveQueue}")
    private String receiveQueue;

    /**
     * Отправка сообщения
     * @throws Exception
     */
    public void sendTestMessageToJms() throws Exception {
        TestDto testDto = TestDto.builder()
                .serviceName("my Service name")
                .build();
        String jsonRequest;
        try {
            jsonRequest = objectMapper.writeValueAsString(testDto);
        } catch (JsonProcessingException e) {
            throw new Exception("Ошибка обработки входящего сообщения" + testDto +
". Ошибка: " + e + ": " + e.getMessage());
        }
        // Отправка сообщения в очередь запросов
        jmsTemplate.send(sendQueue, session -> {
            TextMessage message = session.createTextMessage(jsonRequest);
            return message;
        });
    }

    /**
     * Отправка бинарного файла
     */
    public void sendBinaryMessageToJms(String filePath) throws Exception {
        Path path = Paths.get(filePath);
        byte[] bytes = Files.readAllBytes(path);

        // Отправка сообщения в очередь запросов
        jmsTemplate.send(sendQueue, session -> {
            BytesMessage bytesMessage = session.createBytesMessage();

            //Тело сообщения
            bytesMessage.writeBytes(bytes);
           //Заголовки
           bytesMessage.setStringProperty("fileName",
path.getFileName().toString());
           return bytesMessage;
       });
    }
    /**
     * Отправка сериализуемого сообщения
     * @throws Exception
     */
    public void sendSerializableMessageToJms() throws Exception {
        TestDto testDto = TestDto.builder()
                .serviceName("my Service name")
                .build();
        // Отправка сообщения в очередь запросов
        jmsTemplate.send(sendQueue, session -> {
            ObjectMessage objectMessage = session.createObjectMessage(testDto);
            return objectMessage;
        });
    }
    /**
     * Приемка сообщения
     * @param message
     * @throws JMSException
     */
    @JmsListener(destination = "${receiveQueue}")
    public void handleResponse(Message message) throws JMSException {
        String correlationId = message.getJMSCorrelationID();
        String responseText;
    //Проверка на тип сообщения
    if (message instanceof JmsBytesMessage) {
        //Байт сообщение в строку
        responseText = new String((byte[]) message.getBody(Object.class),           StandardCharsets.UTF_8);
         } else {
             //Строковое сообщение
             responseText = ((TextMessage) message).getText();
         }
         //Работа с принятым сообщением
         try {
             TestDto testDto = objectMapper.readValue(responseText, TestDto.class);
         } catch (JsonProcessingException e) {
             e.printStackTrace();
         }
   }
}