패스트캠퍼스 환급챌린지 1일차 : 백엔드 개발자를 위한 Kafka 실습 0 to 1 : 입문부터 EDA까지 강의 후기
본 포스팅은 패스트캠퍼스 환급 챌린지 참여를 위해 작성하였습니다.
https://fastcampus.info/4n8ztzq
(~6/20) 50일의 기적 AI 환급반💫 | 패스트캠퍼스
초간단 미션! 하루 20분 공부하고 수강료 전액 환급에 AI 스킬 장착까지!
fastcampus.co.kr
2025/07/01 spring-cloud-stream에서 Produce, Consume 해보기 강의를 들어보겠습니다.
실습용으로 주어진 docker-compose.yml 파일입니다.
services:
zookeeper:
image: 'bitnami/zookeeper:3.7.2'
container_name: zookeeper
ports:
- 2181:2181
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
volumes:
- ./.data/zookeeper/data:/bitnami/zookeeper/data
- ./.data/zookeeper/datalog:/bitnami/zookeeper/datalog
- ./.data/zookeeper/logs:/bitnami/zookeeper/logs
kafka1:
image: 'bitnami/kafka:3.6.0'
container_name: kafka1
hostname: kafka1
ports:
- 19092
- "9092:9092"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:19092,EXTERNAL://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka1:19092,EXTERNAL://localhost:9092
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
# volumes:
# - ./.data/kafka1:/bitnami/kafka/data
kafka2:
image: 'bitnami/kafka:3.6.0'
container_name: kafka2
ports:
- 19092
- "9093:9093"
environment:
- KAFKA_BROKER_ID=2
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:19092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka2:19092,EXTERNAL://localhost:9093
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
# volumes:
# - ./.data/kafka2:/bitnami/kafka/data
kafka3:
image: 'bitnami/kafka:3.6.0'
container_name: kafka3
ports:
- 19092
- "9094:9094"
environment:
- KAFKA_BROKER_ID=3
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:19092,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka3:19092,EXTERNAL://localhost:9094
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
# volumes:
# - ./.data/kafka3:/bitnami/kafka/data
kafka-ui:
image: 'provectuslabs/kafka-ui:v0.7.1'
container_name: kafka-ui
ports:
- "8081:8080"
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:19092,kafka2:19092,kafka3:19092
depends_on:
- zookeeper
- kafka1
- kafka2
- kafka3
cmak:
image: 'hlebalbau/kafka-manager:3.0.0.5'
container_name: cmak
ports:
- "9000:9000"
environment:
- ZK_HOSTS=zookeeper:2181
depends_on:
- zookeeper
- kafka1
- kafka2
- kafka3
redpanda-console:
image: 'docker.redpanda.com/redpandadata/console:v2.3.7'
container_name: redpanda-console
ports:
- "8989:8080"
environment:
- KAFKA_BROKERS=kafka1:19092,kafka2:19092,kafka3:19092
depends_on:
- zookeeper
- kafka1
- kafka2
- kafka3
mysql:
image: 'mysql:8.0.35'
container_name: mysql
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=1234
- MYSQL_DATABASE=campus
- MYSQL_USER=myuser
- MYSQL_PASSWORD=mypassword
volumes:
- ./.data/mysql:/var/lib/mysql
kafka volumes를 주석 처리한 이유는 java.nio.file.AccessDeniedException: /bitnami/kafka/data/my-topic2-1 -> /bitnami/kafka/data/my-topic2- 에러메시지가 떴기 때문입니다.
kafka가 계속 죽어서 kafka1을 로그 찍어보니 위와 같은 에러메시지가 떴고,
Kafka가 데이터를 저장하고 있는 디렉토리(/bitnami/kafka/data)에 접근 권한이 없어서 kafka 컨테이너가 실행도 못해본 채 죽어버립니다.
정확한 원인 파악 전에 일단 주석으로 돌리고 실행하려고 합니다.
이제 consumer와, producer 실행이 잘 되는 지 확인하기 위해 Spring boot 애플리케이션과 연동하여 print를 하나 찍어줍니다.
application.yml
spring:
cloud:
function:
definition: myProducer;myConsumer;
stream:
function:
bindings:
myProducer-out-0: producer-test
myConsumer-in-0: consumer-test
kafka:
binder:
brokers: localhost:9092, localhost:9093, localhost:9094
auto-create-topics: false
required-acks: 0
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
bindings:
consumer-test:
consumer:
start-offset: latest
bindings:
producer-test:
destination: my-json-topic
content-type: application/json
consumer-test:
destination: my-json-topic
group: test-consumer-group
consumer:
concurrency: 1
MyConsumer.java
package com.fastcampus.kafkahandson.consumer;
import com.fastcampus.kafkahandson.model.MyMessage;
import java.util.function.Consumer;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
public class MyConsumer implements Consumer<Message<MyMessage>> {
MyConsumer() {
System.out.println("MyConsumer init!");
}
@Override
public void accept(Message<MyMessage> message) {
System.out.println("Message arrived! - " + message.getPayload());
}
}
MyProducer.java
package com.fastcampus.kafkahandson.producer;
import com.fastcampus.kafkahandson.model.MyMessage;
import java.util.function.Supplier;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
@Component
public class MyProducer implements Supplier<Flux<Message<MyMessage>>> {
MyProducer(){
System.out.println("MyProducer init!");
}
private final Sinks.Many<Message<MyMessage>> sinks = Sinks.many().unicast().onBackpressureBuffer();
public void sendMessage(MyMessage myMessage) {
Message<MyMessage> message = MessageBuilder
.withPayload(myMessage)
.setHeader(KafkaHeaders.KEY, String.valueOf(myMessage.getContent()))
.build();
sinks.emitNext(message, EmitFailureHandler.FAIL_FAST);
}
@Override
public Flux<Message<MyMessage>> get() {
return sinks.asFlux();
}
}
print가 잘 찍힌 것을 확인하고 다음 단계로 넘어가겠습니다.
kafka ui에 들어가 토픽을 확인해봅니다.
파티션이 3개여야 하는데, 1개로 들어가 있습니다ㅠ
삭제하고 다시 만들려고 해도 my-json-topic이 제대로 삭제가 되지않아 my-json-topic2로 만들어버렸습니다.
왜 삭제해도 다시 살아날까요?;;;
토픽 생성은 kafka ui에서 생성하실 수 있습니다.
파티션 3개가 제대로 생성되었습니다.
이제 my-json-topic2 토픽에 메시지를 넣어보겠습니다.
메시지가 잘 들어간 것을 확인해줍니다. 2번 파티션에 들어갔습니다.
제가 key를 content로 설정하는 바람에 문장형 key가 되었습니다;;ㅎ
원래는 age나 id같이 정확성을 띄는 숫자나 확실히 구분되는 문자를 key로 지정하는 것이 바람직해보입니다.
이 테스트를 통해 알 수 있는 것은 partition 별로 담당 key가 배정되어 있다는 것입니다. key가 같으면 항상 동일한 partition에 등록이 됩니다.
같은 key는 같은 partition에 들어가므로 순서보장에 유리합니다.🤭
실습은 강의 진도에 맞게 여기까지 해보겠습니다!