Programming/Redis

DB 대용량 데이터 처리, 동시성 이슈 해결 방법 (Redis)

잇(IT) 2023. 12. 19. 16:02
Synchronized 사용
Synchronized의 문제점
DB를 활용한 데이터 정합성 맞추기
Pessimistic Lock
Optimistic Lock
Named Lock
Lettuce
Redisson

 

간단한 로직을 통해 재고 감소 코드를 아래와 같이 작성한다고 가정한다.

 

- StockService.java

@Service
public class StockService {

    private final StockRepositroy stockRepositroy;

    public StockService(StockRepositroy stockRepositroy) {
        this.stockRepositroy = stockRepositroy;
    }

	@Transactional
    public  void decrease(Long id, Long quantity) {
        
        // Stock 조회
        // 재고를 감소시킨 뒤
        // 갱신된 값을 저장

        Stock stock = stockRepositroy.findById(id).orElseThrow();
        stock.decrease(quantity);

        stockRepositroy.saveAndFlush(stock);
    }
}

 

위 코드는 재고 id와 quantity 수량을 파라미터로 전달 받아, DB의 Stock 엔티티의 quantity 수량을 변경 시킨다.

 

- StockServiceTest.java

@SpringBootTest
class StockServiceTest {

    @Autowired
	private StockService stockService;

    @Autowired
    private StockRepositroy stockRepositroy;

    @BeforeEach
    public void before() {
        stockRepositroy.saveAndFlush(new Stock(1L, 100L));
    }

    @AfterEach
    public void after() {
        stockRepositroy.deleteAll();
    }

    @Test
    public void 동시에_100개의_요청() throws InterruptedException {
        int threadCount = 100;
        ExecutorService executorService = Executors.newFixedThreadPool(32);
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    stockService.decrease(1L, 1L);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();

        Stock stock = stockRepositroy.findById(1L).orElseThrow();
        // 100 - ( 1 * 100) =0
        assertEquals(0, stock.getQuantity());
    }
}

 

Multi Thread를 이용하여 동시에 여러 요청을 보내는 테스트 코드를 작성한다.

위 테스트 코드는 레이스 컨디션이 발생하여 둘 이상의 Thread가 공유 데이터에 액세스 할 수 있고 동시에 변경을 하려고 하기 때문에 오류가 발생한다.

 

위 표와 같이 각 Thread가 실행되면서 하나의 데이터를 공유하게 되고, 재고 감소 로직은 2번 실행되었지만 위와 같이 결과는 한 번 실행된 결과를 얻게 된다.

 

- Synchronized를 이용한 해결

 

- StockService.java

@Service
public class StockService {

    private final StockRepositroy stockRepositroy;

    public StockService(StockRepositroy stockRepositroy) {
        this.stockRepositroy = stockRepositroy;
    }

	@Transactional
    public synchronized void decrease(Long id, Long quantity) {

        // Stock 조회
        // 재고를 감소시킨 뒤
        // 갱신된 값을 저장

        Stock stock = stockRepositroy.findById(id).orElseThrow();
        stock.decrease(quantity);

        stockRepositroy.saveAndFlush(stock);
    }
}

 

synchronized를 사용하면 해당 메소드는 한개의 Thread만 접근이 가능해진다. 메서드 명 앞에 synchronized를 붙이면 된다.

하지만 위의 경우 Spring의 트랜잭션에 의해서 여전히 오류가 발생한다. 스프링은 @Transactional을 이용하게 되면, 해당 클래스를 매핑한 새로운 Transaction 클래스를 새로 만들어서 실행한다.

public class TransactionalStockService {

	private StockService stockService;
    
    public TransactionStockService(StockService stockService) {
    	this.stockService = stockService;
        }
       
    public void decrease(Long id, Long quantity) {
    	startTransaction();
        
        stockService.decrease(id, quantity);
        
        endTransaction();
        }

 

위와 같이 Transaction을 매핑한 클래스가 생성되고, startTransaction();을 통해 트랜잭션이 시작하고, stockService.decrease() 메서드를 호출하고, endTransaction(); 메소드 실행이 종료가 되면 트랜잭션을 종료하게 된다.

트랜잭션은 종료 시점에 데이터베이스에 업데이트를 하게 된다.

하지만 멀티 쓰레드를 사용하게 되면, decrease() 메서드가 완료되고, 데이터베이스가 업데이트 되기 전에 다른 Thread가 decrease() 메서드를 호출 할 수 있다.

결국 업데이트 되기전에 decrease() 메서드가 여러번 호출되면 이전 문제가 동일하게 발생된다.

@Service
public class StockService {

