Kafka2026년 5월 27일24분 읽기

Kafka 깊은 이야기: MyKafka를 직접 만들며 배운 것들

왜 메시지 큐가 필요한가부터 Topic/Partition/Offset, length-prefix framing, sparse mmap index, segment rolling, batch가 61배 빠른 이유, zero-copy, Outbox 패턴까지. Kotlin/Netty로 직접 짠 MyKafka MVP 코드를 reference로 풀어 정리했다.

#Kafka#MyKafka#Netty#Distributed#Architecture#Storage#Backend

Kafka: 깊은 이야기

직접 짜본 결과 알게 된 것들. "Kafka가 빠르다"고 들어도 왜 빠른지는 안 와닿는다. partition, segment, sparse index, length-prefix framing을 직접 짜보면 "아, 이래서 이런 trade-off가 있구나"가 와닿는다.


들어가며: 인터랙티브 시각화 4개

이 글은 길어서 중간에 막힐 수 있다. Kafka 내부 동작이 잘 안 잡힐 때 같은 디렉터리에 올려둔 네 개의 시각화로 단계별 애니메이션을 보면 빠르게 잡힌다.

본문 §3과 §6에서 이 시각화로 직접 돌아온다.


목차

  • §1. 왜 메시지 큐가 필요한가
  • §2. Kafka의 핵심 개념 (5분 요약)
  • §3. MyKafka가 구현한 것 (자세히)
  • §4. 의도적으로 안 넣은 것
  • §5. 티켓팅에 어떻게 적용할 것인가
  • §6. Kafka가 빠른 이유 (정리)
  • §7. 학습 진화 단계 (Kafka 측면)
  • §8. 자주 나오는 오해
  • §9. 토론 prompts 일부
  • §10. 코드 reading map

§1. 왜 메시지 큐가 필요한가

동기 처리의 한계

이상적인 (아주 단순한) 예매 흐름은 이렇다.

text
Client → Booking API → [좌석 락 + DB INSERT + 결제] → 응답

문제는 Booking API가 DB INSERT 끝날 때까지 응답을 못 한다는 점이다.

  • 트래픽 폭주 시 DB connection pool 고갈 → API thread 줄줄이 대기 → tomcat thread pool 고갈 → 클라이언트 5xx
  • DB 한쪽이 느려지면 API 전체가 느려짐

비동기 처리

text
Client → Booking API → [좌석 락(Redis) + Kafka publish] → 응답
                          ↓
                       Worker → DB INSERT (천천히)
  • API는 "예약 받았어요"까지만 책임진다. 밀리초 단위 응답
  • DB INSERT는 Worker가 자기 속도대로
  • Kafka는 그 사이의 버퍼

메시지 큐가 주는 4가지

  1. 버퍼링 (back-pressure) — 생산 폭주를 소비 속도와 분리
  2. 재시도/내구성 — Worker가 죽어도 메시지는 큐에 남아 다음 Worker가 처리
  3. fan-out — 한 이벤트를 여러 소비자가 각자 다른 처리 (검색 인덱싱, 통계, 알림 등)
  4. 시간 분리 (decoupling) — 생산자와 소비자가 같은 시점에 살아있을 필요 없음

MSA 스케일에서: 왜 대기업이 Kafka를 쓰는가

위는 단일 서비스(예매 API) 관점이다. 서비스가 수십~수백 개로 늘어나는 MSA에서는 문제가 질적으로 달라진다.

문제

  1. 강결합 (동기 호출 지옥) — A→B→C→D 직접 호출 체인. D가 느려지면 A까지 줄줄이 느려지고, 한 서비스 장애가 **연쇄 장애(cascading failure)**로 전파된다. 새 소비자 붙이려면 호출하는 쪽 코드를 다 수정해야 함
  2. 트래픽 스파이크 — 동기 처리는 가장 느린 지점(보통 DB)이 전체 처리량을 결정. 몰리면 커넥션/스레드 풀 고갈 → 5xx

Kafka의 해결

문제Kafka가 주는 것
폭주에 시스템이 죽음비동기 버퍼 — API는 ms 응답, 워커가 자기 속도로 소비 (back-pressure)
서비스 강결합decoupling — 발행만 하면 됨. 소비자가 죽어도 이벤트는 토픽에 남음
새 기능마다 생산자 수정fan-out — 새 consumer group 추가에 기존 코드 0 수정
컨슈머 장애 시 유실내구성 + replay — consume=삭제가 아니라 offset 전진. 되감아 재처리 가능
순서 vs 확장partition — key로 순서 보장 + 병렬로 선형 확장

실전 쓰임새: MSA에서 Kafka는 서비스 간 이벤트가 모이고 흐르는 중심축 역할을 한다.

  • 서비스 간 이벤트 통신 (event-driven architecture)
  • CDC — DB 변경을 이벤트로 흘려 캐시·검색·데이터레이크 동기화
  • 로그·메트릭 수집 파이프라인
  • 스트림 처리 (실시간 집계·이상탐지)
  • 이벤트 소싱 — 로그가 곧 단일 진실, 전체 히스토리 재생 가능

서비스가 많아질수록 동기 직접 호출은 결합·장애전파·스파이크에 취약해진다. Kafka는 그 사이에 재생 가능한 영속 로그 버퍼를 끼워 넣어 느슨하게 결합되고 탄력적이며 확장 가능한 시스템을 만든다. 이벤트 드리븐 관점은 이벤트 드리븐 아키텍처 글을 참조.


§2. Kafka의 핵심 개념 (5분 요약)

Topic / Partition / Offset

