후기

패스트캠퍼스 환급챌린지 1일차 : 백엔드 개발자를 위한 Kafka 실습 0 to 1 : 입문부터 EDA까지 강의 후기

dev_ajrqkq 2025. 7. 1. 21:07

본 포스팅은 패스트캠퍼스 환급 챌린지 참여를 위해 작성하였습니다.

https://fastcampus.info/4n8ztzq

 

(~6/20) 50일의 기적 AI 환급반💫 | 패스트캠퍼스

초간단 미션! 하루 20분 공부하고 수강료 전액 환급에 AI 스킬 장착까지!

fastcampus.co.kr

 


 

 

2025/07/01 spring-cloud-stream에서 Produce, Consume 해보기 강의를 들어보겠습니다.

 

 

 

실습용으로 주어진 docker-compose.yml 파일입니다.

services:
  zookeeper:
    image: 'bitnami/zookeeper:3.7.2'
    container_name: zookeeper
    ports:
      - 2181:2181
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    volumes:
      - ./.data/zookeeper/data:/bitnami/zookeeper/data
      - ./.data/zookeeper/datalog:/bitnami/zookeeper/datalog
      - ./.data/zookeeper/logs:/bitnami/zookeeper/logs
  kafka1:
    image: 'bitnami/kafka:3.6.0'
    container_name: kafka1
    hostname: kafka1
    ports:
      - 19092
      - "9092:9092"
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:19092,EXTERNAL://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka1:19092,EXTERNAL://localhost:9092
      - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper
#    volumes:
#      - ./.data/kafka1:/bitnami/kafka/data
  kafka2:
    image: 'bitnami/kafka:3.6.0'
    container_name: kafka2
    ports:
      - 19092
      - "9093:9093"
    environment:
      - KAFKA_BROKER_ID=2
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:19092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka2:19092,EXTERNAL://localhost:9093
      - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper
#    volumes:
#      - ./.data/kafka2:/bitnami/kafka/data
  kafka3:
    image: 'bitnami/kafka:3.6.0'
    container_name: kafka3
    ports:
      - 19092
      - "9094:9094"
    environment:
      - KAFKA_BROKER_ID=3
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:19092,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka3:19092,EXTERNAL://localhost:9094
      - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper
#    volumes:
#      - ./.data/kafka3:/bitnami/kafka/data
  kafka-ui:
    image: 'provectuslabs/kafka-ui:v0.7.1'
    container_name: kafka-ui
    ports:
      - "8081:8080"
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:19092,kafka2:19092,kafka3:19092
    depends_on:
      - zookeeper
      - kafka1
      - kafka2
      - kafka3
  cmak:
    image: 'hlebalbau/kafka-manager:3.0.0.5'
    container_name: cmak
    ports:
      - "9000:9000"
    environment:
      - ZK_HOSTS=zookeeper:2181
    depends_on:
      - zookeeper
      - kafka1
      - kafka2
      - kafka3
  redpanda-console:
    image: 'docker.redpanda.com/redpandadata/console:v2.3.7'
    container_name: redpanda-console
    ports:
      - "8989:8080"
    environment:
      - KAFKA_BROKERS=kafka1:19092,kafka2:19092,kafka3:19092
    depends_on:
      - zookeeper
      - kafka1
      - kafka2
      - kafka3
  mysql:
    image: 'mysql:8.0.35'
    container_name: mysql
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=1234
      - MYSQL_DATABASE=campus
      - MYSQL_USER=myuser
      - MYSQL_PASSWORD=mypassword
    volumes:
      - ./.data/mysql:/var/lib/mysql

 

kafka volumes를 주석 처리한 이유는 java.nio.file.AccessDeniedException: /bitnami/kafka/data/my-topic2-1 -> /bitnami/kafka/data/my-topic2- 에러메시지가 떴기 때문입니다.

 

kafka가 계속 죽어서 kafka1을 로그 찍어보니 위와 같은 에러메시지가 떴고, 

