래빗MQ(RabbitMQ)
RabbitMQ는 AMQP(Advanced Message Queing Protocol)을 구현한 메시지 브로커이다. AMQP라는 표준MQ 프로토콜로 만들어 져있고 Cluster구성이 쉽고 ManageUI가 제공되며 무엇보다 성능이 뛰어나다고 알려져 현재 많이 사용되고 있다. 또한 ManagementUI, Autocluster, MQTT Convert, STOMP 등의 plugin도 제공되어 확장성이 뛰어나며 Spring에서도 AMQP연동 라이브러리가 제공되어 편리하게 연동하여 사용가능하다.
특징
- ISO 표준(ISO/IEC 19464) AMQP 구현
- 비동기처리를 위한 메시지큐 브로커
- erlang과 java 언어로 만들어짐 (clustering 등을 위한 고속 데이터 전송하는 부분에 erlang이 사용된 것으로 추정)
- 분산처리를 고려한 MQ ( Cluster, Federation )
- 고가용성 보장 (High Availability)
- Publish/Subscribe 방식 지원
- 다양한 plugin 지원
AMQP
여러 MQ들이 존재해왔지만 그들만의 방식으로 구성되어 이기종간에 통신이 어려운 문제로 표준화된 AMQP가 나왔다.
RabbitMQ를 연구하면서 AMQP라는 프로토콜을 접하게 되었는데 예전에 연구했던 MQTT와 Publish/Subscribe를 하며 Reliability처리하는 사상 등에서 많이 닮았다고 느껴졌다.
(필자가 2011년 모바일 Push 프로젝트를 하며 MQTT를 연구한바 있는데, 최근 IoT로 인해 MQTT가 다시 주목받기 시작하고 있어서 RabbitMQ 다음으로는 MQTT에 대해서 설명올려볼 계획이다.)
RabbitMQ기능 (AMQP) 파악하기
주요 용어
- Producer
-
메시지를 보내는 Application
- Publish
-
Producer가 메시지를 보냄
- Queue
-
메시지를 저장하는 버퍼
- Queue는 Exchange에 Binding된다.
- Consumer
-
메시지 받는 User Application
- 동일 업무를 처리하는 Consumer는 보통 하나의 Queue를 바라본다. (중복방지)
- 동일 업무를 처리하는 Consumer가 여러개 인 경우 같은 Queue를 바라보게 하면 자동으로 메시지를 분배하여 전달함
- Subscribe
-
Consumer가 메세지를 수신하기 위해 Queue를 실시간으로 리스닝하도록 만듬.
- Exchange
-
Producer가 전달한 메시지를 Queue에 전달하는 역할
- 메시지가 Queue에 직접 전달되지 않고
exchange type
이라는 속성에 정의된데로 동작
- Exchange type
-
특정 Queue에 보낼지, 여러 Queue에 보낼지, 아니면 그냥 제거될지 등을 선택
type | 설명 | 특징 |
---|---|---|
fanout | 알려진 모든 Queue에 메시지 전달 함 | Broadcast |
direct | 지정된 routingKey를 가진 Queue에만 메시지 전달 함 | unicast |
topic | 지정된 패턴 바인딩 형태에 일치하는 Queue에만 메시지 전달. #(여러단어), *(한단어)를 통한 문자열 패턴 매칭 | multicast |
header | 헤더에 포함된 key=value의 일치조건에 따라서 메시지 전달 | multicast |
- Bindings
-
Exchange와 Queue를 연결해주는 것
- Routing
-
Exchange가 Queue에 메시지를 전달하는 과정
- RoutingKey
-
Exchange와 Queue가 Binding될 때 Exchange가 Queue에 메시지를 전달할지를 결정하는 기준
-
Publish RoutingKey가 Binding시 설정된 RoutingKey값과 일치하거나 (exchange type=
direct
경우) RoutingKey값이 Binding시 설정된 패턴에 매칭될 때 (exchange type=topic
경우)
기본 흐름
Exchange를 별도로 명시하지 않은 요청의 기본 흐름
- Producer가 메시지를 생성하여 전송
- Queue가 이 메시지를 순차적으로 쌓음
- Consumer가 Queue에 대한 Binding을 가지고 있다가 메시지를 Queue에서 수신
상세 흐름
- Producer가 메시지를 생성하여 전송
- Exchange가 어떤 Queue에 전달할지를 Routing
- Queue들이 메시지를 순차적으로 쌓음
- Consumer가 Queue에 대한 Binding을 가지고 있다가 메시지를 Queue에서 수신
메시지 분배 (Round-robin dispatching)
RabbitMQ는 Consumer가 병렬처리를 쉽게 할 수 있도록 같은 Queue를 바라보고 있는 Consumer에게 메시지를 균등 분배한다.
즉 첫번째 메시지는 Consumer1에게 두번째 메시지는 Consumer2에게 분배(중복 처리하지 않도록 1명에게만 줌)
MQ의 존재 이유를 보여주는 매우 중요한 개념이다. 이로 인해 메시지를 받아 처리하는 프로그램들은 수평 확장이 가능하다. 물론 Producer도 수평 확장이 가능하다. (같은 Queue에 메시지를 던져주기만 하면 됨)
Fair dispatch(공평한 분배)
여러 consumer에게 round robin할 때 번갈아가면서 메시지를 전달하지만 완전히 공평하진 않음 (매홀수는 데이터크기가 크고, 매짝수는 데이터크기가 작은 등)
때문에 busy한 서버에게 메시지를 계속 전달하지 않도록 prefetchCount라는 개념사용. prefetchCount가 1일때는 아직 ack를 받지 못한 메시지가 1개라도 있으면 다시 그 consumer에게 메시지 할당하지 않음. 즉 prefetchCount는 동시에 보내는 메시지 양임
메시지 수신 통보 (Acknowledgment)
많은 프로토콜들이 메시지 전달 보장을 위해 Acknowlegment(ACK)라는 개념을 사용하여 메시지에 대한 응답을 보내주도록 되어있다.
MQTT의 경우는 QoS(Quality of Service) level이라고해서 0=안보냄, 1=전달완료확인, 2=최종목적지까지 처리완료 확인의 개념이 있는데
RabbitMQ의 경우는 Acknowledgment(Consumer전달확인)와 Confirm(Publish전달확인)을 이용한 level 1
만 지원하는 듯 하다.(추정)
ACK가 중요한 이유는 Queue는 Consumer에게 데이터를 전달하고나면 Queue에서 메시지를 삭제하므로, Consumer가 전달받았지만 처리도중 오류가 발생하여 재처리해야하는 경우를 위해 보관 유예기간을 두는 용도로 이용된다. 즉 ACK가 온 메시지만 삭제처리하도록 하는 것이다.
※ 주의 basicQos라는 설정은 prefetchCount의 숫자 설정임(!! QoS level 0,1,2의 개념 아님!!)
여러 Client(Consumer) 중 일부가 죽었을시 대응방법
앞서 설명했듯이 Consumer가 ACK를 보내지 않으면 Queue에 쌓인 메시지는 지워지지 않고 남아있다.
메시지를 받은 Consumer가 느려서 아직 Processing 중일 수 있어서 다른 Consumer에게 Round Robin하여 재전송 하자니 중복처리 우려가 있고,
그대로 Unacknowledged
상태로 마냥 처리 안된채로 남겨 둘 수도 없고 곤란하다.
이 경우에 어떻게 동작하는지 확인해볼 필요가 있다.
테스트 예제
같은 메시지에 대하 바인딩되어있는 Consumer가 2개가 있다. 하나는 ACK를 재대로 보내는 Consumer고 하나는 오류 상황이 발생하여 ACK를 보내지 못하는 상황이다. prefetchCount는 1로 하나라도 Unacknowledge
상태라면 재전송을 하지 않도록 설정한다.
위에서 보는 것 처럼 Acknowledge 메시지가 돌아오지 않으면 Queue는 메시지를 삭제하지 않고 보관하고 있다가, Consumer가 이를 처리하지 못한다고 판단했을 때 (Disconnected 되는 등의 상황) 메시지를 다음 순번의 Consumer(Worker)에게 라운드 로빈한다.
RabbitMQ가 재기동 됐을때 대응책
메시지를 Queue에 넣은 뒤 Consumer에게 전달하기 전에 RabittMQ 서버가 죽는다면
기본적으로 해당 메시지는 날라가버리게 된다.
이런 상황을 방지 하기 위해 durable
이라는 개념을 가지고 있다.
Message durability
메시지는 Queue에 보관할 때 file에도 같이 쓰도록 만드는 방법이다.
아래와 같은 방법으로 설정해야 동작한다.
- queue생성시 durable속성을 true로 주고 만든다.
- message publish할때 MessageProperties.PERSISTENT_TEXT_PLAIN을 설정함
1,2번 모두 만족해야 메시지가 Queue에 남아있을 때 restart해도 날라가지 않는다.
※ 메시지의 persistent는 완변히 보장되진 않음. 메번 메시지마다 fsync 로 동기화히지 않기 때문에
짧은시간이나마 아직 Disk에 쓰여지지 않았을 경우가 있다.
좀더 강력한 방법을 보장하기 위해서는 publisher confirms
를 사용
Publish/Subscribe
한 메시지를 여러 Consumer가 동시에 받아 사용해야 하는 경우가 있다. 이러한 Publish/Subscribe 구조를 RabbitMQ로 구성하여 사용 할 수 있다.
기본 exchanges
: RabbitMQ설치시 기본적으로 세팅된 exchanges
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
이름없는 exchange
Producer
가 메시지를 보낼 때 exchange를 지정해서 보낼 수 있는데,
이름이없다면 routingKey
라는 특정 key값으로 Queue에 라우팅 된다.
즉 exchange
명이 있거나 routingKey
명이 있어야 한다.
channel.basicPublish("", "hello", null, message.getBytes());
channel.basicPublish( "logs", "", null, message.getBytes());
임시 Queue
이전에 사용한 Queue를 기억하고 계속적으로 사용하기 위해서는 같은 이름의 Queue를 Worker가 바라보도록 하는 것이 중요하다. 그러나 fanout을 받아 처리하는 Queue인 경우 Consumer가 임시 이름이라도 지정해줘야하는데 다음 2가지가 고려 되어야 한다.
-
RabbitMQ 연결시마다 서버에서 발급한 random Queue 이름의 사용
-
연결이 종료 되었을 때 Queue의 자동삭제
String queueName = channel.queueDeclare().getQueue();
이런식의 amq.gen-JzTY20BRgKO-HjmUJj0wLg
Random Queue가 사용된다.
바인딩 (Bindings)
생성한 exchange를 queue에 매핑을 해줘야 exchange가 메시지를 받으면 Queue에 메시지를 보낼 수 있는데
이를 Bindings
이라고 함
channel.queueBind(queueName, "logs", "");
Routing
모든 메시지를 받는 것이 아니라 선별정으로 특정 종류의 메시지를 받도록 subset을 설정하여 멀티케스트 함
Direct exchange
exchange type 중 direct
을 이용해서 exchange를 만들면 부분적으로 메시지를 라우팅 하도록 처리 할 수 있다.
// 특정 EXCHANGE_NAME 으로 "direct' exchange를 생성
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
Producer
메시지를 publish할 때 routingKey
값을 주어 선별적인 Queue에만 메시지를 전달 한다.
// 특정 EXCHANGE_NAME에 특정 "routingKey"로 binding된 queue에게만 메시지 전송
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
Consumer
메시지를 받는 Consumer, Worker에서 queue bind시 routingKey
값을 주어 선별적으로 수신 한다.
// 특정 EXCHANGE_NAME의 특정 routingKey만 전달 받도록 binding 함
// 여러 routingKey와 bind 될 수 있다.
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);
Multiple binding
Exchange와 Queue간에 n:m binding이 가능하다.
1개의 Queue는 여러 routingKey와 binging 할 수 있다. 여러개의 Queue는 같은 routingKey와 binding 할 수 있다.
패턴매칭 라우팅(Topics)
RabbitMQ는 Topic이라는 개념을 통해 패턴 매칭식 멀티케스트 라우팅을 지원한다.
topic
exchange를 생성하여 특정 문자열 패턴에 일치하는 Queue에만 데이터를 보내주도록 구성할 수 있다.
Topic exchange
exchange type 중 topic
을 이용해서 exchange를 만들면 특정 패턴에 일치하는 routingKey로 메시지가 전송된 경우 메시지를 수신 할 수 있다.
해당 패턴은 다음의 특수 문자를 사용하여 구성한다.
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
Topic Pattern
*
- 여러 Word를 나타냄 (1 Word 는
.
로 구분됨 ) #
- 여러 Word를 나타냄 (1 Word 는
.
로 구분됨 )
※ Topic이 #
으로만 구성되면 fanout
과 동일하다.
※ Topic에 *
나 #
문자가 하나도 없다면 direct
와 동일하다.
예제
quick으로 시작되는 .
으로 구분되는 4개의 Word로 구성된 문자열
quick.*.*.*
quick.test.good.job => 매칭
front.quick.test.good.job => 비매칭
#.end
front.mid.end => 매칭
front.end => 매칭
front.mid.end.last => 비매칭
Producer
메시지를 publish할 때 routingKey
값을 주어 선별적인 Queue에만 메시지를 전달 한다.
// 특정 EXCHANGE_NAME에 특정 "routingKey"로 binding된 queue에게만 메시지 전송 (메시지 전송시는 direct와 별반 다를바 없음)
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
Consumer
메시지를 받는 Consumer, Worker에서 queue bind시 routingKey
의 패턴 값을 주어 선별적으로 수신 한다.
// 특정 EXCHANGE_NAME의 특정 패턴을 가진 문자열로 routingKey만 전달 받도록 binding 함
// 여러 routingKey와 bind 될 수 있다.
String patternedRoutingKey1 = "kern.*";
String patternedRoutingKey1 = "*.critical";
channel.queueBind(queueName, EXCHANGE_NAME, patternedRoutingKey1);
channel.queueBind(queueName, EXCHANGE_NAME, patternedRoutingKey2);
원격 프로시져 호출 (RPC)
RabbitMQ는 Request-Response로 Client와 Server를 이어주기 위해 RPC라는 개념으로 기능을 제공한다.
RPC라는 거창한 이름을 사용하였지만 실제로는 Client의 request를 Server에 전달하고 , Server가 처리한 결과를 알맞은 Client 요청에 대한 응답으로 전달 할 수 있는 방법을 말한다.
Message Properties 설명
- DeliveryMode
- persistent인지 transient인지 표시 (휘발성인지 비휘발성인지 구분자)
- ContentType
- 내용물의 mime-type
- ReplyTo
- Callback Queue의 이름
- CorrelationID
- 요청을 구분할 수 있는 유일값
처리 흐름
-
Client가
CorrelationID
,ReplyTo
주어서 RabbitMQ의 특정 Request보관용 Queue에 데이터를 Push한다. -
Request용 Queue에 데이터를 Server에서 Consume하여 요청을 처리한다.
-
요청처리 후 Request에서 받은
CorrelationID
와ReplyTo
를 추출하여, 요청ID를 속성으로 갖는 Response를ReplyTo
Queue에 Push한다. -
Client는
ReplyTo
Queue를 subscribe하고 있다가 Response가 오면CorrelationID
를 보고 어떤 요청에 대한 응답인지를 구분하여 처리한다.
샘플 설명
Server ( 원격 요청 처리 )
private static final String RPC_QUEUE_NAME = "rpc_queue";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Request를 받아오기 위한 RPC_QUEUE_NAME의 Queue를 사용 (익명의 exchange 사용)
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
// 다중 worker사용시 메시지 고른 분배를 하도록 1개씩만 서버에서 보낸 Response를 처리하도록 설정
channel.basicQos(1);
// 내부적으로 BlockQueue를 사용하여 순차적으로 메시지를 처리 할 수 있게 해주는 QueueingConsumer사용
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
//서버는 계속적으로 메시지 수신한다.
while (true) {
// Consumer통해서 메시지 발생시마다 Delivery라는 객체를 반환한다.
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
// 응답용 Properties를 만든다. client의 응답아이디와 쌍을 맞추기 위해 넘겨받은 CorrelationId를 설정해준다.
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();
// delivery에서 client가 요청한 내용물을 꺼낸다.
String message = new String(delivery.getBody());
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
String response = "" + fib(n);
// Client 응답에 대한 처리를 완료 후 ReplyTo(Client가 메시지 보낼때 임시 생성했던 Response용 임시 Queue)에 메시지를 전달한다.
channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
// Ack를 보낸다. 메시지를 다 처리하고 Request Queue에서 데이터를 지우도록 함
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
Client ( 원격 프로시져 호출 )
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
// Response를 받을 임시 Queue 생성
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
// 응답받을 Queue를 Subscribe 함
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
// 자신이 여러 Request를 보낼 수 있으므로 이를 구분 할 수 있는 구분값 생성
String corrId = java.util.UUID.randomUUID().toString();
// 서버가 응답시 그대로 포함되서 Callback될 Request구분자(coelationID)
// 서버가 메시지처리 후 응답할 Response Queue(replyTo)를 BasicProperties에 세팅한다.
BasicProperties props = new BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
// 메시지 전송(RPC)
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
// 서버사정에 의해 메시지가 중복 발송되거나 순번이 꼬일 수 있다.
// 이를 위해 내가 보낸 요청과 같은 Response인지 CorrelationId로 확인해야 한다.
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
RPC의 이점
RPC 구조를 응용하면 아래와 같은 상황에 이점을 얻을 수 있다.
서버처리 이점
서버 처리속도가 느려서 성능을 증가 시키려고 할 때, RPC 서버를 하나 더 두고 같은 Request Queue를 바라보게 하면 됨 ( Round Robin 하므로 )
Client 이점
하나의 메시지를 개별 Round Trip으로 처리를 위해 queueDeclare같은 동기처리 요청이 필요없다. (임시 Queue를 생성하여 Client마다 다른 Queue를 사용하므로)
RPC 구성시 고려할 점
- 돌아가는 서버가 없을 때 Client 처리
- 요청 Timeout시 Client 처리
- 서버 Exception이나 오동작시 Client에게 이를 어떻게 전달할지
- Invalid한 데이터가 서버로 전달 되었을 때의 처리
※참고자료 RabbitMQ공식사이트