text
topic: reservation-events
├── partition-0: [m0, m1, m2, m3, ...]   ← append-only log
├── partition-1: [m0, m1, m2, ...]
└── partition-2: [m0, m1, m2, m3, m4, ...]
  • Topic = 논리적 카테고리. "예약 이벤트들"
  • Partition = 토픽을 N개로 나눈 것. 병렬성의 단위. 같은 partition 안에서는 순서 보장
  • Offset = partition 안에서의 위치. 0, 1, 2, ... 단조증가

Producer

메시지를 발행한다. partition 선택 정책은 두 가지다.

  • key 있으면: hash(key) % partition_count → 같은 key는 항상 같은 partition (순서 보장)
  • key 없으면: round-robin

티켓팅 예시: key = seatId로 하면 같은 좌석 이벤트는 같은 partition에 모여서 처리 순서가 보장된다.

Consumer / Consumer Group

  • 여러 consumer를 같은 group으로 묶으면 partition을 나눠 처리한다. 같은 메시지를 여러 번 처리 안 함
  • group이 다르면 같은 메시지를 둘 다 처리
text
topic: reservation-events (partitions: 3)

group: "db-writer"        group: "search-indexer"
├── consumer-A → p0       ├── consumer-X → p0
├── consumer-B → p1       └── consumer-Y → p1, p2
└── consumer-C → p2

같은 메시지를 두 그룹이 각자 받는다 (DB write 따로, 검색 인덱싱 따로).

Offset Commit

"여기까지 처리했어요"를 저장한다. consumer 재시작 시 거기서부터 이어 읽는다.

  • Kafka는 __consumer_offsets 토픽에 자체 저장 (메타 토픽)
  • commit 정책: at-least-once (보통), at-most-once, exactly-once

ISR (In-Sync Replica)

  • partition마다 leader 1개 + follower N개 복제
  • ISR = leader를 잘 따라가고 있는 follower 집합
  • acks=all = 모든 ISR이 받아야 producer에게 ack
  • leader 죽으면 ISR 중에서 새 leader 선출 → 무손실 페일오버

→ MyKafka는 의도적으로 단일 노드라 이 부분 미구현. 학습 거리다.


§3. MyKafka가 구현한 것 (자세히)

구조 전체를 한 번에 보고 싶다면 Kafka Architecture 시각화부터 보면 좋다. 메시지 한 건의 생애 흐름은 Lifecycle 정적 흐름도인터랙티브 5덱에서 단계별로 조작할 수 있다.

3.0 패키지 구조 한 줄씩

text
mykafka/
├── Main.kt                          # 부팅 진입점, CLI 인자 파싱
├── server/
│   ├── BrokerServer.kt              # Netty TCP 서버 (boss/worker EventLoopGroup)
│   └── RequestRouter.kt             # Frame → ApiKey별 핸들러 분기
├── protocol/
│   ├── Frame.kt                     # (ApiKey + payload) 한 단위
│   ├── ApiKey.kt                    # enum: 5개 ApiKey
│   └── FrameCodec.kt                # length-prefix encode/decode (Netty handler)
├── log/
│   ├── Record.kt                    # 메모리상 record (offset, ts, key, value)
│   ├── RecordCodec.kt               # Record ↔ ByteArray + CRC32
│   ├── OffsetIndex.kt               # sparse index, mmap 기반
│   ├── Segment.kt                   # 단일 .log + .index = 한 segment
│   └── Log.kt                       # 한 파티션 (다수 segment 관리)
└── topic/
    ├── LogManager.kt                # 모든 토픽×파티션 관리 + partitioner
    └── OffsetStore.kt               # consumer offset 영속화 (자체 토픽 사용)

전체 ~470줄. 핵심은 log/topic/이다. server/protocol은 wire 처리.

3.1 Wire Protocol: Length-prefix Framing

모든 메시지는 길이 접두로 framed된다.

text
[ 4 bytes totalLength ][ 1 byte apiKey ][ payload (totalLength-1 bytes) ]
  • totalLength: apiKey(1) + payload 길이. length 필드 4바이트는 제외
  • apiKey: 1바이트 enum 식별자

왜 length-prefix? TCP는 바이트 스트림이다. 메시지 경계가 없다.

  • HTTP는 newline+헤더로 해결 → 파싱 복잡, 텍스트 오버헤드 큼
  • length-prefix는 바이트 단순 비교 → 빠르고 정확. 실제 Kafka도 동일 구조

FrameDecoder의 정확한 동작 (FrameCodec.kt)

Netty ByteToMessageDecoder를 상속한다. 한 번의 decode() 호출에서:

  1. readableBytes() < 4 → 길이 헤더도 못 읽음. 반환(대기)
  2. totalLength = input.readInt() → 그만큼 안 모이면 resetReaderIndex() 후 반환(대기)
  3. apiKey = input.readByte() → 1바이트
  4. payload = input.readRetainedSlice(totalLength - 1) → 슬라이스 + retain
  5. out.add(Frame(apiKey, payload))

ByteToMessageDecoder의 핵심 이점은 부분 도착 자동 처리다. 1MB짜리 frame이 TCP MSS 단위로 쪼개져 와도 알아서 누적해주고, 다 모이면 decode를 호출한다.

→ 직접 짜면 partial-read 버그(읽기 시작 후 부족하면 어디까지 읽었는지 잃어버림) 잡기 어렵다.

3.2 ApiKey 5개: 각각의 wire format

protocol/ApiKey.kt:

kotlin
enum class ApiKey(val code: Byte) {
    PRODUCE(0),
    FETCH(1),
    CREATE_TOPIC(2),
    COMMIT_OFFSET(3),
    FETCH_OFFSET(4);
}

