728x90
참고 : 공식홈페이지 Apache Kafka - Run demo app
단어 카운팅 예시 코드
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream(
"streams-plaintext-input",
Consumed.with(stringSerde, stringSerde)
);
KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\\\W+")))
// Group the text words as message keys
.groupBy((key, value) -> value)
// Count the occurrences of each word (message key).
.count();
// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()))
* 코드에서 함수 관련 검색 정보
Table은 이전 sream의 snapshot 모음 같은 개념. key-value 로 되어있음.
- flatMapValues - 원본 레코드의 키를 유지하면서 value만 바꾼 새로운 record들을 생성한다
참고 : KStream (kafka 2.0.0 API) (apache.org)
ex) ( ??? , “all streams” ) → ( ??? , “all”), ( ???, “streams”) - GroupBy - 새로운 키로 그룹핑한다.
참고 : KStream (kafka 2.1.0 API) (apache.org)
위에 따르면 KStream을 집계해서 KTable로 만들기 위한 중간단계 KGroupedStream를 반환한다.
예제 코드에서는 value를 기준 key로 그룹핑한다. - count - 해당 키값으로 몇개의 요소가 있는지 체크한다.
참고 : KGroupedStream (kafka 2.0.0 API) (apache.org)
key와 해당 key를 가진 record 카운트를 value로 하는 정보가저장되어있는 (key, value(count개수)) KTable 을 반환한다.
KTable은 메모리에 존재하면서 count 개수가 업데이트 된다.
1. kafka Download
압축을 풀고 해당 Dir로 이동
hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo$ tar -xzf kafka_2.13-3.4.0.tgz hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo$ cd kafka_2.13-3.4.0
2. start kafka server
(1) UUID 생성
hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)”
** Java not found 에러가 날 경우 아래 커맨드로 jdk 설치
sudo apt update sudo apt install default-jdk
(2)log dir 설정
hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
(3) kafka server start
hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$ bin/kafka-server-start.sh config/kraft/server.properties
3. topic을 만들고, kafka producer를 시작한다 (새로운 터미널)
(1) input topic 생성
hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$ bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic streams-plaintext-input
(2) output topic 생성
hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$
bin/kafka-topics.sh --create \\
--bootstrap-server localhost:9092 \\
--replication-factor 1 \\
--partitions 1 \\
--topic streams-wordcount-output \\
--config cleanup.policy=compact
(3) 생성된 topic 확인
hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
4. Wordcount Apllication 실행
(1) Wordcount Apllication 실행
hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$ bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
app은 streams-plaintext-input topic을 읽고 알고리즘을 거친후 streams-wordcount-ouput에 쓴다.
(2) Input data producer
hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
terminal에 output을 보기 위해서는 console consumer가 필요하다
(3) console producer 시작하기(새로운 터미널 필요)
hong@DESKTOP-UU8A5G1:/mnt/c/Users/Hong/Desktop/kafka_demo/kafka_2.13-3.4.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
5. 데이터를 처리해보기
- input data producer에 문장을 쓰면 console consumer에 단어 개수가 출력된다
728x90
'Back-End > kafka' 카테고리의 다른 글
[Kafka streams] 컨셉 (0) | 2023.02.26 |
---|---|
[Kafka streams] Tutorial (0) | 2023.02.23 |
[Kafka streams] Kafka streams 소개 (0) | 2023.02.23 |
[Kafka] 빠른 시작 예제 (0) | 2023.02.15 |
[Kafka] 카프카 소개 (0) | 2023.02.14 |
댓글