[TIL] Kafka란?

2024. 8. 15. 20:30·TIL

 

Kafka란?

 

분산 스트리밍 플랫폼으로 주로 실시간 데이터 피드의 빅 데이터 처리를 목적으로 사용된다.

대용량 데이터 스트림을 저장하고 실시간으로 분석하거나 처리한다.

 

기본 구성 요소

  1. 메시지(Message) - Kafka를 통해 전달되는 데이터 단위
  2. 프로듀서(Producer) - 메시지를 생성하고 Kafka에 보내는 역할, 특정 토픽에 메시지를 보낸다.
  3. 토픽(Topic) - 메시지를 저장하는 장소. 메시지는 토픽에 저장되었다가 소비자에게 전달된다.
  4. 파티션(Partition) - 토픽을 물리적으로 나눈 단위
  5. 키(Key) - 메시지를 특정 파티션에 할당하는 데 사용되는 값
  6. 컨슈머(Consumer) - 토픽에서 메시지를 가져와 처리하는 역할
  7. 브로커(Broker) - Kafka 클러스터의 각 서버를 의미, 메시지를 저장하고 전송하는 역할
  8. 주키퍼(Zookeeper) - Kafka 클러스터를 관리하고 조정하는 데 사용되는 분산 코디네이션 서비스

 

Kafka 실습

 

docker-compose.yml 파일 생성

services:
  zookeeper:
    image: wurstmeister/zookeeper:latest
    platform: linux/amd64
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: wurstmeister/kafka:latest
    platform: linux/amd64
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    platform: linux/amd64
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_READONLY: "false"

 

docker compose 실행

docker compose up -d

 

http://localhost:8080/ 로 접속하면 UI for Apache Kafka에 접속할 수 있다.

 

Producer 서비스

 

build.gradle

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-web'
	implementation 'org.springframework.kafka:spring-kafka'
	compileOnly 'org.projectlombok:lombok'
	annotationProcessor 'org.projectlombok:lombok'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
	testImplementation 'org.springframework.kafka:spring-kafka-test'
	testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

 

application.propertise

spring.application.name=producer
server.port=8090

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

 

ProducerApplicationKafkaConfig.java

@Configuration
public class ProducerApplicationKafkaConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

producerFactory() : Kafka 프로듀서 인스턴스를 생성

kafkaTemplate() : 메시지를 간단하게 전송할 수 있는 KafkaTemplate 생성

 

ProducerController.java

@RestController
@RequiredArgsConstructor
public class ProducerController {

    private final ProducerService producerService;

    @GetMapping("/send")
    public String sendMessage(@RequestParam("topic") String topic,
                              @RequestParam("key") String key,
                              @RequestParam("message") String message) {
        producerService.sendMessage(topic, key, message);
        return "Message send to kafka topic";
    }
}

Key는 Kafka에서 메시지를 특정 파티션에 할당하고, 순서를 보장하는 중요한 역할을 한다.

Key를 적절히 설정하면 애플리케이션이 데이터 처리의 일관성을 유지할 수 있고,

필요한 경우 메시지의 순서를 보장할 수 있다.

 

 

ProducerService.java

@Service
@RequiredArgsConstructor
public class ProducerService {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String key, String message) {
        for(int i = 0; i < 10; i++){
            kafkaTemplate.send(topic,key,message+" " +i);
        }
    }
}

 

 

Consumer 서비스

 

build.gradle

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-web'
	implementation 'org.springframework.kafka:spring-kafka'
	compileOnly 'org.projectlombok:lombok'
	annotationProcessor 'org.projectlombok:lombok'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
	testImplementation 'org.springframework.kafka:spring-kafka-test'
	testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

 

application.properties

spring.application.name=producer
server.port=8091

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

 

ConsumerApplicationKafkaConfig.java

// 이 클래스는 Kafka 컨슈머 설정을 위한 Spring 설정 클래스입니다.
@EnableKafka // Kafka 리스너를 활성화하는 어노테이션입니다.
@Configuration // Spring 설정 클래스로 선언하는 어노테이션입니다.
public class ConsumerApplicationKafkaConfig {

