본문 바로가기
팀 프로젝트/플러스 프로젝트

조회 성능 향상시키기(5) Semaphore로 비동기 쿼리 동시 실행 제어하기

by pon9 2025. 3. 26.

개요

현재 로직에는 잠재적인 문제가 존재한다

비동기 방식으로 캐시 갱신 쿼리를 실행할 경우, 최소 1초에 한 번 실행될 수 있다.
하지만 일시적인 DB 부하로 인해 쿼리 실행 시간이 1초 이상 걸리는 상황이 발생한다면, 다음과 같은 문제가 생길 수 있다

 

1. 동일 작업이 중복 실행되어 jvm의 스레드 자원이 낭비된다.

2. 캐시 갱신 타이밍이 겹치면서 race condition이 발생할 수 있다.

3. 불필요한 느린 쿼리들이 중복 호출될 수 있다(현재 기준 동일 네트워크 상에서 평균 600ms로 빠른 편이 아니다)

 

-> 결과적으로 어플리케이션, DB, Redis 모두에 영향을 끼칠 수 있다.

 

 

Semaphore

세마포어를 활용해 비동기 메서드 실행을 하나의 스레드로 제한하는 방식으로 문제를 해결할 수 있다고 판단했다.

 

세마포어란 임계영역에 동시에 접근할 수 있는 스레드 수를 제한하는 동기화 도구로,

java에선 tryAcquire()로 락 점유 여부를 비차단 방식으로 확인할 수 있고, 성공한 스레드만 임계 영역으로 보내며 나머지 스레드는 즉시 반환하도록 구현할 수 있다.

 

이 구조는 jvm단에서 실행 흐름을 제어하기 때문에, db나 redis에 접근하기 전 애초에 불필요한 연산을 차단할 수 있어서 시스템 리소스 보호에 효과적일 것이다.

또한 세마포어에 걸려 return 된 비동기 작업은 실제로 실행되는 로직도 없고 외부에서 참조되지 않는다면 gc가 바로 수거하므로, 어플리케이션 전체에 주는 부담도 거의 없을 것이다.

 

 

테스트코드로 검증

@Component
@RequiredArgsConstructor
public class PopularUpdateAsync {

    private static final String MARKET_CACHE_KEY = "popular:market:items";
    private static final int POPULAR_LIMIT = 200;
    private final Semaphore marketSemaphore = new Semaphore(1);
    private final AtomicInteger executionCounter = new AtomicInteger(0);

    @Async
    @Transactional(readOnly = true)
    public void updateMarketPopulars() {
        if (!marketSemaphore.tryAcquire()) {
            return;
        }
        try {
            executionCounter.incrementAndGet();
            Pageable pageable = PageRequest.of(0, POPULAR_LIMIT);
            Page<MarketPopularResponseDto> result = marketRepository.findPopularMarketItems(getStartDate(), pageable);
            Thread.sleep(1000);
            redisTemplate.opsForValue().set(
                    MARKET_CACHE_KEY,
                    serialize(result.getContent())
            );
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            marketSemaphore.release();
        }
    }

    public int getExecutionCount() {
        return executionCounter.get();
    }

    @Async
    public void updateMarketPopularsError() throws InterruptedException {
        executionCounter.incrementAndGet();
        Pageable pageable = PageRequest.of(0, POPULAR_LIMIT);
        Page<MarketPopularResponseDto> result = marketRepository.findPopularMarketItems(getStartDate(), pageable);
        Thread.sleep(1000);
        redisTemplate.opsForValue().set(
                MARKET_CACHE_KEY,
                serialize(result.getContent())
        );
    }
}

Thread.sleep(1000) 으로 db에 병목이 걸린 상황을 가정하였다.

Semaphore(1)로 접근할 수 있는 스레드 수를 1개로 차단한다.

@Slf4j
@SpringBootTest
class PopularUpdateAsyncConcurrencyTest {

    @Autowired
    private PopularUpdateAsync popularUpdateAsync;

    @Test
    void 인기_마켓_동시_갱신_실행_테스트_락_없이() throws InterruptedException {
        int threadCount = 5;
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);

        log.info("=== 테스트 시작 ===");

        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    log.info("스레드 {}: 호출 시작", Thread.currentThread().getName());
                    popularUpdateAsync.updateMarketPopularsError();
                    log.info("스레드 {}: 호출 완료", Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    latch.countDown();
                }
            });
        }

        latch.await();
        int executions = popularUpdateAsync.getExecutionCount();
        log.info(">>> 실행된 쿼리 횟수 = " + executions);

        Assertions.assertEquals(5, executions, "이게왜안됨!");
    }

    @Test
    void 인기_마켓_동시_갱신_실행_테스트_세마포어() throws InterruptedException {
        int threadCount = 5;
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    log.info("스레드 {}: 호출 시작", Thread.currentThread().getName());
                    popularUpdateAsync.updateMarketPopulars();
                    log.info("스레드 {}: 호출 완료", Thread.currentThread().getName());
                } finally {
                    latch.countDown();
                }
            });
        }

        latch.await();

        int executions = popularUpdateAsync.getExecutionCount();
        log.info(">>> 실행된 쿼리 횟수 = " + executions);

        Assertions.assertEquals(1, executions, "이게왜안됨!");
    }
}

동시에 5개(사실 쿼리 병목이 1초라면 동시에 2개만 실행되겠지만 테스트코드니까)의 스레드를 보내어 테스트해보면,

세마포어 적용 전

세마포어 적용 전에는 모든 비동기 스레드가 동시에 실행되어 1초 후 일괄 완료되었지만,

세마포어 적용 후

적용 후에는 단 하나의 스레드만 1초짜리 작업을 수행하며 나머지는 실행되지 않았다.

 

 

회고

비동기적으로 실행되는 작업의 중복 실행을 방지해서, 어플리케이션의 잠재적인 위험을 줄일 수 있었다.

비동기 실행과 동시성 제어가 상호 보완적으로 작동하는 구조를 직접 구현해보니 재밌고 뿌듯하다

 

운영체제 이거 공부해서 어디다가 써먹지? 생각했는데, 역시 아는 만큼 보인다는 말이 맞는 거 같다. 세마포어를 몰랐다면 내가 어떻게 처리했을까?

책 열심히 읽어야겠다..