RabbitMQ DLX com Java Spring

Atualmente os sistemas tradicionais não conseguem suprir toda demanda de requisições de clientes. Muitas vezes é preciso investimento vertical, ou seja, aumentar a capacidade dos servidores a fim de aumentar a capacidade de processamento. Como soulção, podemos usar o O RabbitMQ DLX com Java Spring.

English version: click HERE

Por esse e outros motivos surgiram novas arquiteturas de software. O RabbiMQ é uma dessas soluções.

Sendo caracterizado por um sistema orientado a mensagem, com origem nos sistemas distribuídos, prove uma comunicação assíncrona baseada no protocolo AMQP.

Tutorial RabbitMQ:

Artigo 01: RabbitMQ #1 – O que são sistemas de mensageria?

Artigo 02: RabbitMQ #2 – RabbitMQ no docker-compose

Artigo 03: RabbitMQ DLX com Java Spring

No artigo RabbitMQ #1 – O que são sistemas de mensageria? exploramos alguns conceitos e descrevemos o RabbitMQ.

Caso já queira partir para prática, podes seguir o tutorial abaixo para configurar o RabbiMQ no docker com docker-compose.

Obs: Vale lembrar que para configurar o docker-compose é preciso de permissão (de uma olhada em Permissão de grupo – docker-compose não funciona)

RabbitMQ

Esse conceito de mensageira está fundamentado na matéria de sistemas distribuídos dentro da computação. Caracteriza-se por uma comunicação orientada a mensagem, onde podem ser classificadas como persistentes ou transientes.

Dessa forma, sistemas de mensageria (messaging systems) podem ser clusterizados, possuem escalabilidade, são tolerantes a falhas, com comunicação assíncrona. Além disso, é intuitivo dizer que opera como sistema distribuído. 

Simplificando Redes – O que é um sistema de mensageria?

O que é uma Exchange?

Podemos compara uma exchange, no RabbiMQ, com um correio. Esse componente é responsável por receber, analisar e encaminhar as mensagens. Em outras palavras, uma Exchange é responsável pelo rotemaneto das mensagens.

Os parâmetros analisados, assim como o tipo de exchange influenciam o roteamento de uma determinada mensagem. O tipo mais simplista é o fanout. Essa categoria de exchange funciona em modo broadcast, ou seja, as mensagens que recebo serão encaminhadas para todas as filas conectadas a mim.

Existem outras categorias de exchanges dentro da arquitetura do RabbiMQ, são elas: direct, topic, head, default, e as dead letters exchanges.

O que é uma Dead Letter Exchange?

Dentro de um cenário de produção e consumo de mensagens, é possível dizer que haverão mensagens “perdidas” nas filas. Essas mensagens não foram recuperadas por nenhum consumidor por algum motivo.

Para estas mensagens temos um tratamento específico. Vamos pensar…. é justo dizer que se uma mensagem não foi recuperada ela pode ser direcionada para outro local. Este local pode ser uma exchange.

Dessa forma, as dead letter exchanges, como o próprio nome sugere, são exchanges receptoras de mensagens “esquecidas”. Elas por algum motivo não foram solicitadas, contudo, estão consumindo espaço nas filas e exchanges operantes.

Vamos colocar a mão na massa…

Nosso intuito agora consiste em criar uma aplicação de usuário que defina os parâmetros de uma DLX – Dead Letter eXchange.

Nossa linguagem de programação escolhida é o Java com Spring Boot framework. Contudo, o RabbitMQ é cross-language. Então, você pode escolher outra linguagem de programação de sua preferência, como Python.

Conectando ao RabbiMQ

O primeiro passo em qualquer aplicação consiste na conexão com o servidor/sistema. Neste caso, precisamos especificar alguns parâmetros como: IP, username, senha, e porta.

Obs: Verifique se a porta de sua máquina está aberta para realizar a conexão

No código abaixo criaremos uma factory do tipo ConnectionFactory definir os parâmetros de conexão. Após especificarmos o setHost, setUsername, setPassword e port devemos criar a conexão propriamente dita.

OK, criamos a conexão com o servidor. Agora o póximo passo estana criação do canal TCP a ser utilizado pelo produtor (aplicação que gera as mensagens).

ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.24.0.2");
        factory.setUsername("admin");
        factory.setPassword("pass123");
        factory.setPort(5672);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

Feito isso, podemos definir os parâmetros das exchanges. Agora, através deste canal criado podemos declara (caso ainda não exista) as exchanges e filas.

Declarando Exchanges

Utilizamos então, o método exchangeDeclare da classe channel para declara asDLX e uma Exchange do tipo Topic. Os parâmetros destes métodos são: NOME, X_CATEGORIA.

Muito bem, declarmos nossas exchanges. Contudo, precisamos das filas para conectar os consumidores a elas. Sendo assim, declaramos a fila DLX_QUEUE para se conectar com a DLX_NAME.

//declarar as exchanges (main e dlx)
        channel.exchangeDeclare(DLX_NAME,"topic");
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

//declarar as filas: consumer, dlx
        channel.queueDeclare(DLX_QUEUE,false,false,false,null);

Definindo as propriedades da Exchange

Muito bem, neste próximo trecho de código vamos definir os parâmetros utilizando um dicionário (MAP<String, Object>). Estaremos instânciando um dicionário dentro da variável map. A partir dela criaremos a configuração da DLX.