Kafka가 데이터를 저장하고 있는 디렉토리(/bitnami/kafka/data)에 접근 권한이 없어서 kafka 컨테이너가 실행도 못해본 채 죽어버립니다.

 

정확한 원인 파악 전에 일단 주석으로 돌리고 실행하려고 합니다.

 

이제 consumer와, producer 실행이 잘 되는 지 확인하기 위해 Spring boot 애플리케이션과 연동하여 print를 하나 찍어줍니다.

 

application.yml

spring:
  cloud:
    function:
      definition: myProducer;myConsumer;
    stream:
      function:
        bindings:
          myProducer-out-0: producer-test
          myConsumer-in-0: consumer-test
      kafka:
        binder:
          brokers: localhost:9092, localhost:9093, localhost:9094
          auto-create-topics: false
          required-acks: 0
          configuration:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
        bindings:
          consumer-test:
            consumer:
              start-offset: latest
      bindings:
        producer-test:
          destination: my-json-topic
          content-type: application/json
        consumer-test:
          destination: my-json-topic
          group: test-consumer-group
          consumer:
            concurrency: 1

 

MyConsumer.java

package com.fastcampus.kafkahandson.consumer;

import com.fastcampus.kafkahandson.model.MyMessage;
import java.util.function.Consumer;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class MyConsumer implements Consumer<Message<MyMessage>> {

    MyConsumer() {
        System.out.println("MyConsumer init!");
    }

    @Override
    public void accept(Message<MyMessage> message) {
        System.out.println("Message arrived! - " + message.getPayload());
    }
}

 

MyProducer.java

package com.fastcampus.kafkahandson.producer;

import com.fastcampus.kafkahandson.model.MyMessage;
import java.util.function.Supplier;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;

@Component
public class MyProducer implements Supplier<Flux<Message<MyMessage>>> {

    MyProducer(){
        System.out.println("MyProducer init!");
    }

    private final Sinks.Many<Message<MyMessage>> sinks = Sinks.many().unicast().onBackpressureBuffer();

    public void sendMessage(MyMessage myMessage) {
        Message<MyMessage> message = MessageBuilder
                .withPayload(myMessage)
                .setHeader(KafkaHeaders.KEY, String.valueOf(myMessage.getContent()))
                .build();
        sinks.emitNext(message, EmitFailureHandler.FAIL_FAST);
    }

    @Override
    public Flux<Message<MyMessage>> get() {
        return sinks.asFlux();
    }
}

 

print가 잘 찍힌 것을 확인하고 다음 단계로 넘어가겠습니다.


kafka ui에 들어가 토픽을 확인해봅니다.

 

파티션이 3개여야 하는데, 1개로 들어가 있습니다ㅠ

삭제하고 다시 만들려고 해도 my-json-topic이 제대로 삭제가 되지않아 my-json-topic2로 만들어버렸습니다. 

왜 삭제해도 다시 살아날까요?;;;

 

토픽 생성은 kafka ui에서 생성하실 수 있습니다.

 

파티션 3개가 제대로 생성되었습니다.

 

이제 my-json-topic2 토픽에 메시지를 넣어보겠습니다.

메시지가 잘 들어간 것을 확인해줍니다. 2번 파티션에 들어갔습니다.

 

제가 key를 content로 설정하는 바람에 문장형 key가 되었습니다;;ㅎ

원래는 age나 id같이 정확성을 띄는 숫자나 확실히 구분되는 문자를 key로 지정하는 것이 바람직해보입니다.

 

이 테스트를 통해 알 수 있는 것은 partition 별로 담당 key가 배정되어 있다는 것입니다. key가 같으면 항상 동일한 partition에 등록이 됩니다.

같은 key는 같은 partition에 들어가므로 순서보장에 유리합니다.🤭

 

실습은 강의 진도에 맞게 여기까지 해보겠습니다!