본문 바로가기

대규모 시스템

SAGA Pattern

SAGA 패턴이란?

  • Saga 패턴은 분산 시스템에서 트랜잭션을 관리하기 위한 디자인 패턴입니다. 특히 마이크로서비스 아키텍처와 같은 분산 환경에서 데이터의 일관성을 유지하고, 복잡한 트랜잭션을 처리하는 데 유용합니다.
  • 주요 개념
    • 트랜잭션의 분할: 전통적인 트랜잭션이 여러 서비스에 걸쳐서 실행될 때, 이를 작은 단위의 트랜잭션으로 나누어 처리합니다. 각 단위 트랜잭션을 "사가(Saga)"라고 합니다.
    • 사후 조치(Compensating Action): 만약 하나의 사가가 실패할 경우, 이미 성공한 사가들을 취소하거나 되돌리기 위해 "보상 조치"를 실행합니다. 이 보상 조치는 실패한 사가의 영향을 상쇄하도록 설계됩니다.
    • 단계적 수행: 사가는 여러 단계로 나누어 실행되며, 각 단계는 독립적으로 성공하거나 실패할 수 있습니다. 이를 통해 긴 트랜잭션을 관리하고, 부분적으로 성공할 수 있도록 합니다.
    • 상태 관리: 각 사가는 상태를 저장하고, 트랜잭션의 진행 상황을 추적합니다. 이 상태를 바탕으로 성공적인 실행과 실패한 사가의 보상 조치를 관리합니다.

 

OrderApplication

  • build.gradle
dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-amqp'
	implementation 'org.springframework.boot:spring-boot-starter-web'
	compileOnly 'org.projectlombok:lombok'
	annotationProcessor 'org.projectlombok:lombok'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
	testImplementation 'org.springframework.amqp:spring-rabbit-test'
	testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

 

  • appication.properties
spring.application.name=order

message.exchange=market
message.queue.product=market.product
message.queue.payment=market.payment

message.err.exchange=market.err
message.queue.err.order=market.err.order
message.queue.err.product=market.err.product

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

 

  • DeliveryMessage.java
더보기
package com.market.order;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.UUID;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class DeliveryMessage {

    private UUID orderId;
    private UUID paymentId;

    private String userId;

    private Integer productId;
    private Integer productQuantity;

    private Integer payAmount;

    private String errorType;
}

 

 

  • Order.java
더보기
package com.market.order;

import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.util.UUID;

@Builder
@Getter @Setter
@ToString
public class Order {
    private UUID orderId;
    private String userId;
    private String orderStatus;
    private String errorType;

    public void cancelOrder(String receiveErrorType) {
        orderStatus = "CANCELLED";
        errorType = receiveErrorType;
    }

}

 

 

  • OrderApplicationQueueConfig.java
더보기
package com.market.order;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class OrderApplicationQueueConfig {

	// 객체를 메세지컨버터를 통해 전달할 것이므로 직렬화/역직렬화 필요해 빈 등록
    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Value("${message.exchange}")
    private String exchange;

    @Value("${message.queue.product}")
    private String queueProduct;

    @Value("${message.queue.payment}")
    private String queuePayment;

    @Value("${message.err.exchange}")
    private String exchangeErr;

    @Value("${message.queue.err.order}")
    private String queueErrOrder;

    @Value("${message.queue.err.product}")
    private String queueErrProduct;

    @Bean
    public TopicExchange exchange() {return new TopicExchange(exchange);}

    @Bean public Queue queueProduct() {return new Queue(queueProduct);}
    @Bean public Queue queuePayment() {return new Queue(queuePayment);}

    @Bean public Binding bindingProduct() {return BindingBuilder.bind(queueProduct()).to(exchange()).with(queueProduct);}
    @Bean public Binding bindingPayment() {return BindingBuilder.bind(queuePayment()).to(exchange()).with(queuePayment);}


    @Bean public TopicExchange exchangeErr() {return new TopicExchange(exchangeErr);}

    @Bean public Queue queueErrOrder() {return new Queue(queueErrOrder);}
    @Bean public Queue queueErrProduct() {return new Queue(queueErrProduct);}

    @Bean public Binding bindingErrOrder(){return BindingBuilder.bind(queueErrOrder()).to(exchangeErr()).with(exchangeErr);}
    @Bean public Binding bindingErrProduct(){return BindingBuilder.bind(queueErrProduct()).to(exchangeErr()).with(exchangeErr);}
}

 

 

  • OrderEndpoint.java
