RabbitMQ DLX com Java Spring

Atualmente os sistemas tradicionais não conseguem suprir toda demanda de requisições de clientes. Muitas vezes é preciso investimento vertical, ou seja, aumentar a capacidade dos servidores a fim de aumentar a capacidade de processamento. Como soulção, podemos usar o O RabbitMQ DLX com Java Spring.

English version: click HERE

Por esse e outros motivos surgiram novas arquiteturas de software. O RabbiMQ é uma dessas soluções.

Sendo caracterizado por um sistema orientado a mensagem, com origem nos sistemas distribuídos, prove uma comunicação assíncrona baseada no protocolo AMQP.

Tutorial RabbitMQ:

Artigo 01: RabbitMQ #1 – O que são sistemas de mensageria?

Artigo 02: RabbitMQ #2 – RabbitMQ no docker-compose

Artigo 03: RabbitMQ DLX com Java Spring

No artigo RabbitMQ #1 – O que são sistemas de mensageria? exploramos alguns conceitos e descrevemos o RabbitMQ.

Caso já queira partir para prática, podes seguir o tutorial abaixo para configurar o RabbiMQ no docker com docker-compose.

Tutorial RabbiMQ e dockerer-compose

Obs: Vale lembrar que para configurar o docker-compose é preciso de permissão (de uma olhada em Permissão de grupo – docker-compose não funciona)

RabbitMQ

Esse conceito de mensageira está fundamentado na matéria de sistemas distribuídos dentro da computação. Caracteriza-se por uma comunicação orientada a mensagem, onde podem ser classificadas como persistentes ou transientes.

Dessa forma, sistemas de mensageria (messaging systems) podem ser clusterizados, possuem escalabilidade, são tolerantes a falhas, com comunicação assíncrona. Além disso, é intuitivo dizer que opera como sistema distribuído. 

Simplificando Redes – O que é um sistema de mensageria?

O que é uma Exchange?

Podemos compara uma exchange, no RabbiMQ, com um correio. Esse componente é responsável por receber, analisar e encaminhar as mensagens. Em outras palavras, uma Exchange é responsável pelo rotemaneto das mensagens.

Os parâmetros analisados, assim como o tipo de exchange influenciam o roteamento de uma determinada mensagem. O tipo mais simplista é o fanout. Essa categoria de exchange funciona em modo broadcast, ou seja, as mensagens que recebo serão encaminhadas para todas as filas conectadas a mim.

Existem outras categorias de exchanges dentro da arquitetura do RabbiMQ, são elas: direct, topic, head, default, e as dead letters exchanges.

O que é uma Dead Letter Exchange?

Dentro de um cenário de produção e consumo de mensagens, é possível dizer que haverão mensagens “perdidas” nas filas. Essas mensagens não foram recuperadas por nenhum consumidor por algum motivo.

Para estas mensagens temos um tratamento específico. Vamos pensar…. é justo dizer que se uma mensagem não foi recuperada ela pode ser direcionada para outro local. Este local pode ser uma exchange.

Dessa forma, as dead letter exchanges, como o próprio nome sugere, são exchanges receptoras de mensagens “esquecidas”. Elas por algum motivo não foram solicitadas, contudo, estão consumindo espaço nas filas e exchanges operantes.

Vamos colocar a mão na massa…

Nosso intuito agora consiste em criar uma aplicação de usuário que defina os parâmetros de uma DLX – Dead Letter eXchange.

Nossa linguagem de programação escolhida é o Java com Spring Boot framework. Contudo, o RabbitMQ é cross-language. Então, você pode escolher outra linguagem de programação de sua preferência, como Python.

Conectando ao RabbiMQ

O primeiro passo em qualquer aplicação consiste na conexão com o servidor/sistema. Neste caso, precisamos especificar alguns parâmetros como: IP, username, senha, e porta.

Obs: Verifique se a porta de sua máquina está aberta para realizar a conexão

