Be-Developer

데이터 중심 아키텍처 : 11.스트림 처리

11. 스트림 처리

  • 스트림 : 시간 흐름에 따라 점진적으로 생상된 데이터.

    이벤트 스트림 전송

  • db에서 폴링방식은 비용이 크므로, 이벤트가 발생할때 트리거하는것이 좋다.

    메시징 시스템

    시스템 선택 조건

    1. 메세지가 쌓이면
    1. 큐에 버퍼링한다
    2. 배압,흐름제어를 한다 (생산자가 보내지 못하도록)
      1. 버퍼링될때 메모리가 가득 차면 디스크를 사용하는가?
      2. 노드가 죽으면
    3. 메세지 유실을 허용한다.
    4. 디스크 기록 혹은 복제본 생성을 한다. 처리량은 줄어들것.

생상자에서 소비자로 메세지를 직접 전달하기

  • 생산자와 소비자가 온라인이라고 가정한다. 소비자가 오프라인이면 생산자가 재전송, 생산자가 오프라인이면 메세지버퍼를 잃어버릴수있음.
  • UDP 멀티캐스트 : 금융산업에서 사용됨
  • ZeroMQ, nanomsg,,webhook

    메세지 브로커

  • 소비자는 비동기로 동작하며, 브로커는 클라이언트의(생산자/소비자) 상태에 유연하다.

    메시지 브로커와 데이터베이스 비교

  • MQ는 오랜기간 데이터 저장용도로 적합하지않다.
  • DB는 index로 쿼리하고, MQ는 특정 패턴에 맞는 토픽을 구독한다.
  • DB는 쿼리시점의 데이터 스냅샷을 기준으로 리턴한다. MQ는 데이터가 변했을때 이벤트를 (새로운 메세지를) 받을 수 있다.

    복수 소비자

  • 로드밸런싱 : 메세지는 소비자중 하나에게 전달된다.
  • 팬아웃 : 모든 소비자에게 전달된다.

    확인응답과 재전송

  • 소비자는 메세지 처리 확인 응답을 해야한다.
  • 다중 소비자에서 특정 메세지가 실패하여 재전송되는경우, 메세지들 사이의 순서가 바뀔 수 있다.
  • 따라서 메세지간 인과성이 없어야한다.

    파티셔닝된 로그

    로그기반 메세지 브로커 (Kafka)

  • 일반적으로(AMQP/JMS) 메세지가 처리되면 데이터는 삭제되는데, 이를 보완한것이 로그기반.
  • 디스크에 메세지가 쓰여야하므로, 처리량을 높이기위해 파티셔닝한다.

    로그방식과 전통적인 메시징 방식의 비교

  • 컨슈머는 토픽의 파티션을 할당받아 단일 스레드로 메시지를 읽는다.
    • 활동할 수 있는 컨슈머의 수 = 토픽의 파티션 수
    • 단일 스레드이므로, 특정 메시지가 느리게 처기되면 지연된다.
- JMS/AMQP 로그기반
메세지 처리 순서 중요 X O
메세지 처리 비용 높음 낮음
메세지 양 - 많음

소비자 오프셋

  • 파티션 하나는 컨슈머 하나가 붙으므로, 오프셋이 명확하다.
  • 특정 컨슈머가 죽으면 다른 컨슈머에서 오프셋기록부터 읽으면 된다.

    디스크 공간 사용

  • ring buffer : 버퍼가 가득차면 오래된 메세지순으로 덮어쓴다.

    소비자가 생산자를 따라갈 수 없을때

  • 소비자의 offset과 로그헤드 차(lag)를 모니터링해서 삭제가 일어나기전에 생산을 중단시켜야한다.

    오래된 메세지 재생

  • 로그 기반 메시징과 일괄처리는 변환처리를 반복해도 입력 데이터에 영향을 주지않고, 파생데이터를 만든다.
  • 오류 복구가 쉽기때문에 조직내에서 데이터플로우를 통합하는데 좋은 도구이다.

    컨슈머 로직에서 중복처리를 고려한 구현을 해야한다.

데이터베이스와 스트림

