Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

블로킹 큐 구현체 완성 및 테스트 작성 완료 #36

Merged
merged 30 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b0222f8
✨ Feat : 새로운 기능
haxr369 Mar 26, 2024
fd93056
✅ Test : 테스트 코드 추가
haxr369 Mar 27, 2024
3c8ff75
✨ Feat : 새로운 기능
haxr369 Mar 27, 2024
0b829d6
Merge branch 'main' into main
haxr369 Mar 28, 2024
1305987
Merge remote-tracking branch 'origin/main'
haxr369 Mar 28, 2024
c734c16
✨ Feat : 새로운 기능
haxr369 Mar 28, 2024
f78aeac
✨ Feat : 새로운 기능
haxr369 Mar 30, 2024
b96f482
🤖 Refactor : 코드 리팩토링
haxr369 Mar 30, 2024
f528555
✨ Feat : 새로운 기능
haxr369 Mar 30, 2024
8c63b49
Merge remote-tracking branch 'concurrency/develop'
haxr369 Mar 30, 2024
83c6f93
✨ Feat : 새로운 기능
haxr369 Apr 2, 2024
0b33222
✅ Test : 테스트 코드 추가
haxr369 Apr 2, 2024
06cbe1f
🤖 Refactor : 코드 리팩토링
haxr369 Apr 2, 2024
a752e80
🐛 Fix : 오류 수정
haxr369 Apr 3, 2024
a6870ac
🤖 Refactor : 코드 리팩토링
haxr369 Apr 3, 2024
fd67711
📝 Docs : 문서 수정
haxr369 Apr 4, 2024
f98cdbd
✅ Test : 테스트 코드 추가
haxr369 Apr 5, 2024
8eba8ee
🤖 Refactor : 코드 리팩토링
haxr369 Apr 6, 2024
c763bb4
🤖 Refactor : 코드 리팩토링
haxr369 Apr 7, 2024
ada5ae2
🤖 Refactor : 코드 리팩토링
haxr369 Apr 7, 2024
b286a26
✅ Test : 테스트 코드 추가
haxr369 Apr 7, 2024
eb1c711
refacto/스레드 수 정돈
haxr369 Apr 7, 2024
66d4ccd
Merge branch 'integrated_develop'
haxr369 Apr 7, 2024
86554d6
기존 작업과 병합
haxr369 Apr 7, 2024
d49a951
Merge remote-tracking branch 'concurrency/develop'
haxr369 Apr 7, 2024
55a797d
sol 커밋 정리
haxr369 Apr 7, 2024
84da0f4
queueCounter는 counter와 다른 인터페이스를 공유하기 때문에 counter 패키지에서 삭제
haxr369 Apr 7, 2024
e783deb
🤖 Refactor : 코드 리팩토링
haxr369 Apr 7, 2024
75b0f90
존재하지 않는 구현제 제거
haxr369 Apr 7, 2024
e962db2
필요 없는 출력 제거
haxr369 Apr 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
HELP.md
.gradle
build/
.gradle/*
build/*
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/
Expand All @@ -18,7 +18,7 @@ bin/
!**/src/test/**/bin/