    private final StockRepositroy stockRepositroy;

    public StockService(StockRepositroy stockRepositroy) {
        this.stockRepositroy = stockRepositroy;
    }

    public synchronized void decrease(Long id, Long quantity) {

        // Stock 조회
        // 재고를 감소시킨 뒤
        // 갱신된 값을 저장

        Stock stock = stockRepositroy.findById(id).orElseThrow();
        stock.decrease(quantity);

        stockRepositroy.saveAndFlush(stock);
    }
}

 

위 문제는 @Transactional 어노테이션을 빼면 우선 해결된다.

 

- Synchronized의 문제점

 

Synchronized는 하나의 프로세스 안에서만 보장이 된다.

 

위와 같이 2대 이상일 경우 데이터 접근을 여러대에서 동시에 할 수 있게 된다.

 

위 표와 같이 20:00에 server - 1에서 재고 감소 로직을 실행하고, 20:05분에 종료한다고 가정했을 때, 20:00~20:05 사이에 server - 2에서 갱신되지 않은 데이터를 가져다가 새로운 값을 갱신할 수 있다.

Synchronized는 하나의 프로세스에서만 동작하기 때문에 결국 2대 이상의 서버를 사용하게 될 경우 여러 스레드에서 동시에 데이터에 접근하여 레이스 컨디션이 발생할 수 있다.

때문에 한대 이상의 서버를 사용할 경우 Synchronized는 잘 사용하지 않는다.

 

- DB를 활용한 데이터 정합성 맞추기

 

1. Pessimistic Lock

   1.1. 실제로 데이터에 lock을 걸어서 정합성을 맞추는 방법이다. exclusive lock을 걸게되며 다른 트랜잭션에서는 lock이 해제되기전에 데이터를 가져갈 수 없게 된다.

   1.2. 데드락이 걸릴 수 있기 때문에 주의하여 사용해야 한다.

 

서버가 여러대 있을 때 특정 서버가 락을 걸고 데이터를 가져가게 되면 다른 서버는 락을 가져간 서버가 락을 해제하기 전까지는 데이터를 가져갈 수 없게 된다.

데이터에는 락을 가진 스레드만 접근이 가능해지기 때문에 문제를 해결할 수 있게 된다.

 

2. Optimistic Lock

   2.1. 실제로 Lock을 이용하지 않고 버전을 이용함으로써 정합성을 맞추는 방법이다. 먼저 데이터를 읽은 후에 update를 수행할 때 현재 내가 읽은 버전이 맞는지 확인하며 업데이트 한다. 내가 읽은 버전에서 수정사항이 생겼을 경우에는 application에서 다시 읽은 후에 작업을 수행해야 한다.

 

2대의 서버가 DB로부터 특정 버전의 role을 가져온다.

1. Server 1이 where version=1의 update 쿼리를 날리게 되면 해당 버전을 +1 하여 저장하게 되고, 해당 role의 버전은 +1이 되어 DB에 저장된다. 

Server 2가 동일하게 where version=1 update 쿼리를 날리게 되면 DB는 현재 Server 1에 의해 version=2가 되었기 때문에 update 쿼리가 동작하지 않게 된다.

Server 2에서 실패했기 때문에 다시 DB를 읽은 후에 작업을 수행하는 로직을 작성해 줘야 한다.

 

3. Named Lock

   3.1. 이름을 가진 metadata locking이다. 이름을 가진 lock을 획득한 후 해제할때까지 다른 세션은 이 lock을 획득할 수 없도록 한다.

   3.2. 주의할 점으로는 transaction이 종료될 때 lock이 자동으로 해제되지 않는다. 별도의 명령어로 해제를 수행해주거나 선점시간이 끝나야 해제된다.

 

Pessimistic Lock과 유사해 보이지만 Named Lock은 row나 table 단위가 아니라 메타데이터의 Locking을 하는 방법이다.

 

- Pessimistic Lock

 

실제로 데이터에 락을 걸어서 정합성을 맞추는 방법이다.

exclusive lock을 걸게 되면 다른 트랜잭션에서는 락이 해제되기 전에 락을 걸고 데이터를 가져갈 수 없게 된다.

 

- StockRepository.interface

public interface StockRepositroy extends JpaRepository<Stock, Long> {