시스템 동기화 유지하기

  • 여러 솔루션을(DB,MQ,Index,Cache,Data warehouse) 복합적으로 사용하다보면, 같은 데이터에 대해 동기화가 되어야한다.
  • 이때 dualWrite로 변경이 발생할때 여러솔루션에 업데이트치는 방법이 있는데, 동시쓰기 이슈가 생길 수 있다. 이는 원자적 커밋문제로 해결해야한다. 단일리더를 두어 쓰기순서 결정하게 하는것도 방법.

    변경 데이터 캡처 (CDC Change Data Capture)

  • DB에 기록하는 변경 이벤트를 받아 다른 시스템으로 데이터 복제할 수 있는 형태로 추출하는 것, DB에 쓰여진 순서대로 받을 수 있어 동시이슈 대안이 된다.

    변경 데이터 캡처의 구현

  • 로그기반 메세지 브로커는 메시지 순서를 유지하기때문에 (오프셋으로 위치 보장) DB변경 이벤트를 전송하기에 적합하다.
  • DB트리거는 고장나기쉽고 성능 오버헤드가 많아 복제로그를 파싱하는 방식으로 더 사용한다.

    초기 스냅샷

  • 모든 로그 히스토리를 저장할 수 없으므로, 스냅샷이 일부 과거 변경사항을 대신해야한다.
  • DB 스냅샷은 변경로그의 위치나 오프셋에 대응되어야한다. 일부 CDC도구는 기능이 내장됨.

    로그 컴팩션

  • 한 키의 로그 레코드는 한개씩만 유지한다.
  • 키의 삭제 :
    • 로그 구조화 저장 엔진: 특별한 null값 (tombstone) 으로 갱신하는것
    • 로그 컴팩션 : 값 제거.
  • 카프카는 로그 컴팩션 기능을 제공한다. 이 장의 후반부 참고.

    변경 스트림용 API 지원

  • 요즘엔 CDC를 지원하기보다, DB 기본 인터페이스로 제공된다.
  • VoltDB : 튜플을 삽입할 수 있지만 질의는 할수없는 관계형 데이터 모델의 출력스트림(튜플들의 로그로 구성됨)
  • Kafka Connect : 카프카를 데이터 시스템용 변경 데이터 캠처도구로 활용할수있게 함.

    이벤트 소싱

  • DDD에서 개발한 기법, 애플리케이션 상태변화를 모두 변경 이벤트 로그로 저장한다.
  • 이벤트는 애플리케이션 수준에서 발행하는 변경 불가한 로그.
  • CDC에서는 변경가능한 데이터를 DB저수준에서 추출한 로그.

    이벤트 로그에서 현재상태 파생하기

  • 이벤트 소싱 컨슈머는 결정적이어야한다. (재수행해도 같은 결과)
  • 뒤에발생한 이벤트가 앞선 이벤트를 덮어쓰지않는다. 로그 컴팩션이 불가.

    명령과 이벤트

  • 요청(명령) -> 무결성 검증 -> 승인 -> 불변 이벤트
  • 무결성 검증은 동기로 검증되어야한다. 첫 명령요청에서 동기로 검증을 하던지, 혹은 가예약 -> 유효예약확정 이벤트로 분할하여 비동기 처리로 유효성 검사를 할 수 있다.

    상태와 스트림 그리고 불변성

  • DB의 상태는 로그들의 결과
  • 로그컴팩션은 로그와 DB상태 사이의 차이를 메우는 한가지 방법이다.

    불변이벤트의 장점

  • DB상태의 오류 복원에 활용될 수 있다.
  • 활동을 분석할 수 있다.

    동일한 이벤트로그로 여러가지 뷰 만들기

  • CQRS,(명령과 질의책임의 분리)로 데이터를 쓸때 인덱스등을 고려하지않고 쌓는다면 더 유연하게 후가공(뷰생성)을 할수있다.

    동시성 제어

  • 로그의 소비가 비동기로 이루어지므로, 애플리케이션에서 쓴 상태가 컨슈머에서 읽어지지않을수도있다.
    1. 이벤트 로그 생성과 읽기뷰 갱신을 동기로 수행, 분산 트랜잭션이 필요하다.
    2. 이벤트 로그에 내용을 담는 방법. 원자적으로 만들기 쉽다.
    3. 이벤트 로그와 애플리케이션 상태를 같은 방식으로 파티셔닝하여 동시성이슈 방지.

      불변성의 한계

  • 로그가 너무많아서 혹은 개인정보이슈로 삭제해야할때, 로그가 처음부터 기록되지않았던것처럼 전체삭제해야하는데 쉽지않다.

