RabbitMQ

D A S H B O A R D
D E V E L O P
S E C U R I T Y
 RabbitMQ 란?
 사용 이유
 컴포넌트
 GUI 실습
Spring Boot 실습
 심화
주요내용
Reference

 RabbitMQ 란?

RabbitMQ는 메시지 지향 미들웨어에 대한 개방형 표준인 Advanced Message Queuing Protocol (AMQP)을 구현한 오픈소스 메시징 브로커 중 하나이다.
간단하게 이야기하면, 우편물을 받아 수취인에게 전달해주는 우체국과 비슷하다고 생각하면 된다.
메시지 지향 미들웨어란
메시지 지향 미들웨어는 분산 시스템에서 서로 다른 어플리케이션들 간의 통신을 지원하는 소프트웨어 구조를 말한다.
이 구조에서는 데이터를 메시지 단위로 주고받으며, 메시지를 전달할 때는 중간 단계에서 메시지 큐를 통해 관리한다.
이렇게 하면 어플리케이션 간의 결합도를 낮출 수 있어 유연한 아키텍처를 구성할 수 있다.

 사용 이유

RabbitMQ를 떠나, Kafka, ActiveMQ, AWS SQS 등 여러 메시지 시스템을 사용하는 이유를 간단하게 표현하자면, ‘대규모 Enterprise 환경에서 Real-Time 어플리케이션 구현을 위함’ 이라고 표현할 수 있을 것이다.
이를 위해서는 Message Orientied Middleware(MOM)의 이해가 필요합니다. 아래 글에서 MOM, MQ, AMQP에 대한 정보를 얻을 수 있습니다.
MOM은 설계도, Message System는 구현체라고 보면 쉬울 것
또한, 이러한 메시징 시스템은 장점도 가지지만, 단점도 함께 가지게 된다.

장점

서비스간의 결합도가 낮아지므로 비즈니스 로직에만 집중 가능
메시지 처리 방식은 Message Broker가 담당하고 각 서비스는 메시지만 보내면 됨
각 서비스가 비동기 방식으로 메시지를 보내기만 하면, Message Broker에서 순서 보장(Queue)과 메시지 전송을 보장

단점

Message broker 운영을 위한 자원 더 소모
시스템이 복잡하여 관리가 어려움
호출 구간이 늘어나므로 신뢰성이 떨어지고 네트워크 비용이 추가로 발생

 컴포넌트

메시지 지향 미들웨어에 대한 개방형 표준인 Advanced Message Queuing Protocol (AMQP)을 구현한 것이기 때문에 AMQP의 컴포넌트들을 모두 가지고 있다.
컴포넌트
설명
Producer
메시지를 보내는일만 하는 컴포넌트
Binding
전달 받은 메시지를 원하는 Queue 로 전달하기 위해 Bindings 이라는 규칙을 정의
Exchange
Queue 또는 다른 Exchange로 분배하는 라우팅 기능
Queue
메모리나 디스크에 메시지를 저장하고, 그것을 Consumer에게 전달하는 역할
Routing Key
메시지 헤더에 포함되어 어떤 Queue로 메시지를 Routing 할지 체크할때 사용하는 키
Standard Exchange Type
대부분 MQ(Message Queue)에서 가능한 여러가지 상황에 대해 AMQP에서 정의한 표준 라우팅 알고리즘
Virtual Host
하나의 브로커에서 여러 메시징 도메인을 가능하게 하는 논리적 컨테이너이며 이들 간의 분리와 메시징 트래픽을 컨트롤을 관리 유지
Connection
클라이언트가 RabbitMQ 브로커와 상호 작용하기 위해 취해야 하는 첫 번째 단계는 연결을 설정 즉, 발행자와 소비자, Broker 사이의 물리적인 연결
Channels
발행자와 소비자, Broker 사이의 논리적인 연결, 하나의 Connectoin 내에 다수의 Channel 설정 가능 단일 연결 위에 여러 논리 흐름을 다중화
Consumer
Message 를 수신하는 주체
자세한 내용은 아래 글의 AMQP 컴포넌트 참고

 GUI 실습