    @Lock(LockModeType.PESSIMISTIC_WRITE)
    @Query("select s from Stock s where s.id = :id")
    Stock findByIdWithPessimisticLock(Long id);

 

Spring Data JPA에서는 @Lock 어노테이션을 통해 Pessimistic Lock을 구현할 수 있다.

 

- PessimisticLockStockService.java

@Service
public class PessimisticLockStockService {

    private final StockRepositroy stockRepositroy;

    public PessimisticLockStockService(StockRepositroy stockRepositroy) {
        this.stockRepositroy = stockRepositroy;
    }

    @Transactional
    public void decrease(Long id, Long quantity) {
        Stock stock = stockRepositroy.findByIdWithPessimisticLock(id);

        stock.decrease(quantity);

        stockRepositroy.save(stock);
    }
}

 

PessimisticLock을 사용한 메서드를 통해 데이터를 가져온 뒤 decrease() 메서드를 실행하고, 저장하는 코드를 작성한다.

 

- StockServiceTest.java

@SpringBootTest
class StockServiceTest {

    @Autowired
    private PessimisticLockStockService stockService;

    @Autowired
    private StockRepositroy stockRepositroy;

    @BeforeEach
    public void before() {
        stockRepositroy.saveAndFlush(new Stock(1L, 100L));
    }

    @AfterEach
    public void after() {
        stockRepositroy.deleteAll();
    }

    @Test
    public void 재고감소() {
        stockService.decrease(1L, 1L);

        //100 - 1 == 99
        Stock stock = stockRepositroy.findById(1L).orElseThrow();

        assertEquals(99, stock.getQuantity());
    }

    @Test
    public void 동시에_100개의_요청() throws InterruptedException {
        int threadCount = 100;
        ExecutorService executorService = Executors.newFixedThreadPool(32);
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    stockService.decrease(1L, 1L);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();

        Stock stock = stockRepositroy.findById(1L).orElseThrow();
        // 100 - ( 1 * 100) =0
        assertEquals(0, stock.getQuantity());
    }
}

 

테스트 코드를 Pessimistic을 사용한 서비스를 주입하여 동시에 100개를 요청하는 코드를 테스트 하게 되면 정상적으로 동작하는 것을 확인 할 수 있다. 

 

Pessimistic Lock의 장점은 충돌이 빈번하게 일어나면 Optimistic Lock보다 성능이 좋을 수 있다. 또 Lock을 통해 업데이트를 제어하기 때문에 데이터 정합성이 보장된다.

단점으로는 별도의 Lock을 잡기 때문에 성능이 안좋아 질 수 있다.

 

- Optimistic Lock

 

Lock을 직접 사용하는 것이 아닌 version을 통해 데이터의 정합성을 맞추는 방법이다.

 

엔티티에 version 컬럼을 추가해줘야 한다. 추가로 해당 컬럼에 @Version 어노테이션을 붙여줘야 한다.

 

- StockRespository.interface

public interface StockRepositroy extends JpaRepository<Stock, Long> {

    @Lock(LockModeType.OPTIMISTIC)
    @Query("select s from Stock s where s.id = :id")
    Stock findByIdWithOptimisticLock(Long id);
}

 

Pessimistic Lock과 동일하게 @Lock 어노테이션을 이용하여 쿼리를 작성한다.

 

- OptimisticLockStockService.java

@Service
public class OptimisticLockStockService {

    private final StockRepositroy stockRepositroy;

    public OptimisticLockStockService(StockRepositroy stockRepositroy) {
        this.stockRepositroy = stockRepositroy;
    }

    public void decrease(Long id, Long quantity) {
        Stock stock = stockRepositroy.findByIdWithOptimisticLock(id);

        stock.decrease(quantity);

        stockRepositroy.save(stock);
    }
}

 

마찬가지로 Service에 재고 감소 로직을 작성한다.

 

- OptimisticLockStockFacade.java

@Component
public class OptimisticLockStockFacade {

    private final OptimisticLockStockService optimisticLockStockService;

    public OptimisticLockStockFacade(OptimisticLockStockService optimisticLockStockService) {
        this.optimisticLockStockService = optimisticLockStockService;
    }

    public void decrease(Long id, Long quantity) throws InterruptedException {
        while (true) {
            try {
                optimisticLockStockService.decrease(id, quantity);

                break;
            } catch (Exception e) {
                Thread.sleep(50);
            }
        }
    }
}

 

Facade 클래스는 Service클래스를 감싸고, Service의 decrease() 메서드를 실행하고, 예외가 발생하면 Thread.sleep(50)을 호출하여 50밀리세컨드를 대기 했다가 다시 시도한다.

Optimistic Lock은 실패했을 때 재시도를 해야하기 때문에  위와 같이 Facade 패턴을 이용하여 OptimisticLock Service를 감싸는 클래스를 생성해준다.

 

- OptimisticLockStockFacadeTest.java

@SpringBootTest
class OptimisticLockStockFacaeTest {