스트림 처리

  • n개의 입력스트림으로 n개의 출력스트림을 생산하는것. operator, job 을 다룬다.

    스트림 처리의 사용

    복잡한 이벤트 처리(CEP)

  • 스트림에서 특정 이벤트 패턴을 찾는 규칙을 규정할수있다.
  • 처리엔진에 쿼리를 제출하고, 엔진은 매치를 발견하면 이벤트 패턴의 세부사항을 포함하는 복잡한 이벤트를 발행한다.
  • CEP가 DB와 다른점은, 쿼리가 처리엔진에서 유지되어 이벤트를 스트림으로 계속 발행한다는점.

    스트림 분석

  • ex) 이벤트 빈도, 이동평균, window간의 통계값 비교 및 알람
  • Apache storm, spark streaming, flink,,

    구체화 뷰 유지하기

  • 최종 상태는 일종의 구체화 뷰이다. 이를 위해선 모든 로그가 필요한데, 카프카스트림은 로그컴팩션으로 구현할 수 있다.

    스트림상에서 검색하기

  • 쿼리를 먼저 저장하고, CEP와 같이 데이터를 찾는다.

    이미 처리가 완료된 로그기록에서 검색은 인덱스가 필요할테고 어려운건가?

    메시지 전달과 RPC

  • 액터 모델에서 쓰이는 서비스간 통신 메커니즘으로 사용될 수 있다

    시간에 관한 추론

  • ex) window가 5분인 분석 스트림에서, 시간은 장비의 시스템 시계를 사용하는데, 이벤트가 실제로 발생한 시간보다 처리시간이 많이 늦어지면 문제가 생길수있다.

    이벤트 시간대 처리시간

  • 처리시간을 기준으로 윈도우를 만들면, 서버 배포 후 정상화 되었을때, 멈춘시간동안의 처리율은 0, 복구된 후의 처리율은 튀는것처럼 보일수있다.

    준비여부 인식

  • 이벤트 시간을 기준으로 window를 정의하면, window 종료시점에 로그가 다 들어온건지 아닌건지 알수없어 낙오되는 로그가 생길수있다. (1분 window에서 이벤트 :00 -> :01 -> :02 -> :01 순서로 받았다면 두번째 로그에서 window를 종료했을때 네번째 이벤트는 낙오될수있다.)
  • 낙오된 이벤트는 무시하거나 기존 출력데이터를 수정할수있다.

    시계를 활용할것인가?

    2와 3의 타임스탬프 차를 구하면 생산자와 서버시계간의 차이를 알수있으므로 (네트워크 지연은 무시할만하다고 가정.) 실제 이벤트가 발생한 타임스탬프를 추정할수있다.

    1. 이벤트가 발생한 시간, 생산자의 시계
    2. 이벤트를 서버로 보낸 시간, 생산자의 시계
    3. 서버에서 이벤트를 받은 시간, 서버 시계

      윈도우 유형

  • tumbling window : 타임스탬프의 고정길이. (ex) 00:00~59:59
  • hopping window : 결과를 매끄럽게 만들기위해 윈도우간 중첩이 있다. tumbling window로 1분 계산후, 인접 윈도우를 모아 hopping window를 구현하기도 한다.
  • sliding window : 이벤트간의 시간 간격
  • session window : 기간없이 사용자 세션별 윈도.

    스트림 조인

    stream-stream join (윈도우조인)

  • 조인을 위한 적절한 윈도우 선택이 필요하다.
  • 스트림처리자가 인덱스 상태를 유지해야한다. 지난 윈도우의 이벤트를 인덱싱하고, 현재 윈도우의 인덱스 존재여부로 매칭 이벤트를 발행한다.

    stream-table join (스트림 강화)

  • 강화 : table의 데이터를 stream 이벤트에 추가하는것
  • table의 변경로그를 이벤트로 구독하여 스트림 처리자에 로컬 복사본을 최신상태로 유지한다.

    table-table join (구체화 뷰 유지)

  • sns에서 글 발행과 유저별 타임라인 구현시
    • 글발행 스트림 - 팔로우 이벤트 스트림 조인 하여 캐시한것
    • 글 목록과 팔로워 목록 조인결과를 구체화 뷰

      조인의 시간 의존성

  • 조인해야할 데이터가 변경되는 이벤트가 동시에 발생했을때? 조인할 id를 이력별로 관리한다.

    내결함성

  • 마이크로 일괄처리 : 스트림을 작은 블록으로 나누고 각 블록을 소형 일괄처리와 같이 다루는 방법. spark streaming에서 사용한다.
  • 체크포인트 : 주기적으로 상태의 체크포인트를 생성하고, 장애발생시 해당 체크포인트부터 재시작, 이전 출력은 버린다. apache flink
  • 원자적 커밋 재검토
  • 멱등성 : 스트림소비자가 결과 저장시 스트림 offset을 함께 저장하면 반복 갱신을 막을수있다.
  • 실패후 상태 재구축 : stream-table join시 스트림 처리자는 state가 있는데, flink는 스냅샷을 HDFS와같은 지속성있는 저장소에 기록한다.