728x90
참고 : 공식홈페이지 Apache Kafka - tutorial
1. Maven 설치
(1) Maven 다운로드
Maven – Download Apache Maven 다운로드 링크
해당 압축파일을 풀고 해당 폴더를 C:\Program Files 에 위치시켜준다
(2) 환경변수 설정
내 PC 우클릭 → 속성 → 고급시스템 설정 → 고급 탭 → 환경변수
시스템 변수에
변수 이름 : MAVEN_HOME
변수 값 : C:\Program Files\apache-maven-3.9.0
를 추가해준다
Path 변수 편집 누른 후
%MAVEN_HOME%\bin
를 추가해준다
cmd를 관리자 권한으로 실행시키고 mvn -version 커맨드를 입력해서 정상적으로 출력되면 된 것이다.
2. Project 생성
cmd 에서 프로젝트 생성을 원하는 경로에서 아래 커맨드 입력
mvn archetype:generate -DarchetypeGroupId=org.apache.kafka -DarchetypeArtifactId=streams-quickstart-java -DarchetypeVersion=3.4.0 -DgroupId=streams.examples -DartifactId=streams.examples -Dversion=0.1 -Dpackage=myapps
pom.xml에는 이미지 streams 의존성이 포함되어 있다. Java8 버전이 타겟팅 되어있다.
이미 완성되어있는 src/main/java/myapps 경로에 있는 .java 파일들을 모두 삭제한다
3. Pipe 작성
- input을 받은대로 다시 저장하는 application
package myapps;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class Pipe {
public static void main(String[] args) throws Exception {
// java.util.Properties map을 사용해 streams config 설정
// StreamsConfig.BOOTSTRAP_SERVERS_CONFIG 에서 kafka 클러스터 연결 정보 등과 같은 중요 항목들을 설정해야 한다
// StreamsConfig.APPLICATION_ID_CONFIG 에서 식별자를 설정해주어야 한다
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// default serialization and deserialization libraries 설정
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// StreamsBuilder로 logic(topology) 작성
// streams-plaintext-input topic 으로부터 source stream을 만들고 (key: String, value: String)
// streams-pipe-output topic 에 쓴다.
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.to("streams-pipe-output");
// topology을 출력해보기
final Topology topology = builder.build();
System.out.println(topology.describe());
/*
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])
--> KSTREAM-SINK-0000000001
Sink: KSTREAM-SINK-0000000001 (topic: streams-pipe-output)
<-- KSTREAM-SOURCE-0000000000
설명
1. source node : KSTREAM-SOURCE-0000000000 , sink node : KSTREAM-SINK-0000000001
2. source node는 streams-plaintext-input topic 로부터 읽고 sink로 내려보낸다(downstream)
3. sink node는 streams-pipe-output 에 쓴다
--> : downstream (children) 이다.
<-- : upstream (parents) 이다.
*/
final KafkaStreams streams = new KafkaStreams(topology, props);
// start()로 시작할 수 있고 close() 할때까지 멈추지 않는다.
// 사용자 control-c로 close하는 예시를 하단에 추가
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
4. LineSplit
- 이전 Pipe 코드를 복사 후 수정한다.
- line을 단어로 나누어 output topic에 저장 ( 1 record → N records)
package myapps;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class LineSplit {
public static void main(String[] args) throws Exception {
//Pipe를 복사
Properties props = new Properties();
// ID 변경
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("streams-plaintext-input");
// stateless 하게 이전 record 값과 상관없이 한 줄이 오면 단어로 쪼개어 words stream을 만든다
source.flatMapValues(value -> Arrays.asList(value.split("\\\\W+")))
.to("streams-linesplit-output");
final Topology topology = builder.build();
System.out.println(topology.describe());
/*
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])
--> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
--> KSTREAM-SINK-0000000002
<-- KSTREAM-SOURCE-0000000000
Sink: KSTREAM-SINK-0000000002 (topic: streams-linesplit-output)
<-- KSTREAM-FLATMAPVALUES-0000000001
설명
1. Source에서 input topic 으로 부터 KSTREAM-FLATMAPVALUES-0000000001 stream이 생성됨
2. Processor에서
KSTREAM-FLATMAPVALUES-0000000001 (stores: []) --> KSTREAM-SINK-0000000002 : 프로세서 노드는 소스 노드를 부모로 취급한다.
KSTREAM-FLATMAPVALUES-0000000001 (stores: []) <-- KSTREAM-SOURCE-0000000000 : 프로세서 노드는 싱크 노드를 자식으로 취급한다
이 노드의 결과물로 1개 또는 2개 이상의 새로운 레코드가 생성된다.
3. Sink에서 kafka streams-linesplit-output topic에 씌워진다.
4. processor 노드는 stateless 하다.(stores: [])에서 보듯 아무 저장소와도 연관이 없다.
*/
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
5. WordCount
- 이전 LineSplit 복사 후 수정
- 단어 개수를 카운팅하여 저장
- 1 record (line) → N records (words) → N records (각 word 별 누적 counting 개수)
package myapps;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class WordCount {
public static void main(String[] args) throws Exception {
//LineSplit를 복사
Properties props = new Properties();
// ID 변경
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\\\W+"))) //대소문자 구별하지 않고 count를 위해 toLowerCase
.groupBy((key, value) -> value) // 단어를 key로 설정해여 grouping
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) //key별로 개수를 집계. counts-store에 개수 상태를 저장한다.
.toStream()
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); // type을 String, Long으로 변경하여 streams-wordcount-output topic에 쓴다.
final Topology topology = builder.build();
System.out.println(topology.describe());
/*
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])
--> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
--> KSTREAM-KEY-SELECT-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
--> counts-store-repartition-filter
<-- KSTREAM-FLATMAPVALUES-0000000001
Processor: counts-store-repartition-filter (stores: [])
--> counts-store-repartition-sink
<-- KSTREAM-KEY-SELECT-0000000002
Sink: counts-store-repartition-sink (topic: counts-store-repartition)
<-- counts-store-repartition-filter
Sub-topology: 1
Source: counts-store-repartition-source (topics: [counts-store-repartition])
--> KSTREAM-AGGREGATE-0000000003
Processor: KSTREAM-AGGREGATE-0000000003 (stores: [counts-store])
--> KTABLE-TOSTREAM-0000000007
<-- counts-store-repartition-source
Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
--> KSTREAM-SINK-0000000008
<-- KSTREAM-AGGREGATE-0000000003
Sink: KSTREAM-SINK-0000000008 (topic: streams-wordcount-output)
<-- KTABLE-TOSTREAM-0000000007
설명
1. 첫번째 topology의 결과는 두번째 topolgoy 로 연결된다.
2. 두 topology 사이 연결점 The repartition topic node는 두번재 topology에서 stream을 grouping할 집계 key(위 코드에서는 단어)에 따라 나누는데 사용된다
3. 첫번재 topology의 Processor counts-store-repartition-filter는 key가 빈값인 record를 필터링 하기 위한 것이다
4. 두번째 topology에서 Processor KSTREAM-AGGREGATE-0000000003는 counts-store라는 store와 연결되어 있다.
각각의 record를 받자마자 counts-store에서 조회하고 count 값에 1을 더해 다시 저장한다.
그 후 다음 Processor KTABLE-TOSTREAM-0000000007 로 넘어가고 Stream으로 변환하여 KSTREAM-SINK-0000000008를 통해 kafka에 다시 기록한다.
*/
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
728x90
'Back-End > kafka' 카테고리의 다른 글
[Kafka streams] 아키텍쳐 (0) | 2023.02.27 |
---|---|
[Kafka streams] 컨셉 (0) | 2023.02.26 |
[Kafka streams] 데모 App 실행 (0) | 2023.02.23 |
[Kafka streams] Kafka streams 소개 (0) | 2023.02.23 |
[Kafka] 빠른 시작 예제 (0) | 2023.02.15 |
댓글