포슀트

RabbitMQ - RabbitMQ에 λŒ€ν•˜μ—¬

πŸ“ RabbitMQ - RabbitMQ 에 κ΄€ν•˜μ—¬

Redis λ₯Ό κ³΅λΆ€ν•˜λ©΄μ„œ Redis 의 Mesaage Queuing 을 κ³΅λΆ€ν•˜λ‹€κ°€ Kafka 와 RabbitMQ 에 λŒ€ν•΄ μ•Œκ²Œ λ˜μ—ˆλ‹€. λ‚΄κ°€ μΌν•˜κ³  μžˆλŠ” νšŒμ‚¬μ—μ„œ Kafka λ₯Ό μ‚¬μš©ν•˜κΈ°μ—λŠ” μ‹œμŠ€ν…œ λŒ€λΉ„ Learning Curve 및 데이터 μ‚¬μš©λŸ‰μ΄ 적기도 ν•˜κ³  RabbitMQ κ°€ μ‘°κΈˆλ” μ ν•©ν•΄λ³΄μ—¬μ„œ 정리해보렀고 ν•œλ‹€.




🚩 RabbitMQ λž€?

AMQPλ₯Ό κ΅¬ν˜„ν•˜μ—¬ μ„œλ²„κ°„ 메세지(데이터)λ₯Ό μ „λ‹¬ν•΄μ£ΌλŠ” μ˜€ν”ˆ μ†ŒμŠ€ λ©”μ‹œμ§€ 브둜컀 μ†Œν”„νŠΈμ›¨μ–΄ ( λ©”μ‹œμ§€ 지ν–₯ 미듀웨어 )


πŸ“ AMQP ( Advanced Message Queuing Protocol) λž€?

λ©”μ‹œμ§€ 지ν–₯ 미듀웨어λ₯Ό μœ„ν•œ κ°œλ°©ν˜• ν‘œμ€€ μ‘μš© 계측 ν”„λ‘œν† μ½œ


🚩 RabbitMQ ꡬ성

πŸ“ 1. Message

  • μ„œλ²„κ°„ μ²˜λ¦¬ν•˜κ³ μž ν•˜λŠ” 메세지(데이터)

πŸ“ 2. Producer

  • Message λ₯Ό μ†‘μ‹ ν•˜λŠ” 주체
  • Message λ₯Ό Consumer μ—κ²Œ μ „λ‹¬ν•˜κΈ° μœ„ν•΄ Message λ₯Ό Exchange 에 λ°œν–‰ν•œλ‹€.
  • Queue 에 직접 μ ‘κ·Όν•˜μ§€ μ•Šκ³ , 항상 Exchange λ₯Ό 톡해 μ ‘κ·Όν•œλ‹€.

πŸ“ 3. Exchange

  • Producer μ—κ²Œ 전달받은 메세지(데이터)λ₯Ό Queue μ—κ²Œ μ „λ‹¬ν•΄μ£ΌλŠ” Router μ—­ν• 
  • Exchange λŠ” Message λ₯Ό μ–΄λ–€ Queue 에 좔가할지, 버렀야할지 λ“± κ·œμΉ™μ— μ˜ν•΄ κ²°μ •ν•œλ‹€.
  • Exchange 의 κ·œμΉ™ : Fanout, Direct, Topic, Headers

πŸ“ 4. Bindings

  • Exchange 와 Queue 의 관계
  • Binding 과정이 μžˆμ–΄μ•Ό Exchange μ—μ„œ Queue 둜 메세지(데이터)κ°€ μ΄λ™ν•œλ‹€.
  • ν†΅μƒμ μœΌλ‘œ Exchange κ°€ μ–΄λŠ Queue 에 Binding 할지 μ •μ˜ν•΄μ•Όν•œλ‹€.

πŸ“ 5. Queues

  • Consumer μ—κ²Œ 메세지(데이터)λ₯Ό μ „λ‹¬ν•˜λŠ” μ—­ν• 
  • In-Memory or Disk 에 Message λ₯Ό 보관
  • Queue λŠ” μ΄λ¦„μœΌλ‘œ κ΅¬λΆ„λœλ‹€. β€» 같은 이름과 같은 μ„€μ •μœΌλ‘œ Queue λ₯Ό μƒμ„±ν•˜λ©΄ μ—λŸ¬ 없이 κΈ°μ‘΄ Queue 에 μ—°κ²°λ˜μ§€λ§Œ, 같은 이름과 λ‹€λ₯Έ μ„€μ •μœΌλ‘œ Queue λ₯Ό μƒμ„±ν•˜λ €κ³  μ‹œλ„ν•˜λ©΄ μ—λŸ¬κ°€ λ°œμƒν•˜λ‹ˆ μ£Όμ˜ν•΄μ•Ό ν•œλ‹€.