    @Autowired
//    private StockService stockService;
    private OptimisticLockStockFacade optimisticLockStockFacae;

    @Autowired
    private StockRepositroy stockRepositroy;

    @BeforeEach
    public void before() {
        stockRepositroy.saveAndFlush(new Stock(1L, 100L));
    }

    @AfterEach
    public void after() {
        stockRepositroy.deleteAll();
    }

    @Test
    public void 동시에_100개의_요청() throws InterruptedException {
        int threadCount = 100;
        ExecutorService executorService = Executors.newFixedThreadPool(32);
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    optimisticLockStockFacae.decrease(1L, 1L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();

        Stock stock = stockRepositroy.findById(1L).orElseThrow();
        // 100 - ( 1 * 100) =0
        assertEquals(0, stock.getQuantity());
    }
}

 

마찬가지로 OptimisticLockStockFacade를 주입하고, 동시에 100개의 요청을 날리는 테스트의 결과도 성공적인 것을 확인 할 수 있다.

 

Optimistic Lock의 장점은 lock을 별도로 잡지 않기 때문에 lock을 잡는 Pessimistic Lock보다 성능상 이점이 있다.

단, 업데이트가 실패했을 때 재시도 로직을 개발자가 직접 작성해 주어야 하는 번거로움이 있다. 충돌이 빈번하게 일어나지 않을 때 사용하기 좋다.

 

- Named Lock

 

Name Lock 이름을 가진 메타데이터 Lock이다. 이름을 가진 Lock을 획득한 후 해제할 때 까지 다른 세션은 이 Lock을 획득 할 수 없다. 단, 트랜잭션이 종료될 때 Lock이 해제되지 않기 때문에 별도의 명령어로 해제를 수행해주거나 선점 시간이 끝나야 Lock이 해제된다.

Mysql에서 get-lock 명령어를 통해 named-lock을 획득할 수 있고, release-lock을 통해 named-lock을 해제할 수 있다.

데이터 소스를 분리해서 사용하는 것이 좋다. 같은 데이터 소스를 사용하면 커넥션 풀이 부족해지는 현상으로 인해서 다른 서비스에도 영향을 줄 수 있기 때문이다.

 

- LockRepository.interface

public interface LockRepository extends JpaRepository<Stock, Long> {

    @Query(value = "select get_lock(:key, 3000)", nativeQuery = true)
    void getLock(String key);

    @Query(value = "select release_lock(:key)", nativeQuery = true)
    void releaseLock(String key);
}

 

편의를 위해 위의 코드의 경우 Stock 엔티티를 그대로 사용하였는데, 별도의 데이터 소스를 생성하는 것이 좋다.

 

Named Lock의 경우 실제 실행될 로직 전후로 Lock 획득, 해제를 해줘야 하기 때문에 Facade 클래스를 생성해준다.

 

- NamedLockStockFacade.java

@Component
public class NamedLockStockFacade {

    private final LockRepository lockRepository;

    private final StockService stockService;

    public NamedLockStockFacade(LockRepository lockRepository, StockService stockService) {
        this.lockRepository = lockRepository;
        this.stockService = stockService;
    }

    @Transactional
    public void decrease(Long id, Long quantity) {
        try {
            lockRepository.getLock(id.toString());
            stockService.decrease(id, quantity);
        } finally {
            lockRepository.releaseLock(id.toString());
        }
    }
}

 

위 Facade 클래스는 decrease() 메서드가 실행되면 getLock()을 통해 Lock을 3초동안 획득하고, 해제하는 코드에 해당한다.

 

- StockService.java

@Service
public class StockService {

    private final StockRepositroy stockRepositroy;

