참고 : 공식홈페이지
1. kafka 다운로드 Download
- kafka압축 풀기
tar -xzf kafka_2.13-3.3.1.tgz
cd kafka_2.13-3.3.1
2. kafka 환경변수 설정 및 실행
- Java8 이상 설치돼 있어야 함
- Zookeeper 와 KRaft 중 선택 가능한데, Zookeeper는 앞으로 점차 없앨 계획이라고 하니 KRaft로 설치
(1) Cluster UUID 생성
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
(2) 로그 폴더 포맷
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
(3) 카프카 서버 시작
bin/kafka-server-start.sh config/kraft/server.properties
3. topic 생성
(1) 터미널을 새로 열고 topic을 생성하는 명령어를 입력합니다
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
(2) topic 정보 보기
bin/kafka-topics.sh
(3) topic정보 자세히 보기
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
4. topic에 event 쓰기
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
5. topic event 읽기
터미널을 새로 열고 아래 명령어입력
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
6. 카프카 커넥트를 이용해서 데이터 가져오기 내보내기
- kafka connect : 카프카와 다른시스템을 연결해주는 툴. 아파치 카프카의 오픈소스 프로젝트 중 하나로, 카프카와 외부 시스템(DB 등) 간의 파이프라인 구성을 쉽게 해주는 프레임워크
file to kafka 연결 예제
(1) connect-file-3.3.1.jar를 worker의 plugin.path 설정파일에 추가해준다
echo "plugin.path=libs/connect-file-3.3.1.jar"
(2) 테스트할 읽을 파일 생성
(Linux) echo -e "foo\\nbar" > test.txt
(Windows) echo foo> test.txt echo bar>> test.txt
(3) 2개의 connector를 standalone 모드로 실행 3개의 설정 파일을 파라미터로 준다.
- 첫번째 파일은 항상 Kafka Connect process를 위한 설정이다.kafka 브로커와 data format 관련 소스코드들이다.
(config/connect-standalone.properties)
- 나머지 구성파일들은 각각 생성할 커넥터들이다. 각각 커넥터 이름과, 인스턴스화할 class 등을 포함한다.
(config/connect-file-source.properties, config/connect-file-sink.properties)
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
위 샘플 설정 파일들은 미리 우리가 시작한 로컬 클러스터 kafka를 사용한다. 그리고 두개의 connector를 만든다.
source connector 는 파일을 한줄씩 읽고 kafka topic을 생성한다 (test.txt 파일 → kafka)
sink connector는 kafka topic을 읽고 file에 한줄씩 작성한다 (kafka → test.sink.txt)
(4) topic event 확인
우리가 테스트한 데이터는 kafka topic connect-test에 저장되어 있다.
콘솔 consumer를 실행해서 확인 가능하다.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
(5) 추가적으로 file에 줄을 추가하며 테스트를 할 수 있다.
echo Another line>> test.txt
7. kafka stream으로 event 처리하기
- kafka stream 설명 및 java 코드 예시
한번 데이터가 kafka에 event로 저장되면, java/scala에서 kafka streams 라이브러리로 데이터를 처리할 수 있다. Kafka Streams는 클라이언트 측에 표준 Java 및 Scala 애플리케이션을 작성하고 배포하는 단순성과 Kafka의 서버 측 클러스터 기술의 이점을 결합하여 이러한 애플리케이션을 확장성, 탄력성, 내결함성 및 분산성이 뛰어난 애플리케이션으로 만든다 라이브러리는 정확하게 한 번 처리, 상태 저장 작업 및 집계, 윈도우 설정, 조인, 이벤트 시간 기반 처리 등을 지원한다.
맛보기로, 단어 카운팅 알고리즘 예시이다
KStream<string, string=""> textLines = builder.stream("quickstart-events");
KTable<string, long=""> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
.groupBy((keyIgnored, word) -> word)
.count();
wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
8. kafka 환경 끝내기
- producer와 consumer를 Ctrl+C로 끝낸다
- Kafka broker를 Ctrl+C로 끝낸다
'Back-End > kafka' 카테고리의 다른 글
[Kafka streams] 컨셉 (0) | 2023.02.26 |
---|---|
[Kafka streams] Tutorial (0) | 2023.02.23 |
[Kafka streams] 데모 App 실행 (0) | 2023.02.23 |
[Kafka streams] Kafka streams 소개 (0) | 2023.02.23 |
[Kafka] 카프카 소개 (0) | 2023.02.14 |
댓글