진짜 쉬운 RabbitMQ (AMQP)의 일반적인 흐름 (producer to consumer)

728x90

RabbitMQ는 AMQP 프로토콜을 구현한 메시지 브로커다.

 

메시지가 producer부터 consumer까지 전달되는 흐름을 간단하게 설명하자면

일반적으로 producer 는 exchange 에 메시지를 보낸다.

 

exchange는 라우터의 역할을 하면서 자신과 binding된 queue에 메세지를 보낸다.

 

queue와 exchange를 바인딩 하는건 당연히 메시지 보내기 전에 해둬야 한다.

 

이 때 exchange는 fanout 이냐 direct냐 아님 topic 이냐 같은 exchange 생성 당시 속성에 따라서 큐에 어떻게 메시지를 보낼지 결정한다.

 

아래 그림을 보면 한번에 이해가 될텐데, direct 나 topic은 큐를 binding 할때 아님 메시지를 보낼때

 

바인딩된 모든 큐가 아니라 바인딩 된 큐중에서도 자기가 보내고 싶은 queue 를 지정할 수 있지만

 

fanout은 그런거 없고 그냥 바인딩된 모든 큐에 전달 받은 메시지를 다 뿌려버리는 것이다.

Consumer는 그냥 queue를 구독하고 있으면 된다. 그럼 얘가 메시지를 queue에 가서 빼온다.

 

그래도 이해가 안간다?  그럴때는 코드로 보면 이해가 쉽다. 위에 한번 더 보고 밑에 코드를 보자.

pseudocode로 설명하면 다음과 같다.

 

먼저 송신 (producer) 부분이다

< AMQP의 일반적인 흐름 - 송신>

amqp.connect('amqp://localhost').then(async (connection) => {
  // 채널 생성
  const ch = await connection.createChannel();

  //커넥션과 채널 만들기
  //익스체인지 정의 및 생성
  const exchangeName = 'fanout_exchange_0001'; 
  ch.assertExchange(exchangeName, type을 정해야 하는데 여기선 'fanout') 

  //Exchange를 생성함 타입을 정의해서 만들어 줌
  //type은 direct,fanout,topic등 필요에 따라 정의한다

  const message = 'Hello World!';

  // exchange를 통해 메시지를 전송
  ch.publish(exchangeName, '', Buffer.from(message));
  //두번째 파라미터는 라우팅 키, fanout 이기때문에 그냥 비워둠. 아무곳에 다 보내니까
  ...
  connection.close();
}

 

이제 메시지를 수신하는 (cosumer) 부분

< AMQP의 일반적인 흐름 - 메시지 수신>

const exchangeName = 'fanout_exchange_0001'; 

async function receiveMessages() {
  // AMQP 서버에 연결
  const connection = await amqp.connect('amqp://localhost');
	
  // 채널 생성
  const channel = await connection.createChannel();

  // 큐 생성
  const queue1 = 'queueOneName';
  await channel.assertQueue(queue1 , { durable: false });

  channel.bindQueue(queue1 , exchangeName , ''); //routing key 같은거 여긴 없지

  // Consumer 등록
  channel.consume(queue1 , (message) => {
    if (message !== null) {
      console.log('Received message:', message.content.toString());
      // 메시지 처리 완료 후 ACK 전송 (ACK 받으면 큐에서 메시지가 빠짐)
      // ACK 안빠지면 다시 보냄
      channel.ack(message);
    }
  });
}