Database

아파치 카프카(Apache Kafka) - 프로듀서, 컨슈머

TedDev 2025. 1. 6. 10:48
728x90

Apache Kafka의 프로듀서(Producer)컨슈머(Consumer)는 Kafka의 핵심 구성 요소로 데이터를 송수신하는 데 사용된다. 이 두 역할은 Kafka 클러스터를 통해 데이터를 효율적으로 주고받을 수 있도록 설계되어있다.

 

1. 프로듀서(Producer)

역할

  • 데이터 생성 및 전송
    • 프로듀서는 데이터를 Kafka 클러스터로 보내는 애플리케이션이다.
    • 데이터를 특정 토픽(Topic)에 게시(Produce)한다.

주요 특징

  1. 파티션 선택
    • 프로듀서는 토픽의 데이터를 파티션(Partition)으로 분배한다.
    • 기본적으로 파티셔너(Partitioner)를 통해 데이터를 특정 파티션에 저장한다.
      • 파티션 키(key)가 제공되면 해당 키의 해시 값을 기준으로 파티션이 결정된다.
      • 파티션 키가 없으면 라운드 로빈 방식으로 파티션이 선택된다.
  2. 데이터 보장
    • Kafka는 데이터 전달 보장을 위해 acks(Acknowledgments) 설정을 지원한다
      • acks=0 : 프로듀서는 데이터 전송 후 확인하지 않음
      • acks=1 : 리더가 데이터를 수신하면 성공으로 간주
      • acks=all : 모든 복제본이 데이터를 수신해야 성공으로 간주
  3. 압축 지원
    • 프로듀서는 데이터를 Kafka로 전송하기 전에 압축할 수 있다. (예: gzip, snappy, lz4, zstd)
    • 이는 네트워크 사용량을 줄이고 처리 속도를 높이는 데 유용하다.
  4. 배치 처리
    • 여러 메시지를 하나의 배치(batch)로 묶어 전송하여 네트워크 비용을 절감

구현 예시 (Java , Spring)

프로듀서 설정

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

프로듀서 서비스

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String key, String message) {
        kafkaTemplate.send(topic, key, message);
        System.out.printf("Produced message: key=%s, value=%s to topic=%s%n", key, message, topic);
    }
}

 

프로듀서 컨트롤러

import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaProducerController {

    private final KafkaProducerService producerService;

    public KafkaProducerController(KafkaProducerService producerService) {
        this.producerService = producerService;
    }

    @PostMapping("/send")
    public String sendMessage(@RequestParam String topic, @RequestParam String key, @RequestParam String message) {
        producerService.sendMessage(topic, key, message);
        return "Message sent successfully!";
    }
}

 

 

2. 컨슈머(Consumer)

역할

  • 데이터 소비 및 처리
    • 컨슈머는 Kafka 클러스터에서 데이터를 읽어오는 애플리케이션이다.
    • 특정 토픽을 구독(Subscribe)하고 데이터를 처리한다.

주요 특징

  1. 컨슈머 그룹(Consumer Group)
    • 컨슈머는 컨슈머 그룹에 속할 수 있으며 같은 그룹 내의 컨슈머는 서로 다른 파티션의 데이터를 처리한다.
    • 파티션 분배
      • 하나의 파티션은 하나의 컨슈머 그룹 내에서 한 컨슈머만 처리
      • 여러 컨슈머 그룹은 동일한 토픽 데이터를 독립적으로 처리 가능
  2. 오프셋 관리
    • Kafka는 각 파티션에서 컨슈머가 읽은 마지막 메시지의 위치(오프셋)를 관리한다.
    • 오프셋 저장 방식
      • 자동 커밋 : 컨슈머가 주기적으로 오프셋을 자동으로 커밋
      • 수동 커밋 : 애플리케이션에서 명시적으로 커밋
  3. 데이터 전달 보장
    • enable.auto.commit 설정으로 메시지 처리 완료 후 오프셋 커밋 여부 결정
    • 데이터 전달 모드
      • At-most-once : 메시지를 한 번만 처리, 중복 처리 불가
      • At-least-once : 메시지를 적어도 한 번 처리, 중복 가능성 있음
      • Exactly-once : 중복 없는 정확한 메시지 처리(특정 설정 필요)
  4. 로드 밸런싱
    • 컨슈머 그룹에 속한 컨슈머 수에 따라 파티션이 자동으로 재분배된다.

구현 예시 (Java, Spring)

컨슈머 설정

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.ConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

 

컨슈머 서비스

 
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic", groupId = "my-consumer-group")
    public void consume(String message) {
        System.out.printf("Consumed message: %s%n", message);
    }
}
 

 

3. 프로듀서와 컨슈머 간의 협력

  • 데이터 흐름
    1. 프로듀서가 Kafka 클러스터에 데이터를 게시(Produce)한다.
    2. 데이터는 브로커에서 지정된 토픽과 파티션에 저장된다.
    3. 컨슈머는 Kafka 클러스터에서 해당 데이터를 읽어(Consume) 처리한다.
  • 결합도 감소
    • 프로듀서와 컨슈머는 서로 직접적으로 연결되지 않으며 Kafka 클러스터를 통해 독립적으로 작동한다.
    • 이를 통해 시스템의 유연성과 확장성을 확보할 수 있다.

 

4. Kafka의 강점

  • 확장성 : 프로듀서와 컨슈머를 독립적으로 확장 가능
  • 유연성 : 여러 컨슈머 그룹이 같은 데이터를 동시에 처리할 수 있음
  • 순서 보장 : 파티션 내에서는 메시지의 순서가 보장됨
  • 내결함성 : 데이터 복제를 통해 장애에 강함

 

결론

  • 프로듀서는 데이터를 Kafka 클러스터에 게시하는 역할
  • 컨슈머는 클러스터에서 데이터를 읽고 처리하는 역할
  • Kafka의 설계는 높은 처리량, 내결함성, 확장성을 제공하며 데이터 중심 애플리케이션에서 효과적으로 사용된다.

 

 

반응형