실제 Kafka는 수십 개 (FetchSession, JoinGroup, Heartbeat, OffsetFetch v2…). MVP는 본질 5개만.

CREATE_TOPIC

text
req : [topicLen:2][topic][partitionCount:4]
resp: [status:1]   0=OK, 1=ALREADY_EXISTS, 2=INVALID

간단하다. broker는 디렉토리 만들고 Log 인스턴스 N개 생성.

PRODUCE (batch)

text
req : [topicLen:2][topic][partition:4][recordCount:4]
      for each record: [keyLen:4][key][valLen:4][value]
        - partition == -1  → broker가 partitioner로 결정 (첫 record의 key 사용)
        - keyLen == -1     → null key
resp: [errCode:1][partition:4][baseOffset:8][count:4]
        - 개별 offset = baseOffset, baseOffset+1, …, baseOffset+count-1

핵심 invariant: 한 batch의 N record는 **모두 같은 (topic, partition)**이다. 실제 Kafka도 동일하게 클라이언트가 partition별로 미리 grouping해서 보낸다.

FETCH (pull)

text
req : [topicLen:2][topic][partition:4][offset:8][maxBytes:4]
resp: [errCode:1][recordCount:4][record1 bytes][record2 bytes]…

record는 self-delimiting (RecordCodec 포맷 그대로). client는 N번 decode.

진행 보장: record 1건이 maxBytes보다 커도 1건은 무조건 포함. 안 그러면 컨슈머 영원히 막힘. Kafka 동일 invariant.

COMMIT_OFFSET / FETCH_OFFSET

text
COMMIT_OFFSET:
  req : [groupLen:2][group][topicLen:2][topic][partition:4][offset:8]
  resp: [errCode:1]

FETCH_OFFSET:
  req : [groupLen:2][group][topicLen:2][topic][partition:4]
  resp: [errCode:1][offset:8]    offset=-1 → 미커밋

둘 다 내부 토픽 __consumer_offsets에 저장. broker 재시작에도 살아남습니다 (§3.8 참조).

3.3 Record 포맷: 한 메시지의 정확한 바이트

log/RecordCodec.kt:

text
[offset:8][timestamp:8][keyLen:4][key:keyLen][valueLen:4][value:valueLen][crc:4]
  • offset: broker가 부여. 클라이언트가 못 정함
  • keyLen == -1 → null key (key 부분 생략)
  • crc: offset 시작부터 value 끝까지의 CRC32. 디스크 corruption 검증

CRC 계산 (encode)

kotlin
val crc = CRC32()
crc.update(buf.array(), 0, buf.position())   // 헤더+payload 전체
buf.putInt(crc.value.toInt())

CRC 검증 (decode)

decode 시 같은 범위로 다시 CRC 계산 → 저장된 값과 비교. 불일치면 error("CRC mismatch") → 호출자가 corruption으로 판단.

실 Kafka와 차이: MyKafka는 record별 CRC32다. 실 Kafka(메시지 포맷 v2, 0.11+)는 batch별 CRC32C(Castagnoli, CPU HW 가속)이고, CRC 범위에서 baseOffset·leaderEpoch제외한다 → broker가 offset을 부여해도 CRC 재계산이 불필요 → zero-copy(§6) 성립의 전제다.

CRC는 "데이터가 저장 시점과 달라졌나"(disk bit rot 등 무결성)만 검출한다. 극히 드물게 깨진 데이터가 같은 CRC를 내는 충돌(~1/2³²)은 통과되어 silent corruption이 되지만, single-bit·짧은 burst 오류는 100% 검출. 악의적 변조는 막지 못함(그건 해시·서명의 영역).

CRC 위치와 검증 흐름은 Kafka Summary 시각화에서 시각적으로 확인할 수 있다.

부분 읽기 처리

decode는 buf 안에 record가 다 안 모였으면 null 반환 + buf position을 시작 위치로 복원. 호출자는 "truncated tail"로 판단해 거기서 멈춥니다.

3.4 OffsetIndex: Sparse + Mmap

log/OffsetIndex.kt. 한 segment에 하나씩 딸려 있다.

엔트리 포맷

text
[relativeOffset:4][filePosition:4]    ← 8바이트 entry
  • relativeOffset = recordOffset - segment.baseOffset. 4바이트면 segment 안에 최대 2³¹ record 표현 가능 — 충분
  • filePosition = .log 파일 내 byte 위치

Sparse 정책

모든 record를 인덱싱하지 않는다. Segment.append() 안에서:

kotlin
if (lastIndexedPos < 0 || pos - lastIndexedPos >= indexIntervalBytes) {
    index.append((offset - baseOffset).toInt(), pos.toInt())
    lastIndexedPos = pos
}
  • 첫 record는 무조건 인덱싱 (탐색 시작점 보장)
  • 그 이후는 indexIntervalBytes(기본 4KB) 이상 벌어졌을 때만

Lookup 알고리즘 (이진 탐색)

kotlin
fun lookup(targetRelativeOffset: Int): IntArray {
    var lo = 0; var hi = entryCount - 1; var bestIdx = -1
    while (lo <= hi) {
        val mid = (lo + hi) ushr 1
        val midOff = mmap.getInt(mid * ENTRY_SIZE)
        if (midOff <= targetRelativeOffset) {
            bestIdx = mid; lo = mid + 1
        } else hi = mid - 1
    }
    // ... bestIdx 에서 (rel, pos) 반환
}

target 이하 가장 큰 엔트리 찾기. 거기서부터 짧은 sequential scan으로 정확한 offset 도달.

Mmap 사용 이유

