728x90
RabbitMQ는 AMQP 프로토콜을 구현한 메시지 브로커다.
메시지가 producer부터 consumer까지 전달되는 흐름을 간단하게 설명하자면
일반적으로 producer 는 exchange 에 메시지를 보낸다.
exchange는 라우터의 역할을 하면서 자신과 binding된 queue에 메세지를 보낸다.
queue와 exchange를 바인딩 하는건 당연히 메시지 보내기 전에 해둬야 한다.
이 때 exchange는 fanout 이냐 direct냐 아님 topic 이냐 같은 exchange 생성 당시 속성에 따라서 큐에 어떻게 메시지를 보낼지 결정한다.
아래 그림을 보면 한번에 이해가 될텐데, direct 나 topic은 큐를 binding 할때 아님 메시지를 보낼때
바인딩된 모든 큐가 아니라 바인딩 된 큐중에서도 자기가 보내고 싶은 queue 를 지정할 수 있지만
fanout은 그런거 없고 그냥 바인딩된 모든 큐에 전달 받은 메시지를 다 뿌려버리는 것이다.
Consumer는 그냥 queue를 구독하고 있으면 된다. 그럼 얘가 메시지를 queue에 가서 빼온다.
그래도 이해가 안간다? 그럴때는 코드로 보면 이해가 쉽다. 위에 한번 더 보고 밑에 코드를 보자.
pseudocode로 설명하면 다음과 같다.
먼저 송신 (producer) 부분이다
< AMQP의 일반적인 흐름 - 송신>
amqp.connect('amqp://localhost').then(async (connection) => {
// 채널 생성
const ch = await connection.createChannel();
//커넥션과 채널 만들기
//익스체인지 정의 및 생성
const exchangeName = 'fanout_exchange_0001';
ch.assertExchange(exchangeName, type을 정해야 하는데 여기선 'fanout')
//Exchange를 생성함 타입을 정의해서 만들어 줌
//type은 direct,fanout,topic등 필요에 따라 정의한다
const message = 'Hello World!';
// exchange를 통해 메시지를 전송
ch.publish(exchangeName, '', Buffer.from(message));
//두번째 파라미터는 라우팅 키, fanout 이기때문에 그냥 비워둠. 아무곳에 다 보내니까
...
connection.close();
}
이제 메시지를 수신하는 (cosumer) 부분
< AMQP의 일반적인 흐름 - 메시지 수신>
const exchangeName = 'fanout_exchange_0001';
async function receiveMessages() {
// AMQP 서버에 연결
const connection = await amqp.connect('amqp://localhost');
// 채널 생성
const channel = await connection.createChannel();
// 큐 생성
const queue1 = 'queueOneName';
await channel.assertQueue(queue1 , { durable: false });
channel.bindQueue(queue1 , exchangeName , ''); //routing key 같은거 여긴 없지
// Consumer 등록
channel.consume(queue1 , (message) => {
if (message !== null) {
console.log('Received message:', message.content.toString());
// 메시지 처리 완료 후 ACK 전송 (ACK 받으면 큐에서 메시지가 빠짐)
// ACK 안빠지면 다시 보냄
channel.ack(message);
}
});
}
'개발 > Etc' 카테고리의 다른 글
git pull 땡길때마다 귀찮은 Git 인증 매번 안 하기 (0) | 2024.01.17 |
---|---|
Early Return 에 대해서 (0) | 2023.03.06 |
깃허브 Public API Respository 공유 (0) | 2022.06.22 |