Os parâmetros são bem intuitivos na verdade. O primeiro deles etsá relacionado ao tempo de vida de uma mensage – TTL – Time To Live. Neste campo, estamos dizendo ao sistema que uma mensagem só poderá permanecer na fila por 10 segundos antes de ser direcionada a uma DLX.

Após definifa a condição precisamos definir o destino. bserve a terceira linha do código abaixo. Neste caso, a DLX DLX_NAME é o destino das mensagens que preencherem o requisito. Em outras palavars, excedam o TTL.

Próximo parâmetro…. agora iremos definir a label a ser utilizada. Uma label é uma identificação, é por ela que definimos para onde a mensagem vai. Logo, a routing-key será: DLX_BIDING_KEY. A routing key está associada a mensagem, enquanto a binding key está associada a fila.

Definimos todos os requisitos da DLX. Agora precisamos adicionar esses parâmetros a nossa configuração do sistema.

Criamos agora a fila CONSUMER_QUEUE, nela estamos enviando o nosso mapeamento map (dicionário) com todas informações da DLX. Abaixo estão descritas as propriedades de uma fila. Importante salientar que estas são do tipo boleano (true e false).

//parametros relacionados a fila: durável, exclusiva e auto-deletável
channel.queueDeclare(queue, durable, exclusive, autoDelete, null);

O útimo parâmetro está reservado para argumentos relacionados a configuração do sistema. Justamente, neste ponto que enviamos o map como argumento do método.

Map<String, Object> map = new HashMap<String,Object>();
        map.put("x-message-ttl",10000);
        map.put("x-dead-letter-exchange",DLX_NAME);
        map.put("x-dead-letter-routing-key",DLX_BINDING_KEY);
        channel.queueDeclare(CONSUMER_QUEUE,false,false,false,map);

Criando o Binding entre a Exchange e Filas

Para que as exchanges e filas se conectem precisamos definir os binding. Todo biding possui um label (rótulo) associado, os biding keys. Juntamente com as routing keys, os bindig keys são utilizamos durante a fase de roteamento das mensagens.

Repare que toda a configuração foi realizada através do canal criado (channel.method). Agora não é diferente, iremos utilizar o queueBind para definir as conexões entre as filas e exchanges.

No primeiro binding defimos a ligação entre a DLX e sua respectiva fila. Na prática, dizemos ao RabbitMQ que toda mensagem que chegar com uma routing key compatível com a binding key da fila DLX será direcionada para DLX pelas exchanges de operação do sistema.

No segundo binding estamos associando a fila CONSUMER_QUEUE à exchange EXCHANGE_NAME com a label BINDING_KEY.

Obs: o operador .# está relacionado ao match entre a routing key e a binding key.

//bindingkey da dlx e consumer
        channel.queueBind(DLX_QUEUE,DLX_NAME,DLX_BINDING_KEY+".#");
        channel.queueBind(CONSUMER_QUEUE,EXCHANGE_NAME,CONSUMER_BINDING_KEY+".#");

        connection.close();

Por fim, fechamos a conexão.

connection.close()

Código completo

Aqui você tem o código completo do passo a passo anterior. Neste tutorial, codamos uma aplicação em Java para definir os parâmetros das exchanges do RabbitMQ. Em posts futuros iremos criar o produtor e consumidor. Assim podemos análisar ocomportamento do sistema.

package DLX;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class DlxConfig {
    //DLX
    private static final String DLX_NAME = "dlxExchange";
    private static final String DLX_QUEUE = "dlxQueue";
    private static final String DLX_BINDING_KEY= "dlxrk";

    //EXCHANGE do sistema
    private static final String EXCHANGE_NAME = "mainExchange";

    //CONSUMER
    private static final String CONSUMER_QUEUE = "queueConsumer";
    private static final String CONSUMER_BINDING_KEY = "bkConsumer";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.24.0.2");
        factory.setUsername("admin");
        factory.setPassword("pass123");
        factory.setPort(5672);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //declarar as exchanges (main e dlx)
        channel.exchangeDeclare(DLX_NAME,"topic");
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        //declarar as filas: consumer, dlx
        channel.queueDeclare(DLX_QUEUE,false,false,false,null);

        Map<String, Object> map = new HashMap<String,Object>();
        map.put("x-message-ttl",10000);
        map.put("x-dead-letter-exchange",DLX_NAME);
        map.put("x-dead-letter-routing-key",DLX_BINDING_KEY);
        channel.queueDeclare(CONSUMER_QUEUE,false,false,false,map);

        //bindingkey da dlx e consumer
        channel.queueBind(DLX_QUEUE,DLX_NAME,DLX_BINDING_KEY+".#");
        channel.queueBind(CONSUMER_QUEUE,EXCHANGE_NAME,CONSUMER_BINDING_KEY+".#");

        connection.close();

    }
}

Tutorial RabbitMQ:

Artigo 01: RabbitMQ #1 – O que são sistemas de mensageria?

Artigo 02: RabbitMQ #2 – RabbitMQ no docker-compose

Artigo 03: RabbitMQ DLX com Java Spring

Quer saber mais sobre RabbitMQ?

Artigos Relacionados

Se quiser saber sobre outro protocolo de comunicação, o HTTP acesse:

Aula teórica sobre o que é o RabbitMQ.

Como funciona na prática o RabbitMQ? Neste vídeo você poderá ver o comportamento do sistema através de aplicações do usuário em Java Spring boot.

Juliana Mascarenhas

Data Scientist and Master in Computer Modeling by LNCC.
Computer Engineer