    public StockService(StockRepositroy stockRepositroy) {
        this.stockRepositroy = stockRepositroy;
    }

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public synchronized void decrease(Long id, Long quantity) {
        // Stock 조회
        // 재고를 감소시킨 뒤
        // 갱신된 값을 저장

        Stock stock = stockRepositroy.findById(id).orElseThrow();
        stock.decrease(quantity);

        stockRepositroy.saveAndFlush(stock);
    }
}

 

StockService는 부모의 Transaction과 별도로 실행되어야 되기 때문에 Propagation을 REQUIRES_NEW로 변경해준다.

REQUIRES_NEW는 실행 중인 트랜잭션을 일시 중지시키고 새로운 트랜잭션을 시작한다. 해당 어노테이션이 적용된 메서드가 호출 될 때마다 독립적인 트랜잭션이 시작된다.

만약 이미 다른 트랜잭션이 진행 중이라면 그 트랜잭션은 일시 중지되고, @Transactional 어노테이션이 적용된 메서드 내부에서만 새로운 트랜잭션이 실행된다.

 

- NamedLockStockFacadeTest.java

@SpringBootTest
class NamedLockStockFacadeTest {
    
    @Autowired
//    private StockService stockService;
    private NamedLockStockFacade namedLockStockFacade;

    @Autowired
    private StockRepositroy stockRepositroy;

    @BeforeEach
    public void before() {
        stockRepositroy.saveAndFlush(new Stock(1L, 100L));
    }

    @AfterEach
    public void after() {
        stockRepositroy.deleteAll();
    }

    @Test
    public void 동시에_100개의_요청() throws InterruptedException {
        int threadCount = 100;
        ExecutorService executorService = Executors.newFixedThreadPool(32);
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    namedLockStockFacade.decrease(1L, 1L);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();

        Stock stock = stockRepositroy.findById(1L).orElseThrow();
        // 100 - ( 1 * 100) =0
        assertEquals(0, stock.getQuantity());
    }
}

 

NamedLockService를 주입하여 테스트 코드를 동작하면 정상적으로 동작하는 것을 알 수 있다.


Lettuce

 

1. setNx 명령어를 활용하여 분산락을 구현할 수 있다.

2. spin lock 방식으로 retry로직을 개발자가 작성해주어야 한다.

 

spin lock이란 락을 획득하려는 스레드가 락을 사용할 수 있는지 반복적을 확인하면서 락 획득을 시도하는 방식이다.

 

Named Lock 방식과 유사하다. 다른 점은 redis를 사용하며, Session 관리에 신경을 안 써도 된다.

 

 

- RedisLockRepository.java

@Component
public class RedisLockRepository {

    private RedisTemplate<String, String> redisTemplate;

    public RedisLockRepository(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public Boolean lock(Long key) {
        return redisTemplate
                .opsForValue()
                .setIfAbsent(generateKey(key), "lock", Duration.ofMillis(3_000));
    }

    public Boolean unlock(Long key) {
        return redisTemplate.delete(generateKey(key));
    }

    private String generateKey(Long key) {
        return key.toString();
    }
}

 

redis 명령어를 실행할 수 있어야 되기 때문에 RedisTemplate을 변수로 추가해준다.

lock() unlock() 메서드를 통해 락을 획득 해제하게 된다.

 

opsForValue().setIfAbsent() 메서드와 delete() 메서드의 반환값은 Boolean이고, 두 메서드는 redis의 setnx(lock 획득), del(lock 해제) 명령어에 해당한다.

 

redis를 활용하는 방식도 로직 실행 전후로 락 획득 해제를 수행해 줘야 되기 때문에 facade 클래스를 생성해준다.

- LettuceLockStockFacade.java

@Component
public class LettuceLockStockFacade {

    private final RedisLockRepository redisLockRepository;
    private final StockService stockService;

    public LettuceLockStockFacade(RedisLockRepository redisLockRepository, StockService stockService) {
        this.redisLockRepository = redisLockRepository;
        this.stockService = stockService;
    }

    public void decrease(Long id, Long quantity) throws InterruptedException {
        while (!redisLockRepository.lock(id)) {
            Thread.sleep(100);
        }
        try {
            stockService.decrease(id, quantity);
        } finally {
            redisLockRepository.unlock(id);
        }
    }
}

 

lock()메서드를 통해 이미 존재하는 lock일 경우 실패하고 Thread.sleep()을 통해 1초의 텀을 두고, 획득을 재시도한다. 위 코드를 작성해야 redis의 부하를 줄여줄 수 있다.

lock이 없다면 try에 의해 로직을 실행시키고, lock을 해제하게 된다.

 

- LettuceLockStockFacadeTest.java

@SpringBootTest
class LettuceLockStockFacadeTest {

    @Autowired
    //    private StockService stockService;
    private LettuceLockStockFacade lettuceLockStockFacade;

    @Autowired
    private StockRepositroy stockRepositroy;

    @BeforeEach
    public void before() {
        stockRepositroy.saveAndFlush(new Stock(1L, 100L));
    }