πŸ“ 6. Consumer

  • Message λ₯Ό μˆ˜μ‹ ν•˜λŠ” 주체
  • Queue 에 직접 μ ‘κ·Όν•˜μ—¬ 메세지(데이터)λ₯Ό κ°€μ Έμ˜¨λ‹€.


🚩 Exchange μ’…λ₯˜

μ’…λ₯˜μ„€λͺ…νŠΉμ§•
DirectRouting Key κ°€ μ •ν™•νžˆ μΌμΉ˜ν•˜λŠ” Queue μ—κ²Œ Message 전솑Unicast / Multicast
TopicRouting Key 의 νŒ¨ν„΄μ΄ μΌμΉ˜ν•˜λŠ” Queue μ—κ²Œ Message 전솑Multicast
Headers( Key:Value ) 둜 이루어진 Header κ°’ 을
κΈ°μ€€μœΌλ‘œ μΌμΉ˜ν•˜λŠ” Queue μ—κ²Œ Message 전솑
Multicast
Fanoutν•΄λ‹Ή Exchange 에 등둝 된 λͺ¨λ“  Queue μ—κ²Œ Message 전솑Broadcast

πŸ“ 1. Direct ( Unicast / 1:1 or 1:N )

"Exchange Type = Direct Example 01"

"Exchange Type = Direct Example 02"
  • RabbitMQ μ—μ„œ μ‚¬μš©λ˜λŠ” Exchange 의 Default Option 이닀.
  • RabbitMq μ—μ„œ μƒμ„±λ˜λŠ” Queue κ°€ μžλ™μœΌλ‘œ Binding 되고, Queue 의 이름이 routing key 둜 μ§€μ •λœλ‹€.
  • ν•˜λ‚˜μ˜ Queue 에 μ—¬λŸ¬κ°œμ˜ routing key λ₯Ό 지정할 수 μžˆλ‹€. ( Ex. Exchange Type = Direct Example 01 )
  • μ—¬λŸ¬κ°œμ˜ Queue 에 λ™μΌν•œ routing key λ₯Ό 지정할 수 μžˆλ‹€. ( Ex. Exchange Type = Direct Example 02 )

πŸ“ 2. Topic ( Multicast / 1:N )

"Exchange Type = Topic Example"
  • routing key 의 νŒ¨ν„΄μ΄ μΌμΉ˜ν•˜λŠ” Queue 에 메세지(데이터)λ₯Ό μ „λ‹¬ν•œλ‹€.
  • Direct 보닀 μ’€ 더 μœ μ—°ν•˜κ²Œ μ •μ˜ν•΄μ„œ 메세지(데이터)λ₯Ό 전솑할 수 μžˆλ‹€.
  • κ·œμΉ™
    • * : ν•˜λ‚˜μ˜ 단어λ₯Ό μ˜λ―Έν•œλ‹€.
    • # : 0개 μ΄μƒμ˜ 단어λ₯Ό μ˜λ―Έν•œλ‹€.
  • μ˜ˆμ‹œ
    • routing Key : quick.orange.male.rabbit
      • 메세지(데이터)λ₯Ό λ°›λŠ” Queue κ°€ μ—†λ‹€.
    • routing Key : quick.orange.fox
      • Q1만 메세지(데이터)λ₯Ό λ°›λŠ”λ‹€.
    • routing Key : lazy.orange.male.rabbit
      • Q2만 메세지(데이터)λ₯Ό λ°›λŠ”λ‹€.

πŸ“ 3. Headers ( Multicast / 1:N )

"Exchange Type = Headers Example"
  • key-value 둜 μ •μ˜λœ 헀더에 μ˜ν•΄ 메세지λ₯Ό Queue 에 μ „λ‹¬ν•˜λŠ” 방법.
  • Producer κ°€ μ •μ˜ν•˜λŠ” Header 의 key-value 와 Consumer κ°€ μ •μ˜ν•œ argument 의 key-valueκ°€ μΌμΉ˜ν•˜λ©΄ Binding λœλ‹€.
  • x-match : all

    • Producer κ°€ μ •μ˜ν•˜λŠ” Header 의 key-value 와 Consumer κ°€ μ •μ˜ν•œ argument 의 key-value κ°€ μ •ν™•νžˆ μΌμΉ˜ν•΄μ•Ό Binding λœλ‹€.
  • x-match : any

    • Producer κ°€ μ •μ˜ν•˜λŠ” Header 의 key-value 와 Consumer κ°€ μ •μ˜ν•œ argument 의 key-value 쀑 ν•˜λ‚˜λΌλ„ μΌμΉ˜ν•˜λ©΄ Binding λœλ‹€.

πŸ“ 4. Fanout ( Broadcast / 1:ALL )

"Exchange Type = Fanout Example"
  • 사진과 같이 Exchange μ—μ„œ 메세지(데이터)λ₯Ό κ±°λ₯΄μ§€ μ•Šκ³  Binding λ˜μ–΄ μžˆλŠ” λͺ¨λ“  Queue 에 메세지(데이터)λ₯Ό μ „μ†‘ν•œλ‹€.