No código abaixo criaremos uma factory do tipo ConnectionFactory definir os parâmetros de conexão. Após especificarmos o setHost, setUsername, setPassword e port devemos criar a conexão propriamente dita.

OK, criamos a conexão com o servidor. Agora o póximo passo estana criação do canal TCP a ser utilizado pelo produtor (aplicação que gera as mensagens).

ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.24.0.2");
        factory.setUsername("admin");
        factory.setPassword("pass123");
        factory.setPort(5672);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();
https://gist.github.com/julianazanelatto/a64bb2d0beaaf760a719e2f41cea6abb

Feito isso, podemos definir os parâmetros das exchanges. Agora, através deste canal criado podemos declara (caso ainda não exista) as exchanges e filas.

Livros Indicados:

E-Books de Redes e Segurança

Declarando Exchanges

Utilizamos então, o método exchangeDeclare da classe channel para declara asDLX e uma Exchange do tipo Topic. Os parâmetros destes métodos são: NOME, X_CATEGORIA.

Muito bem, declarmos nossas exchanges. Contudo, precisamos das filas para conectar os consumidores a elas. Sendo assim, declaramos a fila DLX_QUEUE para se conectar com a DLX_NAME.

//declarar as exchanges (main e dlx)
        channel.exchangeDeclare(DLX_NAME,"topic");
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

//declarar as filas: consumer, dlx
        channel.queueDeclare(DLX_QUEUE,false,false,false,null);
https://gist.github.com/julianazanelatto/d9940521eda28253a66eb4148610fc2d.js

Definindo as propriedades da Exchange

Muito bem, neste próximo trecho de código vamos definir os parâmetros utilizando um dicionário (MAP<String, Object>). Estaremos instânciando um dicionário dentro da variável map. A partir dela criaremos a configuração da DLX.

Os parâmetros são bem intuitivos na verdade. O primeiro deles etsá relacionado ao tempo de vida de uma mensage – TTL – Time To Live. Neste campo, estamos dizendo ao sistema que uma mensagem só poderá permanecer na fila por 10 segundos antes de ser direcionada a uma DLX.

Após definifa a condição precisamos definir o destino. bserve a terceira linha do código abaixo. Neste caso, a DLX DLX_NAME é o destino das mensagens que preencherem o requisito. Em outras palavars, excedam o TTL.

Próximo parâmetro…. agora iremos definir a label a ser utilizada. Uma label é uma identificação, é por ela que definimos para onde a mensagem vai. Logo, a routing-key será: DLX_BIDING_KEY. A routing key está associada a mensagem, enquanto a binding key está associada a fila.

Definimos todos os requisitos da DLX. Agora precisamos adicionar esses parâmetros a nossa configuração do sistema.

Criamos agora a fila CONSUMER_QUEUE, nela estamos enviando o nosso mapeamento map (dicionário) com todas informações da DLX. Abaixo estão descritas as propriedades de uma fila. Importante salientar que estas são do tipo boleano (true e false).

//parametros relacionados a fila: durável, exclusiva e auto-deletável
channel.queueDeclare(queue, durable, exclusive, autoDelete, null);

O útimo parâmetro está reservado para argumentos relacionados a configuração do sistema. Justamente, neste ponto que enviamos o map como argumento do método.

Map<String, Object> map = new HashMap<String,Object>();
        map.put("x-message-ttl",10000);
        map.put("x-dead-letter-exchange",DLX_NAME);
        map.put("x-dead-letter-routing-key",DLX_BINDING_KEY);
        channel.queueDeclare(CONSUMER_QUEUE,false,false,false,map);

Criando o Binding entre a Exchange e Filas

Para que as exchanges e filas se conectem precisamos definir os binding. Todo biding possui um label (rótulo) associado, os biding keys. Juntamente com as routing keys, os bindig keys são utilizamos durante a fase de roteamento das mensagens.