kotlin
mmap = channel.map(FileChannel.MapMode.READ_WRITE, 0, fileSize)
  • 인덱스는 작고 자주 읽힘 → OS 페이지 캐시에 위임
  • read/write syscall 없이 메모리처럼 접근
  • mmap.getInt(offset) → 그냥 메모리 fetch

Pre-allocate

파일 사이즈를 시작 시 maxEntries * 8만큼 미리 truncate. 추후 append할 때 파일 크기 변경 syscall 없음.

자기 완결성

인덱스가 깨져도 .log만 멀쩡하면 시작 시 완전 재구축된다. 검증에서 모든 .index를 0으로 깨도 정상 동작한다.

kotlin
fun rebuildIndex(truncateTail: Boolean): Long {
    index.reset()
    scan { rec, startPos -> if (조건) index.append(...) }
    // 마지막 segment면 부분 쓰기된 trailing 자르기
}

3.5 Segment: 단일 (.log + .index)

log/Segment.kt. 파일명 규약:

text
00000000000000000000.log    ← %020d.format(baseOffset)
00000000000000000000.index
00000000000000001000.log    ← 1000 offset부터 새 segment
00000000000000001000.index

파일명 = baseOffset → 디렉토리 자체가 인덱스. 임의 offset이 어느 파일에 있는지 파일명만으로 이진 탐색이 가능하다.

append (단건)

kotlin
fun append(offset: Long, bytes: ByteArray) {
    val pos = sizeBytes
    channel.position(pos)
    channel.write(ByteBuffer.wrap(bytes))
    sizeBytes += bytes.size
    // sparse 인덱스 갱신 (§3.4)
}

호출자(=Log)가 락을 잡았다고 가정한다. FileChannel.write() 한 번 = 1 syscall.

appendBatch: 한 번의 write로 N record

kotlin
fun appendBatch(baseOffsetOfBatch: Long, recordBytes: List<ByteArray>) {
    val totalLen = recordBytes.sumOf { it.size }
    val combined = ByteBuffer.allocate(totalLen)
    for (b in recordBytes) combined.put(b)
    combined.flip()
    channel.write(combined)    // ← 1 syscall로 모두 기록
    // 인덱스 엔트리는 record 단위로 갱신
}

핵심: N개 record를 메모리에서 합쳐 한 번에 write. syscall 수 = 1.

readFrom: pull-based 핵심

kotlin
fun readFrom(startOffset: Long, maxBytes: Int): Pair<List<Record>, Int> {
    val rel = (startOffset - baseOffset).toInt()
    val (_, startPos) = index.lookup(rel).let { it[0] to it[1] }
    // startPos 부터 sizeBytes 끝까지 한 번에 읽어서 메모리 buf
    // buf 에서 RecordCodec.decode 반복
    // startOffset 이전은 skip, maxBytes 누적 초과하면 break
    // 단, 첫 record는 항상 포함 (진행 보장)
}

scan: recovery용 전체 순회

kotlin
fun scan(onRecord: (Record, Long) -> Unit): Long {
    // .log 처음부터 끝까지 record 단위로 decode
    // CRC 깨진 record가 나오면 그 직전 byte 위치 반환
    return lastGoodPos
}

rebuildIndex: 시작 시 복구

  1. index reset
  2. .log 전체 scan → sparse 규칙대로 index 재구성
  3. 활성 segment면 마지막 정상 위치까지만 truncate (부분 쓰기된 꼬리 자르기)
  4. nextOffset = 마지막 record offset + 1

이게 "꼬리 자르기 복구"다. 전원 나가 부분 쓰기된 trailing record는 신뢰하지 않고 잘라낸다 (실 Kafka도 동일).

3.6 Log: 한 파티션 = 다수 segment 관리

log/Log.kt. 파티션 하나당 인스턴스 하나.

kotlin
class Log(
    private val dir: Path,
    private val maxSegmentBytes: Long = 1MB,      // 기본 1MB (학습용)
    private val indexIntervalBytes: Int = 4KB,
) {
    private val lock = ReentrantLock()             // 파티션당 한 락
    private val segments: MutableList<Segment> = mutableListOf()
    @Volatile private var nextOffset: Long = 0
}

시작 시

  1. loadSegments() — 디렉토리에서 *.log 파일들 발견 → baseOffset 기준 정렬
  2. recoverAll() — 각 segment rebuildIndex(truncateTail = isActive)
  3. nextOffset = 활성 segment의 nextOffset

append (단건)

kotlin
fun append(key: ByteArray?, value: ByteArray, timestamp: Long): Long =
    lock.withLock {
        maybeRoll()                                   // 활성 segment 크기 검사
        val offset = nextOffset
        val bytes = RecordCodec.encode(offset, timestamp, key, value)
        activeSegment().append(offset, bytes)
        nextOffset = offset + 1
        offset
    }

Broker가 offset 부여한다. 클라이언트가 못 정함 — 두 producer가 같은 partition에 쓸 때 충돌 방지.

maybeRoll: segment 분할 트리거

kotlin
private fun maybeRoll() {
    val active = activeSegment()
    if (active.sizeBytes < maxSegmentBytes) return
    val newBase = nextOffset
    segments.add(newSegment(newBase))                 // 새 파일 생성
}

왜 분할?

  • Retention 삭제 O(1) — segment 통째 unlink
  • 활성 segment만 쓰기, 나머지는 read-only → mmap 최적화 유리

find / read: 이진 탐색으로 segment 결정

kotlin
private fun segmentIndexFor(offset: Long): Int? {
    // segments는 baseOffset 오름차순 정렬 → 이진 탐색
    // target 이하의 가장 큰 baseOffset 가진 segment 반환
}
 