더보기
package com.market.order;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.UUID;

@RestController
@RequiredArgsConstructor
@Slf4j
public class OrderEndpoint {
	
    // 에러를 전달 받을 RabbitListener
    @RabbitListener(queues = "${message.queue.err.order}")
    public void errOrder(DeliveryMessage deliveryMessage) {
        log.error("ERROR RECEIVED !!!");
        orderService.rollbackOrder(deliveryMessage);
    }

    private final OrderService orderService;
    
	// 주문 조회
    @GetMapping("/order/{orderId}")
    public ResponseEntity<Order> getOrder(@PathVariable("orderId") UUID orderId) {
        Order order = orderService.getOrder(orderId);
        return ResponseEntity.ok(order);
    }
	
    // 주문 생성
    @PostMapping("/order")
    public ResponseEntity<Order> order(@RequestBody OrderRequestDto orderRequestDto) {

        Order order = orderService.createOrder(orderRequestDto);
        return ResponseEntity.ok(order);
    }

    @Getter @Setter
    public static class OrderRequestDto {
        private String userId;
        private Integer productId;
        private Integer productQuantity;
        private Integer payAmount;

        public Order toOrder() {
            return Order.builder()
                    .orderId(UUID.randomUUID())
                    .userId(userId)
                    .orderStatus("RECEIPT")
                    .build();
        }

        public DeliveryMessage toDeliveryMessage(UUID orderId) {
            return DeliveryMessage.builder()
                    .userId(userId)
                    .orderId(orderId)
                    .productId(productId)
                    .productQuantity(productQuantity)
                    .payAmount(payAmount)
                    .build();
        }
    }
}

 

  • OrderService.java
더보기
package com.market.order;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@Service
@Slf4j
@RequiredArgsConstructor
public class OrderService {

    private Map<UUID, Order> orderStore = new HashMap<>();
    private final RabbitTemplate rabbitTemplate;

    @Value("${message.queue.product}")
    private String productQueue;

	// 주문 생성 후 exchange를 통해 productQueue로 전달
    public Order createOrder(OrderEndpoint.OrderRequestDto orderRequestDto) {
        Order order = orderRequestDto.toOrder();
        orderStore.put(order.getOrderId(), order);

        DeliveryMessage deliveryMessage = orderRequestDto.toDeliveryMessage(order.getOrderId());
        rabbitTemplate.convertAndSend(productQueue, deliveryMessage);

        return order;
    }

	// 주문 조회
    public Order getOrder(UUID orderId) {
        return orderStore.get(orderId);
    }

	// 주문 errqueue에서 에러를 받으면 주문 취소하는 메서드
    public void rollbackOrder(DeliveryMessage deliveryMessage) {
        Order order = orderStore.get(deliveryMessage.getOrderId());
        order.cancelOrder(deliveryMessage.getErrorType());
    }


}

 

 

 

ProductApplication

  • build.gradle
dependencies {
	// Jackson 의존성
	implementation 'com.fasterxml.jackson.core:jackson-databind'
	implementation 'com.fasterxml.jackson.core:jackson-core'
	implementation 'com.fasterxml.jackson.core:jackson-annotations'

	implementation 'org.springframework.boot:spring-boot-starter-amqp'
	compileOnly 'org.projectlombok:lombok'
	annotationProcessor 'org.projectlombok:lombok'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
	testImplementation 'org.springframework.amqp:spring-rabbit-test'
	testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

 

  • application.properties
spring.application.name=product

message.exchange=market
message.queue.product=market.product
message.queue.payment=market.payment

message.err.exchange=market.err
message.queue.err.order=market.err.order
message.queue.err.product=market.err.product

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

 

  • DeliveryMessage.java : OrderApplication에서 복사하여 사용
  • ProductApplicationQueueConfig.java
더보기
package com.market.product;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ProductApplicationQueueConfig {

    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }


}

 

 

  • ProductEndpoint.java
