11. 스트림 처리
- 스트림 : 시간 흐름에 따라 점진적으로 생상된 데이터.
이벤트 스트림 전송
- db에서 폴링방식은 비용이 크므로, 이벤트가 발생할때 트리거하는것이 좋다.
메시징 시스템
시스템 선택 조건
- 메세지가 쌓이면
- 큐에 버퍼링한다
- 배압,흐름제어를 한다 (생산자가 보내지 못하도록)
- 버퍼링될때 메모리가 가득 차면 디스크를 사용하는가?
- 노드가 죽으면
- 메세지 유실을 허용한다.
- 디스크 기록 혹은 복제본 생성을 한다. 처리량은 줄어들것.
생상자에서 소비자로 메세지를 직접 전달하기
- 생산자와 소비자가 온라인이라고 가정한다. 소비자가 오프라인이면 생산자가 재전송, 생산자가 오프라인이면 메세지버퍼를 잃어버릴수있음.
- UDP 멀티캐스트 : 금융산업에서 사용됨
- ZeroMQ, nanomsg,,webhook
메세지 브로커
- 소비자는 비동기로 동작하며, 브로커는 클라이언트의(생산자/소비자) 상태에 유연하다.
메시지 브로커와 데이터베이스 비교
- MQ는 오랜기간 데이터 저장용도로 적합하지않다.
- DB는 index로 쿼리하고, MQ는 특정 패턴에 맞는 토픽을 구독한다.
- DB는 쿼리시점의 데이터 스냅샷을 기준으로 리턴한다. MQ는 데이터가 변했을때 이벤트를 (새로운 메세지를) 받을 수 있다.
복수 소비자
- 로드밸런싱 : 메세지는 소비자중 하나에게 전달된다.
- 팬아웃 : 모든 소비자에게 전달된다.
확인응답과 재전송
- 소비자는 메세지 처리 확인 응답을 해야한다.
- 다중 소비자에서 특정 메세지가 실패하여 재전송되는경우, 메세지들 사이의 순서가 바뀔 수 있다.
- 따라서 메세지간 인과성이 없어야한다.
파티셔닝된 로그
로그기반 메세지 브로커 (Kafka)
- 일반적으로(AMQP/JMS) 메세지가 처리되면 데이터는 삭제되는데, 이를 보완한것이 로그기반.
- 디스크에 메세지가 쓰여야하므로, 처리량을 높이기위해 파티셔닝한다.
로그방식과 전통적인 메시징 방식의 비교
- 컨슈머는 토픽의 파티션을 할당받아 단일 스레드로 메시지를 읽는다.
- 활동할 수 있는 컨슈머의 수 = 토픽의 파티션 수
- 단일 스레드이므로, 특정 메시지가 느리게 처기되면 지연된다.
- | JMS/AMQP | 로그기반 |
---|---|---|
메세지 처리 순서 중요 | X | O |
메세지 처리 비용 | 높음 | 낮음 |
메세지 양 | - | 많음 |
소비자 오프셋
- 파티션 하나는 컨슈머 하나가 붙으므로, 오프셋이 명확하다.
- 특정 컨슈머가 죽으면 다른 컨슈머에서 오프셋기록부터 읽으면 된다.
디스크 공간 사용
- ring buffer : 버퍼가 가득차면 오래된 메세지순으로 덮어쓴다.
소비자가 생산자를 따라갈 수 없을때
- 소비자의 offset과 로그헤드 차(lag)를 모니터링해서 삭제가 일어나기전에 생산을 중단시켜야한다.
오래된 메세지 재생
- 로그 기반 메시징과 일괄처리는 변환처리를 반복해도 입력 데이터에 영향을 주지않고, 파생데이터를 만든다.
- 오류 복구가 쉽기때문에 조직내에서 데이터플로우를 통합하는데 좋은 도구이다.
컨슈머 로직에서 중복처리를 고려한 구현을 해야한다.
데이터베이스와 스트림
시스템 동기화 유지하기
- 여러 솔루션을(DB,MQ,Index,Cache,Data warehouse) 복합적으로 사용하다보면, 같은 데이터에 대해 동기화가 되어야한다.
- 이때 dualWrite로 변경이 발생할때 여러솔루션에 업데이트치는 방법이 있는데, 동시쓰기 이슈가 생길 수 있다. 이는 원자적 커밋문제로 해결해야한다. 단일리더를 두어 쓰기순서 결정하게 하는것도 방법.
변경 데이터 캡처 (CDC Change Data Capture)
- DB에 기록하는 변경 이벤트를 받아 다른 시스템으로 데이터 복제할 수 있는 형태로 추출하는 것, DB에 쓰여진 순서대로 받을 수 있어 동시이슈 대안이 된다.
변경 데이터 캡처의 구현
- 로그기반 메세지 브로커는 메시지 순서를 유지하기때문에 (오프셋으로 위치 보장) DB변경 이벤트를 전송하기에 적합하다.
- DB트리거는 고장나기쉽고 성능 오버헤드가 많아 복제로그를 파싱하는 방식으로 더 사용한다.
초기 스냅샷
- 모든 로그 히스토리를 저장할 수 없으므로, 스냅샷이 일부 과거 변경사항을 대신해야한다.
- DB 스냅샷은 변경로그의 위치나 오프셋에 대응되어야한다. 일부 CDC도구는 기능이 내장됨.
로그 컴팩션
- 한 키의 로그 레코드는 한개씩만 유지한다.
- 키의 삭제 :
- 로그 구조화 저장 엔진: 특별한 null값 (tombstone) 으로 갱신하는것
- 로그 컴팩션 : 값 제거.
- 카프카는 로그 컴팩션 기능을 제공한다. 이 장의 후반부 참고.
변경 스트림용 API 지원
- 요즘엔 CDC를 지원하기보다, DB 기본 인터페이스로 제공된다.
- VoltDB : 튜플을 삽입할 수 있지만 질의는 할수없는 관계형 데이터 모델의 출력스트림(튜플들의 로그로 구성됨)
- Kafka Connect : 카프카를 데이터 시스템용 변경 데이터 캠처도구로 활용할수있게 함.
이벤트 소싱
- DDD에서 개발한 기법, 애플리케이션 상태변화를 모두 변경 이벤트 로그로 저장한다.
- 이벤트는 애플리케이션 수준에서 발행하는 변경 불가한 로그.
- CDC에서는 변경가능한 데이터를 DB저수준에서 추출한 로그.
이벤트 로그에서 현재상태 파생하기
- 이벤트 소싱 컨슈머는 결정적이어야한다. (재수행해도 같은 결과)
- 뒤에발생한 이벤트가 앞선 이벤트를 덮어쓰지않는다. 로그 컴팩션이 불가.
명령과 이벤트
- 요청(명령) -> 무결성 검증 -> 승인 -> 불변 이벤트
- 무결성 검증은 동기로 검증되어야한다. 첫 명령요청에서 동기로 검증을 하던지, 혹은 가예약 -> 유효예약확정 이벤트로 분할하여 비동기 처리로 유효성 검사를 할 수 있다.
상태와 스트림 그리고 불변성
- DB의 상태는 로그들의 결과
- 로그컴팩션은 로그와 DB상태 사이의 차이를 메우는 한가지 방법이다.
불변이벤트의 장점
- DB상태의 오류 복원에 활용될 수 있다.
- 활동을 분석할 수 있다.
동일한 이벤트로그로 여러가지 뷰 만들기
- CQRS,(명령과 질의책임의 분리)로 데이터를 쓸때 인덱스등을 고려하지않고 쌓는다면 더 유연하게 후가공(뷰생성)을 할수있다.
동시성 제어
- 로그의 소비가 비동기로 이루어지므로, 애플리케이션에서 쓴 상태가 컨슈머에서 읽어지지않을수도있다.
- 이벤트 로그 생성과 읽기뷰 갱신을 동기로 수행, 분산 트랜잭션이 필요하다.
- 이벤트 로그에 내용을 담는 방법. 원자적으로 만들기 쉽다.
- 이벤트 로그와 애플리케이션 상태를 같은 방식으로 파티셔닝하여 동시성이슈 방지.
불변성의 한계
- 로그가 너무많아서 혹은 개인정보이슈로 삭제해야할때, 로그가 처음부터 기록되지않았던것처럼 전체삭제해야하는데 쉽지않다.
스트림 처리
- n개의 입력스트림으로 n개의 출력스트림을 생산하는것. operator, job 을 다룬다.
스트림 처리의 사용
복잡한 이벤트 처리(CEP)
- 스트림에서 특정 이벤트 패턴을 찾는 규칙을 규정할수있다.
- 처리엔진에 쿼리를 제출하고, 엔진은 매치를 발견하면 이벤트 패턴의 세부사항을 포함하는 복잡한 이벤트를 발행한다.
- CEP가 DB와 다른점은, 쿼리가 처리엔진에서 유지되어 이벤트를 스트림으로 계속 발행한다는점.
스트림 분석
- ex) 이벤트 빈도, 이동평균, window간의 통계값 비교 및 알람
- Apache storm, spark streaming, flink,,
구체화 뷰 유지하기
- 최종 상태는 일종의 구체화 뷰이다. 이를 위해선 모든 로그가 필요한데, 카프카스트림은 로그컴팩션으로 구현할 수 있다.
스트림상에서 검색하기
- 쿼리를 먼저 저장하고, CEP와 같이 데이터를 찾는다.
이미 처리가 완료된 로그기록에서 검색은 인덱스가 필요할테고 어려운건가?
메시지 전달과 RPC
- 액터 모델에서 쓰이는 서비스간 통신 메커니즘으로 사용될 수 있다
시간에 관한 추론
- ex) window가 5분인 분석 스트림에서, 시간은 장비의 시스템 시계를 사용하는데, 이벤트가 실제로 발생한 시간보다 처리시간이 많이 늦어지면 문제가 생길수있다.
이벤트 시간대 처리시간
- 처리시간을 기준으로 윈도우를 만들면, 서버 배포 후 정상화 되었을때, 멈춘시간동안의 처리율은 0, 복구된 후의 처리율은 튀는것처럼 보일수있다.
준비여부 인식
- 이벤트 시간을 기준으로 window를 정의하면, window 종료시점에 로그가 다 들어온건지 아닌건지 알수없어 낙오되는 로그가 생길수있다. (1분 window에서 이벤트 :00 -> :01 -> :02 -> :01 순서로 받았다면 두번째 로그에서 window를 종료했을때 네번째 이벤트는 낙오될수있다.)
- 낙오된 이벤트는 무시하거나 기존 출력데이터를 수정할수있다.
시계를 활용할것인가?
2와 3의 타임스탬프 차를 구하면 생산자와 서버시계간의 차이를 알수있으므로 (네트워크 지연은 무시할만하다고 가정.) 실제 이벤트가 발생한 타임스탬프를 추정할수있다.
- 이벤트가 발생한 시간, 생산자의 시계
- 이벤트를 서버로 보낸 시간, 생산자의 시계
- 서버에서 이벤트를 받은 시간, 서버 시계
윈도우 유형
- tumbling window : 타임스탬프의 고정길이. (ex) 00:00~59:59
- hopping window : 결과를 매끄럽게 만들기위해 윈도우간 중첩이 있다. tumbling window로 1분 계산후, 인접 윈도우를 모아 hopping window를 구현하기도 한다.
- sliding window : 이벤트간의 시간 간격
- session window : 기간없이 사용자 세션별 윈도.
스트림 조인
stream-stream join (윈도우조인)
- 조인을 위한 적절한 윈도우 선택이 필요하다.
- 스트림처리자가 인덱스 상태를 유지해야한다. 지난 윈도우의 이벤트를 인덱싱하고, 현재 윈도우의 인덱스 존재여부로 매칭 이벤트를 발행한다.
stream-table join (스트림 강화)
- 강화 : table의 데이터를 stream 이벤트에 추가하는것
- table의 변경로그를 이벤트로 구독하여 스트림 처리자에 로컬 복사본을 최신상태로 유지한다.
table-table join (구체화 뷰 유지)
- sns에서 글 발행과 유저별 타임라인 구현시
- 글발행 스트림 - 팔로우 이벤트 스트림 조인 하여 캐시한것
- 글 목록과 팔로워 목록 조인결과를 구체화 뷰
조인의 시간 의존성
- 조인해야할 데이터가 변경되는 이벤트가 동시에 발생했을때? 조인할 id를 이력별로 관리한다.
내결함성
- 마이크로 일괄처리 : 스트림을 작은 블록으로 나누고 각 블록을 소형 일괄처리와 같이 다루는 방법. spark streaming에서 사용한다.
- 체크포인트 : 주기적으로 상태의 체크포인트를 생성하고, 장애발생시 해당 체크포인트부터 재시작, 이전 출력은 버린다. apache flink
- 원자적 커밋 재검토
- 멱등성 : 스트림소비자가 결과 저장시 스트림 offset을 함께 저장하면 반복 갱신을 막을수있다.
- 실패후 상태 재구축 : stream-table join시 스트림 처리자는 state가 있는데, flink는 스냅샷을 HDFS와같은 지속성있는 저장소에 기록한다.