fun read(startOffset: Long, maxBytes: Int): List<Record> {
    var idx = segmentIndexFor(startOffset)
    while (idx <= segments.lastIndex) {
        val (recs, used) = segments[idx].readFrom(off, budget)
        if (recs.isEmpty()) break
        out.addAll(recs); bytesUsed += used; off = recs.last().offset + 1
        idx++                                          // ← cross-segment 자동
    }
}

Cross-segment FETCH: 시작 offset이 segment-A에 있고 다음 record가 segment-B에 있어도 자동으로 넘어간다. 컨슈머는 segment 경계를 의식할 필요가 없다.

3.7 LogManager: Topic × Partition × Partitioner

topic/LogManager.kt. 한 브로커가 책임지는 모든 토픽 관리.

디렉토리 = 단일 진실 출처

text
<rootDir>/
  <topicName>/
    0/   ← partition 0
    1/   ← partition 1

별도 메타 파일이 없다. 디렉토리 트리 자체가 토픽 목록 + 파티션 수 + segment 목록이다. 실 Kafka도 거의 동일하다 (<topic>-<partition>/ 단일 디렉토리지만 의미는 같음).

Partitioner: key 있으면 sticky hash

kotlin
fun pickPartition(topic: String, key: ByteArray?): Int? {
    val t = topics[topic] ?: return null
    val n = t.partitions.size
    return if (key == null) t.nextRoundRobin(n)
           else Math.floorMod(key.contentHashCode(), n)
}
  • key 있음: floorMod(hash(key), N) → 같은 key는 항상 같은 partition → 순서 보장
  • key 없음: AtomicInteger round-robin → 균등 분산

파티션마다 별도 Log → 락 충돌 0

Log 안에서만 락. 파티션 A 쓰기와 파티션 B 쓰기는 서로 안 기다림. "파티션 = 병렬성의 단위" 정체성을 코드로 표현한 거예요.

3.8 OffsetStore: "모든 게 로그"의 자기 dogfooding

topic/OffsetStore.kt. consumer group offset 영속화.

어디에 저장?

새 저장소 만들지 않는다. broker가 이미 가진 메커니즘(Log)을 그대로 사용:

  • 내부 토픽 이름: __consumer_offsets (1 파티션)
  • record key = encode(group, topic, partition)
  • record value = offset (8B big-endian)

시작 시 cache 재구축

kotlin
init {
    logManager.createTopic("__consumer_offsets", 1)
    val partition = logManager.getPartition("__consumer_offsets", 0)!!
    for (rec in partition.readAll()) {
        val k = decodeKey(rec.key!!)
        cache[k] = ByteBuffer.wrap(rec.value).long
    }
    log.info("loaded $count commits (${cache.size} unique keys)")
}

시작 시 내부 토픽을 처음부터 끝까지 scan → ConcurrentHashMap cache 재구축.

COMMIT

kotlin
fun commit(group, topic, partition, offset) {
    val keyBytes = encodeKey(group, topic, partition)
    val valueBytes = ByteBuffer.allocate(8).putLong(offset).array()
    logManager.getPartition("__consumer_offsets", 0)!!
        .append(keyBytes, valueBytes)             // 그냥 또 다른 append
    cache[Key(group, topic, partition)] = offset
}

append + cache 업데이트. 끝.

왜 이게 우아한가

  • 단일 메커니즘 재활용 — 새 코드는 key/value 인코딩 + cache뿐. Log.append/readAll/recovery 그대로
  • Durability 동일 — 사용자 메시지와 같은 append-only + recovery 모델
  • Log compaction의 자연스러운 의미 — 같은 (g,t,p) key가 반복 commit되면 옛 record는 garbage. compaction 도입 시 최신값만 살아남음 (미구현이지만 의미는 이미 있음)
  • 검증: 5번 commit한 group이 unique key 2개 → 옛 record 3개는 "역사적 정보". 재시작 후 loaded 5 commits (2 unique keys) 정확히 출력

3.9 검증된 동작

시나리오결과
PRODUCE 라운드트립, monotonic offset
재시작 후 nextOffset 정확 복구
부분 쓰기 → 꼬리 자르기 복구
Segment 롤링 (파일명 = baseOffset)
Multi-segment recovery
Sparse mmap index, 임의 offset 빠르게 find
모든 .index 0으로 깨도 .log에서 재구축
같은 key sticky partition
30 keys → 3 partitions 10/10/10 분산
null key 엄격 round-robin
토픽/파티션 디렉토리 자동 복구
Batch PRODUCE all-or-none
1000건 single vs 1×1000 batch → 61x speedup
Cross-segment FETCH
maxBytes=1 진행 보장 (1건은 무조건)
Consumer loop (25 round, 50건 수신, self-offset)
__consumer_offsets 자동 생성
5 commits → unique key 2개, 최신 우선
재시작 → 컨슈머 group이 정확히 마지막 commit에서 이어 시작

61x speedup의 정확한 숫자

  • 1000회 단건 PRODUCE: 84ms (RTT 1000번, 락 1000번, syscall 1000번)
  • 1회 1000-record batch: 1.4ms (RTT 1번, 락 1번, syscall 1번)
  • 61배 throughput 차이

Kafka가 수십만 msgs/sec를 내는 핵심 이유가 여기 있다. 데이터 양은 같다.

3.10 직접 짜본 결과 알게 된 통찰

