SAGA 패턴이란?
- Saga 패턴은 분산 시스템에서 트랜잭션을 관리하기 위한 디자인 패턴입니다. 특히 마이크로서비스 아키텍처와 같은 분산 환경에서 데이터의 일관성을 유지하고, 복잡한 트랜잭션을 처리하는 데 유용합니다.
- 주요 개념
- 트랜잭션의 분할: 전통적인 트랜잭션이 여러 서비스에 걸쳐서 실행될 때, 이를 작은 단위의 트랜잭션으로 나누어 처리합니다. 각 단위 트랜잭션을 "사가(Saga)"라고 합니다.
- 사후 조치(Compensating Action): 만약 하나의 사가가 실패할 경우, 이미 성공한 사가들을 취소하거나 되돌리기 위해 "보상 조치"를 실행합니다. 이 보상 조치는 실패한 사가의 영향을 상쇄하도록 설계됩니다.
- 단계적 수행: 사가는 여러 단계로 나누어 실행되며, 각 단계는 독립적으로 성공하거나 실패할 수 있습니다. 이를 통해 긴 트랜잭션을 관리하고, 부분적으로 성공할 수 있도록 합니다.
- 상태 관리: 각 사가는 상태를 저장하고, 트랜잭션의 진행 상황을 추적합니다. 이 상태를 바탕으로 성공적인 실행과 실패한 사가의 보상 조치를 관리합니다.
OrderApplication
- build.gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.boot:spring-boot-starter-web'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
- appication.properties
spring.application.name=order
message.exchange=market
message.queue.product=market.product
message.queue.payment=market.payment
message.err.exchange=market.err
message.queue.err.order=market.err.order
message.queue.err.product=market.err.product
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
- DeliveryMessage.java
더보기
package com.market.order;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.UUID;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class DeliveryMessage {
private UUID orderId;
private UUID paymentId;
private String userId;
private Integer productId;
private Integer productQuantity;
private Integer payAmount;
private String errorType;
}
- Order.java
더보기
package com.market.order;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import java.util.UUID;
@Builder
@Getter @Setter
@ToString
public class Order {
private UUID orderId;
private String userId;
private String orderStatus;
private String errorType;
public void cancelOrder(String receiveErrorType) {
orderStatus = "CANCELLED";
errorType = receiveErrorType;
}
}
- OrderApplicationQueueConfig.java
더보기
package com.market.order;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class OrderApplicationQueueConfig {
// 객체를 메세지컨버터를 통해 전달할 것이므로 직렬화/역직렬화 필요해 빈 등록
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Value("${message.exchange}")
private String exchange;
@Value("${message.queue.product}")
private String queueProduct;
@Value("${message.queue.payment}")
private String queuePayment;
@Value("${message.err.exchange}")
private String exchangeErr;
@Value("${message.queue.err.order}")
private String queueErrOrder;
@Value("${message.queue.err.product}")
private String queueErrProduct;
@Bean
public TopicExchange exchange() {return new TopicExchange(exchange);}
@Bean public Queue queueProduct() {return new Queue(queueProduct);}
@Bean public Queue queuePayment() {return new Queue(queuePayment);}
@Bean public Binding bindingProduct() {return BindingBuilder.bind(queueProduct()).to(exchange()).with(queueProduct);}
@Bean public Binding bindingPayment() {return BindingBuilder.bind(queuePayment()).to(exchange()).with(queuePayment);}
@Bean public TopicExchange exchangeErr() {return new TopicExchange(exchangeErr);}
@Bean public Queue queueErrOrder() {return new Queue(queueErrOrder);}
@Bean public Queue queueErrProduct() {return new Queue(queueErrProduct);}
@Bean public Binding bindingErrOrder(){return BindingBuilder.bind(queueErrOrder()).to(exchangeErr()).with(exchangeErr);}
@Bean public Binding bindingErrProduct(){return BindingBuilder.bind(queueErrProduct()).to(exchangeErr()).with(exchangeErr);}
}
- OrderEndpoint.java
더보기
package com.market.order;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.UUID;
@RestController
@RequiredArgsConstructor
@Slf4j
public class OrderEndpoint {
// 에러를 전달 받을 RabbitListener
@RabbitListener(queues = "${message.queue.err.order}")
public void errOrder(DeliveryMessage deliveryMessage) {
log.error("ERROR RECEIVED !!!");
orderService.rollbackOrder(deliveryMessage);
}
private final OrderService orderService;
// 주문 조회
@GetMapping("/order/{orderId}")
public ResponseEntity<Order> getOrder(@PathVariable("orderId") UUID orderId) {
Order order = orderService.getOrder(orderId);
return ResponseEntity.ok(order);
}
// 주문 생성
@PostMapping("/order")
public ResponseEntity<Order> order(@RequestBody OrderRequestDto orderRequestDto) {
Order order = orderService.createOrder(orderRequestDto);
return ResponseEntity.ok(order);
}
@Getter @Setter
public static class OrderRequestDto {
private String userId;
private Integer productId;
private Integer productQuantity;
private Integer payAmount;
public Order toOrder() {
return Order.builder()
.orderId(UUID.randomUUID())
.userId(userId)
.orderStatus("RECEIPT")
.build();
}
public DeliveryMessage toDeliveryMessage(UUID orderId) {
return DeliveryMessage.builder()
.userId(userId)
.orderId(orderId)
.productId(productId)
.productQuantity(productQuantity)
.payAmount(payAmount)
.build();
}
}
}
- OrderService.java
더보기
package com.market.order;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderService {
private Map<UUID, Order> orderStore = new HashMap<>();
private final RabbitTemplate rabbitTemplate;
@Value("${message.queue.product}")
private String productQueue;
// 주문 생성 후 exchange를 통해 productQueue로 전달
public Order createOrder(OrderEndpoint.OrderRequestDto orderRequestDto) {
Order order = orderRequestDto.toOrder();
orderStore.put(order.getOrderId(), order);
DeliveryMessage deliveryMessage = orderRequestDto.toDeliveryMessage(order.getOrderId());
rabbitTemplate.convertAndSend(productQueue, deliveryMessage);
return order;
}
// 주문 조회
public Order getOrder(UUID orderId) {
return orderStore.get(orderId);
}
// 주문 errqueue에서 에러를 받으면 주문 취소하는 메서드
public void rollbackOrder(DeliveryMessage deliveryMessage) {
Order order = orderStore.get(deliveryMessage.getOrderId());
order.cancelOrder(deliveryMessage.getErrorType());
}
}
ProductApplication
- build.gradle
dependencies {
// Jackson 의존성
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-annotations'
implementation 'org.springframework.boot:spring-boot-starter-amqp'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
- application.properties
spring.application.name=product
message.exchange=market
message.queue.product=market.product
message.queue.payment=market.payment
message.err.exchange=market.err
message.queue.err.order=market.err.order
message.queue.err.product=market.err.product
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
- DeliveryMessage.java : OrderApplication에서 복사하여 사용
- ProductApplicationQueueConfig.java
더보기
package com.market.product;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ProductApplicationQueueConfig {
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
- ProductEndpoint.java
더보기
package com.market.product;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class ProductEndpoint {
private final ProductService productService;
// order 주문 정보를 담은 메세지 productQueue에서 가져오기
@RabbitListener(queues = "${message.queue.product}")
public void receiveMessage(DeliveryMessage deliveryMessage) {
log.info("Product received: {}", deliveryMessage.toString());
productService.reduceProductAmount(deliveryMessage);
}
// Payment에서 실패한 정보를 담은 메세지 errProductQueue에서 가져오기
@RabbitListener(queues = "${message.queue.err.product}")
public void receiveErrorMessage(DeliveryMessage deliveryMessage) {
log.info("ERROR RECEIVED !!!");
productService.rollbackProduct(deliveryMessage);
}
}
- ProductService.java
더보기
package com.market.product;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
@Slf4j
@Service
@RequiredArgsConstructor
public class ProductService {
@Value("${message.queue.payment}")
private String paymentQueue;
@Value("${message.queue.err.order}")
private String errOrderQueue;
private final RabbitTemplate rabbitTemplate;
public void reduceProductAmount(DeliveryMessage deliveryMessage) {
Integer productId = deliveryMessage.getProductId();
Integer productQuantity = deliveryMessage.getProductQuantity();
// product에서 에러가 있을 경우 payment 가지 않고 롤백
if (productId != 1 || productQuantity > 1) {
this.rollbackProduct(deliveryMessage);
return;
}
rabbitTemplate.convertAndSend(paymentQueue, deliveryMessage);
}
// product에서 에러가 있을 경우 에러메세지 담아서 errOrderQueue로 전달
public void rollbackProduct(DeliveryMessage deliveryMessage) {
log.error("PRODUCT ROLLBACK");
if (!StringUtils.hasText(deliveryMessage.getErrorType())) {
deliveryMessage.setErrorType("PRODUCT ERROR");
}
rabbitTemplate.convertAndSend(errOrderQueue, deliveryMessage);
}
}
PaymentApplication
- build.gradle, application.properties, DeliveryMessage.java : 위 ProductApplication과 동일
- Payment.java
더보기
package com.market.payment;
import lombok.*;
import java.util.UUID;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Payment {
private UUID paymentId;
private String userId;
private Integer payAmount;
private String payStatus;
}
- PaymentApplicationQueueConfig.java : Product 와 동일
- PaymentEndpoint.java
더보기
package com.market.payment;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class PaymentEndpoint {
private final PaymentService paymentService;
// product 에서 큐에 전달한 메세지 받는 리스너
@RabbitListener(queues = "${message.queue.payment}")
public void receivePayment(DeliveryMessage deliveryMessage) {
log.info("Received payment {}", deliveryMessage.toString());
paymentService.createPayment(deliveryMessage);
}
}
- PaymentService.java
더보기
package com.market.payment;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
@Slf4j
@RequiredArgsConstructor
public class PaymentService {
@Value("${message.queue.err.product}")
private String ProductErrQueue;
private final RabbitTemplate rabbitTemplate;
public void createPayment(DeliveryMessage deliveryMessage) {
Payment payment = Payment.builder()
.paymentId(UUID.randomUUID())
.userId(deliveryMessage.getUserId())
.payAmount(deliveryMessage.getPayAmount())
.payStatus("SUCCESS")
.build();
if (payment.getPayAmount() >= 10000) {
log.error("Payment amount exceeds limit: {}", payment.getPayAmount());
deliveryMessage.setErrorType("PAYMENT_LIMIT_EXCEEDS");
rollbackPayment(deliveryMessage);
}
}
public void rollbackPayment(DeliveryMessage deliveryMessage) {
log.info("PAYMENT ROLLBACK!!!");
rabbitTemplate.convertAndSend(ProductErrQueue, deliveryMessage);
}
}
'대규모 시스템' 카테고리의 다른 글
RabbitMQ 라우팅 키로 구분하여 하나의 큐로 메세지 보내기 (0) | 2024.09.05 |
---|---|
RabbitMQ - Listener 구분하기 (0) | 2024.08.14 |
Kafka 기본 설정 (0) | 2024.08.14 |
Kafka란? (0) | 2024.08.14 |
RabbitMQ 기본 설정 (0) | 2024.08.14 |