본문 바로가기
정보공유

비동기 회고 발표자료

by 날고싶은커피향 2018. 11. 6.

비동기 회고 발표자료


비동기 회고 발표자료
1. Asynchronous Programming With Spring 4.X, RDBMS in MSA
2. • Dooray! Mail-API 개발, 운영 • www.pikicast.com 개발, 운영 • Cloud orchestration system 개발, 운영 • Caller-Ring 개발, 운영
3. “Non-Blocking” or “Blocking” vs “Asynchronous” or “Synchronous” Return 의 여부에 따라… Thread 상태에 따라서…
4. “The C10K Problem” http://www.kegel.com/c10k.html It’s time for web servers to handle ten thousand clients simultaneously. The web is a big place now
5. The C10K Problem • Scale-out • Load-balancer • Haproxy, Nginx • AWS, Azure, Google Cloud and OpenStack • NoSQL • Redis, HazelCast (IMDG) • Zero-copy, limits on file descriptors
6. “그러면 프로그래밍의 패러다임 은?”
7. The C10K Problem • Event Loop – Nginx, node.js, Play, Netty, Vert.X • Event Driven Programming • Reactor • Java Executor Service Event Loop Thread PoolEvent Queue
8. The C10K Problem • Callback Functions • Functional Programming vs imperative programming • Lambda in Java – Functional programming – Modern Java Event Loop Thread PoolEvent Queue Register Callback Complete CallbackTrigger Callback
9. The C10K Problem • 객체 지향형 프로그래밍 • 함수형 프로그래밍 Event Loop Thread PoolEvent Queue Th #1 Th #2 Th #3 Th #1 Th #2 VS
10. 얼마나 ‘효율적으로’, ‘효과적’으 로 Thread를 짧게 나눠쓰느냐??”
11. 쓰기 어렵죠?
12. Asynchronous Programming with Spring 4.3 • DeferredResult • @Async • AsyncRestTemplate & ListenableFuture • CompletableFuture
13. @Async & DeferredResult example Sample : AsyncController.java 단점은?
14. Common Architecture using JAVA
15. “그런데 JDBC 는 blocking IO 라며?” “왜 비동기 프로그래밍을 할까?”“하지만 많은 기능을 구현할 수 있습니다.”
16. “휴지통에 있는 메일을 삭제하 는 API” - 데이터 베이스에 record 삭제 - 메일 파일(Mime) 삭제 - 타 시스템에 메일이 삭제 되었 음을 알려줌. 검색서버, History 서버
17. 그런데… 휴지통에 1만건이 존재한다면? - DB 작업 : 300ms- 파일 삭제 작업 : 10ms * 10,000 = 100sec - 타시스템 연동작업 : 30ms * 10,000 * 3 = 900 sec Total : 약 1000 sec, 16분
18. @Async • @Async 를 쓸때는 별도의 ThreadPool 을 사용하자. • @Async 와 @Transactional 을 쓸때는 주의하자. – @Transactional 의 원리를 이해하고 사용해야 함. – 부하
19. @Transactional • AOP-Proxy 에서는 public method 로 선언 – @EnableTransactionManagement(proxyTargetClass = true) – <tx:annotation-driven/> – AspectJ 에서는 상관없음. • Bean 내부의 다른 method에서 다른 Transaction을 실행할때 – 선언적 Transaction 을 실행한다. – Self-autowiring in Spring 4.3+ – @Async 에서 많이 놓치는 실수.
20. “DB Connection 객체는 비싸 다!” 비동기 프로그래밍에서도 마찬 가지 입니다.
21. @Transactional(readOnly = false) public boolean removeMailsByMailFolderId(Long mailFolderId){ List<Long> mailIds = mailRepository.findIds(mailFolderId); mailRepository.deleteByMailFolderId(mailFolderId); mailIds.stream.forEach(mailId -> { storageComponent.deleteByMailId(mailId); searchComponent.deleteByMailId(mailId); historyComponent.deleteByMailId(mailId); ); return true; } Th#1 Storage Search History DB
22. @Transactional(readOnly = false) public boolean removeMailsByMailFolderId(Long mailFolderId){ List<Long> mailIds = mailRepository.findIds(mailFolderId); mailRepository.deleteByMailFolderId(mailFolderId); mailIds.stream.forEach(mailId -> { storageComponent.deleteByMailId(mailId); searchComponent.deleteByMailId(mailId); historyComponent.deleteByMailId(mailId); ); return true; } 실행시간 = TX 시간 Long Transaction End Transaction Begin Transaction Th#1 Storage Search History DB 장애유발 가능성
23. DB Connection을 아끼는 방법들 • @Async • LazyConnectionDataSourceProxy • Facade Pattern • TransactionSynchronizationManager, TransactionSynchronizationAdaptor
24. @Async 를 이용한 예제
25. @Transactional(readOnly = false) public void removeMailsByMailFolderId(Long mailFolderId){ List<Long> mailIds = mailRepository.findIds(mailFolderId); Lists.partition(mailIds, 10).stream .forEach(submailIds -> selfService.removeByAsync(submailIds); } @Async @Transactional(readOnly = false) public void removeByAsync(List<Long> submailIds){ mailRepository.deleteByMailIdIn(subMailIds) subMailIds.stream.forEach(mailId -> { storageComponent.deleteByMailId(mailId); searchComponent.deleteByMailId(mailId); historyComponent.deleteByMailId(mailId); ); } Th#1 DB Th#4 Th#3 Th#2 Storage Search History DB DB DB
26. @Transactional(readOnly = false) public void removeMailsByMailFolderId(Long mailFolderId){ List<Long> mailIds = mailRepository.findIds(mailFolderId); mailRepository.deleteByMailIdIn(mailIds) selfService.removeByAsync(mailIds) } @Async public void removeByAsync(List<Long> mailIds){ mailIds.stream.forEach(mailId -> { storageComponent.deleteByMailId(mailId); searchComponent.deleteByMailId(mailId); historyComponent.deleteByMailId(mailId); ); } Th #1 Th#1 DB Th#4 Storage Search History
27. LazyConnectionDataSource 예 제
28. @Bean(value = “dataSource”, destoryMethod=“close”) public DataSource dataSource(){ BasicDataSource ds = new BasicDataSource(); //… more configuration return ds; } @Bean(value = “lazyDataSource”) public LazyConnectionDataSourceProxy lazyDataSource(){ return new LazyConnectionDataSourceProxy(datasource()); } @Transactional(readOnly = false) public boolean removeMailsByMailFolderId(Long mailFolderId){ List<Long> mailIds = mailRepository.findIds(mailFolderId); mailRepository.deleteByMailFolderId(mailFolderId); mailIds.stream.forEach(mailId -> { storageComponent.deleteByMailId(mailId); searchComponent.deleteByMailId(mailId); historyComponent.deleteByMailId(mailId); ); return true; } After What is Difference? Before
29. Façade Pattern 을 이용한 예제
30. public class MailFacade { public void removeMailsByMailFolderId(Long mailFolderId){ List<Long> removedMailIds = mailService.findIds(mailFolderId); removedMailIds.stream.forEach(mailId -> { storageComponent.deleteByMailId(mailId); searchComponent.deleteByMailId(mailId); historyComponent.deleteByMailId(mailId); ); } } public class MailServiceImpl implements MailService { @Transactional(readOnly = false) public List<Long> removeMailEntities(Long mailFolderId){ List<Long> mailIds = mailRepository.findIds(mailFolderId); mailRepository.deleteByMailIdIn(mailIds) return mailIds } } Façade Layer Service Layer
31. TransactionSynchronizationMa nager TransactionSynchronizationAd aptor 를 이용한 예제
32. @Async @Transactional(readOnly = false) public void removeByAsync(List<Long> submailIds){ mailRepository.deleteByMailIdIn(subMailIds) subMailIds.stream.forEach(mailId -> { storageComponent.deleteByMailId(mailId); searchComponent.deleteByMailId(mailId); historyComponent.deleteByMailId(mailId); ); } DB Transaction 이 묶일 필요가 있나요?
33. @Transactional(readOnly = false) public void removeByAsync(List<Long> submailIds){ mailRepository.deleteByMailIdIn(subMailIds) TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { subMailIds.stream.forEach(mailId -> { storageComponent.deleteByMailId(mailId); searchComponent.deleteByMailId(mailId); historyComponent.deleteByMailId(mailId); }); } } } afterCommit() – invoke after transaction commit afterCompletion() - invoke after transaction commit / rollback beforeCommit() – invoke before trasnaction commit beforeCompletion() - invoke before transaction commit / rollback
34. “WAS 스레드, DBPool 갯수 설 정이야기 해볼까요?” Peek Time 에 비동기로 프로그래 밍된 api 를 호출하면? DBPool 은 괜찬은가요? “DataSource 를 분리합니다”
35. DataSource Configuration • 비동기용와 API 처리용 DataSource를 분리한다. – Peek time 때, 비동기 프로세스는 늦게 처리되도 된다. – 장애는 전파하지 말자. • AbstractRoutingDataSource class – 일종의 Proxy DataSource – setTargetDataSources method - key 에 따른 DataSource 등록 – determineCurrentLookupKey method – key 에 따른 DataSource 획득
36. @Slf4j public class ExecutionRoutingDataSource extends AbstractRoutingDataSource { @Override protected Object determineCurrentLookupKey() { return ExecutionContextHolder.getExecutionType(); } }
37. public class ExecutionRoutingDataSource extends AbstractRoutingDataSource { @Override protected Object determineCurrentLookupKey() { return ExecutionContextHolder.getExecutionType(); } } @Bean("routingDataSource") public ExecutionRoutingDataSource routingDataSource() { Map<Object, Object> targetDataSources = new HashMap<>(); targetDataSources.put(ExecutionType.API, apiDataSource()); targetDataSources.put(ExecutionType.BATCH, batchDataSource()); ExecutionRoutingDataSource routingDataSource = new ExecutionRoutingDataSource(); routingDataSource.setTargetDataSources(targetDataSources); routingDataSource.setDefaultTargetDataSource(apiDataSource()); return routingDataSource; } LookUpKey 에 따라 동적으로 DataSource 를 사용 미리 등록된 enum ExecutionType.API or BATCH
38. @TransactionalExecutionType( executionType = ExecutionType.BATCH ) @Transactional( readOnly = true, isolation = Isolation.READ_COMMITTED ) public List<CompletableFuture<List<Long>>> partition(Long taskQueueId) { // Business logic and query to DB } @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface TransactionalExecutionType { ExecutionType executionType() default ExecutionType.API; }
39. AsyncRestTemplate • ListenableFuture – Callback Hell • Factory 종류 – Netty4ClientHttpRequestFactory – SimpleClientHttpRequestFactory – OkHttp3ClientHttpRequestFactory – Pros and Cons • Backpressure, Throttling • Circuit Break Pattern
40. AsyncRestTemplate 은 RestTemplate 과 같다. Spring 의 PSA (Portable Service Abstraction) Sample : AsyncRestTemplateTest.java getWords(), asyncGetWords(), asyncCombinedProcess()
41. AsyncRestTemplate • Callback Hell 을 피하기 위해서는? – 람다식 – Default Method – CompletableFuture 객체로 랩핑하는 Util 클래스를 만든다. – Continue..
42. public class CompletableFutureBuilder { public static <T> CompletableFuture<T> build(ListenableFuture<T> listenableFuture) { CompletableFuture<T> completableFuture = new CompletableFuture<T>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = listenableFuture.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning); return result; } }; listenableFuture.addCallback(new ListenableFutureCallback<T>() { @Override public void onFailure(Throwable ex) { completableFuture.completeExceptionally(ex); } @Override public void onSuccess(T result) { completableFuture.complete(result); } }); return completableFuture; }
43. public class CompletableFutureBuilder { public static <T> CompletableFuture<T> build(ListenableFuture<T> listenableFuture) { CompletableFuture<T> completableFuture = new CompletableFuture<T>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = listenableFuture.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning); return result; } }; listenableFuture.addCallback(new ListenableFutureCallback<T>() { @Override public void onFailure(Throwable ex) { completableFuture.completeExceptionally(ex); } @Override public void onSuccess(T result) { completableFuture.complete(result); } }); return completableFuture; }
44. public class CompletableFutureBuilder { public static <T> CompletableFuture<T> build(ListenableFuture<T> listenableFuture) { CompletableFuture<T> completableFuture = new CompletableFuture<T>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = listenableFuture.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning); return result; } }; listenableFuture.addCallback(new ListenableFutureCallback<T>() { @Override public void onFailure(Throwable ex) { completableFuture.completeExceptionally(ex); } @Override public void onSuccess(T result) { completableFuture.complete(result); } }); return completableFuture; }
45. CompletableFuture • 비동기 프로그래밍을 위한 클래스 – NonBlocking 으로 처리 가능하다. (vs Future class) – Blocking 으로도 처리 가능하다. – 여러개의 작업들을 엮을 수 있다. – 여러개의 작업들을 합칠 수 있다. – 예외 사항을 처리할 수 있다. • Implements CompletionStage<T> – 테스크를 포함하는 모델을 의미한다. – 테스크는 체인패턴의 기본 요소이다.
46. CompletableFuture • 접미사를 확인 – thenApply() vs thenApplyAsync() • 파라미터를 확인 – thenApplyAsync(Function<? super T, ? extends U> fn) – thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor) • 비동기 작업 시작 method – runAsync(), supplyAsync() • 작업들을 연결하는 method – thenApplyAsync(), thenComposeAsync(), anyOf(), allOf() • 예외처리를 하는 method – exceptionally()
47. CompletableFuture Method AsyncMethod Arguments Returns thenAccept thenAcceptAsync 이전 Stage 의 결과 Noting thenRun thenRunAsync Noting thenApply thenApplyAsync 이전 Stage 의 결과 Result of current stage thenCompose thenComposeAsync 이전 Stage 의 결과 Future result of current stage thenCombine thenCombineAsync 이전 두 Stage 의 결과 Result of current stage whenComplete whenCompleteAsync 이전 두 Stage 의 결과 or Exception Noting
48. CompletableFuture • Building a CompletableFuture Chain – Example. CompletableFutureTest.java chainMethods(); – Example. CompletableFutureTest.java allOfMethods(); – Example. CompletableFutureTest.java allOfAcceptMethods(); • Exception handling – Example. CompletableFutureTest.java handleMethods();
49. CompletableFuture • Thread pool 공유시 – 당연하지만 순서보장이 안될 수 있다. – Method chaining 시 공정한 처리가 어려울수 있다. – Starvation 가능성 – executor.setRejectedExecutionHandler(); – AbortPolicy – DiscardPolicy – DiscardOldestPolicy – CallerRunsPolicy Event Loop Thread PoolEvent Queue Register Callback Complete CallbackTrigger Callback
50. Transaction Saving Programming - Avoid MySQL Lock wait - Avoid MySQL Dead Lock - Query Tuning on JPA…
51. - 디버깅이 어렵다. - TC를 만들기 어렵다. - 항상 RDB 와의 싸움이다.
52. Q & A


반응형