코드 짜기 전에는 "그렇구나"였지만 직접 짜보고서 와닿은 것들이다.

  1. "모든 게 로그"는 정말 강력하다. OffsetStore가 별도 저장소 없이 자기 자신을 dogfooding하는 게 추상적인 슬로건이 아니라 실제 코드 절약이다. 새 코드는 key/value 인코딩 + cache뿐
  2. mmap이 마법이 아닙니다. 인덱스가 작아서 효과가 있는 거지, 큰 파일 mmap 하면 페이지 fault 폭발. 작은 데이터에 자주 접근하는 케이스에만 적합
  3. CRC가 늦게 잡힙니다. Record decode 시 CRC 검증을 하지만, 부분 쓰기는 decode 자체가 null 반환으로 잡힘. CRC는 disk corruption(드물지만 치명적) 용. 두 보호막이 다른 위협을 막음
  4. 락은 작게. Log.append 안에만 락. partition 간에는 락 X. partition 수를 늘리면 throughput이 정확히 비례
  5. 클라이언트가 offset 들고 다니는 비대칭의 깔끔함. broker가 컨슈머 상태를 안 갖는다 = 컨슈머 수에 비례한 broker 상태 폭발 없음. 컨슈머 N억 명도 broker는 그대로
  6. 파일명을 baseOffset으로 쓴다. 메타 파일 없이 디렉토리만으로 segment 위치를 결정한다. find data -name "*.log"가 곧 toc 역할을 한다.

§4. 의도적으로 안 넣은 것

영역MVP에서 생략다음 단계 학습 포인트
Replication단일 노드Leader/Follower + ISR + acks=all
Controller메타 단순 관리KRaft (Raft 기반 controller, Zookeeper 제거)
Log compactionsparse만 구현같은 key 옛 record 삭제 — __consumer_offsets 정리에 즉시 활용
Exactly-onceat-least-onceproducer idempotence + transactions
Zero-copybyte copysendfile() / Netty FileRegion
Group coordination단일 group stringrebalance, member tracking, sticky assignment
Durability tuningOS flush 의존channel.force() 정책 (per-request / per-N / time-based)

각 항목 모두 학습 거리다. 부재가 한계로 드러나면 그때 구현한다.


§5. 티켓팅에 어떻게 적용할 것인가

5.1 Worker 패턴

text
Booking API ──PRODUCE──> Kafka(reservation-events)
                            │
                            └──FETCH──> Worker ──INSERT──> RDB

발행 시점

좌석 락(Redis) + 결제 검증 통과 직후. DB INSERT 전이다.

kotlin
// command service의 의사 코드
@Transactional  // Redis 락만 잡고 짧게 끝
fun reserve(req: ReserveRequest): ReservationToken {
    val seat = redis.acquireSeatLock(req.seatId)  // 짧음
    val token = ReservationToken(uuid())
    kafka.publish("reservation-events", req.seatId, ReservationCreated(token, req.userId, req.seatId))
    return token  // 응답 즉시
}

소비 시점

Worker는 자기 속도대로 consume → DB INSERT.

kotlin
// worker service 의사 코드
consumer.subscribe("reservation-events")
while (true) {
    val records = consumer.poll(timeoutMs = 100)
    for (r in records) {
        db.insertReservation(r.value)
        consumer.commitOffset(r.offset)
    }
}

5.2 왜 key=seatId 인가

  • 같은 좌석에 대한 이벤트는 항상 같은 partition으로 → 처리 순서 보장
  • 예: ReservationCreated(seat=A-1) 다음 ReservationCancelled(seat=A-1). 순서 바뀌면 DB 상태 깨짐
  • 다른 좌석 이벤트는 자유롭게 다른 partition → 병렬성

5.3 Outbox 패턴 (학습 거리)

문제: command service가

text
1. DB에 reservation 행 INSERT
2. Kafka에 ReservationCreated publish

1만 성공하고 2 실패하면? 또는 반대?

outbox 테이블 도입:

text
1. (한 트랜잭션에) DB에 reservation 행 INSERT + outbox 행 INSERT
2. 별도 outbox-relay가 outbox 폴링 → Kafka publish → outbox 행 삭제

DB 트랜잭션 하나로 원자성 보장. Kafka 발행은 최소 한 번(at-least-once)이 보장된다.

→ 정확히는 우리 다이어그램은 Booking API가 DB INSERT를 안 한다. Worker가 한다. 그래서 outbox는 다른 상황(예: 예매 취소 시 환불 알림 발행)에서 학습 거리다.


§6. Kafka가 빠른 이유 (정리)

흔히 "Kafka는 빠르다"고만 말한다. 왜 빠른가:

  1. 순차 디스크 쓰기 (append-only log) — 랜덤 access 대비 HDD/SSD 모두 압도적으로 빠름. 메모리에 가까운 속도
  2. Page cache 활용 — OS 페이지 캐시가 hot data를 메모리에 유지. JVM 힙 부담 X
  3. Zero-copy (sendfile()) — consumer fetch 시 kernel-space → socket 직접 전송. user-space 복사 생략
  4. Batch 처리 — producer가 메시지를 모아 한 번에 전송. 네트워크 RTT 분할상환
  5. Compression (선택) — batch를 압축해서 전송. gzip/snappy/lz4/zstd
  6. Sparse index — 인덱스 자체가 작아 메모리에 다 올라감. 인덱스 lookup도 빠름
  7. Partition별 병렬화 — partition 수만큼 producer/consumer 병렬

Zero-copy 자세히 (전통 경로 vs sendfile)

"왜 빠른가"의 핵심이라 따로 정리한다.

전통적 경로 (read() + send(), 복사 4번):