사용환경 - Mac OS

RabbitMQ 설치

# rabbitmq mac 설치 - homebrew 이용 $ brew install rabbitmq # rabbitmq 시작 $ brew services start rabbitmq
Shell
복사
위 명령어로 Rabbitmq를 시작하고 http://localhost:15672 를 통해 RabbitMQ 콘솔에 접근 가능
기본 계정
Username : guest
Password : guest

계정 생성 및 권한 부여

# 계정 생성 $ /opt/homebrew/sbin/rabbitmqctl add_user admin admin # 권한 부여 (tag 설정) $ /opt/homebrew/sbin/rabbitmqctl set_user_tags admin administrator # 계정 보기 $ /usr/local/sbin/rabbitmqctl list_users
Shell
복사
/opt/homebrew/sbin/rabbitmqctl 를 찾을 수 없다면?
$ find / -name rabbitmqctl 을 통해 rabbitmqctl 위치를 찾은 후 진행

Virtual Host 생성 및 유저 할당

# vhost 생성 $ /opt/homebrew/sbin/rabbitmqctl add_vhost vhost-01 # 유저 vhost 권한 부여) rabbitmqctl list_permissions [-p <vhost>] <user> <conf> <write> <read> $ /opt/homebrew/sbin/rabbitmqctl set_permissions -p vhost-01 admin ".*" ".*" ".*"
Shell
복사
vhost 생성
하나의 유저는 여러개의 Virtual Host를 가질 수 있다.
권한 부여 방식 : 정규표현식으로 권한을 한정
.* : 모든 권한 부여
^$ : 아무 권한도 주지 않음
vhost 클릭 시 권한 설정 가능

Exchange 생성

Virtual Host : 가상 호스트 설정
Name : Exchange의 이름
Type : Exchange Type으로 direct, fanout, headers, topic 이 존재
Durability : 브로커를 다시 실행해도 exchange를 유지할지에 관한 옵션
durable : 재시작해도 유지
transient : 재시작 시 삭제
autoDelete : 마지막 Queue 연결이 해제되면 삭제
internal : 생성된 exchange에 메시지를 직접적으로 보낼 수 없고 해당 exchange가 binding된 다른 exchange를 통해 전달하도록 설정하는 옵션

Queue 생성

Virtual Host : 가상 호스트 설정
Type : quorum, classic, Stream, Default of Virtual Host
quorum : 내구성(durability)를 강화시키고 큐를 복제해 안정적인 메시지 전달 구현
classic : 큐를 복제하지 않기 때문에, 데이터의 안정적인 전달 보다는 많은 데이터 전달이 필요할 때 사용
Stream : 보통 많은 애플리케이션이 동일한 메시지를 읽어야 하는 경우 사용 - kafka와 비슷
Name : Queue의 이름
Durability : 브로커를 다시 실행해도 Queue를 유지할지에 관한 옵션
durable : 재시작해도 유지
transient : 재시작 시 삭제
Exclusive : 현재 연결에서만 액세스할 수 있으며 해당 연결이 닫히면 삭제
Arguments : Message 의 TTL, Max Length 같은 추가 기능을 명시
Quorum Queue
다중 노드 환경에서 메시지 손실 없이 안정적인 메시지 전달을 보장하는 큐입니다.
기존의 Mirrored Queues는 하나의 노드에 저장된 메시지를 다른 노드로 복제하는 방식이었기 때문에, 하나의 노드에서 장애가 발생하면 해당 노드에서 처리되는 모든 메시지가 유실될 수 있었습니다.
하지만 Quorum Queues는 여러 노드에 메시지를 복제해서 저장하기 때문에, 하나의 노드에 장애가 발생해도 다른 노드에 저장된 메시지를 사용할 수 있어 안정적인 메시지 전달을 보장합니다.
Quorum Queues는 다른 큐와 동일하게 사용 가능하며, 큐 생성 시 quorum 옵션을 사용하여 Quorum Queues로 생성할 수 있습니다.
Quorum Queues의 레플리카 기능과 달리 모든 노드에 대한 쓰기 권한이 있어 다중 노드 환경에서 안정적인 메시지 전달을 보장합니다.
Quorum Queues는 odd-numbered(홀수) cluster 노드 구성을 권장합니다.
짝수 노드 구성에서는 "split-brain"과 같은 문제가 발생할 수 있습니다.
Stream Queue
Streams : RabbitMQ 3.9에 새로 추가된 데이터 구조
기존 큐와 달리 메시지를 삭제하지 않는 Append-Only 모델
데이터가 유지되고 복제됨
Usecase
여러 어플리케이션이 동시에 같은 메시지를 받아야 할때
대용량 백로그 저장
Timestamp 기준으로 Replay 및 시간여행 가능
기존 큐 대비 엄청 빠름
주요 기능
최소 1회 전송 보장
서버측 오프셋 트래킹 지원. 컨슈머가 원하는 부분부터 재시작 가능
무한 확장 가능. 크기/기간 기준 저장정책을 통해 삭제 지원
초고속의 전용 바이너리 프로토콜 및 AMQP 0.91 & 1.0 지원
클라이언트-서버 TLS 지원