    // Kafka 컨슈머 팩토리를 생성하는 빈을 정의합니다.
    // ConsumerFactory는 Kafka 컨슈머 인스턴스를 생성하는 데 사용됩니다.
    // 각 컨슈머는 이 팩토리를 통해 생성된 설정을 기반으로 작동합니다.
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        // 컨슈머 팩토리 설정을 위한 맵을 생성합니다.
        Map<String, Object> configProps = new HashMap<>();
        // Kafka 브로커의 주소를 설정합니다.
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 메시지 키의 디시리얼라이저 클래스를 설정합니다.
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 메시지 값의 디시리얼라이저 클래스를 설정합니다.
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 설정된 프로퍼티로 DefaultKafkaConsumerFactory를 생성하여 반환합니다.
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    // Kafka 리스너 컨테이너 팩토리를 생성하는 빈을 정의합니다.
    // ConcurrentKafkaListenerContainerFactory는 Kafka 메시지를 비동기적으로 수신하는 리스너 컨테이너를 생성하는 데 사용됩니다.
    // 이 팩토리는 @KafkaListener 어노테이션이 붙은 메서드들을 실행할 컨테이너를 제공합니다.
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        // ConcurrentKafkaListenerContainerFactory를 생성합니다.
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 컨슈머 팩토리를 리스너 컨테이너 팩토리에 설정합니다.
        factory.setConsumerFactory(consumerFactory());
        // 설정된 리스너 컨테이너 팩토리를 반환합니다.
        return factory;
    }
}

 

ConsumerService.java

@Slf4j
@Service
public class ConsumerService {

    // 이 메서드는 Kafka에서 메시지를 소비하는 리스너 메서드입니다.
    // @KafkaListener 어노테이션은 이 메서드를 Kafka 리스너로 설정합니다.
    @KafkaListener(groupId = "group_a", topics = "topic1")
    // Kafka 토픽 "test-topic"에서 메시지를 수신하면 이 메서드가 호출됩니다.
    // groupId는 컨슈머 그룹을 지정하여 동일한 그룹에 속한 다른 컨슈머와 메시지를 분배받습니다.
    public void consumeFromGroupA(String message) {
        log.info("Group A consumed message from topic1: " + message);
    }

    // 동일한 토픽을 다른 그룹 ID로 소비하는 또 다른 리스너 메서드입니다.
    @KafkaListener(groupId = "group_b", topics = "topic1")
    public void consumeFromGroupB(String message) {
        log.info("Group B consumed message from topic1: " + message);
    }

    // 다른 토픽을 다른 그룹 ID로 소비하는 리스너 메서드입니다.
    @KafkaListener(groupId = "group_c", topics = "topic2")
    public void consumeFromTopicC(String message) {
        log.info("Group C consumed message from topic2: " + message);
    }

    // 다른 토픽을 다른 그룹 ID로 소비하는 리스너 메서드입니다.
    @KafkaListener(groupId = "group_c", topics = "topic3")
    public void consumeFromTopicD(String message) {
        log.info("Group C consumed message from topic3: " + message);
    }

    @KafkaListener(groupId = "group_d", topics = "topic4")
    public void consumeFromPartition0(String message) {
        log.info("Group D consumed message from topic4: " + message);
    }
}

http://localhost:8090/send?topic=topic1&key=key-1&message=hihi

전송하는 토픽과 일치하는 리스너가 실행되는 것을 확인한다.

 

Kafka 리스너는 전송된 메시지의 토픽과 일치할 때 메시지를 소비하게 된다.

프로듀서가 특정 토픽으로 메시지를 전송하면,

그 토픽을 구독하고 있는 리스너(컨슈머)가 메시지를 수식하여 처리하게 된다.

'TIL' 카테고리의 다른 글

[TIL] 모니터링  (0) 2024.08.16
[TIL] SAGA Pattern  (0) 2024.08.16
[TIL] RabbitMQ 이해하기  (0) 2024.08.13
[TIL] Docker란? | FeignClient DIP | Gateway FeignClient 순환참조  (0) 2024.08.13
[TIL] Spring Boot 로컬과 서버 환경 분리 | 정적 팩토리 메서드  (0) 2024.08.10
'TIL' 카테고리의 다른 글
  • [TIL] 모니터링
  • [TIL] SAGA Pattern
  • [TIL] RabbitMQ 이해하기
  • [TIL] Docker란? | FeignClient DIP | Gateway FeignClient 순환참조
dev_ajrqkq
dev_ajrqkq
알고리즘 천재가 될 거야
  • dev_ajrqkq
    기록이 자산이다
    dev_ajrqkq
  • 전체
    오늘
    어제
    • 분류 전체보기 (147)
      • Front-end (0)
      • Back-end (11)
        • Spring (1)
        • Java (8)
      • CS (9)
        • 데이터베이스 (5)
        • 네트워크 (4)
      • Algorithm (80)
      • 이것저것 (0)
      • 버그잡기 (1)
      • TIL (37)
      • 후기 (1)
      • 취준 (0)
  • 블로그 메뉴

    • 링크

    • 공지사항

    • 인기 글

    • 태그

      99클럽
      코딩테스트준비
      오블완
      Til
      항해99
      티스토리챌린지
      개발자취업
      TypeScript
    • 최근 댓글

    • 최근 글

    • hELLO· Designed By정상우.v4.10.2
    dev_ajrqkq
    [TIL] Kafka란?
    상단으로

    티스토리툴바