text
디스크 ─DMA─▶ page cache ─CPU─▶ 앱(user) 버퍼 ─CPU─▶ socket 버퍼 ─DMA─▶ NIC
            ①              ②                ③               ④
  • ② page cache→앱 버퍼, ③ 앱 버퍼→socket 버퍼가 CPU 복사(비쌈). user-space로 올렸다 다시 내림 = 순수 낭비 + JVM heap 거쳐 GC 부담. 모드 전환도 4번

Zero-copy (sendfile() = Java FileChannel.transferTo(), 복사 2번):

text
디스크 ─DMA─▶ page cache ─DMA(scatter-gather)─▶ NIC
            ①                      ②
  • user-space를 안 거침. socket 버퍼엔 위치+길이 디스크립터만 넘기고 NIC가 page cache에서 직접 DMA로 가져감
  • "zero" = CPU 복사 0번 (남은 DMA 2번은 하드웨어라 CPU 안 씀). 복사 4→2, syscall 2→1, JVM heap 안 건드림(GC 부담 0) — 이 셋이 합쳐져 빨라집니다

왜 Kafka가 이게 되나 — broker가 데이터를 변형 없이 저장 포맷 그대로 흘려보내기 때문이다. CRC가 offset을 안 덮어서(§3.3) offset 부여해도 재계산 불필요, 압축도 그대로 전달 → user-space로 끌어올릴 이유가 없다.

한계 — TLS 암호화를 켜면 user-space 암호화 때문에 복사가 부활 → zero-copy 깨짐. 재압축·포맷 변환도 동일. 보안 vs 성능 트레이드오프.

→ MyKafka는 fetch 응답을 재직렬화(byte copy)하므로 미구현이다.

우리 MyKafka가 검증한 것

  • (1) (4) (6) 구현 완료. 1×1000 batch가 1000×single 대비 61배 빠름 측정
  • (3) 미구현. byte copy로 함. → "Zero-copy 도입 시 얼마나 더 빨라지나"가 다음 측정 거리
  • (2) JVM 위라 OS page cache 자동

§7. 학습 진화 단계 (Kafka 측면)

text
[현재] MyKafka MVP (단일 노드, client SDK 부재)
   ↓ client SDK 구현
[1] Producer / Consumer 라이브러리 → ticket-command가 publish
   ↓ Worker 도입
[2] Worker service 신규 → consume → DB INSERT (비동기 영속화)
   ↓ 부하 측정으로 효과 검증
[3] partition 수 늘리며 처리량 변화 측정
   ↓ 신뢰성 보강
[4] Replication 구현 (Leader/Follower + ISR + acks=all)
   ↓ 운영성
[5] Log compaction (consumer_offsets 정리)
   ↓ 정확성
[6] Producer idempotence + Transactions (exactly-once)
   ↓ 성능
[7] Zero-copy (`FileRegion` Netty)

각 단계마다 무엇을 측정해서 효과를 보여줄 것인지 미리 정해두는 게 학습 핵심이다.


§8. 자주 나오는 오해

오해진실
"Kafka는 메시지 큐다"정확히는 distributed commit log. 큐처럼 쓰지만 "consume = 삭제"가 아니라 "consume = offset 전진". 같은 메시지를 N번 다시 읽을 수 있음
"Kafka는 빠르니까 어디나 쓰자"1KB 미만 짧은 메시지에는 오히려 RabbitMQ가 latency 낮을 수 있음. Kafka는 throughput 최적화
"exactly-once 보장하니까 안전"exactly-once는 Kafka 내부 + producer side 한정. consumer 측 DB INSERT까지는 보장 X. 우리가 처리(idempotent consumer) 해야 함
"partition 늘리면 무조건 빠름"partition 수 = consumer 병렬도 상한. consumer 수 < partition 수면 idle consumer 생김. 그리고 partition은 한번 늘리면 줄이기 어려움
"offset commit은 자동"auto-commit은 데이터 손실 위험. 처리 완료 후 manual commit이 일반적

§9. 토론 prompts 일부

  • 우리 티켓팅에 partition을 몇 개로 잡을까? 그 근거는?
  • key=seatId 외에 다른 키 후보는? (userId? eventId?)
  • Worker가 죽어서 처리 못 한 메시지는 어떻게 되나? (offset 안 커밋되면)
  • 같은 메시지를 두 번 처리해도 안전한 코드(idempotent)는 어떻게 짜나?
  • replica가 lag 100MB라면 leader가 죽었을 때 어떻게 해야 하나? (acks 정책)

§10. 코드 reading map

MyKafka 코드 읽을 때 권장 순서. 한 호흡에 한 단계.

text
[1] protocol/Frame.kt + FrameCodec.kt           "메시지 경계 어떻게 잡나?"
    └─ length-prefix framing의 5줄짜리 본질

[2] log/Record.kt + RecordCodec.kt              "한 메시지의 정확한 바이트는?"
    └─ CRC 위치와 검증 흐름

[3] log/OffsetIndex.kt                          "왜 sparse + mmap인가?"
    └─ lookup 이진 탐색 + sequential scan의 결합

[4] log/Segment.kt — append/readFrom/scan       "한 segment의 라이프사이클"
    └─ rebuildIndex(truncateTail)가 꼬리 자르기

[5] log/Log.kt — maybeRoll/find/read            "여러 segment 어떻게 관리?"
    └─ segmentIndexFor 이진 탐색 + cross-segment

[6] topic/LogManager.kt — pickPartition          "partition 어떻게 정해?"
    └─ floorMod(hash, n) vs round-robin

[7] topic/OffsetStore.kt                         "오프셋도 그냥 또 다른 로그"
    └─ 자기 dogfooding의 우아함

[8] server/RequestRouter.kt                      "ApiKey 분기 끝"
    └─ handleProduce / handleFetch 흐름 따라가기

