-->

[Kafka] Kafka cli 명령어

 

Kafka Command 목록

$ cd confluent-7.1.2/bin/
$ ls
connect-distributed          kafka-features                      kafka-rest-stop                  ksql-run-class
connect-mirror-maker         kafka-get-offsets                   kafka-rest-stop-service          ksql-server-start
connect-standalone           kafka-json-schema-console-consumer  kafka-run-class                  ksql-server-stop
kafka-acls                   kafka-json-schema-console-producer  kafka-server-start               ksql-stop
kafka-avro-console-consumer  kafka-leader-election               kafka-server-stop                ksql-test-runner
kafka-avro-console-producer  kafka-log-dirs                      kafka-storage                    schema-registry-run-class
kafka-broker-api-versions    kafka-metadata-shell                kafka-streams-application-reset  schema-registry-start
kafka-cluster                kafka-mirror-maker                  kafka-topics                     schema-registry-stop
kafka-configs                kafka-preferred-replica-election    kafka-transactions               schema-registry-stop-service
kafka-console-consumer       kafka-producer-perf-test            kafka-verifiable-consumer        trogdor
kafka-console-producer       kafka-protobuf-console-consumer     kafka-verifiable-producer        windows
kafka-consumer-groups        kafka-protobuf-console-producer     ksql                             zookeeper-security-migration
kafka-consumer-perf-test     kafka-reassign-partitions           ksql-datagen                     zookeeper-server-start
kafka-delegation-tokens      kafka-replica-verification          ksql-migrations                  zookeeper-server-stop
kafka-delete-records         kafka-rest-run-class                ksql-print-metrics               zookeeper-shell
kafka-dump-log               kafka-rest-start                    ksql-restore-command-topic

kafka-topics

# topic 생성
$ kafka-topics --bootstrap-server localhost:9092 --create --topic test_topic_01
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. 
To avoid issues it is best to use either, but not both. # topic name에 마침표나 언더스코어를 사용하면 Warning이 발생한다.
Created topic test_topic_01.

# 생성한 topic 확인
$ kafka-topics --bootstrap-server localhost:9092 --list
test_topic_01
welcome-topic # 저번에 만들어 둔 토픽

# topic 삭제
$ kafka-topics --bootstrap-server localhost:9092 --delete --topic test_topic_01

# 3개의 Partitions를 갖는 topic 생성 (설정을 주지 않으면 default partition은 1개 $KAFKA_HOME/etc/kafka/server.properties 안의 num.partitions=1)
$ kafka-topics --bootstrap-server localhost:9092 --create --topic test-topic-02 --partitions 3
Created topic test-topic-02.

# 생성한 topic 확인
$ kafka-topics --bootstrap-server localhost:9092 --describe --topic test-topic-02
Topic: test-topic-02    TopicId: pL4El2JiSWWueVQ-pOOCsw PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: test_topic_02    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: test_topic_02    Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: test_topic_02    Partition: 2    Leader: 0       Replicas: 0     Isr: 0

# replication 을 만들어 본다.
$ kafka-topics --bootstrap-server localhost:9092 --create --topic test-topic-03 --partitions 3 --replication-factor 2
Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
[2024-01-30 12:06:52,910] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
 (kafka.admin.TopicCommand$) # 현재 브로커가 1개이기 때문에 factor를 2개 이상 설정할 수 없다.

# 생성한 topic logs 파일 확인
$ cd ~/data/kafka-logs 
$ ls -l
total 32
-rw-r--r-- 1 root root    4 Jan 30 12:05 cleaner-offset-checkpoint
-rw-r--r-- 1 root root    4 Jan 30 12:12 log-start-offset-checkpoint
-rw-r--r-- 1 root root   88 Jan 30 11:53 meta.properties
-rw-r--r-- 1 root root   58 Jan 30 12:12 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root   58 Jan 30 12:12 replication-offset-checkpoint
drwxr-xr-x 2 root root 4096 Jan 30 12:06 test-topic-02-0
drwxr-xr-x 2 root root 4096 Jan 30 12:06 test-topic-02-1
drwxr-xr-x 2 root root 4096 Jan 30 12:06 test-topic-02-2 # 3개의 partitions 설정해서 3개의 폴더가 생성