### IntelliJ IDEA ###
.idea
.idea/*
*.iws
*.iml
*.ipr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
import com.thread.concurrency.counter.batch.ConcurrentBatchingCounter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
Expand All @@ -16,6 +21,7 @@
import java.util.function.Consumer;

@SpringBootApplication
@EnableAsync
public class SpringThreadConcurrencyApplication {

public static void main(String[] args) {
Expand Down Expand Up @@ -77,5 +83,14 @@ private static List<Integer> range(int numberOfThreads, int expected) {
}
return result;
}

@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // Spring에서 사용하는 스레드를 제어한느 설정
executor.setCorePoolSize(50); // thread-pool에 살아있는 thread의 최소 개수
executor.setMaxPoolSize(50); // thread-pool에서 사용할 수 있는 최대 개수
executor.setQueueCapacity(500); //thread-pool에 최대 queue 크기
executor.setThreadNamePrefix("AsyncApp-");
executor.initialize();
return executor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.thread.concurrency.async.controller;

import com.thread.concurrency.async.service.AsyncService;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Controller;

import java.time.Duration;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@Controller
public class AsyncController {
private final AsyncService asyncService;

public AsyncController(AsyncService asyncService) {
this.asyncService = asyncService;
}

@Async
public CompletableFuture<String> calculateRunTime(int cnt, int waitTime) throws InterruptedException {
LocalTime lt1 = LocalTime.now();
List<CompletableFuture<String>> hellos = new ArrayList<>();
for(int i=0; i<cnt; i++){
hellos.add(asyncService.voidParamStringReturn(waitTime,i+"번째 메세지"));
}
// 모든 비동기 호출이 완료될 때까지 대기하고 결과를 리스트에 넣기
List<String> results = hellos.stream().map(CompletableFuture::join)
.toList();
LocalTime lt2 = LocalTime.now();
long dif = Duration.between(lt1, lt2).toMillis();
return CompletableFuture.completedFuture(dif+"가 걸렸습니다.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.thread.concurrency.async.service;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;


@Service
public class AsyncService {
@Async
public CompletableFuture<String> voidParamStringReturn (long waitTime, String message) throws InterruptedException{
System.out.println("비동기적으로 실행 - "+
Thread.currentThread().getName());
Thread.sleep(waitTime);
return CompletableFuture.completedFuture(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.thread.concurrency.counter.queueCounter;

import java.util.concurrent.TimeUnit;

public interface Consumer {
void consumeEvent(long timeout, TimeUnit unit) throws InterruptedException;
Long show();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.thread.concurrency.counter.queueCounter;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class CounterConsumer implements Consumer {
private final BlockingQueue<Long> queue;
private final AtomicLong count = new AtomicLong(0);

public CounterConsumer(BlockingQueue<Long> queue) {
this.queue = queue;
}

public void consumeEvent(long timeout, TimeUnit unit) throws InterruptedException {
while (true) {
Long value = queue.poll(timeout, unit);
if(value == null){
break;
}
count.addAndGet(value);
}
}
public Long show(){
while(true){
if(queue.isEmpty()){
return count.get();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.thread.concurrency.counter.queueCounter;

import java.util.concurrent.BlockingQueue;

public class CounterProducer implements Producer{
private final BlockingQueue<Long> queue;

public CounterProducer(BlockingQueue<Long> queue) {
this.queue = queue;
}

public void add(long value) throws InterruptedException {
queue.put(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.thread.concurrency.counter.queueCounter;

public interface Producer {
void add(long value) throws InterruptedException;
}
54 changes: 54 additions & 0 deletions src/test/java/com/thread/concurrency/CounterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.thread.concurrency;

import com.thread.concurrency.counter.*;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.Duration;
import java.time.LocalTime;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;

@SpringBootTest
public class CounterTest {
private final int valueToAdd = 1;
private final int nAddsPerThread = 50000000;
private final int nThreads = 9;

public static Stream<Counter> counterProvider() {
return Stream.of(new AtomicCounter(), new CompletableFutureCounter(), new SynchronizedCounter());
}

@ParameterizedTest
@MethodSource("counterProvider")
@DisplayName("스레드 안전한 카운터로 동시에 여러 더하기 수행하기.")
public void 여러_더하기_수행_Executor(Counter counter) throws InterruptedException {
LocalTime lt1 = LocalTime.now();
int initalCount = counter.show();

ExecutorService service = Executors.newFixedThreadPool(nThreads);
CountDownLatch latch = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
service.submit(() -> {
for(int j=0; j<nAddsPerThread; j++){
counter.add(valueToAdd);
}
latch.countDown();
});
}
latch.await();
int finalCount = counter.show();
LocalTime lt2 = LocalTime.now();
long dif = Duration.between(lt1, lt2).getNano();
System.out.println("여러_더하기_수행_Executor 테스트가 걸린 시간 : " + ((float)dif / 1000000) + "ms");
Runtime.getRuntime().gc();
long usedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
System.out.println("메모리 사용량 "+(double)usedMemory/1048576 + " MB");
Assertions.assertEquals(initalCount + nThreads*nAddsPerThread*valueToAdd, finalCount);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.thread.concurrency.async;

import com.thread.concurrency.async.controller.AsyncController;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@SpringBootTest
public class AsyncControllerTest {
private static final Logger logger = LoggerFactory.getLogger(AsyncServiceTest.class);

@Autowired
private AsyncController asyncController;

@Test
public void invokeMultiAsyncMethod() throws InterruptedException {
List<CompletableFuture<String>> hellos = new ArrayList<>();
for(int i=0; i<10; i++){
hellos.add(asyncController.calculateRunTime(10, 1000));
}
// 모든 비동기 호출이 완료될 때까지 대기하고 결과를 리스트에 넣기
List<String> results = hellos.stream().map(CompletableFuture::join)
.toList();
results.forEach(logger::info);
}
}
52 changes: 52 additions & 0 deletions src/test/java/com/thread/concurrency/async/AsyncServiceTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.thread.concurrency.async;

import com.thread.concurrency.async.service.AsyncService;
import org.junit.jupiter.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

@SpringBootTest
public class AsyncServiceTest {
private static final Logger logger = LoggerFactory.getLogger(AsyncServiceTest.class);
@Autowired
private AsyncService asyncService;

@Test
@DisplayName("입력은 void 출력은 String인 비동기 함수 단일 호출")
public void testGetString() throws ExecutionException, InterruptedException {
CompletableFuture<String> helloWorld = asyncService.voidParamStringReturn(1000, "기본 메세지");
Assertions.assertEquals("기본 메세지",helloWorld.get());
}

@Test
@DisplayName("입력은 void 출력은 String인 비동기 함수 다중 호출")
public void testGetMultiString() throws InterruptedException {
List<CompletableFuture<String>> hellos = new ArrayList<>();
for(int i=0; i<100; i++){
hellos.add(asyncService.voidParamStringReturn(1000,i+"번째 메세지"));
}
// 모든 비동기 호출이 완료될 때까지 대기하고 결과를 리스트에 넣기
List<String> results = hellos.stream().map(CompletableFuture::join)
.toList();
results.forEach(logger::info);
}

@Test
@DisplayName("입력은 void 출력은 String인 비동기 함수 단일 호출 타임아웃 발생.")
public void testGetStringTimeOutIsThisAsync() throws InterruptedException {
// voidParamStringReturn가 비동기 메서드인지 의문이 생김.
CompletableFuture<String> completableFuture = asyncService.voidParamStringReturn(4000, "타임아웃 발생 안 함!");
long timeOutValue = 1;
TimeUnit timeUnit = TimeUnit.SECONDS;
// 1초가 지난 후 타임아웃 발생
Assertions.assertThrows(ExecutionException.class, () -> completableFuture.orTimeout(timeOutValue,timeUnit).get());
}
}
Loading