더보기
package com.market.product;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class ProductEndpoint {

    private final ProductService productService;

	// order 주문 정보를 담은 메세지 productQueue에서 가져오기
    @RabbitListener(queues = "${message.queue.product}")
    public void receiveMessage(DeliveryMessage deliveryMessage) {
        log.info("Product received: {}", deliveryMessage.toString());

        productService.reduceProductAmount(deliveryMessage);
    }
	
    // Payment에서 실패한 정보를 담은 메세지 errProductQueue에서 가져오기
    @RabbitListener(queues = "${message.queue.err.product}")
    public void receiveErrorMessage(DeliveryMessage deliveryMessage) {
        log.info("ERROR RECEIVED !!!");
        productService.rollbackProduct(deliveryMessage);
    }
}

 

 

  • ProductService.java
더보기
package com.market.product;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

@Slf4j
@Service
@RequiredArgsConstructor
public class ProductService {

    @Value("${message.queue.payment}")
    private String paymentQueue;

    @Value("${message.queue.err.order}")
    private String errOrderQueue;

    private final RabbitTemplate rabbitTemplate;

    public void reduceProductAmount(DeliveryMessage deliveryMessage) {
        Integer productId = deliveryMessage.getProductId();
        Integer productQuantity = deliveryMessage.getProductQuantity();
		
        // product에서 에러가 있을 경우 payment 가지 않고 롤백
        if (productId != 1 || productQuantity > 1) {
            this.rollbackProduct(deliveryMessage);
            return;
        }

        rabbitTemplate.convertAndSend(paymentQueue, deliveryMessage);
    }
	
    // product에서 에러가 있을 경우 에러메세지 담아서 errOrderQueue로 전달
    public void rollbackProduct(DeliveryMessage deliveryMessage) {
        log.error("PRODUCT ROLLBACK");
        if (!StringUtils.hasText(deliveryMessage.getErrorType())) {
            deliveryMessage.setErrorType("PRODUCT ERROR");
        }
        rabbitTemplate.convertAndSend(errOrderQueue, deliveryMessage);
    }
}

 

 

 

PaymentApplication

  • build.gradle, application.properties, DeliveryMessage.java : 위 ProductApplication과 동일
  • Payment.java
더보기
package com.market.payment;

import lombok.*;

import java.util.UUID;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Payment {
    private UUID paymentId;
    private String userId;
    private Integer payAmount;
    private String payStatus;
}

 

 

  • PaymentApplicationQueueConfig.java : Product 와 동일
  • PaymentEndpoint.java
더보기
package com.market.payment;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class PaymentEndpoint {

    private final PaymentService paymentService;
	
    // product 에서 큐에 전달한 메세지 받는 리스너
    @RabbitListener(queues = "${message.queue.payment}")
    public void receivePayment(DeliveryMessage deliveryMessage) {
        log.info("Received payment {}", deliveryMessage.toString());

        paymentService.createPayment(deliveryMessage);
    }
}

 

 

  • PaymentService.java
더보기
package com.market.payment;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service
@Slf4j
@RequiredArgsConstructor
public class PaymentService {

    @Value("${message.queue.err.product}")
    private String ProductErrQueue;

    private final RabbitTemplate rabbitTemplate;

    public void createPayment(DeliveryMessage deliveryMessage) {
        Payment payment = Payment.builder()
                .paymentId(UUID.randomUUID())
                .userId(deliveryMessage.getUserId())
                .payAmount(deliveryMessage.getPayAmount())
                .payStatus("SUCCESS")
                .build();


        if (payment.getPayAmount() >= 10000) {
            log.error("Payment amount exceeds limit: {}", payment.getPayAmount());
            deliveryMessage.setErrorType("PAYMENT_LIMIT_EXCEEDS");
            rollbackPayment(deliveryMessage);
        }
    }

    public void rollbackPayment(DeliveryMessage deliveryMessage) {
        log.info("PAYMENT ROLLBACK!!!");
        rabbitTemplate.convertAndSend(ProductErrQueue, deliveryMessage);
    }

}

 

'대규모 시스템' 카테고리의 다른 글

RabbitMQ 라우팅 키로 구분하여 하나의 큐로 메세지 보내기  (0) 2024.09.05
RabbitMQ - Listener 구분하기  (0) 2024.08.14
Kafka 기본 설정  (0) 2024.08.14
Kafka란?  (0) 2024.08.14
RabbitMQ 기본 설정  (0) 2024.08.14