$ cd test-topic-02-2
$ ls -l
total 8
-rw-r--r-- 1 root root 10485760 Jan 30 12:06 00000000000000000000.index # 기본 segment 10mb
-rw-r--r-- 1 root root        0 Jan 30 12:06 00000000000000000000.log # 여기가 실제 저장되는 로그
-rw-r--r-- 1 root root 10485756 Jan 30 12:06 00000000000000000000.timeindex
-rw-r--r-- 1 root root        8 Jan 30 12:06 leader-epoch-checkpoint
-rw-r--r-- 1 root root       43 Jan 30 12:06 partition.metadata

kafka-console-producer

# 신규 토픽 생성 (있으면 지우고 해도 상관없음)
$ kafka-topics --bootstrap-server localhost:9092 --create --topic test-topic
Created topic test-topic.

# test-topic 토픽에 값 보내기 (실제로는 Selialize, Partitioning 절차를 거쳐 전달됨)
$ kafka-console-producer --bootstrap-server localhost:9092 --topic test-topic
>aaa
>bbb
>123 # 숫자도 문자열로 보내짐
>한글도 되나요 # 한글도 보내짐
>

# key message 구조로 보내고자 한다면 속성 추가
$ kafka-console-producer --bootstrap-server localhost:9092 --topic test-topic \
--property key.separator=: --property parse.key=true
>userId01:aaa
>userId02:bbb
>userId01:ccc
>userId02:ddd

kafka-console-consumer

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic
# 아무것도 안나옴 (offset : latest)

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning
aaa
bbb
123
한글도 되나요
# 실제로는 읽고 있으나 처음부터 읽게 하려면 --from-beginning 옵션을 줘야 earliest가 됨.

# key meesage 구조로 읽고자 한다면
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic \
--property print.key=true --property print.value=true --from-beginning
userId01        aaa
userId02        bbb
userId01        ccc
userId02        ddd

여러 개의 Partition을 갖는 토픽

$ kafka-topics --bootstrap-server localhost:9092 --create --topic multipart-topic --partitions 3
Created topic multipart-topic.

# 확인
$ kafka-topics --bootstrap-server localhost:9092 --describe --topic multipart-topic
Topic: multipart-topic  TopicId: xZz_vjDISb65ojmEiJkF5g PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: multipart-topic  Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: multipart-topic  Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: multipart-topic  Partition: 2    Leader: 0       Replicas: 0     Isr: 0

# 프로듀스
$ kafka-console-producer --bootstrap-server localhost:9092 --topic multipart-topic
>aaa
>bbb
>ccc
>ddd

# 컨슘
 kafka-console-consumer --bootstrap-server localhost:9092 --topic multipart-topic --from-beginning
ccc
bbb
aaa
ddd # 이런식으로 순서가 보장되어 있지 않다.

# partition을 프린트 하도록 하면 같은 파티션 단위로 묶어서 읽어옴
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic multipart-topic --from-beginning --property print.partition=true
Partition:2     ccc
Partition:0     bbb
Partition:0     ddd
Partition:1     aaa

# key를 가진 메세지 프로듀스
$ kafka-console-producer --bootstrap-server localhost:9092 --topic multipart-topic \
> --property key.separator=: --property parse.key=true
>1: aaa
>2: bbb
>3: ccc
>4: ddd
>5: eee
>6: fff
>1: kkk # 재할당
>2: bbbaaa

# key를 가진 메세지 컨슘
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic multipart-topic --from-beginning --property print.key=true --property print.value=true --property print.partition=true
Partition:0     1        aaa
Partition:2     2        bbb
Partition:2     3        ccc
Partition:1     4        ddd
Partition:0     5        eee
Partition:1     6        fff
Partition:0     1        kkk # 동일한 key값을 가지면 동일한 partition으로 간다!
Partition:2     2        bbbaaa

+ Recent posts