본문 바로가기
Back-End/kafka

[Kafka streams] Tutorial

by hongdor 2023. 2. 23.
728x90

참고 : 공식홈페이지 Apache Kafka - tutorial

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

 

1. Maven 설치

(1) Maven 다운로드

Maven – Download Apache Maven 다운로드 링크

 

해당 압축파일을 풀고 해당 폴더를 C:\Program Files 에 위치시켜준다

 

(2) 환경변수 설정

내 PC 우클릭 → 속성 → 고급시스템 설정 → 고급 탭 → 환경변수

 

시스템 변수에

변수 이름 : MAVEN_HOME

변수 값 : C:\Program Files\apache-maven-3.9.0

를 추가해준다

 

Path 변수 편집 누른 후

%MAVEN_HOME%\bin

를 추가해준다

 

cmd를 관리자 권한으로 실행시키고 mvn -version 커맨드를 입력해서 정상적으로 출력되면 된 것이다.

 

2. Project 생성

cmd 에서 프로젝트 생성을 원하는 경로에서 아래 커맨드 입력

mvn archetype:generate -DarchetypeGroupId=org.apache.kafka -DarchetypeArtifactId=streams-quickstart-java -DarchetypeVersion=3.4.0 -DgroupId=streams.examples -DartifactId=streams.examples -Dversion=0.1 -Dpackage=myapps

pom.xml에는 이미지 streams 의존성이 포함되어 있다. Java8 버전이 타겟팅 되어있다.

이미 완성되어있는 src/main/java/myapps 경로에 있는 .java 파일들을 모두 삭제한다

 

 

3. Pipe 작성

  • input을 받은대로 다시 저장하는 application

 

package myapps;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class Pipe {
    public static void main(String[] args) throws Exception {

        // java.util.Properties map을 사용해 streams config 설정
        // StreamsConfig.BOOTSTRAP_SERVERS_CONFIG 에서 kafka 클러스터 연결 정보 등과 같은 중요 항목들을 설정해야 한다
        // StreamsConfig.APPLICATION_ID_CONFIG 에서 식별자를 설정해주어야 한다
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // default serialization and deserialization libraries 설정
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // StreamsBuilder로 logic(topology) 작성
        // streams-plaintext-input topic 으로부터 source stream을 만들고 (key: String, value: String)
        // streams-pipe-output topic 에 쓴다.
        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.to("streams-pipe-output");

        // topology을 출력해보기
        final Topology topology = builder.build();
        System.out.println(topology.describe());

        /*
        Topologies:
        Sub-topology: 0
            Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])
              --> KSTREAM-SINK-0000000001
            Sink: KSTREAM-SINK-0000000001 (topic: streams-pipe-output)
              <-- KSTREAM-SOURCE-0000000000

        설명
        1. source node : KSTREAM-SOURCE-0000000000 , sink node : KSTREAM-SINK-0000000001
        2. source node는 streams-plaintext-input topic 로부터 읽고 sink로 내려보낸다(downstream)
        3. sink node는 streams-pipe-output 에 쓴다
        --> : downstream (children) 이다.
        <-- : upstream (parents) 이다.
         */

        final KafkaStreams streams = new KafkaStreams(topology, props);

        // start()로 시작할 수 있고 close() 할때까지 멈추지 않는다.
        // 사용자 control-c로 close하는 예시를 하단에 추가
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);

    }
}

 

 

 

4. LineSplit

  • 이전 Pipe 코드를 복사 후 수정한다.
  • line을 단어로 나누어 output topic에 저장 ( 1 record → N records)

 