[1][2]가 wire/포맷. [3][5]가 디스크 추상. [6]~[8]이 broker. 각 층이 다음 층의 invariant를 깔끔히 가정해서 위에 쌓는 구조다.

토론하면 좋은 코드 포인트 4개

Segment.appendBatch: 왜 syscall 수가 throughput을 지배하나?

N record를 메모리에서 합쳐 한 번에 write.

: syscall 한 번엔 고정 비용이 붙는다 — user→kernel 모드 전환, CPU 캐시·TLB 오염, 커널 진입 경로. 데이터 양과 무관한 이 고정 오버헤드 × 호출 횟수가 병목이다. 1000건을 1000번 write하면 모드전환 1000번 + 락 1000번 (+네트워크면 RTT 1000번). batch로 합치면 메모리 복사(싸다)는 N번이지만 syscall·락·RTT는 1번. 디스크도 큰 덩어리 순차쓰기가 페이지 캐시에 유리. 측정: 84ms→1.4ms(61배)로, 데이터 양은 동일하고 줄어든 건 오직 호출 횟수다. 이게 **분할상환(amortization)**의 효과다.

Log.read: 컨슈머가 segment 경계를 의식 안 해도 되는 게 왜 중요한가?

cross-segment 자동 처리. offset이 segment-A에서 끝나고 B로 넘어가도 알아서 이어 읽음.

: 내부 표현과 외부 계약의 분리(캡슐화). 컨슈머는 "offset부터 maxBytes"만 알 뿐 segment가 1개인지 1000개인지, 경계가 어딘지 모른다. 만약 경계를 알아야 했다면 — ① segment 크기·롤링 정책이 클라이언트 API로 새어나가 서버가 정책을 못 바꿈, ② 클라이언트가 경계마다 재요청 로직 필요(복잡+버그), ③ retention으로 segment가 삭제되면 클라이언트 상태가 깨짐. 자동 처리 덕에 서버는 segment 크기 조절·compaction·retention을 자유롭게 하고, 클라이언트는 단순하게 유지된다.

OffsetIndex.lookup: sparse + sequential scan trade-off의 의미는?

이진 탐색으로 "그 이하 가장 가까운" 엔트리를 찾고, 거기서부터 짧게 scan.

: 공간 vs 시간 trade-off, 그리고 "메모리에 다 올라가는 것"의 승리. 모든 record를 인덱싱(dense)하면 lookup은 정확하지만 인덱스가 데이터만큼 커져 메모리에 못 올립니다. sparse(4KB 간격)는 인덱스가 수백 배 작아 통째로 mmap/page cache에 상주 → 이진 탐색이 디스크 안 타고 메모리에서 끝납니다. 대신 정확한 위치가 아닌 "근처"를 주지만, 거기서부터의 sequential scan은 indexInterval(≤4KB) 이내로 bounded이고 순차 읽기라 빠릅니다. 즉 "거의 다 와서 마지막 몇 걸음만 걷기" — 작은 scan 비용을 내주고 인덱스를 메모리에 통째로 올리는 이득을 사는 거예요.

Log.appendlock.withLock: partition 늘리면 throughput이 비례하는 이유는?

파티션 하나당 Log 인스턴스 하나, 그 안에 ReentrantLock 하나.

: 락 경계가 곧 병렬성의 경계. 락이 partition 단위로 독립이라 partition A 쓰기와 B 쓰기는 서로 다른 락 → 안 기다린다(no contention). 동시에 쓸 수 있는 writer 수 = partition 수. 경합이 없으면 CPU 코어·디스크 대역폭이 허용하는 한 선형 확장한다.

만약 broker 전역 락 하나였다면 partition을 아무리 늘려도 모든 쓰기가 직렬화되어 throughput이 고정(Amdahl의 직렬 구간). "partition = 병렬성의 단위"라는 정체성이 코드의 락 경계로 그대로 드러난 거예요.

단, '비례'의 전제: ① 자원(코어·디스크 IO)에 여유가 있어야 하고 — 물리 상한에 닿으면 더는 안 늘어남, ② key가 고르게 분산돼야 한다 — 한 partition에 쓰기가 몰리면 그 락에서 경합이 생겨 hot partition이 병목. 그래서 partition 수와 key 설계는 같이 고민해야 한다.


마치며

직접 짜본 결과 가장 크게 와닿은 건 두 가지였다.

첫째, "모든 게 로그"는 슬로건이 아니라 실제 코드 절약이다. OffsetStore__consumer_offsets 토픽을 자기 인프라(Log)로 dogfooding하는 게 새 저장소·복구 로직·durability 구현을 통째로 절약해준다. 이미 있는 추상을 한 번 더 쓰면 그만큼 새로 짤 코드가 줄어든다.

둘째, "왜 Kafka가 빠른가"의 60%는 batch에서 온다. 측정해보면 1000건 single이 84ms, 1×1000 batch가 1.4ms로 61배 차이가 났다. 데이터 양은 동일하다. 줄어든 건 오직 syscall·락·RTT 횟수. Zero-copy·page cache·순차 디스크가 모두 의미 있지만, 일단 batch부터 안 쓰면 다른 최적화의 효과를 측정조차 못 한다.

복습용으로 시각화 네 개를 다시 링크해둔다.

다음 글에서는 CQRS 깊은 이야기와 함께, Kafka가 CQRS의 Stage 3에서 어떻게 query side를 자유롭게 만드는지를 풀어본다.

#Kafka#MyKafka#Netty#Distributed#Architecture#Storage#Backend

황호민

Backend Engineer · Java/Kotlin · Spring Boot · Next.js