본문 바로가기
Back-End/kafka

[Kafka streams] 데모 App 실행

by hongdor 2023. 2. 23.
728x90

참고 : 공식홈페이지 Apache Kafka - Run demo app

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

 

단어 카운팅 예시 코드

 

// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream(
      "streams-plaintext-input",
      Consumed.with(stringSerde, stringSerde)
    );

KTable<String, Long> wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\\\W+")))

    // Group the text words as message keys
    .groupBy((key, value) -> value)

    // Count the occurrences of each word (message key).
    .count();

// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()))

 

* 코드에서 함수 관련 검색 정보

Table은 이전 sream의 snapshot 모음 같은 개념. key-value 로 되어있음.

  • flatMapValues - 원본 레코드의 키를 유지하면서 value만 바꾼 새로운 record들을 생성한다
    참고 : KStream (kafka 2.0.0 API) (apache.org)
    ex) ( ??? , “all streams” ) → ( ??? , “all”), ( ???, “streams”)
  • GroupBy - 새로운 키로 그룹핑한다.
    참고 : KStream (kafka 2.1.0 API) (apache.org) 
    위에 따르면 KStream을 집계해서 KTable로 만들기 위한 중간단계 KGroupedStream를 반환한다.
    예제 코드에서는 value를 기준 key로 그룹핑한다.
  • count - 해당 키값으로 몇개의 요소가 있는지 체크한다.
    참고 : KGroupedStream (kafka 2.0.0 API) (apache.org)
    key와 해당 key를 가진 record 카운트를 value로 하는 정보가저장되어있는 (key, value(count개수)) KTable 을 반환한다.
    KTable은 메모리에 존재하면서 count 개수가 업데이트 된다.

 

1. kafka Download

Apache Downloads

밑줄 친 부분 클릭

압축을 풀고 해당 Dir로 이동

hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo$ tar -xzf kafka_2.13-3.4.0.tgz hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo$ cd kafka_2.13-3.4.0

 

 

2. start kafka server

(1) UUID 생성

hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)”

 

** Java not found 에러가 날 경우 아래 커맨드로 jdk 설치

sudo apt update sudo apt install default-jdk

 

(2)log dir 설정

hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

 

 

(3) kafka server start

hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$ bin/kafka-server-start.sh config/kraft/server.properties

 

 

3. topic을 만들고, kafka producer를 시작한다 (새로운 터미널)

(1) input topic 생성

hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$ bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic streams-plaintext-input

 

(2) output topic 생성    

 hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$
 bin/kafka-topics.sh --create \\     
    --bootstrap-server localhost:9092 \\
    --replication-factor 1 \\
    --partitions 1 \\
    --topic streams-wordcount-output \\
    --config cleanup.policy=compact    


(3) 생성된 topic 확인

hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe

 

 

4. Wordcount Apllication 실행

(1) Wordcount Apllication 실행

hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

app은 streams-plaintext-input topic을 읽고 알고리즘을 거친후 streams-wordcount-ouput에 쓴다.

 

(2) Input data producer

hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input

terminal에 output을 보기 위해서는 console consumer가 필요하다

 

(3) console producer 시작하기(새로운 터미널 필요)

hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

 

 

5. 데이터를 처리해보기

  1. input data producer에 문장을 쓰면 console consumer에 단어 개수가 출력된다

입력
출력

728x90

'Back-End > kafka' 카테고리의 다른 글

[Kafka streams] 컨셉  (0) 2023.02.26
[Kafka streams] Tutorial  (0) 2023.02.23
[Kafka streams] Kafka streams 소개  (0) 2023.02.23
[Kafka] 빠른 시작 예제  (0) 2023.02.15
[Kafka] 카프카 소개  (0) 2023.02.14

댓글