Spring Boot 실습

가정
RabbitMQ 설치가 되어 있음
RabbitMQ의 Vhost를 가지고 있는 username과 password가 존재
한 프로젝트 안에 Producer와 Consumer를 둔 뒤 메시지를 주고 받는 모습 확인

Build.gradle

dependencies { implementation 'org.springframework.boot:spring-boot-starter-amqp' implementation 'org.springframework.boot:spring-boot-starter-web' compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.amqp:spring-rabbit-test' }
Groovy
복사

application.yml

spring: application: name: rabbitmq config: import: classpath:/secretkey.properties rabbitmq: host: localhost port: 5672 username: ${RABBITMQ_USERNAME} password: ${RABBITMQ_PASSWORD} virtual-host: vhost-01 rabbitmq: exchange: vhost-01.direct queue: q.direct_test bindingKey: rabbitmq routingKey: rabbitmq
YAML
복사

Config

// Direct Exchange 생성 @Bean public DirectExchange createDirectExchange() { return new DirectExchange(EXCHANGE_NAME_DIRECT, true, false, null); } // Queue 생성 // 큐 이름, durability, exclusive, audoDelete, args // 큐 이름 : amq로 시작하는 큐 이름은 예약어이기 때문에 에러발생 // durability : 브로커를 재시작해도 유지됨 // exclusive : 단 하나의 연결에서만 사용되며, 연결이 끊기면 삭제 // audoDelete : 최소 한 명의 소비자가 있는 큪눈 마지막 소비자가 구독을 취소하면 삭제 @Bean public Queue createQueue() { return new Queue(QUEUE_NAME, true,false,false,null); } // Binding @Bean public Binding binding(DirectExchange exchange ) { return BindingBuilder.bind(createQueue()).to(exchange).with(BINDING_KEY); } // JSON 형식의 메시지를 주고 받을 경우 설정 // 의존성 주입을 이용할 경우 RabbitTemplate 와 ConnectionFatcory는 자동 생성 @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(messageConverter); return rabbitTemplate; } // JSON 컨버터 @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
Java
복사

ProducerService

@Service @RequiredArgsConstructor public class ProducerService { private final RabbitTemplate rabbitTemplate; @Value("${rabbitmq.exchange}") private String exchange; @Value("${rabbitmq.routingKey}") private String routingKey; /* exchage와 매핑된 큐에 라우팅 키를 확인 후 매핑이 옳바르다면 큐에 저장 */ public void addQueue(Message message){ rabbitTemplate.convertAndSend(exchange, routingKey, message); } }
Java
복사

ConsumerService

@Slf4j @Service public class ConsumerService { private int cnt = 1; /* RabbitListener를 통해 큐에 저장된 메시지를 받아옴 */ @RabbitListener(queues = "${rabbitmq.queue}") public void getMessage(Message message){ log.info("[ "+ cnt++ +" ]" +" Consumer get Message from Queue: " + message); } }
Java
복사

ProducerController

@Slf4j @RestController @RequiredArgsConstructor public class ProducerController { private final ProducerService rabbitMqService; private int cnt = 1; private Message message = new Message("TEST", "TEST"); @RequestMapping("/send") public Message send() { log.info("[ "+ cnt++ +" ]" +" Producer Put Message to Queue : " + message); rabbitMqService.addQueue(message); return message; } }
Java
복사

 심화

Exchange

// 아래 모든 생성자는 Builder를 통해 생성 가능 // Direct Exchange 생성 @Bean public DirectExchange createDirectExchange() { return new DirectExchange(EXCHANGE_NAME_DIRECT, true, false, null); } // Topic Exchange 생성 @Bean public TopicExchange createDirectExchange() { return new TopicExchange(EXCHANGE_NAME_TOPIC, true, false, null); } // Headers Exchange 생성 @Bean public HeadersExchange createDirectExchange() { return new HeadersExchange(EXCHANGE_NAME_HEADERS, true, false, null); } // Fanout Exchange 생성 @Bean public FanoutExchange createDirectExchange() { return new FanoutExchange(EXCHANGE_NAME_FANOUT, true, false, null); } // rabbitmq_delayed_message_exchange(지연큐를 위한 플로그인) 플러그인과 같은 플러그인을 사용할 경우 사용 @Bean public CustomExchange exchange() { Map<String,Object> headers = new HashMap<>(); headers.put("x-delayed-type","direct"); // 인자 -> exchange명, 타입, durable, autoDelete, args return new CustomExchange(RabbitUtil.TOPIC_EXCHANGE_NAME,"x-delayed-message",true,false,headers); }
Java
복사

Queue

// 아래 모든 생성자는 Builder를 통해 생성 가능 // Queue 생성 @Bean public Queue createQueue() { return new Queue(QUEUE_NAME, true); } // Queue 타입 지정해 생성 @Bean public Queue createQueueByType() { Map<String, Object> args = new HashMap<>(); args.put("x-queue-type", "quorum"); return new Queue(QUEUE_NAME, false, false, false, args); }
Java
복사
Arguments
Description
x-message-ttl
- 메시지가 큐에 대기하는 시간을 제한 - 밀리초 단위로 TTL(Time-To-Live)을 설정하여 메시지가 일정 시간 후에 큐에서 제거
x-expires
- 큐 자체의 수명을 설정 - 밀리초 단위로 지정된 시간이 경과하면 큐가 자동으로 삭제
x-max-length
- 큐에 저장될 수 있는 최대 메시지 개수를 제한하는데 사용 - 큐에 지정된 개수 이상의 메시지가 들어오면 가장 오래된 메시지가 삭제
x-max-length-bytes
- 큐에 저장될 수 있는 최대 바이트 수를 제한하는데 사용 - 큐의 용량을 바이트 단위로 제한하며, 이 값을 초과하는 경우 가장 오래된 메시지가 삭제
x-dead-letter-exchange x-dead-letter-routing-key
- 대기열에서 만료된 메시지나 거부된 메시지를 전달할 교환 및 라우팅 키를 지정하는 데 사용 - 이를 통해 "Dead Letter Exchange" 패턴을 구현
x-overflow
- 큐가 가득 찼을 때 메시지 처리 방식을 지정할 수 있음 - "drop-head"로 설정하면 가장 오래된 메시지가 제거되고 새로운 메시지가 큐에 추가
x-queue-mode
큐 모드를 "lazy"로 설정하여, 메모리 대신 디스크에 데이터를 유지
x-single-active-consumer
- 큐에 단일 소비자만 허용하도록 설정할 수 있음 - 이 소비자가 작업을 완료하기 전까지 다른 소비자가 큐에서 메시지를 가져갈 수 없음
x-queue-type
- 큐의 종류를 지정할 수 있음 - 기본적으로는 "classic"으로 설정되며, "quorum"으로 설정하면 큐가 quorum 모드로 동작하게 됨 - Quorum 모드는 데이터의 안정성과 가용성을 개선하기 위해 설계된 모드
x-max-priority
- 큐에 저장되는 메시지에 우선순위를 부여하고 싶을 때 사용 - 높은 우선순위의 메시지가 먼저 처리
x-keep-history
큐가 삭제되었을 때 큐의 메타데이터와 메시지 히스토리를 보존할지 여부를 설정

RabbitListener

// 기본 사용 @RabbitListener(queues = "${rabbitmq.queue}") public void getMessage(Message message){ log.info("[ "+ cnt++ +" ]" +" Consumer get Message from Queue: " + message); } // 바인딩을 리스너에서 바로 지정하는 방법 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "myQueue", durable = "true"), exchange = @Exchange(value = "auto.exch", type = "fanout", delayed = "false"), key = "orderRoutingKey") )
Java
복사

Connection & Channel

// 새로운 커넥션 생성 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); // 새로운 채널 생성 Channel ch = connection.createChannel(); // 채널을 이용할 Exchange와 Queue 정의 //exchange -> direct channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //create Queue String queueName = channel.queueDeclare().getQueue(); //binding for(String severity:args) channel.queueBind(queueName, EXCHANGE_NAME, severity);
Java
복사

재해복구

메시지 손실에 대한 문제
Consumer가 메시지를 처리하다가 에러가 발생할 경우 → Message. Acknoledgement를 이용해 Consumer가 메시지 처리 후 Ack을 보내도록 하여 만약 Ack을 보내지 않는다면, 다시 큐에 넣는다.
SimpleMessageListenerContainer 를 빈에 등록
@RabbitListener 가 붙은 매서드는 삭제
AcknowledgeMode.MANUAL 설정
@Bean public SimpleMessageListenerContainer listenerContainer(ConnectionFactory connectionFactory, MessageListener messageListener) { SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(); listenerContainer.setConnectionFactory(connectionFactory); listenerContainer.setQueueNames(QUEUE_NAME); listenerContainer.setMessageListener(messageListener); listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); // Acknowledge 매뉴얼 설정 listenerContainer.setPrefetchCount(20); return listenerContainer; }
Java
복사
MessageListener 생성
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 을 통해 메시지 테그로 처리 완료된 메시지에 대한 ACK을 보냄
@Slf4j @Component public class MessageListener implements ChannelAwareMessageListener { private static int cnt = 1; @Override public void onMessage(Message message, Channel channel) throws Exception { log.info("[ "+ cnt++ +" ]" +" Consumer get Message from Queue: " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
Java
복사
channel.basicAck 을 주석처리 했을 경우
channel.basicAck 을 주석처리 해제 했을 경우
간단하게 Consumer의 쓰레드를 올려주는 방법도 존재
SimpleMessageListenerContainer 클래스에 listenerContainer.setConcurrentConsumers(10); 로 쓰레드 늘려주기!
Durability → 디스크에 메시지를 저장해 서버를 재시작하면 해당 메시지가 계속 유지되도록 함
애초에 Queue와 Exchange를 생성할 때 Durabilitytrue로 설정
Fair dispatch → 메시지가 도착했다고 바로 분배하지 말고, 최대 n개씩만 분배하도록 요청할 수 있다. 그래서 이미 소비자에게 n개가 부여되었다면 놀고 있는 다른 소비자에게 분배되도록
SimpleMessageListenerContainer 를 빈에 등록
listenerContainer.setPrefetchCount(20); 을 통해 prefetch 설정
@Bean public SimpleMessageListenerContainer listenerContainer(ConnectionFactory connectionFactory, MessageListener messageListener) { SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(); listenerContainer.setConnectionFactory(connectionFactory); listenerContainer.setQueueNames(QUEUE_NAME); listenerContainer.setMessageListener(messageListener); listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); // Acknowledge 매뉴얼 설정 listenerContainer.setPrefetchCount(20); return listenerContainer; }
Java
복사