package myapps;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class LineSplit {
    public static void main(String[] args) throws Exception {
        //Pipe를 복사

        Properties props = new Properties();
        // ID 변경
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("streams-plaintext-input");
        // stateless 하게 이전 record 값과 상관없이 한 줄이 오면 단어로 쪼개어 words stream을 만든다
        source.flatMapValues(value -> Arrays.asList(value.split("\\\\W+")))
                .to("streams-linesplit-output");

        final Topology topology = builder.build();
        System.out.println(topology.describe());
        /*
        Topologies:
           Sub-topology: 0
            Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])
              --> KSTREAM-FLATMAPVALUES-0000000001
            Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
              --> KSTREAM-SINK-0000000002
              <-- KSTREAM-SOURCE-0000000000
            Sink: KSTREAM-SINK-0000000002 (topic: streams-linesplit-output)
              <-- KSTREAM-FLATMAPVALUES-0000000001
        설명
        1. Source에서 input topic 으로 부터 KSTREAM-FLATMAPVALUES-0000000001 stream이 생성됨
        2. Processor에서
            KSTREAM-FLATMAPVALUES-0000000001 (stores: []) --> KSTREAM-SINK-0000000002 : 프로세서 노드는 소스 노드를 부모로 취급한다.
            KSTREAM-FLATMAPVALUES-0000000001 (stores: []) <-- KSTREAM-SOURCE-0000000000 : 프로세서 노드는 싱크 노드를 자식으로 취급한다
            이 노드의 결과물로 1개 또는 2개 이상의 새로운 레코드가 생성된다.
        3. Sink에서 kafka streams-linesplit-output topic에 씌워진다.
        4. processor 노드는 stateless 하다.(stores: [])에서 보듯 아무 저장소와도 연관이 없다.
        */

        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);

    }
}

 

 

 

5. WordCount

  • 이전 LineSplit 복사 후 수정
  • 단어 개수를 카운팅하여 저장
  • 1 record (line) → N records (words) → N records (각 word 별 누적 counting 개수)

 

package myapps;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class WordCount {
    public static void main(String[] args) throws Exception {
        //LineSplit를 복사

        Properties props = new Properties();
        // ID 변경
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\\\W+"))) //대소문자 구별하지 않고 count를 위해 toLowerCase
                .groupBy((key, value) -> value) // 단어를 key로 설정해여 grouping
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) //key별로 개수를 집계. counts-store에 개수 상태를 저장한다.
                .toStream()
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); // type을 String, Long으로 변경하여 streams-wordcount-output topic에 쓴다.

        final Topology topology = builder.build();
        System.out.println(topology.describe());
        /*
        Topologies:
           Sub-topology: 0
            Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])
              --> KSTREAM-FLATMAPVALUES-0000000001
            Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
              --> KSTREAM-KEY-SELECT-0000000002
              <-- KSTREAM-SOURCE-0000000000
            Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
              --> counts-store-repartition-filter
              <-- KSTREAM-FLATMAPVALUES-0000000001
            Processor: counts-store-repartition-filter (stores: [])
              --> counts-store-repartition-sink
              <-- KSTREAM-KEY-SELECT-0000000002
            Sink: counts-store-repartition-sink (topic: counts-store-repartition)
              <-- counts-store-repartition-filter

          Sub-topology: 1
            Source: counts-store-repartition-source (topics: [counts-store-repartition])
              --> KSTREAM-AGGREGATE-0000000003
            Processor: KSTREAM-AGGREGATE-0000000003 (stores: [counts-store])
              --> KTABLE-TOSTREAM-0000000007
              <-- counts-store-repartition-source
            Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
              --> KSTREAM-SINK-0000000008
              <-- KSTREAM-AGGREGATE-0000000003
            Sink: KSTREAM-SINK-0000000008 (topic: streams-wordcount-output)
              <-- KTABLE-TOSTREAM-0000000007
        설명
        1. 첫번째 topology의 결과는 두번째 topolgoy 로 연결된다.
        2. 두 topology 사이 연결점 The repartition topic node는 두번재 topology에서 stream을 grouping할 집계 key(위 코드에서는 단어)에 따라 나누는데 사용된다
        3. 첫번재 topology의 Processor counts-store-repartition-filter는 key가 빈값인 record를 필터링 하기 위한 것이다
        4. 두번째 topology에서 Processor KSTREAM-AGGREGATE-0000000003는 counts-store라는 store와 연결되어 있다.
           각각의 record를 받자마자 counts-store에서 조회하고 count 값에 1을 더해 다시 저장한다.
           그 후 다음 Processor KTABLE-TOSTREAM-0000000007 로 넘어가고 Stream으로 변환하여 KSTREAM-SINK-0000000008를 통해 kafka에 다시 기록한다.
        */

        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);

    }
}
728x90

'Back-End > kafka' 카테고리의 다른 글

[Kafka streams] 아키텍쳐  (0) 2023.02.27
[Kafka streams] 컨셉  (0) 2023.02.26
[Kafka streams] 데모 App 실행  (0) 2023.02.23
[Kafka streams] Kafka streams 소개  (0) 2023.02.23
[Kafka] 빠른 시작 예제  (0) 2023.02.15

댓글