    @AfterEach
    public void after() {
        stockRepositroy.deleteAll();
    }

    @Test
    public void 동시에_100개의_요청() throws InterruptedException {
        int threadCount = 100;
        ExecutorService executorService = Executors.newFixedThreadPool(32);
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    lettuceLockStockFacade.decrease(1L, 1L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();

        Stock stock = stockRepositroy.findById(1L).orElseThrow();
        // 100 - ( 1 * 100) =0
        assertEquals(0, stock.getQuantity());
    }
}

 

lettuce 방식은 구현이 간단하다는 장점이 있지만, spin lock 방식이므로 redis에 부하를 줄 수 있다.

redis의 부하를 줄이기 위해 Thread.sleep을 통해 락 획득 재시도 간에 텀을 둬야 한다.

Redisson

 

1. pub-sub 기반으로 lock 구현을 제공한다.

 

pub-sub 기반은 채널을 하나 만들고 락을 점유 중인 스레드가 락 획득하려고 대기중인 스레드에게 해제를 알려주면 안내를 받은 스레드가 락 획득을 시도하는 방식이다.

위 방식은 lettuce와 다르게 대부분의 경우에는 별도의 retry 로직을 작성하지 않아도 된다.

 

* 동작 방식

subscribe를 통해 채널을 구독한다.

subscribe ch1

 

publish 명령어를 통해 메시지를 전달한다.

publish ch1 hello

 

hello라는 메시지를 받는다.

 

Redisson은 자신이 점유하고 있는 락을 해제할 때 채널에 메시지를 보내줌으로써 락을 획득해야하는 스레드들에게 락을 획득하라고 전달해 줄 수 있다. 락을 획득해야 하는 스레드들은 메시지를 받으면 락 획득을 시도하게 된다.

Lock 획득 시도를 계속하는 lettuce 방식과 달리 redisson은 한 번 혹은 몇번만 시도하기 때문에 redis의 부하를 줄여주게 된다.

Redisson은 Lock 관련 클래스들을 라이브러리에서 제공을 해주므로 별도의 레포지토리를 작성하지 않아도 된다. 하지만 로직 실행 전후로 lock 획득 해제는 해줘야 하므로 facade 클래스를 하나 생성해 줘야 한다.

 

- RedissonLockStockFacade.java

@Component
public class RedissonLockStockFacade {

    private RedissonClient redissonClient;

    private StockService stockService;

    public RedissonLockStockFacade(RedissonClient redissonClient, StockService stockService) {
        this.redissonClient = redissonClient;
        this.stockService = stockService;
    }

    public void decrease(Long id, Long quantity) {
        RLock lock = redissonClient.getLock(id.toString());

        try{
            boolean available = lock.tryLock(10, 1, TimeUnit.SECONDS);

            if (!available) {
                System.out.println("lock 획득 실패");
                return;
            }
            stockService.decrease(id, quantity);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }
}

 

lock 획득에 사용할 RedissonClient를 필드로 추가해준다.

redissonClient.getLock() 메서드를 통해 lock 객체를 가져온다. 후에 tryLock() 메서드를 통해 몇 초 동안 lock 획득을 시도할 것인지, 몇 초 동안 점유할 것인지를 설정한 후 lock을 획득해준다.

lock 획득을 실패하면 return해주고, 성공하면 unlock() 메서드를 통해 lock을 해제해준다.

 

- RedissonLockStockFacadeTest.java

@SpringBootTest
class RedissonLockStockFacadeTest {
    @Autowired
    private RedissonLockStockFacade redissonLockStockFacade;

    @Autowired
    private StockRepositroy stockRepositroy;

    @BeforeEach
    public void before() {
        stockRepositroy.saveAndFlush(new Stock(1L, 100L));
    }

    @AfterEach
    public void after() {
        stockRepositroy.deleteAll();
    }

    @Test
    public void 동시에_100개의_요청() throws InterruptedException {
        int threadCount = 100;
        ExecutorService executorService = Executors.newFixedThreadPool(32);
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    redissonLockStockFacade.decrease(1L, 1L);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();

        Stock stock = stockRepositroy.findById(1L).orElseThrow();
        // 100 - ( 1 * 100) =0
        assertEquals(0, stock.getQuantity());
    }
}

 

테스트 케이스가 정상적으로 동작하는 것을 마찬가지로 확인할 수 있다.

 

* 재시도가 필요하지 않은 lock은 lettuce를 활용 / 재시도가 필요한 경우에는 redisson을 활용한다.

728x90