Database
아파치 카프카(Apache Kafka) - 프로듀서, 컨슈머
TedDev
2025. 1. 6. 10:48
728x90
Apache Kafka의 프로듀서(Producer)와 컨슈머(Consumer)는 Kafka의 핵심 구성 요소로 데이터를 송수신하는 데 사용된다. 이 두 역할은 Kafka 클러스터를 통해 데이터를 효율적으로 주고받을 수 있도록 설계되어있다.
1. 프로듀서(Producer)
역할
- 데이터 생성 및 전송
- 프로듀서는 데이터를 Kafka 클러스터로 보내는 애플리케이션이다.
- 데이터를 특정 토픽(Topic)에 게시(Produce)한다.
주요 특징
- 파티션 선택
- 프로듀서는 토픽의 데이터를 파티션(Partition)으로 분배한다.
- 기본적으로 파티셔너(Partitioner)를 통해 데이터를 특정 파티션에 저장한다.
- 파티션 키(key)가 제공되면 해당 키의 해시 값을 기준으로 파티션이 결정된다.
- 파티션 키가 없으면 라운드 로빈 방식으로 파티션이 선택된다.
- 데이터 보장
- Kafka는 데이터 전달 보장을 위해 acks(Acknowledgments) 설정을 지원한다
- acks=0 : 프로듀서는 데이터 전송 후 확인하지 않음
- acks=1 : 리더가 데이터를 수신하면 성공으로 간주
- acks=all : 모든 복제본이 데이터를 수신해야 성공으로 간주
- Kafka는 데이터 전달 보장을 위해 acks(Acknowledgments) 설정을 지원한다
- 압축 지원
- 프로듀서는 데이터를 Kafka로 전송하기 전에 압축할 수 있다. (예: gzip, snappy, lz4, zstd)
- 이는 네트워크 사용량을 줄이고 처리 속도를 높이는 데 유용하다.
- 배치 처리
- 여러 메시지를 하나의 배치(batch)로 묶어 전송하여 네트워크 비용을 절감
구현 예시 (Java , Spring)
프로듀서 설정
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
프로듀서 서비스
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message);
System.out.printf("Produced message: key=%s, value=%s to topic=%s%n", key, message, topic);
}
}
프로듀서 컨트롤러
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaProducerController {
private final KafkaProducerService producerService;
public KafkaProducerController(KafkaProducerService producerService) {
this.producerService = producerService;
}
@PostMapping("/send")
public String sendMessage(@RequestParam String topic, @RequestParam String key, @RequestParam String message) {
producerService.sendMessage(topic, key, message);
return "Message sent successfully!";
}
}
2. 컨슈머(Consumer)
역할
- 데이터 소비 및 처리
- 컨슈머는 Kafka 클러스터에서 데이터를 읽어오는 애플리케이션이다.
- 특정 토픽을 구독(Subscribe)하고 데이터를 처리한다.
주요 특징
- 컨슈머 그룹(Consumer Group)
- 컨슈머는 컨슈머 그룹에 속할 수 있으며 같은 그룹 내의 컨슈머는 서로 다른 파티션의 데이터를 처리한다.
- 파티션 분배
- 하나의 파티션은 하나의 컨슈머 그룹 내에서 한 컨슈머만 처리
- 여러 컨슈머 그룹은 동일한 토픽 데이터를 독립적으로 처리 가능
- 오프셋 관리
- Kafka는 각 파티션에서 컨슈머가 읽은 마지막 메시지의 위치(오프셋)를 관리한다.
- 오프셋 저장 방식
- 자동 커밋 : 컨슈머가 주기적으로 오프셋을 자동으로 커밋
- 수동 커밋 : 애플리케이션에서 명시적으로 커밋
- 데이터 전달 보장
- enable.auto.commit 설정으로 메시지 처리 완료 후 오프셋 커밋 여부 결정
- 데이터 전달 모드
- At-most-once : 메시지를 한 번만 처리, 중복 처리 불가
- At-least-once : 메시지를 적어도 한 번 처리, 중복 가능성 있음
- Exactly-once : 중복 없는 정확한 메시지 처리(특정 설정 필요)
- 로드 밸런싱
- 컨슈머 그룹에 속한 컨슈머 수에 따라 파티션이 자동으로 재분배된다.
구현 예시 (Java, Spring)
컨슈머 설정
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
컨슈머 서비스
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "my-topic", groupId = "my-consumer-group")
public void consume(String message) {
System.out.printf("Consumed message: %s%n", message);
}
}
3. 프로듀서와 컨슈머 간의 협력
- 데이터 흐름
- 프로듀서가 Kafka 클러스터에 데이터를 게시(Produce)한다.
- 데이터는 브로커에서 지정된 토픽과 파티션에 저장된다.
- 컨슈머는 Kafka 클러스터에서 해당 데이터를 읽어(Consume) 처리한다.
- 결합도 감소
- 프로듀서와 컨슈머는 서로 직접적으로 연결되지 않으며 Kafka 클러스터를 통해 독립적으로 작동한다.
- 이를 통해 시스템의 유연성과 확장성을 확보할 수 있다.
4. Kafka의 강점
- 확장성 : 프로듀서와 컨슈머를 독립적으로 확장 가능
- 유연성 : 여러 컨슈머 그룹이 같은 데이터를 동시에 처리할 수 있음
- 순서 보장 : 파티션 내에서는 메시지의 순서가 보장됨
- 내결함성 : 데이터 복제를 통해 장애에 강함
결론
- 프로듀서는 데이터를 Kafka 클러스터에 게시하는 역할
- 컨슈머는 클러스터에서 데이터를 읽고 처리하는 역할
- Kafka의 설계는 높은 처리량, 내결함성, 확장성을 제공하며 데이터 중심 애플리케이션에서 효과적으로 사용된다.
반응형