1. 학습 목표
- synchronized 를 사용해 수강신청 로직 작성
- 작업 큐를 생성해 Task 관리
2. 학습 내용
Controller Layer
우선 memberEmail과 subjectName 을 받아오는 DTO를 정의했고 이를 전달하는 Controller 작성
package study.threadsynchronized.registration;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import lombok.RequiredArgsConstructor;
@RestController
@RequiredArgsConstructor
public class RegistrationController {
/*
work flow
1. 학생 수강 신청
2. 학생 ID와 수강 신청할 과목 RequestBody로 받음
3. registrationService.register() 호출
* */
private final RegistrationService registrationService;
@PostMapping("/registration")
public RegistrationService.RegistrationResult classRegistration(@RequestBody RegistrationDTO registrationDTO) {
return registrationService.register(registrationDTO);
}
}
작업 Queue 생성
package study.threadsynchronized.structure;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
public class TaskQueue {
private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final AtomicBoolean isProcessing = new AtomicBoolean(false);
public void addTask(Runnable task) {
taskQueue.add(task);
processTasks();
}
private void processTasks() {
if (isProcessing.compareAndSet(false, true)) {
executor.submit(() -> {
try {
while (!taskQueue.isEmpty()) {
Runnable task = taskQueue.take();
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
isProcessing.set(false);
if (!taskQueue.isEmpty()) {
processTasks(); // 작업이 남아있다면 다시 처리 시작
}
}
});
}
}
public void shutdown() {
executor.shutdown();
while (!executor.isTerminated()) {
try {
Runnable task = taskQueue.poll();
if (task != null) {
task.run();
} else {
Thread.sleep(100); // 작업이 없으면 잠시 대기
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
addTask는 Task 를 작업큐에 담아주고 processTasks 는 작업큐에 담겨있는 Task 를 순차적으로 실행시킨다.
그럼 이 작업큐를 직접 사용해보자.
Service Layer
다음과 같은
package study.threadsynchronized.registration;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import org.hibernate.Hibernate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import lombok.RequiredArgsConstructor;
import study.threadsynchronized.member.Member;
import study.threadsynchronized.member.MemberRepository;
import study.threadsynchronized.structure.TaskQueue;
import study.threadsynchronized.subject.Subject;
import study.threadsynchronized.subject.SubjectName;
import study.threadsynchronized.subject.SubjectRepository;
@Service
@RequiredArgsConstructor
public class RegistrationServiceSample implements RegistrationService{
private final RegistrationRepository registrationRepository;
private final SubjectRepository subjectRepository;
private final MemberRepository memberRepository;
// TaskQueue: 수강신청 요청을 순차적으로 처리하기 위한 큐
private final TaskQueue taskQueue = new TaskQueue();
// Key: 과목명, Value: 현재 수강신청 인원 (Integer)
private final HashMap<String, Integer> subjectCurrentCount = new HashMap<>();
// 수강신청 요청을 TaskQueue에 추가
@Transactional
public RegistrationResult register(RegistrationDTO registrationDTO) {
CompletableFuture<RegistrationResult> future = new CompletableFuture<>();
SubjectName subjectName = registrationDTO.getSubjectName();
// 과목 정보 조회
Subject subject = subjectRepository.findBySubjectName(subjectName);
Member member = memberRepository.findByEmail(registrationDTO.getMemberEmail());
Hibernate.initialize(subject.getRegistration());
Hibernate.initialize(member.getRegistration());
taskQueue.addTask(() -> {
try {
RegistrationResult result = registerSubject(subject, member);
future.complete(result);
} catch (Exception e) {
future.completeExceptionally(e);
}
});
return future.join();
}
// registerSubject 메서드에 동기화 블록 추가
private RegistrationResult registerSubject(Subject subject, Member member) {
if (subject == null) {
return new RegistrationResult(false, "과목을 찾을 수 없습니다.");
}
Integer currentCount;
// 과목의 현재 수강신청 인원 카운터 가져오기 (없으면 새로 생성)
currentCount = subjectCurrentCount.computeIfAbsent(
String.valueOf(subject.getSubjectName()), k -> (subject.getRegistration().size()));
// 수강신청 인원 증가 및 최대 인원 체크
if (++currentCount > subject.getMaximumStudent()) {
--currentCount; // 초과 시 다시 감소
return new RegistrationResult(false, "수강 신청 인원이 초과되었습니다.");
}
try {
// 수강신청 정보 저장
Registration registration = Registration.of(member, subject);
registrationRepository.save(registration);
return new RegistrationResult(true, "수강 신청이 완료되었습니다.");
} catch (Exception e) {
--currentCount; // 저장 실패 시 인원 감소
throw e;
}
}
}
여기서 문제가 발생한다.
Locust를 사용해 테스트를 해본 결과! 수강 신청 제한 인원인 30명을 훌쩍 뛰어넘은 500명이 DB에 담겨버리는 불상사가 발생한것!!
여기서 내가 시도한 해결방법은 총 2가지가 있다.
package study.threadsynchronized.registration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.hibernate.Hibernate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import lombok.RequiredArgsConstructor;
import study.threadsynchronized.member.Member;
import study.threadsynchronized.member.MemberRepository;
import study.threadsynchronized.structure.TaskQueue;
import study.threadsynchronized.subject.Subject;
import study.threadsynchronized.subject.SubjectName;
import study.threadsynchronized.subject.SubjectRepository;
@Service
@RequiredArgsConstructor
public class RegistrationService {
private final RegistrationRepository registrationRepository;
private final SubjectRepository subjectRepository;
private final MemberRepository memberRepository;
// TaskQueue: 수강신청 요청을 순차적으로 처리하기 위한 큐
private final TaskQueue taskQueue = new TaskQueue();
// ConcurrentHashMap: 과목별 현재 수강신청 인원을 추적하는 스레드 안전 맵
// Key: 과목명, Value: 현재 수강신청 인원 (AtomicInteger)
private final ConcurrentHashMap<String, AtomicInteger> subjectCurrentCount = new ConcurrentHashMap<>();
// 수강신청 요청을 TaskQueue에 추가
@Transactional
public RegistrationResult register(RegistrationDTO registrationDTO) {
CompletableFuture<RegistrationResult> future = new CompletableFuture<>();
SubjectName subjectName = registrationDTO.getSubjectName();
// 과목 정보 조회
Subject subject = subjectRepository.findBySubjectName(subjectName);
Member member = memberRepository.findByEmail(registrationDTO.getMemberEmail());
Hibernate.initialize(subject.getRegistration());
Hibernate.initialize(member.getRegistration());
taskQueue.addTask(() -> {
try {
RegistrationResult result = registerSubject(subject, member);
future.complete(result);
} catch (Exception e) {
future.completeExceptionally(e);
}
});
return future.join();
}
// registerSubject 메서드에 동기화 블록 추가
private RegistrationResult registerSubject(Subject subject, Member member) {
if (subject == null) {
return new RegistrationResult(false, "과목을 찾을 수 없습니다.");
}
// synchronized 블록에 들어가면 trancactional의 session이 종료되어서 lazy loading이 안됨
// legeno 사건
/* 해결방법
* 1. Hibernate.initialize(subject.getRegistration());로 명시적으로 session이 열려있을때 가져오기
* 하지만 이 방법은 CompletableFuture 내에서 실행되는 코드가 원래의 트랜잭션 컨텍스트와 별도로 비동기적으로 실행되기 때문에 사용할 수 없음
* 2. Fetch Join 사용
* 이 방법이 사실 제일 쉽고 간편하다. 근데 이 방법은 지금 entity 연관관계가 OneToMany라서 적합하지 않을것같음.
* 3. CompletableFuture 바깥에서 값을 조회하고 CompletableFuture 내부에서 사용
* 이 방법으로 한번 가보자
* 일단 성공!!!!!!!!!!!!!
* */
AtomicInteger currentCount;
synchronized (this) {
// 과목의 현재 수강신청 인원 카운터 가져오기 (없으면 새로 생성)
currentCount = subjectCurrentCount.computeIfAbsent(
String.valueOf(subject.getSubjectName()), k -> new AtomicInteger(subject.getRegistration().size()));
// 수강신청 인원 증가 및 최대 인원 체크
if (currentCount.incrementAndGet() > subject.getMaximumStudent()) {
currentCount.decrementAndGet(); // 초과 시 다시 감소
return new RegistrationResult(false, "수강 신청 인원이 초과되었습니다.");
}
}
try {
// 수강신청 정보 저장
Registration registration = Registration.of(member, subject);
registrationRepository.save(registration);
return new RegistrationResult(true, "수강 신청이 완료되었습니다.");
} catch (Exception e) {
synchronized (this) {
currentCount.decrementAndGet(); // 저장 실패 시 인원 감소
}
throw e;
}
}
}
ConcurrentHashMap,AtomicInteger 같이 멀티 쓰레드에서 안전한 자료형을 사용함으로써 수강 인원을 Count하는 로직을 수정했고, 멀티 쓰레드 환경에서 문제를 발생시킬 수 있는 주요 로직에 synchronized 처리를 해줌으로써 멀티 쓰레드 환경에서 안전한 코드를 작성했다.
그럼 한줄한줄 코드를 분석해보자.
우선 기본적으로 Task를 생성하고 이를 작업큐에 담아주는 코드
// 수강신청 요청을 TaskQueue에 추가
@Transactional
public RegistrationResult register(RegistrationDTO registrationDTO) {
CompletableFuture<RegistrationResult> future = new CompletableFuture<>();
SubjectName subjectName = registrationDTO.getSubjectName();
// 과목 정보 조회
Subject subject = subjectRepository.findBySubjectName(subjectName);
Member member = memberRepository.findByEmail(registrationDTO.getMemberEmail());
// CompletableFuture 내부에서 실행되는 코드가 원래의 트랜잭션 컨텍스트와 별도로 비동기적으로 실행되기 때문에 사용할 수 없음
// 즉 Task 내부가 아닌 외부에서 값을 조회하고 CompletableFuture 내부에서 사용해야함
Hibernate.initialize(subject.getRegistration());
Hibernate.initialize(member.getRegistration());
taskQueue.addTask(() -> {
try {
RegistrationResult result = registerSubject(subject, member);
future.complete(result);
} catch (Exception e) {
future.completeExceptionally(e);
}
});
return future.join();
}
주석으로 달아놓은 별도의 내용 말고는 크게 조심할게 없다.
task 를 작업큐에 넣고 이 작업이 완료되면 결과를 Controller로 반환하는 로직
그렇다면 실질적인 작업이 이뤄지는 registerSubject 에 대해 알아보자.
// registerSubject 메서드에 동기화 블록 추가
private RegistrationResult registerSubject(Subject subject, Member member) {
if (subject == null) {
return new RegistrationResult(false, "과목을 찾을 수 없습니다.");
}
// synchronized 블록에 들어가면 trancactional의 session이 종료되어서 lazy loading이 안됨
// legeno 사건
/* 해결방법
* 1. Hibernate.initialize(subject.getRegistration());로 명시적으로 session이 열려있을때 가져오기
* 하지만 이 방법은 CompletableFuture 내에서 실행되는 코드가 원래의 트랜잭션 컨텍스트와 별도로 비동기적으로 실행되기 때문에 사용할 수 없음
* 2. Fetch Join 사용
* 이 방법이 사실 제일 쉽고 간편하다. 근데 이 방법은 지금 entity 연관관계가 OneToMany라서 적합하지 않을것같음.
* 3. CompletableFuture 바깥에서 값을 조회하고 CompletableFuture 내부에서 사용
* 이 방법으로 한번 가보자
* 일단 성공!!!!!!!!!!!!!
* */
AtomicInteger currentCount;
synchronized (this) {
// 과목의 현재 수강신청 인원 카운터 가져오기 (없으면 새로 생성)
currentCount = subjectCurrentCount.computeIfAbsent(
String.valueOf(subject.getSubjectName()), k -> new AtomicInteger(subject.getRegistration().size()));
// 수강신청 인원 증가 및 최대 인원 체크
if (currentCount.incrementAndGet() > subject.getMaximumStudent()) {
currentCount.decrementAndGet(); // 초과 시 다시 감소
return new RegistrationResult(false, "수강 신청 인원이 초과되었습니다.");
}
}
try {
// 수강신청 정보 저장
Registration registration = Registration.of(member, subject);
registrationRepository.save(registration);
return new RegistrationResult(true, "수강 신청이 완료되었습니다.");
} catch (Exception e) {
synchronized (this) {
currentCount.decrementAndGet(); // 저장 실패 시 인원 감소
}
throw e;
}
}
이 부분에서 고작 단건 수강신청도 진행이 되지 않아 조금 고생했다.
주석에 달려있들 총 3가지 방법을 고안해냈는데 그중 마지막 방법이 가장 합당하다고 생각되어 채택했다.
여기서 주의깊게 봐야할 부분은
// ConcurrentHashMap: 과목별 현재 수강신청 인원을 추적하는 스레드 안전 맵
// Key: 과목명, Value: 현재 수강신청 인원 (AtomicInteger)
private final ConcurrentHashMap<String, AtomicInteger> subjectCurrentCount = new ConcurrentHashMap<>();
위에서 설명했듯 ConcurrentHashMap 과 AtomicInteger 를 이용해 데이터의 원자성을 보장한다는 점이다.
추가로 synchronized 를 이용해 수강신청으로 인해 데이터가 변동될때 동시성으로 인해 발생할 수 있는 문제를 차단하는 로직을 작성했음!
테스트
단건 수강신청
@Autowired
private RegistrationService registrationService;
@Test
public void singleRegistrationTest() {
// given
RegistrationDTO singleRegistration = RegistrationDTO.create("email0@example.com", SubjectName.COMPUTERSCIENCE);
RegistrationResult expected = RegistrationResult.success();
// when
RegistrationResult register = registrationService.register(singleRegistration);
// then
assertThat(register).isEqualTo(expected);
}
수강신청에 성공했다는 결과를 생성해놓고 비교해본 결과 성공으로 나옴
초기에 LazyLoadingException이 자꾸 떠서 힘들었다.
1000건 수강신청
from locust import HttpUser, task, between
from itertools import cycle
email_list = [
"email0@example.com"
]
email_iterator = cycle(email_list)
class MyUser(HttpUser):
wait_time = between(1, 2)
host = "<http://localhost:8080>" # 여기에 호스트 URL을 설정합니다.
@task
def post_request(self):
headers = {'Content-Type': 'application/json'}
email = next(email_iterator)
payload = {
'memberEmail': email,
'subjectName': 'COMPUTERSCIENCE'
}
self.client.post('/registration', headers=headers, json=payload)
병렬적으로 요청을 수행하기 위해 다음과 같이 locust 코드를 작성했다.
1000건의 요청을 서버로 전송해보자
1000건도 정확히 사전에 설정해둔 만큼만 수강 신청이 완료되고 있다.
이 과정에서 hikari maximum-pool-size가 너무 작아 동시에 너무 많은 요청을 처리하지 못해 pool-size를 늘려주었다.
hikari:
maximum-pool-size: 20
다음에는 synchronized 말고 Reentrantlock 을 사용해 좀더 디테일하게 조건을 설정해봐야겠다.
코드 전문
'코딩딩 > Java' 카테고리의 다른 글
ReentrantLock (0) | 2024.09.04 |
---|---|
Synchronized와 Reentrantlock (1) | 2024.08.31 |
모던 자바 인 액션 Chapter 3 (0) | 2024.08.26 |
모던 자바 인 액션 Chapter 2 (1) | 2024.08.16 |
모던 자바 인 액션 Chapter 1 (0) | 2024.08.15 |