🚩 RabbitMQ & Spring Boot 연동

πŸ”¨ 1. Pom.xml

1
2
3
4
5
<!--        RabbitMQ Dependency-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

πŸ”¨ 2. application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
  rabbitmq:
    host: localhost // RabbitMQ host ip
    port: 5672 // RabbitMQ port
    username: guest // RabbitMQ μ›Ή 관리 μ½˜μ†” 아이디
    password: guest // RabbitMQ μ›Ή 관리 μ½˜μ†” λΉ„λ°€λ²ˆν˜Έ
    queue:
      name: sample-queue // μ‚¬μš©ν•  queue 이름
    exchange:
      name: sample-exchange // μ‚¬μš©ν•  exchange 이름
    routing:
      key: key

πŸ”¨ 3. RabbitMqConfig Class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@RequiredArgsConstructor
@Configuration
public class RabbitMQConfig {
  @Va

  @Value("${spring.rabbitmq.queue.name}")
  private String queueName;

  @Value("${spring.rabbitmq.exchange.name}")
  private String exchangeName;

  @Value("${spring.rabbitmq.routing.key}")
  private String routingKey;

  @Value("${spring.rabbitmq.host}")
  private String host;

  @Value("${spring.rabbitmq.port}")
  private int port;

  @Value("${spring.rabbitmq.username}")
  private String userName;

  @Value("${spring.rabbitmq.password}")
  private String password;

  // org.springframework.amqp.core.Queue
  @Bean
  public Queue queue() {
    return new Queue(queueName);
  }

  /**
   * μ§€μ •λœ Exchange μ΄λ¦„μœΌλ‘œ Direct Exchange Bean 을 생성
   */
  @Bean
  public DirectExchange directExchange() {
    return new DirectExchange(exchangeName);
  }

  /**
   * 주어진 Queue 와 Exchange 을 Binding ν•˜κ³  Routing Key 을 μ΄μš©ν•˜μ—¬ Binding Bean 생성
   * Exchange 에 Queue 을 등둝
   **/
  @Bean
  public Binding binding(Queue queue, DirectExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(routingKey);
  }

  /**
   * RabbitMQ 연동을 μœ„ν•œ ConnectionFactory λΉˆμ„ μƒμ„±ν•˜μ—¬ λ°˜ν™˜
   **/
  @Bean
  public CachingConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost(this.host);
    connectionFactory.setPort(this.port);
    connectionFactory.setUsername(this.userName);
    connectionFactory.setPassword(this.password);
    return connectionFactory;
  }

  /**
   * RabbitTemplate
   * ConnectionFactory 둜 μ—°κ²° ν›„ μ‹€μ œ μž‘μ—…μ„ μœ„ν•œ Template
   */
  @Bean
  public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
    return rabbitTemplate;
  }

  /**
   * 직렬화(메세지λ₯Ό JSON 으둜 λ³€ν™˜ν•˜λŠ” Message Converter)
   */
  @Bean
  public MessageConverter jackson2JsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
  }
}

πŸ”¨ 4. RabbitMqService Class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@RequiredArgsConstructor
@Service
public class RabbitMqService {

  @Value("${spring.rabbitmq.queue.name}")
  private String queueName;

  @Value("${spring.rabbitmq.exchange.name}")
  private String exchangeName;

  @Value("${spring.rabbitmq.routing.key}")
  private String routingKey;

  private final RabbitTemplate rabbitTemplate;

  /**
   * 1. Queue 둜 메세지λ₯Ό λ°œν–‰
   * 2. Producer μ—­ν•  -> Direct Exchange μ „λž΅
   **/
  public void sendMessage(MessageDto messageDto) {
    log.info("messagge send: {}", messageDto.toString());
    this.rabbitTemplate.convertAndSend(exchangeName, routingKey, messageDto);
  }

  /**
   * 1. Queue μ—μ„œ 메세지λ₯Ό ꡬ독
   **/
  @RabbitListener(queues = "${rabbitmq.queue.name}")
  public void receiveMessage(MessageDto messageDto) {
    log.info("Received Message : {}", messageDto.toString());
  }
}

πŸ”¨ 5. RabbitMqService Class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RequiredArgsConstructor
@RestController
public class RabbitMqController {
  private final RabbitMqService rabbitMqService;

  @PostMapping("/send/message")
  public ResponseEntity<String> sendMessage(
    @RequestBody MessageDto messageDto
  ) {
    this.rabbitMqService.sendMessage(messageDto);
    return ResponseEntity.ok("Message sent to RabbitMQ");
  }
}
이 κΈ°μ‚¬λŠ” μ €μž‘κΆŒμžμ˜ CC BY 4.0 λΌμ΄μ„ΌμŠ€λ₯Ό λ”°λ¦…λ‹ˆλ‹€.