Kafka 깊은 이야기: MyKafka를 직접 만들며 배운 것들
왜 메시지 큐가 필요한가부터 Topic/Partition/Offset, length-prefix framing, sparse mmap index, segment rolling, batch가 61배 빠른 이유, zero-copy, Outbox 패턴까지. Kotlin/Netty로 직접 짠 MyKafka MVP 코드를 reference로 풀어 정리했다.
Kafka: 깊은 이야기
직접 짜본 결과 알게 된 것들. "Kafka가 빠르다"고 들어도 왜 빠른지는 안 와닿는다. partition, segment, sparse index, length-prefix framing을 직접 짜보면 "아, 이래서 이런 trade-off가 있구나"가 와닿는다.
들어가며: 인터랙티브 시각화 4개
이 글은 길어서 중간에 막힐 수 있다. Kafka 내부 동작이 잘 안 잡힐 때 같은 디렉터리에 올려둔 네 개의 시각화로 단계별 애니메이션을 보면 빠르게 잡힌다.
- Kafka Architecture 시각화 — Producer / Broker / Consumer / Topic / Partition 전체 구조
- Kafka Lifecycle (정적 흐름도) — 메시지 한 건의 생애 (publish → log → fetch → commit)
- Kafka Lifecycle Steps (인터랙티브 5덱) — 레코드 / 세그먼트 / 오프셋 / zero-copy / MSA 적용을 단계별로 조작
- Kafka Summary (CRC·차이 상세) — 핵심 데이터 포맷과 실 Kafka 대비 MyKafka의 차이
본문 §3과 §6에서 이 시각화로 직접 돌아온다.
목차
- §1. 왜 메시지 큐가 필요한가
- §2. Kafka의 핵심 개념 (5분 요약)
- §3. MyKafka가 구현한 것 (자세히)
- §4. 의도적으로 안 넣은 것
- §5. 티켓팅에 어떻게 적용할 것인가
- §6. Kafka가 빠른 이유 (정리)
- §7. 학습 진화 단계 (Kafka 측면)
- §8. 자주 나오는 오해
- §9. 토론 prompts 일부
- §10. 코드 reading map
§1. 왜 메시지 큐가 필요한가
동기 처리의 한계
이상적인 (아주 단순한) 예매 흐름은 이렇다.
Client → Booking API → [좌석 락 + DB INSERT + 결제] → 응답
문제는 Booking API가 DB INSERT 끝날 때까지 응답을 못 한다는 점이다.
- 트래픽 폭주 시 DB connection pool 고갈 → API thread 줄줄이 대기 → tomcat thread pool 고갈 → 클라이언트 5xx
- DB 한쪽이 느려지면 API 전체가 느려짐
비동기 처리
Client → Booking API → [좌석 락(Redis) + Kafka publish] → 응답
↓
Worker → DB INSERT (천천히)
- API는 "예약 받았어요"까지만 책임진다. 밀리초 단위 응답
- DB INSERT는 Worker가 자기 속도대로
- Kafka는 그 사이의 버퍼
메시지 큐가 주는 4가지
- 버퍼링 (back-pressure) — 생산 폭주를 소비 속도와 분리
- 재시도/내구성 — Worker가 죽어도 메시지는 큐에 남아 다음 Worker가 처리
- fan-out — 한 이벤트를 여러 소비자가 각자 다른 처리 (검색 인덱싱, 통계, 알림 등)
- 시간 분리 (decoupling) — 생산자와 소비자가 같은 시점에 살아있을 필요 없음
MSA 스케일에서: 왜 대기업이 Kafka를 쓰는가
위는 단일 서비스(예매 API) 관점이다. 서비스가 수십~수백 개로 늘어나는 MSA에서는 문제가 질적으로 달라진다.
문제
- 강결합 (동기 호출 지옥) — A→B→C→D 직접 호출 체인. D가 느려지면 A까지 줄줄이 느려지고, 한 서비스 장애가 **연쇄 장애(cascading failure)**로 전파된다. 새 소비자 붙이려면 호출하는 쪽 코드를 다 수정해야 함
- 트래픽 스파이크 — 동기 처리는 가장 느린 지점(보통 DB)이 전체 처리량을 결정. 몰리면 커넥션/스레드 풀 고갈 → 5xx
Kafka의 해결
| 문제 | Kafka가 주는 것 |
|---|---|
| 폭주에 시스템이 죽음 | 비동기 버퍼 — API는 ms 응답, 워커가 자기 속도로 소비 (back-pressure) |
| 서비스 강결합 | decoupling — 발행만 하면 됨. 소비자가 죽어도 이벤트는 토픽에 남음 |
| 새 기능마다 생산자 수정 | fan-out — 새 consumer group 추가에 기존 코드 0 수정 |
| 컨슈머 장애 시 유실 | 내구성 + replay — consume=삭제가 아니라 offset 전진. 되감아 재처리 가능 |
| 순서 vs 확장 | partition — key로 순서 보장 + 병렬로 선형 확장 |
실전 쓰임새: MSA에서 Kafka는 서비스 간 이벤트가 모이고 흐르는 중심축 역할을 한다.
- 서비스 간 이벤트 통신 (event-driven architecture)
- CDC — DB 변경을 이벤트로 흘려 캐시·검색·데이터레이크 동기화
- 로그·메트릭 수집 파이프라인
- 스트림 처리 (실시간 집계·이상탐지)
- 이벤트 소싱 — 로그가 곧 단일 진실, 전체 히스토리 재생 가능
서비스가 많아질수록 동기 직접 호출은 결합·장애전파·스파이크에 취약해진다. Kafka는 그 사이에 재생 가능한 영속 로그 버퍼를 끼워 넣어 느슨하게 결합되고 탄력적이며 확장 가능한 시스템을 만든다. 이벤트 드리븐 관점은 이벤트 드리븐 아키텍처 글을 참조.
§2. Kafka의 핵심 개념 (5분 요약)
Topic / Partition / Offset
topic: reservation-events
├── partition-0: [m0, m1, m2, m3, ...] ← append-only log
├── partition-1: [m0, m1, m2, ...]
└── partition-2: [m0, m1, m2, m3, m4, ...]
- Topic = 논리적 카테고리. "예약 이벤트들"
- Partition = 토픽을 N개로 나눈 것. 병렬성의 단위. 같은 partition 안에서는 순서 보장
- Offset = partition 안에서의 위치. 0, 1, 2, ... 단조증가
Producer
메시지를 발행한다. partition 선택 정책은 두 가지다.
- key 있으면:
hash(key) % partition_count→ 같은 key는 항상 같은 partition (순서 보장) - key 없으면: round-robin
티켓팅 예시: key = seatId로 하면 같은 좌석 이벤트는 같은 partition에 모여서 처리 순서가 보장된다.
Consumer / Consumer Group
- 여러 consumer를 같은 group으로 묶으면 partition을 나눠 처리한다. 같은 메시지를 여러 번 처리 안 함
- group이 다르면 같은 메시지를 둘 다 처리
topic: reservation-events (partitions: 3)
group: "db-writer" group: "search-indexer"
├── consumer-A → p0 ├── consumer-X → p0
├── consumer-B → p1 └── consumer-Y → p1, p2
└── consumer-C → p2
같은 메시지를 두 그룹이 각자 받는다 (DB write 따로, 검색 인덱싱 따로).
Offset Commit
"여기까지 처리했어요"를 저장한다. consumer 재시작 시 거기서부터 이어 읽는다.
- Kafka는
__consumer_offsets토픽에 자체 저장 (메타 토픽) - commit 정책: at-least-once (보통), at-most-once, exactly-once
ISR (In-Sync Replica)
- partition마다 leader 1개 + follower N개 복제
- ISR = leader를 잘 따라가고 있는 follower 집합
acks=all= 모든 ISR이 받아야 producer에게 ack- leader 죽으면 ISR 중에서 새 leader 선출 → 무손실 페일오버
→ MyKafka는 의도적으로 단일 노드라 이 부분 미구현. 학습 거리다.
§3. MyKafka가 구현한 것 (자세히)
구조 전체를 한 번에 보고 싶다면 Kafka Architecture 시각화부터 보면 좋다. 메시지 한 건의 생애 흐름은 Lifecycle 정적 흐름도와 인터랙티브 5덱에서 단계별로 조작할 수 있다.
3.0 패키지 구조 한 줄씩
mykafka/
├── Main.kt # 부팅 진입점, CLI 인자 파싱
├── server/
│ ├── BrokerServer.kt # Netty TCP 서버 (boss/worker EventLoopGroup)
│ └── RequestRouter.kt # Frame → ApiKey별 핸들러 분기
├── protocol/
│ ├── Frame.kt # (ApiKey + payload) 한 단위
│ ├── ApiKey.kt # enum: 5개 ApiKey
│ └── FrameCodec.kt # length-prefix encode/decode (Netty handler)
├── log/
│ ├── Record.kt # 메모리상 record (offset, ts, key, value)
│ ├── RecordCodec.kt # Record ↔ ByteArray + CRC32
│ ├── OffsetIndex.kt # sparse index, mmap 기반
│ ├── Segment.kt # 단일 .log + .index = 한 segment
│ └── Log.kt # 한 파티션 (다수 segment 관리)
└── topic/
├── LogManager.kt # 모든 토픽×파티션 관리 + partitioner
└── OffsetStore.kt # consumer offset 영속화 (자체 토픽 사용)
전체 ~470줄. 핵심은 log/와 topic/이다. server/protocol은 wire 처리.
3.1 Wire Protocol: Length-prefix Framing
모든 메시지는 길이 접두로 framed된다.
[ 4 bytes totalLength ][ 1 byte apiKey ][ payload (totalLength-1 bytes) ]
- totalLength:
apiKey(1) + payload길이. length 필드 4바이트는 제외 - apiKey: 1바이트 enum 식별자
왜 length-prefix? TCP는 바이트 스트림이다. 메시지 경계가 없다.
- HTTP는 newline+헤더로 해결 → 파싱 복잡, 텍스트 오버헤드 큼
- length-prefix는 바이트 단순 비교 → 빠르고 정확. 실제 Kafka도 동일 구조
FrameDecoder의 정확한 동작 (FrameCodec.kt)
Netty ByteToMessageDecoder를 상속한다. 한 번의 decode() 호출에서:
readableBytes() < 4→ 길이 헤더도 못 읽음. 반환(대기)totalLength = input.readInt()→ 그만큼 안 모이면resetReaderIndex()후 반환(대기)apiKey = input.readByte()→ 1바이트payload = input.readRetainedSlice(totalLength - 1)→ 슬라이스 + retainout.add(Frame(apiKey, payload))
ByteToMessageDecoder의 핵심 이점은 부분 도착 자동 처리다. 1MB짜리 frame이 TCP MSS 단위로 쪼개져 와도 알아서 누적해주고, 다 모이면 decode를 호출한다.
→ 직접 짜면 partial-read 버그(읽기 시작 후 부족하면 어디까지 읽었는지 잃어버림) 잡기 어렵다.
3.2 ApiKey 5개: 각각의 wire format
protocol/ApiKey.kt:
enum class ApiKey(val code: Byte) {
PRODUCE(0),
FETCH(1),
CREATE_TOPIC(2),
COMMIT_OFFSET(3),
FETCH_OFFSET(4);
}실제 Kafka는 수십 개 (FetchSession, JoinGroup, Heartbeat, OffsetFetch v2…). MVP는 본질 5개만.
CREATE_TOPIC
req : [topicLen:2][topic][partitionCount:4]
resp: [status:1] 0=OK, 1=ALREADY_EXISTS, 2=INVALID
간단하다. broker는 디렉토리 만들고 Log 인스턴스 N개 생성.
PRODUCE (batch)
req : [topicLen:2][topic][partition:4][recordCount:4]
for each record: [keyLen:4][key][valLen:4][value]
- partition == -1 → broker가 partitioner로 결정 (첫 record의 key 사용)
- keyLen == -1 → null key
resp: [errCode:1][partition:4][baseOffset:8][count:4]
- 개별 offset = baseOffset, baseOffset+1, …, baseOffset+count-1
핵심 invariant: 한 batch의 N record는 **모두 같은 (topic, partition)**이다. 실제 Kafka도 동일하게 클라이언트가 partition별로 미리 grouping해서 보낸다.
FETCH (pull)
req : [topicLen:2][topic][partition:4][offset:8][maxBytes:4]
resp: [errCode:1][recordCount:4][record1 bytes][record2 bytes]…
record는 self-delimiting (RecordCodec 포맷 그대로). client는 N번 decode.
진행 보장: record 1건이 maxBytes보다 커도 1건은 무조건 포함. 안 그러면 컨슈머 영원히 막힘. Kafka 동일 invariant.
COMMIT_OFFSET / FETCH_OFFSET
COMMIT_OFFSET:
req : [groupLen:2][group][topicLen:2][topic][partition:4][offset:8]
resp: [errCode:1]
FETCH_OFFSET:
req : [groupLen:2][group][topicLen:2][topic][partition:4]
resp: [errCode:1][offset:8] offset=-1 → 미커밋
둘 다 내부 토픽 __consumer_offsets에 저장. broker 재시작에도 살아남습니다 (§3.8 참조).
3.3 Record 포맷: 한 메시지의 정확한 바이트
log/RecordCodec.kt:
[offset:8][timestamp:8][keyLen:4][key:keyLen][valueLen:4][value:valueLen][crc:4]
offset: broker가 부여. 클라이언트가 못 정함keyLen == -1→ null key (key부분 생략)crc:offset시작부터value끝까지의 CRC32. 디스크 corruption 검증
CRC 계산 (encode)
val crc = CRC32()
crc.update(buf.array(), 0, buf.position()) // 헤더+payload 전체
buf.putInt(crc.value.toInt())CRC 검증 (decode)
decode 시 같은 범위로 다시 CRC 계산 → 저장된 값과 비교. 불일치면 error("CRC mismatch") → 호출자가 corruption으로 판단.
실 Kafka와 차이: MyKafka는 record별 CRC32다. 실 Kafka(메시지 포맷 v2, 0.11+)는 batch별 CRC32C(Castagnoli, CPU HW 가속)이고, CRC 범위에서
baseOffset·leaderEpoch를 제외한다 → broker가 offset을 부여해도 CRC 재계산이 불필요 → zero-copy(§6) 성립의 전제다.CRC는 "데이터가 저장 시점과 달라졌나"(disk bit rot 등 무결성)만 검출한다. 극히 드물게 깨진 데이터가 같은 CRC를 내는 충돌(~1/2³²)은 통과되어 silent corruption이 되지만, single-bit·짧은 burst 오류는 100% 검출. 악의적 변조는 막지 못함(그건 해시·서명의 영역).
CRC 위치와 검증 흐름은 Kafka Summary 시각화에서 시각적으로 확인할 수 있다.
부분 읽기 처리
decode는 buf 안에 record가 다 안 모였으면 null 반환 + buf position을 시작 위치로 복원. 호출자는 "truncated tail"로 판단해 거기서 멈춥니다.
3.4 OffsetIndex: Sparse + Mmap
log/OffsetIndex.kt. 한 segment에 하나씩 딸려 있다.
엔트리 포맷
[relativeOffset:4][filePosition:4] ← 8바이트 entry
- relativeOffset =
recordOffset - segment.baseOffset. 4바이트면 segment 안에 최대 2³¹ record 표현 가능 — 충분 - filePosition =
.log파일 내 byte 위치
Sparse 정책
모든 record를 인덱싱하지 않는다. Segment.append() 안에서:
if (lastIndexedPos < 0 || pos - lastIndexedPos >= indexIntervalBytes) {
index.append((offset - baseOffset).toInt(), pos.toInt())
lastIndexedPos = pos
}- 첫 record는 무조건 인덱싱 (탐색 시작점 보장)
- 그 이후는
indexIntervalBytes(기본 4KB) 이상 벌어졌을 때만
Lookup 알고리즘 (이진 탐색)
fun lookup(targetRelativeOffset: Int): IntArray {
var lo = 0; var hi = entryCount - 1; var bestIdx = -1
while (lo <= hi) {
val mid = (lo + hi) ushr 1
val midOff = mmap.getInt(mid * ENTRY_SIZE)
if (midOff <= targetRelativeOffset) {
bestIdx = mid; lo = mid + 1
} else hi = mid - 1
}
// ... bestIdx 에서 (rel, pos) 반환
}target 이하 가장 큰 엔트리 찾기. 거기서부터 짧은 sequential scan으로 정확한 offset 도달.
Mmap 사용 이유
mmap = channel.map(FileChannel.MapMode.READ_WRITE, 0, fileSize)- 인덱스는 작고 자주 읽힘 → OS 페이지 캐시에 위임
- read/write syscall 없이 메모리처럼 접근
mmap.getInt(offset)→ 그냥 메모리 fetch
Pre-allocate
파일 사이즈를 시작 시 maxEntries * 8만큼 미리 truncate. 추후 append할 때 파일 크기 변경 syscall 없음.
자기 완결성
인덱스가 깨져도 .log만 멀쩡하면 시작 시 완전 재구축된다. 검증에서 모든 .index를 0으로 깨도 정상 동작한다.
fun rebuildIndex(truncateTail: Boolean): Long {
index.reset()
scan { rec, startPos -> if (조건) index.append(...) }
// 마지막 segment면 부분 쓰기된 trailing 자르기
}3.5 Segment: 단일 (.log + .index)
log/Segment.kt. 파일명 규약:
00000000000000000000.log ← %020d.format(baseOffset)
00000000000000000000.index
00000000000000001000.log ← 1000 offset부터 새 segment
00000000000000001000.index
파일명 = baseOffset → 디렉토리 자체가 인덱스. 임의 offset이 어느 파일에 있는지 파일명만으로 이진 탐색이 가능하다.
append (단건)
fun append(offset: Long, bytes: ByteArray) {
val pos = sizeBytes
channel.position(pos)
channel.write(ByteBuffer.wrap(bytes))
sizeBytes += bytes.size
// sparse 인덱스 갱신 (§3.4)
}호출자(=Log)가 락을 잡았다고 가정한다. FileChannel.write() 한 번 = 1 syscall.
appendBatch: 한 번의 write로 N record
fun appendBatch(baseOffsetOfBatch: Long, recordBytes: List<ByteArray>) {
val totalLen = recordBytes.sumOf { it.size }
val combined = ByteBuffer.allocate(totalLen)
for (b in recordBytes) combined.put(b)
combined.flip()
channel.write(combined) // ← 1 syscall로 모두 기록
// 인덱스 엔트리는 record 단위로 갱신
}핵심: N개 record를 메모리에서 합쳐 한 번에 write. syscall 수 = 1.
readFrom: pull-based 핵심
fun readFrom(startOffset: Long, maxBytes: Int): Pair<List<Record>, Int> {
val rel = (startOffset - baseOffset).toInt()
val (_, startPos) = index.lookup(rel).let { it[0] to it[1] }
// startPos 부터 sizeBytes 끝까지 한 번에 읽어서 메모리 buf
// buf 에서 RecordCodec.decode 반복
// startOffset 이전은 skip, maxBytes 누적 초과하면 break
// 단, 첫 record는 항상 포함 (진행 보장)
}scan: recovery용 전체 순회
fun scan(onRecord: (Record, Long) -> Unit): Long {
// .log 처음부터 끝까지 record 단위로 decode
// CRC 깨진 record가 나오면 그 직전 byte 위치 반환
return lastGoodPos
}rebuildIndex: 시작 시 복구
- index reset
.log전체 scan → sparse 규칙대로 index 재구성- 활성 segment면 마지막 정상 위치까지만 truncate (부분 쓰기된 꼬리 자르기)
nextOffset= 마지막 record offset + 1
이게 "꼬리 자르기 복구"다. 전원 나가 부분 쓰기된 trailing record는 신뢰하지 않고 잘라낸다 (실 Kafka도 동일).
3.6 Log: 한 파티션 = 다수 segment 관리
log/Log.kt. 파티션 하나당 인스턴스 하나.
class Log(
private val dir: Path,
private val maxSegmentBytes: Long = 1MB, // 기본 1MB (학습용)
private val indexIntervalBytes: Int = 4KB,
) {
private val lock = ReentrantLock() // 파티션당 한 락
private val segments: MutableList<Segment> = mutableListOf()
@Volatile private var nextOffset: Long = 0
}시작 시
loadSegments()— 디렉토리에서*.log파일들 발견 → baseOffset 기준 정렬recoverAll()— 각 segmentrebuildIndex(truncateTail = isActive)nextOffset= 활성 segment의 nextOffset
append (단건)
fun append(key: ByteArray?, value: ByteArray, timestamp: Long): Long =
lock.withLock {
maybeRoll() // 활성 segment 크기 검사
val offset = nextOffset
val bytes = RecordCodec.encode(offset, timestamp, key, value)
activeSegment().append(offset, bytes)
nextOffset = offset + 1
offset
}Broker가 offset 부여한다. 클라이언트가 못 정함 — 두 producer가 같은 partition에 쓸 때 충돌 방지.
maybeRoll: segment 분할 트리거
private fun maybeRoll() {
val active = activeSegment()
if (active.sizeBytes < maxSegmentBytes) return
val newBase = nextOffset
segments.add(newSegment(newBase)) // 새 파일 생성
}왜 분할?
- Retention 삭제 O(1) — segment 통째
unlink - 활성 segment만 쓰기, 나머지는 read-only → mmap 최적화 유리
find / read: 이진 탐색으로 segment 결정
private fun segmentIndexFor(offset: Long): Int? {
// segments는 baseOffset 오름차순 정렬 → 이진 탐색
// target 이하의 가장 큰 baseOffset 가진 segment 반환
}
fun read(startOffset: Long, maxBytes: Int): List<Record> {
var idx = segmentIndexFor(startOffset)
while (idx <= segments.lastIndex) {
val (recs, used) = segments[idx].readFrom(off, budget)
if (recs.isEmpty()) break
out.addAll(recs); bytesUsed += used; off = recs.last().offset + 1
idx++ // ← cross-segment 자동
}
}Cross-segment FETCH: 시작 offset이 segment-A에 있고 다음 record가 segment-B에 있어도 자동으로 넘어간다. 컨슈머는 segment 경계를 의식할 필요가 없다.
3.7 LogManager: Topic × Partition × Partitioner
topic/LogManager.kt. 한 브로커가 책임지는 모든 토픽 관리.
디렉토리 = 단일 진실 출처
<rootDir>/
<topicName>/
0/ ← partition 0
1/ ← partition 1
별도 메타 파일이 없다. 디렉토리 트리 자체가 토픽 목록 + 파티션 수 + segment 목록이다. 실 Kafka도 거의 동일하다 (<topic>-<partition>/ 단일 디렉토리지만 의미는 같음).
Partitioner: key 있으면 sticky hash
fun pickPartition(topic: String, key: ByteArray?): Int? {
val t = topics[topic] ?: return null
val n = t.partitions.size
return if (key == null) t.nextRoundRobin(n)
else Math.floorMod(key.contentHashCode(), n)
}- key 있음:
floorMod(hash(key), N)→ 같은 key는 항상 같은 partition → 순서 보장 - key 없음:
AtomicIntegerround-robin → 균등 분산
파티션마다 별도 Log → 락 충돌 0
Log 안에서만 락. 파티션 A 쓰기와 파티션 B 쓰기는 서로 안 기다림. "파티션 = 병렬성의 단위" 정체성을 코드로 표현한 거예요.
3.8 OffsetStore: "모든 게 로그"의 자기 dogfooding
topic/OffsetStore.kt. consumer group offset 영속화.
어디에 저장?
새 저장소 만들지 않는다. broker가 이미 가진 메커니즘(Log)을 그대로 사용:
- 내부 토픽 이름:
__consumer_offsets(1 파티션) - record key =
encode(group, topic, partition) - record value =
offset (8B big-endian)
시작 시 cache 재구축
init {
logManager.createTopic("__consumer_offsets", 1)
val partition = logManager.getPartition("__consumer_offsets", 0)!!
for (rec in partition.readAll()) {
val k = decodeKey(rec.key!!)
cache[k] = ByteBuffer.wrap(rec.value).long
}
log.info("loaded $count commits (${cache.size} unique keys)")
}시작 시 내부 토픽을 처음부터 끝까지 scan → ConcurrentHashMap cache 재구축.
COMMIT
fun commit(group, topic, partition, offset) {
val keyBytes = encodeKey(group, topic, partition)
val valueBytes = ByteBuffer.allocate(8).putLong(offset).array()
logManager.getPartition("__consumer_offsets", 0)!!
.append(keyBytes, valueBytes) // 그냥 또 다른 append
cache[Key(group, topic, partition)] = offset
}append + cache 업데이트. 끝.
왜 이게 우아한가
- 단일 메커니즘 재활용 — 새 코드는 key/value 인코딩 + cache뿐. Log.append/readAll/recovery 그대로
- Durability 동일 — 사용자 메시지와 같은 append-only + recovery 모델
- Log compaction의 자연스러운 의미 — 같은 (g,t,p) key가 반복 commit되면 옛 record는 garbage. compaction 도입 시 최신값만 살아남음 (미구현이지만 의미는 이미 있음)
- 검증: 5번 commit한 group이 unique key 2개 → 옛 record 3개는 "역사적 정보". 재시작 후
loaded 5 commits (2 unique keys)정확히 출력
3.9 검증된 동작
| 시나리오 | 결과 |
|---|---|
| PRODUCE 라운드트립, monotonic offset | ✓ |
| 재시작 후 nextOffset 정확 복구 | ✓ |
| 부분 쓰기 → 꼬리 자르기 복구 | ✓ |
| Segment 롤링 (파일명 = baseOffset) | ✓ |
| Multi-segment recovery | ✓ |
| Sparse mmap index, 임의 offset 빠르게 find | ✓ |
| 모든 .index 0으로 깨도 .log에서 재구축 | ✓ |
| 같은 key sticky partition | ✓ |
| 30 keys → 3 partitions 10/10/10 분산 | ✓ |
| null key 엄격 round-robin | ✓ |
| 토픽/파티션 디렉토리 자동 복구 | ✓ |
| Batch PRODUCE all-or-none | ✓ |
| 1000건 single vs 1×1000 batch → 61x speedup | ✓ |
| Cross-segment FETCH | ✓ |
maxBytes=1 진행 보장 (1건은 무조건) | ✓ |
| Consumer loop (25 round, 50건 수신, self-offset) | ✓ |
__consumer_offsets 자동 생성 | ✓ |
| 5 commits → unique key 2개, 최신 우선 | ✓ |
| 재시작 → 컨슈머 group이 정확히 마지막 commit에서 이어 시작 | ✓ |
61x speedup의 정확한 숫자
- 1000회 단건 PRODUCE: 84ms (RTT 1000번, 락 1000번, syscall 1000번)
- 1회 1000-record batch: 1.4ms (RTT 1번, 락 1번, syscall 1번)
- → 61배 throughput 차이
Kafka가 수십만 msgs/sec를 내는 핵심 이유가 여기 있다. 데이터 양은 같다.
3.10 직접 짜본 결과 알게 된 통찰
코드 짜기 전에는 "그렇구나"였지만 직접 짜보고서 와닿은 것들이다.
- "모든 게 로그"는 정말 강력하다.
OffsetStore가 별도 저장소 없이 자기 자신을 dogfooding하는 게 추상적인 슬로건이 아니라 실제 코드 절약이다. 새 코드는 key/value 인코딩 + cache뿐 - mmap이 마법이 아닙니다. 인덱스가 작아서 효과가 있는 거지, 큰 파일 mmap 하면 페이지 fault 폭발. 작은 데이터에 자주 접근하는 케이스에만 적합
- CRC가 늦게 잡힙니다. Record decode 시 CRC 검증을 하지만, 부분 쓰기는 decode 자체가
null반환으로 잡힘. CRC는 disk corruption(드물지만 치명적) 용. 두 보호막이 다른 위협을 막음 - 락은 작게.
Log.append안에만 락. partition 간에는 락 X. partition 수를 늘리면 throughput이 정확히 비례 - 클라이언트가 offset 들고 다니는 비대칭의 깔끔함. broker가 컨슈머 상태를 안 갖는다 = 컨슈머 수에 비례한 broker 상태 폭발 없음. 컨슈머 N억 명도 broker는 그대로
- 파일명을 baseOffset으로 쓴다. 메타 파일 없이 디렉토리만으로 segment 위치를 결정한다.
find data -name "*.log"가 곧 toc 역할을 한다.
§4. 의도적으로 안 넣은 것
| 영역 | MVP에서 생략 | 다음 단계 학습 포인트 |
|---|---|---|
| Replication | 단일 노드 | Leader/Follower + ISR + acks=all |
| Controller | 메타 단순 관리 | KRaft (Raft 기반 controller, Zookeeper 제거) |
| Log compaction | sparse만 구현 | 같은 key 옛 record 삭제 — __consumer_offsets 정리에 즉시 활용 |
| Exactly-once | at-least-once | producer idempotence + transactions |
| Zero-copy | byte copy | sendfile() / Netty FileRegion |
| Group coordination | 단일 group string | rebalance, member tracking, sticky assignment |
| Durability tuning | OS flush 의존 | channel.force() 정책 (per-request / per-N / time-based) |
각 항목 모두 학습 거리다. 부재가 한계로 드러나면 그때 구현한다.
§5. 티켓팅에 어떻게 적용할 것인가
5.1 Worker 패턴
Booking API ──PRODUCE──> Kafka(reservation-events)
│
└──FETCH──> Worker ──INSERT──> RDB
발행 시점
좌석 락(Redis) + 결제 검증 통과 직후. DB INSERT 전이다.
// command service의 의사 코드
@Transactional // Redis 락만 잡고 짧게 끝
fun reserve(req: ReserveRequest): ReservationToken {
val seat = redis.acquireSeatLock(req.seatId) // 짧음
val token = ReservationToken(uuid())
kafka.publish("reservation-events", req.seatId, ReservationCreated(token, req.userId, req.seatId))
return token // 응답 즉시
}소비 시점
Worker는 자기 속도대로 consume → DB INSERT.
// worker service 의사 코드
consumer.subscribe("reservation-events")
while (true) {
val records = consumer.poll(timeoutMs = 100)
for (r in records) {
db.insertReservation(r.value)
consumer.commitOffset(r.offset)
}
}5.2 왜 key=seatId 인가
- 같은 좌석에 대한 이벤트는 항상 같은 partition으로 → 처리 순서 보장
- 예:
ReservationCreated(seat=A-1)다음ReservationCancelled(seat=A-1). 순서 바뀌면 DB 상태 깨짐 - 다른 좌석 이벤트는 자유롭게 다른 partition → 병렬성
5.3 Outbox 패턴 (학습 거리)
문제: command service가
1. DB에 reservation 행 INSERT
2. Kafka에 ReservationCreated publish
1만 성공하고 2 실패하면? 또는 반대?
→ outbox 테이블 도입:
1. (한 트랜잭션에) DB에 reservation 행 INSERT + outbox 행 INSERT
2. 별도 outbox-relay가 outbox 폴링 → Kafka publish → outbox 행 삭제
DB 트랜잭션 하나로 원자성 보장. Kafka 발행은 최소 한 번(at-least-once)이 보장된다.
→ 정확히는 우리 다이어그램은 Booking API가 DB INSERT를 안 한다. Worker가 한다. 그래서 outbox는 다른 상황(예: 예매 취소 시 환불 알림 발행)에서 학습 거리다.
§6. Kafka가 빠른 이유 (정리)
흔히 "Kafka는 빠르다"고만 말한다. 왜 빠른가:
- 순차 디스크 쓰기 (append-only log) — 랜덤 access 대비 HDD/SSD 모두 압도적으로 빠름. 메모리에 가까운 속도
- Page cache 활용 — OS 페이지 캐시가 hot data를 메모리에 유지. JVM 힙 부담 X
- Zero-copy (
sendfile()) — consumer fetch 시 kernel-space → socket 직접 전송. user-space 복사 생략 - Batch 처리 — producer가 메시지를 모아 한 번에 전송. 네트워크 RTT 분할상환
- Compression (선택) — batch를 압축해서 전송. gzip/snappy/lz4/zstd
- Sparse index — 인덱스 자체가 작아 메모리에 다 올라감. 인덱스 lookup도 빠름
- Partition별 병렬화 — partition 수만큼 producer/consumer 병렬
Zero-copy 자세히 (전통 경로 vs sendfile)
"왜 빠른가"의 핵심이라 따로 정리한다.
전통적 경로 (read() + send(), 복사 4번):
디스크 ─DMA─▶ page cache ─CPU─▶ 앱(user) 버퍼 ─CPU─▶ socket 버퍼 ─DMA─▶ NIC
① ② ③ ④
- ② page cache→앱 버퍼, ③ 앱 버퍼→socket 버퍼가 CPU 복사(비쌈). user-space로 올렸다 다시 내림 = 순수 낭비 + JVM heap 거쳐 GC 부담. 모드 전환도 4번
Zero-copy (sendfile() = Java FileChannel.transferTo(), 복사 2번):
디스크 ─DMA─▶ page cache ─DMA(scatter-gather)─▶ NIC
① ②
- user-space를 안 거침. socket 버퍼엔 위치+길이 디스크립터만 넘기고 NIC가 page cache에서 직접 DMA로 가져감
- "zero" = CPU 복사 0번 (남은 DMA 2번은 하드웨어라 CPU 안 씀). 복사 4→2, syscall 2→1, JVM heap 안 건드림(GC 부담 0) — 이 셋이 합쳐져 빨라집니다
왜 Kafka가 이게 되나 — broker가 데이터를 변형 없이 저장 포맷 그대로 흘려보내기 때문이다. CRC가 offset을 안 덮어서(§3.3) offset 부여해도 재계산 불필요, 압축도 그대로 전달 → user-space로 끌어올릴 이유가 없다.
한계 — TLS 암호화를 켜면 user-space 암호화 때문에 복사가 부활 → zero-copy 깨짐. 재압축·포맷 변환도 동일. 보안 vs 성능 트레이드오프.
→ MyKafka는 fetch 응답을 재직렬화(byte copy)하므로 미구현이다.
우리 MyKafka가 검증한 것
- (1) (4) (6) 구현 완료. 1×1000 batch가 1000×single 대비 61배 빠름 측정
- (3) 미구현. byte copy로 함. → "Zero-copy 도입 시 얼마나 더 빨라지나"가 다음 측정 거리
- (2) JVM 위라 OS page cache 자동
§7. 학습 진화 단계 (Kafka 측면)
[현재] MyKafka MVP (단일 노드, client SDK 부재)
↓ client SDK 구현
[1] Producer / Consumer 라이브러리 → ticket-command가 publish
↓ Worker 도입
[2] Worker service 신규 → consume → DB INSERT (비동기 영속화)
↓ 부하 측정으로 효과 검증
[3] partition 수 늘리며 처리량 변화 측정
↓ 신뢰성 보강
[4] Replication 구현 (Leader/Follower + ISR + acks=all)
↓ 운영성
[5] Log compaction (consumer_offsets 정리)
↓ 정확성
[6] Producer idempotence + Transactions (exactly-once)
↓ 성능
[7] Zero-copy (`FileRegion` Netty)
각 단계마다 무엇을 측정해서 효과를 보여줄 것인지 미리 정해두는 게 학습 핵심이다.
§8. 자주 나오는 오해
| 오해 | 진실 |
|---|---|
| "Kafka는 메시지 큐다" | 정확히는 distributed commit log. 큐처럼 쓰지만 "consume = 삭제"가 아니라 "consume = offset 전진". 같은 메시지를 N번 다시 읽을 수 있음 |
| "Kafka는 빠르니까 어디나 쓰자" | 1KB 미만 짧은 메시지에는 오히려 RabbitMQ가 latency 낮을 수 있음. Kafka는 throughput 최적화 |
| "exactly-once 보장하니까 안전" | exactly-once는 Kafka 내부 + producer side 한정. consumer 측 DB INSERT까지는 보장 X. 우리가 처리(idempotent consumer) 해야 함 |
| "partition 늘리면 무조건 빠름" | partition 수 = consumer 병렬도 상한. consumer 수 < partition 수면 idle consumer 생김. 그리고 partition은 한번 늘리면 줄이기 어려움 |
| "offset commit은 자동" | auto-commit은 데이터 손실 위험. 처리 완료 후 manual commit이 일반적 |
§9. 토론 prompts 일부
- 우리 티켓팅에 partition을 몇 개로 잡을까? 그 근거는?
- key=seatId 외에 다른 키 후보는? (userId? eventId?)
- Worker가 죽어서 처리 못 한 메시지는 어떻게 되나? (offset 안 커밋되면)
- 같은 메시지를 두 번 처리해도 안전한 코드(idempotent)는 어떻게 짜나?
- replica가 lag 100MB라면 leader가 죽었을 때 어떻게 해야 하나? (acks 정책)
§10. 코드 reading map
MyKafka 코드 읽을 때 권장 순서. 한 호흡에 한 단계.
[1] protocol/Frame.kt + FrameCodec.kt "메시지 경계 어떻게 잡나?"
└─ length-prefix framing의 5줄짜리 본질
[2] log/Record.kt + RecordCodec.kt "한 메시지의 정확한 바이트는?"
└─ CRC 위치와 검증 흐름
[3] log/OffsetIndex.kt "왜 sparse + mmap인가?"
└─ lookup 이진 탐색 + sequential scan의 결합
[4] log/Segment.kt — append/readFrom/scan "한 segment의 라이프사이클"
└─ rebuildIndex(truncateTail)가 꼬리 자르기
[5] log/Log.kt — maybeRoll/find/read "여러 segment 어떻게 관리?"
└─ segmentIndexFor 이진 탐색 + cross-segment
[6] topic/LogManager.kt — pickPartition "partition 어떻게 정해?"
└─ floorMod(hash, n) vs round-robin
[7] topic/OffsetStore.kt "오프셋도 그냥 또 다른 로그"
└─ 자기 dogfooding의 우아함
[8] server/RequestRouter.kt "ApiKey 분기 끝"
└─ handleProduce / handleFetch 흐름 따라가기
[1][2]가 wire/포맷. [3][5]가 디스크 추상. [6]~[8]이 broker. 각 층이 다음 층의 invariant를 깔끔히 가정해서 위에 쌓는 구조다.
토론하면 좋은 코드 포인트 4개
Segment.appendBatch: 왜 syscall 수가 throughput을 지배하나?
N record를 메모리에서 합쳐 한 번에 write.
답: syscall 한 번엔 고정 비용이 붙는다 — user→kernel 모드 전환, CPU 캐시·TLB 오염, 커널 진입 경로. 데이터 양과 무관한 이 고정 오버헤드 × 호출 횟수가 병목이다. 1000건을 1000번 write하면 모드전환 1000번 + 락 1000번 (+네트워크면 RTT 1000번). batch로 합치면 메모리 복사(싸다)는 N번이지만 syscall·락·RTT는 1번. 디스크도 큰 덩어리 순차쓰기가 페이지 캐시에 유리. 측정: 84ms→1.4ms(61배)로, 데이터 양은 동일하고 줄어든 건 오직 호출 횟수다. 이게 **분할상환(amortization)**의 효과다.
Log.read: 컨슈머가 segment 경계를 의식 안 해도 되는 게 왜 중요한가?
cross-segment 자동 처리. offset이 segment-A에서 끝나고 B로 넘어가도 알아서 이어 읽음.
답: 내부 표현과 외부 계약의 분리(캡슐화). 컨슈머는 "offset부터 maxBytes"만 알 뿐 segment가 1개인지 1000개인지, 경계가 어딘지 모른다. 만약 경계를 알아야 했다면 — ① segment 크기·롤링 정책이 클라이언트 API로 새어나가 서버가 정책을 못 바꿈, ② 클라이언트가 경계마다 재요청 로직 필요(복잡+버그), ③ retention으로 segment가 삭제되면 클라이언트 상태가 깨짐. 자동 처리 덕에 서버는 segment 크기 조절·compaction·retention을 자유롭게 하고, 클라이언트는 단순하게 유지된다.
OffsetIndex.lookup: sparse + sequential scan trade-off의 의미는?
이진 탐색으로 "그 이하 가장 가까운" 엔트리를 찾고, 거기서부터 짧게 scan.
답: 공간 vs 시간 trade-off, 그리고 "메모리에 다 올라가는 것"의 승리. 모든 record를 인덱싱(dense)하면 lookup은 정확하지만 인덱스가 데이터만큼 커져 메모리에 못 올립니다. sparse(4KB 간격)는 인덱스가 수백 배 작아 통째로 mmap/page cache에 상주 → 이진 탐색이 디스크 안 타고 메모리에서 끝납니다. 대신 정확한 위치가 아닌 "근처"를 주지만, 거기서부터의 sequential scan은 indexInterval(≤4KB) 이내로 bounded이고 순차 읽기라 빠릅니다. 즉 "거의 다 와서 마지막 몇 걸음만 걷기" — 작은 scan 비용을 내주고 인덱스를 메모리에 통째로 올리는 이득을 사는 거예요.
Log.append의 lock.withLock: partition 늘리면 throughput이 비례하는 이유는?
파티션 하나당 Log 인스턴스 하나, 그 안에 ReentrantLock 하나.
답: 락 경계가 곧 병렬성의 경계. 락이 partition 단위로 독립이라 partition A 쓰기와 B 쓰기는 서로 다른 락 → 안 기다린다(no contention). 동시에 쓸 수 있는 writer 수 = partition 수. 경합이 없으면 CPU 코어·디스크 대역폭이 허용하는 한 선형 확장한다.
만약 broker 전역 락 하나였다면 partition을 아무리 늘려도 모든 쓰기가 직렬화되어 throughput이 고정(Amdahl의 직렬 구간). "partition = 병렬성의 단위"라는 정체성이 코드의 락 경계로 그대로 드러난 거예요.
단, '비례'의 전제: ① 자원(코어·디스크 IO)에 여유가 있어야 하고 — 물리 상한에 닿으면 더는 안 늘어남, ② key가 고르게 분산돼야 한다 — 한 partition에 쓰기가 몰리면 그 락에서 경합이 생겨 hot partition이 병목. 그래서 partition 수와 key 설계는 같이 고민해야 한다.
마치며
직접 짜본 결과 가장 크게 와닿은 건 두 가지였다.
첫째, "모든 게 로그"는 슬로건이 아니라 실제 코드 절약이다. OffsetStore가 __consumer_offsets 토픽을 자기 인프라(Log)로 dogfooding하는 게 새 저장소·복구 로직·durability 구현을 통째로 절약해준다. 이미 있는 추상을 한 번 더 쓰면 그만큼 새로 짤 코드가 줄어든다.
둘째, "왜 Kafka가 빠른가"의 60%는 batch에서 온다. 측정해보면 1000건 single이 84ms, 1×1000 batch가 1.4ms로 61배 차이가 났다. 데이터 양은 동일하다. 줄어든 건 오직 syscall·락·RTT 횟수. Zero-copy·page cache·순차 디스크가 모두 의미 있지만, 일단 batch부터 안 쓰면 다른 최적화의 효과를 측정조차 못 한다.
복습용으로 시각화 네 개를 다시 링크해둔다.
다음 글에서는 CQRS 깊은 이야기와 함께, Kafka가 CQRS의 Stage 3에서 어떻게 query side를 자유롭게 만드는지를 풀어본다.