Repare que toda a configuração foi realizada através do canal criado (channel.method). Agora não é diferente, iremos utilizar o queueBind para definir as conexões entre as filas e exchanges.

No primeiro binding defimos a ligação entre a DLX e sua respectiva fila. Na prática, dizemos ao RabbitMQ que toda mensagem que chegar com uma routing key compatível com a binding key da fila DLX será direcionada para DLX pelas exchanges de operação do sistema.

No segundo binding estamos associando a fila CONSUMER_QUEUE à exchange EXCHANGE_NAME com a label BINDING_KEY.

Obs: o operador .# está relacionado ao match entre a routing key e a binding key.

//bindingkey da dlx e consumer
        channel.queueBind(DLX_QUEUE,DLX_NAME,DLX_BINDING_KEY+".#");
        channel.queueBind(CONSUMER_QUEUE,EXCHANGE_NAME,CONSUMER_BINDING_KEY+".#");

        connection.close();
https://gist.github.com/julianazanelatto/94c774bb30dcf40332ee416c85328bfe.js

Por fim, fechamos a conexão.

connection.close()

Código completo

Aqui você tem o código completo do passo a passo anterior. Neste tutorial, codamos uma aplicação em Java para definir os parâmetros das exchanges do RabbitMQ. Em posts futuros iremos criar o produtor e consumidor. Assim podemos análisar ocomportamento do sistema.

package DLX;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class DlxConfig {
    //DLX
    private static final String DLX_NAME = "dlxExchange";
    private static final String DLX_QUEUE = "dlxQueue";
    private static final String DLX_BINDING_KEY= "dlxrk";

    //EXCHANGE do sistema
    private static final String EXCHANGE_NAME = "mainExchange";

    //CONSUMER
    private static final String CONSUMER_QUEUE = "queueConsumer";
    private static final String CONSUMER_BINDING_KEY = "bkConsumer";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.24.0.2");
        factory.setUsername("admin");
        factory.setPassword("pass123");
        factory.setPort(5672);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //declarar as exchanges (main e dlx)
        channel.exchangeDeclare(DLX_NAME,"topic");
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        //declarar as filas: consumer, dlx
        channel.queueDeclare(DLX_QUEUE,false,false,false,null);

        Map<String, Object> map = new HashMap<String,Object>();
        map.put("x-message-ttl",10000);
        map.put("x-dead-letter-exchange",DLX_NAME);
        map.put("x-dead-letter-routing-key",DLX_BINDING_KEY);
        channel.queueDeclare(CONSUMER_QUEUE,false,false,false,map);

        //bindingkey da dlx e consumer
        channel.queueBind(DLX_QUEUE,DLX_NAME,DLX_BINDING_KEY+".#");
        channel.queueBind(CONSUMER_QUEUE,EXCHANGE_NAME,CONSUMER_BINDING_KEY+".#");

        connection.close();

    }
}
https://gist.github.com/julianazanelatto/3aa2201bbdbbd597e60251ca3832f0fe.js

Tutorial RabbitMQ:

Artigo 01: RabbitMQ #1 – O que são sistemas de mensageria?

Artigo 02: RabbitMQ #2 – RabbitMQ no docker-compose

Artigo 03: RabbitMQ DLX com Java Spring

Quer saber mais sobre RabbitMQ?

Artigos Relacionados

Se quiser saber sobre outro protocolo de comunicação, o HTTP acesse:

Aula teórica sobre o que é o RabbitMQ.

Como funciona na prática o RabbitMQ? Neste vídeo você poderá ver o comportamento do sistema através de aplicações do usuário em Java Spring boot.

Juliana Mascarenhas

Data Scientist and Master in Computer Modeling by LNCC.
Computer Engineer

Instalar Ubuntu VirtualBox

Vamos ensinar como instalar o Ubuntu no VirtualBox. Para isso, vamos realizar uma instalação simples…
Ler mais