본문 바로가기
IT 둘러보기

카카오의데이터파이프라인 윤도영

by 날고싶은커피향 2023. 10. 17.

  • 1. Introduction to Data Pipeline at Kakao
  • 2. Table of Contents 1. What is a Data Pipeline? 2. Architecture 3. Future Work
  • 3. What is a Data Pipeline?
  • 4. + A기사의 카테고리는 IT다 Ryan이 아이폰 7 발화 기사(A)를 보았다. +A기사의 대표 키워드는 아 이폰 7, 환불, 교환이다. +A기사는 클릭한는 사용자중 남: 여 비율이 6:4다. Definition
  • 5. 타겟팅(OLAP, Data Warehouse)개인화(OLTP, Data Hub) Ryan이 최근 본 기사는? A Ryan이 최근에 많이 본 기사들이 속하는 카테고리는? IT Ryan이 최근에 많이 본 기사들에서 많이 발견 되는 키워드들은? 아이폰 7, 환불, 교환, 발화, V20 A를 본 사용자들의 연령대 비율은? 30대 90% IT 카테고리에 기사들 중 가장 인기 있는 기사는? A기사. Ryan이 최근 본 기사와 비슷한 기사 는? B Ryan과 같은 기사를 본 다른 사용자들이 많이 보는 기사는? C Ryan의 친구들이 많이 보는 기사? D A를 본 사람들이 같이 많이 보는 기사? B, C 아이폰 7에 관심 있는 사용자들은? N명, Ryan. A기사를 본 사람 중 애플 홈페이지를 클릭한 사용자들은? N명, Ryan. Usecase 일반화
  • 6. - 안정적인 실시간 event처리 - 일별 평균 40~50억 건의 실시간 event처리. - 10초 안에 Data pipeline의 output을 생성. - 데이터 분석을 통해 생성된 부가 정보들(meta)을 안정적으로 저장, 관리 하고, 실시간 event와 join. - Event별로 join해놔야 하는 meta들이 2~3개(Avg). - 일간 업데이트가 필요한 부가 정보 데이터 종류만 해도 갯수가 70여개. - 여러 data source를 수용할 수 있을 정도로 일반화 되고, scalable한 저장소 필요. - 단순 실시간으로 저장되는 event를 모두 한달만 저장한다고 해도, 50억 x 30 = 1500억개. - 매일 매일 업데이트 되는 부가 정보 데이터들은 하루에 500억개. - 6개월치를 저장한다고 치면? - 여러개의 서비스에서 발생하는 사용자 data가 하나의 storage에 저장되어야 함으로 storage의 효율성과 성능이 중요함. - 최종적으로 저장된 data에 여러 서비스에 unified, scalable한 공통된 API를 제공해야 함. - 쓸만 하다 싶으면 갑자기 연동을 원하는 서비스 수가 급증. Technical Challenges
  • 7. - Open source의 이해를 기반으로 한 데이터 흐름 아키텍쳐. - 오픈소스들이 아무리 좋다고 내가 필요한, 내 서비스에 사용자들이 필요한 데이터들이 갑자기 나타나는건 아님. - 필요한 데이터를 정의하고, 이를 서비스에 적용하는 iteration의 반복이 실제 일. (개발 << 노가다). - 개별 서비스의 할일 줄여주기. - 데이터 타입 및 포맷 정의 - 데이터 수집 Endpoint제공 및 가이드, 운영. - Data의 수집, 저장, API에 대한 명확한 스펙. - 어떤 데이터가 아웃풋으로 나오고, 이를 어떻게 가져다 어디에 사용할 수 있는지 모두의 이해와 협조가 필요함. - 이해와 협조를 위해서는 명확한 스펙이 필수. - 서비스의 퀄리티를 지속적으로 높이기 - 데이터 비지니스는 로직 실험 설계 -> 서비스 적용 -> AB테스트 -> 성과측정 -> 로직 실험 설계의 끊임없는 Iteration. - Iteration 플랫폼이 있어도 실무진의 인식변화에는 많은 노력과 시간이 듬 - Data >> Opinion Non-Technical Challenges
  • 8. - 뭐 기술은 어려울건 없네 - 카프카가 좋다는데 카프카나 써볼까? - 실제로 platform 자체는 구성에 큰 어려움 없음(여러 오픈소스들을 요령껏 사용, 안되면 만들면 됨) - 제일 어려운 부분은 - 서비스에서 필요한 데이터는 무엇일까? - 서비스에서 더 편리하게 사용할 수 있는 Input/Outpu format은 무엇일까? - Event들에 어떤 data들을 join해놓으면 어떤 view를 완성 할 수 있는지 끊임 없는 삽질. - 삽질을 계속 하려면 자동화! - 서비스에 영업. - 회사 분위기가 사내 솔루션이 호의적인 회사는 아님. 무려 듣보잡 오픈 소스들이네? - 제공할 수 있는 Data Product/API가 명확하지 않은게 문제. - 데이터는 왜 모으냐? - 너네팀 데이터도 아닌데 왜 보려하느냐? - 우리한테 뭐가 이득이냐? - 무조건 raw data를 내놓으시오. - 서비스에서 operation을 설명 하고 가공된 결과를 가져가면 서로 편한데, 무조건 원본 데이터를 요구. - 서비스 별로 kafka설치하고, spark 설치하고, 이 모두가 낭비라는걸 관철. Personal Note.
  • 10. Architecture
  • 11. 수집 가공 저장 및 API제공 1. 데이터 수집 a. 기술적으로는 난이도 보다는 복잡함이 존재. b. 공통된 format을 정의하고, 공통으로 사용할 endpoint를 만드는 일이 필수. c. 수집이 안되면 가공 아무리 잘해도 안됨. 2. 가공 a. 데이터 마이닝, 머신러닝, 데이터 사이언티스트 등등 Fancy한 분야. b. 가공만 아무리 잘해봐야, 쓸모가 없음. 3. 저장 및 API제공 a. 가공된 데이터를 서비스에서 가져다 사용할 수 있는 product형태로 저장하고, API를 제공. b. 중요도에 비해, adhoc하게 진행 되는 경우가 많음. Physical 구성 요소: 수집 + 가공 + 저장 + API제공
  • 12. 수집 가공 저장 및 API제공 1. 수집 Layer 문제 정의 Client Javascript IOS app Android app Web app REST endpoint Kafka log_event_sync() log_event_async() S2Graph log_event_sync() log_event_async() - 각각의 service에서 Data Pipeline에 REST API를 통해 data를 ingest. - 두 가지로 input의 성격을 분류. - Synchronous: ingest요청의 응답이 실제로 Storage에 persist된 이후에 돌아가야함. - 보통 관계에 대한 데이터일 경우들이 많음 - Asynchronous: ingest요청에 응답을 바로 주고, asynchronous하게 event가 persist됨. - 보통 activity 데이터일 경우들이 많음 - Apache Kafka가 좋은건 인정. - Kafka 사용 != Data hub. Kafka 사용 != data pipeline.
  • 13. 수집 가공 저장 및 API제공 1. 수집 Layer Solution: S2Event Client Javascript IOS app Android app Web app REST Endpoint Kafka log_event_sync() log_event_async() S2Graph log_event_sync() log_event_async() S2Event: REST Endpoint - Kafka broker가 down되었을 때도 event들을 local file에 buffering 해주는 Fallback기능. - Multiple Kafka cluster에 publish하는 Fanout 기능. - Admin에 특정 rule을 만족하는 data를 어떤 kafka cluster들로 publish할지 설정 기능. - Event 데이터들의 Metrics 제공. - 어느 서비스의 어떤 데이터(click인지, 좋아요 인지)들이 몇개 in/out되었다. - Synchronous Input - S2Graph에 CRUD - Asynchronous Input - Kafka에 Publish
  • 14. 수집 가공 저장 및 API제공 1. 수집 Layer Solution: S2Event Client Javascript IOS app Android app Web app REST Endpoint Kafka log_event_sync() log_event_async() S2Graph log_event_sync() log_event_async() - 데이터는 모두 Edge(누가, 무엇에, 어떤 행동을 했다, extra data) format. timestamp operation from to label props now insert Ryan 기사 A 기사를 읽었다 {“Tags”: [“아이폰7”, “발화"]} Edge Format: https://steamshon.gitbooks.io/s2graph-book/content/manage_edges.html
  • 15. 2. 가공 Layer 문제 정의 - Event들에 필요한 operation들을 적용하여 새로운 Event를 생성. - 처리 방식에 따라 Real-time, Batch 두 가지로 분류. - Batch - 많은 수의 가공된 대용량 데이터를 안정적으로 벌크 업데이트. - 일별로 70여 종류, 대략 500억 건의 데이터를 업데이트 - 때에 따라서는 직접 similarity matrix등의 데이터를 만들어 관리. - 데이터가 변경 되어도 서비스에서 바라보는 API는 변경이 있으면 안되고, 아무리 많은 종류의, 대용량의 데이터를 업데이트 해도, 이때 Read throughput, latency에 손해가 없어야 함. - Real-time - 대량의 stream에 다양한 ETL을 수행. - Input event에 대해 수행해야 할 작업들이 평균 2~3개 정도. - event에 수행해야할 topology들을 10초내에 모두 수행. 수집 가공 저장 및 API제공 Computing Engine Spark Streaming MLlib S2Lambda Bulk Loader Batch
  • 16. 2. 가공 Layer Solution: S2Lambda(Real-time) 수집 가공 저장 및 API제공 Computing Engine Spark Streaming MLlib S2Lambda Bulk Loader Batch S2Lambda: ETL environment. - AWS Lambda와 비슷한 환경을 spark streaming위에 제공. - Event들에 행해야 할 event handler(scala function)을 등록하고. - Source -> Parser -> Flow -> Sink의 topology를 등록된 event handler들을 사용해 생성. - Spark Streaming Job를 통해 등록된 topology들을 event마다 실행. - Marathon을 통한 Streaming Job HA관리.
  • 17. 2. 가공 Layer Solution: S2Lambda, Why? - 처음에는 매번 Spark Streaming job을 만들었음. - Event handle logic과 runtime의 tightly coupling. - Not Composable! - 비효율적인 resource사용. - Streaming Job마다 executor를 상수로 할당. - Fragmentation! - ETL process의 test case작성 및 validation하기 어려움. - 귀찮아서 안하게 됨. - Data의 ingest, transform, join, sink등 일련의 flow를 관리 하기가 어려움. - IntelliJ열어 Spark Streaming Job 코드 읽기 시작하면 답답함. - Event각각에 해야할 function들과, 이를 실행해 주는 engine을 구분. - Event processing framework를 변경하면 모든 코드를 다시 작성 해야 함. - 비싼 실시간 Join. - 외부 데이터 Join같은 경우 긴 시간 cache해도 무방함(Local 캐쉬 >> Remote캐쉬). - 하나의 JVM내에서 local cache(RocksDB) instance들을 공유. 수집 가공 저장 및 API제공 Computing Engine Spark Streaming MLlib S2Lambda Bulk Loader Batch
  • 18. 2. 가공 Layer Solution: S2Lambda 수집 가공 저장 및 API제공 Computing Engine Spark Streaming MLlib S2Lambda Bulk Loader Batch //Flow example. parse external api route and //Get data through HTTP Get then merge with input event. new EtlFlow { var baseUrl = "" def init(options: String) = { baseUrl = (Json.parse(options) "external.api.route").as[String] } override def apply: Apply = { case (event @ StringInput(s), ctx) => val tokens = s.split("t") val articleId = tokens(3) val uri = baseUrl + articleId ctx.httpClient.url(uri).get().map { response => Seq(InputWithResult(event, response.json, response.status)) } } }
  • 19. 2. 가공 Layer Solution: S2Lambda 수집 가공 저장 및 API제공 Computing Engine Spark Streaming MLlib S2Lambda Bulk Loader Batch //Sink Example. parse kafka topic from option parameter and publish to kafka. new EtlFlow { var topics = Array.empty[String] def init(options: String) = { topics = (Json.parse(options) "kafka.topics").as[String].split(",").map(_.trim) } def apply: Apply = { case (input, ctx) => ctx.kafkaProducerOpt.foreach { producer => topics.foreach { topic => producer.send(new ProducerRecord[String, String](topic, null, input.toString) } } Future.successful(Nil) } }
  • 20. S2Lambda Admin Console - Write event handler as scala code. - Check if the code compiles. - Check if the code runs correctly with an example input.
  • 21. 2. 가공 Layer Solution: S2Lambda, 이득 수집 가공 저장 및 API제공 Computing Engine Spark Streaming MLlib S2Lambda Bulk Loader Batch - 재배포 없는 코드 작성 및 적용. - 코드 변경 없이 Event Handler자체와, 전달 해 주는 Parameter를 변경함으로 추가/변경에 쉽게 대응. - Event Handler들끼리 Composable! - Event Handler의 Validation하는 데이터 + 코드들이 DB에 쌓임. - 다른 사람들이 만들어 놓은 코드들과 저장된 sample input들을 가지고 validation해볼 수 있음. - 로직 이해에는 test case를 보는게 최고. - 전체 데이터의 흐름이 DB화되어 정리됨. = 개발/적용 속도 향상 + 개발적 재미(JVM, classpath, compiler)
  • 22. 2. 가공 Layer Solution: S2Loader 수집 가공 저장 및 API제공 Computing Engine Spark Streaming MLlib S2Lambda Bulk Loader Batch - Production HBase클러스터가 아닌, 별도의 분석 클러스터에서 Spark Job을 통해 정렬된 HFile형태로 데이터 생성. - HFileOutputFormat2 - 생성된 HFile을 Production HBase 클러스터로 Distcp 후, HBase의 completebulkload 툴을 사용하여 HBase에 업로드. Put을 통해 로드하는 방식에 비해 Region서버에 부담이 없음. - Memstore부담 - WAL부담 - 잦은 compaction - Region서버 GC Bulk Load시 read throughput, latency는 영향 받지 않음. http://blog.cloudera.com/blog/2013/09/how-to-use-hba se-bulk-loading-and-why/
  • 23. 3. 저장 및 API제공 문제 정의 수집 가공 저장 및 API제공 - OLTP - 특정 시작점을 지정하고, 해당 시작점에 연결된 local graph를 traverse하는 쿼리들. - 개인화 용도로 사용. Data Hub로 분류. - ex) Ryan이 본 기사와 비슷한 기사. - ex) A기사와 비슷한 기사들. - ex) Ryan의 친구들이 많이 본 기사. - OLAP - 시작점이 지정되지 않고 전체 graph에 대해 scan, aggregation을 하는 쿼리들. - 타게팅 용도로 사용. Data Warehouse로 분류. - ex) A기사를 본 사용자들의 연령 대 비율. - ex) 아이폰 7기사를 보고 삼성대리점 사이트를 클릭해본 사용자들 수와 리스트. Data Hub (S2Graph) Data Warehouse (Druid, Hive)
  • 24. 3. 저장 및 API제공 Solution: S2Graph(Data Hub) 수집 가공 저장 및 API제공 Data Hub (S2Graph) Data Warehouse (Druid, Hive) - Data Hub: Apache S2Graph(incubating), Scalable distributed OLTP Graph Database on HBase. - OLTP Query를 처리 하는 저장소. 개인, 특정 아이템등 시작점이 주어진 경우에 사용. - Asynchronous Input type뿐만이 아니라 Synchronous한 요청에도 사용됨. - Vertex/Edge에 대한 CRUD를 제공하고, 연결된 Edge들에 대해 BFS search traverse를 제공함. - 서비스에서 별도의 cache layer없이 바로 사용 가능한 성능 확보에 초점. - API는 모두 Asynchronous.
  • 25. 3. 저장 및 API제공 Solution: S2Graph(Data Hub) 수집 가공 저장 및 API제공 Data Hub (S2Graph)
  • 26. 3. 저장 및 API제공 Solution: Druid(Warehouse) 수집 가공 저장 및 API제공 Data Warehouse (Druid, Hive) - Data Warehouse: Druid, Hive - Realtime Slice & Dice with Druid. - 조건이 fix되지 않은 데이터들을 interactive하게 조건을 바꿔가며 숫자를 확인(explore) - Fix된 조건은 Hive + Jenkins로 관리. - Dashboard, Report등에 사용. - Long term 데이터에 대해 그때그때 생기는 특수한 분석요건에 대응. - 의미 있는 데이터들은 다시 S2Graph(Data Hub)로 Bulk Load하여 서비스에서는 쿼리를 사용하여 활용.
  • 27. 3. 저장 및 API제공 Solution: Druid(Warehouse) 수집 가공 저장 및 API제공 Data Warehouse (Druid, Hive)
  • 28. 3. 저장 및 API제공: Summary 수집 가공 저장 및 API제공 Data Hub (S2Graph) Data Warehouse (Druid, Hive) - OLTP - S2Graph Query REST API를 통해 Service에서는 필요한 데이터를 Query. - 대부분 Service server에서 바로 통신. - ~70K QPS, ~100ms latency. - Druid에 저장된 데이터들의 일부분도 Druid to S2Graph ETL을 통해 S2Graph에 저장. - 카테고리 별로 가장 인기 있는 키워드 TopK. - 지역별 가장 인기 있는 장소 TopK. - 특정 지역의 시간별 검색 분포 비율. - OLAP - Druid data를 visualize해주는 Pivot이라는 UI툴을 사용하여 기획자 및 개발자들에게 interactive하게 slice & dice해볼 수 있는 기능 제공. - 필요한 데이터의 조건이 어느 정도 정형화되면 서비스의 admin에서 druid REST Query API를 통해 통신. - 1~10 QPS, ~ 10s latency.
  • 29. - 모든 서비스가 일반화된 abstract layer(Property Graph Model)를 이해하고, 일반화된 API(Graph Query) 에 익숙해 지면, service요청 마다 API서버를 구축 할 필요가 없음. - 아직 까지는 getEdges, getVertices 두개의 API만으로 모든 서비스의 요구조건을 처리. - 모두가 Vertex, Edge이고, 모두가 getEdges, getVertices. - 사내 부서 마다 하둡, Kafka cluster, spark, HBase등을 따로 운영 하면, 운영 cost가 너무 큼. - 대부분의 오픈 소스가 scalable한 architecture를 가지므로, 이 오픈 소스들을 사용해 multi tenancy 를 어떻게 제공할지 고민을 하는 게 더 효율 적임. - 여러 팀에 혼재 되어 있는 분석된 결과를 서비스에 적용하기 쉽게하여 더 많은 실험(삽질)을 할 수 있게 비용을 낮춰줌. - A/B테스트 및 Data에 기반한 의사 결정에도 기여. = 개발 편의성 + 빠른 데이터 처리 + 운영비용 절감 + 중복 코드 작업 제거 + 데이터 기반 의사결정 Data Pipeline을 구축 해서 얻는 이득들.
  • 30. Future Work
  • 31. - Apache S2Graph(incubating) - 말이 ASF incubating이지, 회사 일 하느라 open source와 사내 source의 부채가 엄청남. - 첫 번째 릴리즈 v0.1.0 release vote까지 pass. - 다음 릴리즈 - Apache Tinkerpop initial구현. - OLAP layer도 추가. - 다른 Storage 추가(Redis, RocksDB, Postgresql, Mysql, Cassandra, …) - S2Lambda open source(Apache License V2) - 오픈, 피드백, 개선. - 내부에서 개발 하고 적용한지 아직 두 달도 안되어서…(사내 정책)정리가 필요함. - 다양한 stream 처리 framework를 선택 할 수 있게 지원. - 이번 발표는 기술 스택 소개에 초점을 두었지만, 다음에는 실제로 위의 Pipeline을 통해 어떤 데이터를 어떻게 저장하고, 이를 서비스의 어떤 곳에 사용하여 어떤 효과를 거두었는지에 대한 공유. 할 일들.
반응형