본문 바로가기
Back-End/kafka

[Kafka] 빠른 시작 예제

by hongdor 2023. 2. 15.
728x90

참고 : 공식홈페이지

Apache Kafka Quick Start

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

 

1. kafka 다운로드 Download

- kafka압축 풀기

tar -xzf kafka_2.13-3.3.1.tgz
cd kafka_2.13-3.3.1

 

2. kafka 환경변수 설정 및 실행

  • Java8 이상 설치돼 있어야 함
  • Zookeeper 와 KRaft 중 선택 가능한데, Zookeeper는 앞으로 점차 없앨 계획이라고 하니 KRaft로 설치

(1) Cluster UUID 생성

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

(2) 로그 폴더 포맷

bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

(3) 카프카 서버 시작

bin/kafka-server-start.sh config/kraft/server.properties

 

3. topic 생성

 

(1) 터미널을 새로 열고 topic을 생성하는 명령어를 입력합니다

bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

(2) topic 정보 보기

bin/kafka-topics.sh 

(3) topic정보 자세히 보기

bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

 

4. topic에 event 쓰기

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 

 

5. topic event 읽기

터미널을 새로 열고 아래 명령어입력

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

 

6. 카프카 커넥트를 이용해서 데이터 가져오기 내보내기

- kafka connect : 카프카와 다른시스템을 연결해주는 툴. 아파치 카프카의 오픈소스 프로젝트 중 하나로, 카프카와 외부 시스템(DB 등) 간의 파이프라인 구성을 쉽게 해주는 프레임워크

 

file to kafka 연결 예제

 

(1) connect-file-3.3.1.jar를 worker의 plugin.path 설정파일에 추가해준다

  echo "plugin.path=libs/connect-file-3.3.1.jar"

 

(2) 테스트할 읽을 파일 생성

(Linux) echo -e "foo\\nbar" > test.txt
(Windows) echo foo> test.txt echo bar>> test.txt

 

(3) 2개의 connector를 standalone 모드로 실행 3개의 설정 파일을 파라미터로 준다.

- 첫번째 파일은 항상 Kafka Connect process를 위한 설정이다.kafka 브로커와 data format 관련 소스코드들이다. 

   (config/connect-standalone.properties)

- 나머지 구성파일들은 각각 생성할 커넥터들이다. 각각 커넥터 이름과, 인스턴스화할 class 등을 포함한다.
   (config/connect-file-source.properties, config/connect-file-sink.properties)

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

위 샘플 설정 파일들은 미리 우리가 시작한 로컬 클러스터 kafka를 사용한다. 그리고 두개의 connector를 만든다.

source connector 는 파일을 한줄씩 읽고 kafka topic을 생성한다 (test.txt 파일 → kafka)

sink connector는 kafka topic을 읽고 file에 한줄씩 작성한다 (kafka → test.sink.txt)

 

(4) topic event 확인

우리가 테스트한 데이터는 kafka topic connect-test에 저장되어 있다.

콘솔 consumer를 실행해서 확인 가능하다.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

 

(5) 추가적으로 file에 줄을 추가하며 테스트를 할 수 있다.

echo Another line>> test.txt

 

 

7. kafka stream으로 event 처리하기

- kafka stream 설명 및 java 코드 예시

한번 데이터가 kafka에 event로 저장되면, java/scala에서 kafka streams 라이브러리로 데이터를 처리할 수 있다. Kafka Streams는 클라이언트 측에 표준 Java 및 Scala 애플리케이션을 작성하고 배포하는 단순성과 Kafka의 서버 측 클러스터 기술의 이점을 결합하여 이러한 애플리케이션을 확장성, 탄력성, 내결함성 및 분산성이 뛰어난 애플리케이션으로 만든다 라이브러리는 정확하게 한 번 처리, 상태 저장 작업 및 집계, 윈도우 설정, 조인, 이벤트 시간 기반 처리 등을 지원한다.

맛보기로, 단어 카운팅 알고리즘 예시이다

KStream<string, string=""> textLines = builder.stream("quickstart-events");

KTable<string, long=""> wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
            .groupBy((keyIgnored, word) -> word)
            .count();

wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

 

8. kafka 환경 끝내기

  1. producer와 consumer를 Ctrl+C로 끝낸다
  2. Kafka broker를 Ctrl+C로 끝낸다
728x90

'Back-End > kafka' 카테고리의 다른 글

[Kafka streams] 컨셉  (0) 2023.02.26
[Kafka streams] Tutorial  (0) 2023.02.23
[Kafka streams] 데모 App 실행  (0) 2023.02.23
[Kafka streams] Kafka streams 소개  (0) 2023.02.23
[Kafka] 카프카 소개  (0) 2023.02.14

댓글