[Kafka] Kafka cli 명령어
Kafka Command 목록
$ cd confluent-7.1.2/bin/
$ ls
connect-distributed kafka-features kafka-rest-stop ksql-run-class
connect-mirror-maker kafka-get-offsets kafka-rest-stop-service ksql-server-start
connect-standalone kafka-json-schema-console-consumer kafka-run-class ksql-server-stop
kafka-acls kafka-json-schema-console-producer kafka-server-start ksql-stop
kafka-avro-console-consumer kafka-leader-election kafka-server-stop ksql-test-runner
kafka-avro-console-producer kafka-log-dirs kafka-storage schema-registry-run-class
kafka-broker-api-versions kafka-metadata-shell kafka-streams-application-reset schema-registry-start
kafka-cluster kafka-mirror-maker kafka-topics schema-registry-stop
kafka-configs kafka-preferred-replica-election kafka-transactions schema-registry-stop-service
kafka-console-consumer kafka-producer-perf-test kafka-verifiable-consumer trogdor
kafka-console-producer kafka-protobuf-console-consumer kafka-verifiable-producer windows
kafka-consumer-groups kafka-protobuf-console-producer ksql zookeeper-security-migration
kafka-consumer-perf-test kafka-reassign-partitions ksql-datagen zookeeper-server-start
kafka-delegation-tokens kafka-replica-verification ksql-migrations zookeeper-server-stop
kafka-delete-records kafka-rest-run-class ksql-print-metrics zookeeper-shell
kafka-dump-log kafka-rest-start ksql-restore-command-topic
kafka-topics
# topic 생성
$ kafka-topics --bootstrap-server localhost:9092 --create --topic test_topic_01
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide.
To avoid issues it is best to use either, but not both. # topic name에 마침표나 언더스코어를 사용하면 Warning이 발생한다.
Created topic test_topic_01.
# 생성한 topic 확인
$ kafka-topics --bootstrap-server localhost:9092 --list
test_topic_01
welcome-topic # 저번에 만들어 둔 토픽
# topic 삭제
$ kafka-topics --bootstrap-server localhost:9092 --delete --topic test_topic_01
# 3개의 Partitions를 갖는 topic 생성 (설정을 주지 않으면 default partition은 1개 $KAFKA_HOME/etc/kafka/server.properties 안의 num.partitions=1)
$ kafka-topics --bootstrap-server localhost:9092 --create --topic test-topic-02 --partitions 3
Created topic test-topic-02.
# 생성한 topic 확인
$ kafka-topics --bootstrap-server localhost:9092 --describe --topic test-topic-02
Topic: test-topic-02 TopicId: pL4El2JiSWWueVQ-pOOCsw PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test_topic_02 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test_topic_02 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test_topic_02 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
# replication 을 만들어 본다.
$ kafka-topics --bootstrap-server localhost:9092 --create --topic test-topic-03 --partitions 3 --replication-factor 2
Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
[2024-01-30 12:06:52,910] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
(kafka.admin.TopicCommand$) # 현재 브로커가 1개이기 때문에 factor를 2개 이상 설정할 수 없다.
# 생성한 topic logs 파일 확인
$ cd ~/data/kafka-logs
$ ls -l
total 32
-rw-r--r-- 1 root root 4 Jan 30 12:05 cleaner-offset-checkpoint
-rw-r--r-- 1 root root 4 Jan 30 12:12 log-start-offset-checkpoint
-rw-r--r-- 1 root root 88 Jan 30 11:53 meta.properties
-rw-r--r-- 1 root root 58 Jan 30 12:12 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root 58 Jan 30 12:12 replication-offset-checkpoint
drwxr-xr-x 2 root root 4096 Jan 30 12:06 test-topic-02-0
drwxr-xr-x 2 root root 4096 Jan 30 12:06 test-topic-02-1
drwxr-xr-x 2 root root 4096 Jan 30 12:06 test-topic-02-2 # 3개의 partitions 설정해서 3개의 폴더가 생성
$ cd test-topic-02-2
$ ls -l
total 8
-rw-r--r-- 1 root root 10485760 Jan 30 12:06 00000000000000000000.index # 기본 segment 10mb
-rw-r--r-- 1 root root 0 Jan 30 12:06 00000000000000000000.log # 여기가 실제 저장되는 로그
-rw-r--r-- 1 root root 10485756 Jan 30 12:06 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 Jan 30 12:06 leader-epoch-checkpoint
-rw-r--r-- 1 root root 43 Jan 30 12:06 partition.metadata
kafka-console-producer
# 신규 토픽 생성 (있으면 지우고 해도 상관없음)
$ kafka-topics --bootstrap-server localhost:9092 --create --topic test-topic
Created topic test-topic.
# test-topic 토픽에 값 보내기 (실제로는 Selialize, Partitioning 절차를 거쳐 전달됨)
$ kafka-console-producer --bootstrap-server localhost:9092 --topic test-topic
>aaa
>bbb
>123 # 숫자도 문자열로 보내짐
>한글도 되나요 # 한글도 보내짐
>
# key message 구조로 보내고자 한다면 속성 추가
$ kafka-console-producer --bootstrap-server localhost:9092 --topic test-topic \
--property key.separator=: --property parse.key=true
>userId01:aaa
>userId02:bbb
>userId01:ccc
>userId02:ddd
kafka-console-consumer
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic
# 아무것도 안나옴 (offset : latest)
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning
aaa
bbb
123
한글도 되나요
# 실제로는 읽고 있으나 처음부터 읽게 하려면 --from-beginning 옵션을 줘야 earliest가 됨.
# key meesage 구조로 읽고자 한다면
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic \
--property print.key=true --property print.value=true --from-beginning
userId01 aaa
userId02 bbb
userId01 ccc
userId02 ddd
여러 개의 Partition을 갖는 토픽
$ kafka-topics --bootstrap-server localhost:9092 --create --topic multipart-topic --partitions 3
Created topic multipart-topic.
# 확인
$ kafka-topics --bootstrap-server localhost:9092 --describe --topic multipart-topic
Topic: multipart-topic TopicId: xZz_vjDISb65ojmEiJkF5g PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: multipart-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: multipart-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: multipart-topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
# 프로듀스
$ kafka-console-producer --bootstrap-server localhost:9092 --topic multipart-topic
>aaa
>bbb
>ccc
>ddd
# 컨슘
kafka-console-consumer --bootstrap-server localhost:9092 --topic multipart-topic --from-beginning
ccc
bbb
aaa
ddd # 이런식으로 순서가 보장되어 있지 않다.
# partition을 프린트 하도록 하면 같은 파티션 단위로 묶어서 읽어옴
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic multipart-topic --from-beginning --property print.partition=true
Partition:2 ccc
Partition:0 bbb
Partition:0 ddd
Partition:1 aaa
# key를 가진 메세지 프로듀스
$ kafka-console-producer --bootstrap-server localhost:9092 --topic multipart-topic \
> --property key.separator=: --property parse.key=true
>1: aaa
>2: bbb
>3: ccc
>4: ddd
>5: eee
>6: fff
>1: kkk # 재할당
>2: bbbaaa
# key를 가진 메세지 컨슘
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic multipart-topic --from-beginning --property print.key=true --property print.value=true --property print.partition=true
Partition:0 1 aaa
Partition:2 2 bbb
Partition:2 3 ccc
Partition:1 4 ddd
Partition:0 5 eee
Partition:1 6 fff
Partition:0 1 kkk # 동일한 key값을 가지면 동일한 partition으로 간다!
Partition:2 2 bbbaaa
'Server & Infra' 카테고리의 다른 글
Jenkins야 특정 모듈만 빌드해줘 (0) | 2024.10.16 |
---|---|
[Kafka] Kafka 구성요소 (2) | 2024.02.14 |
[Kafka] 설치 및 환경구축 (0) | 2024.01.30 |
[Linux / Windows] 특정포트를 사용하는 프로세스 강제종료 (0) | 2022.09.26 |
[Linux] WSL2를 이용한 윈도우에 리눅스 설치 (0) | 2022.08.24 |