LikeTech Main.

kafka-producer-consumer

Seokkyu Ryu
Seokkyu Ryu

오늘은 kafka 관련해서, 여러 토픽과 여러 컨슈머를 만들어서 작동하도록 하는 과정을 정리해보려고 한다.

여러개의 토픽이 있고, 각각의 토픽에서 메시지를 소비하는 consumer들을 각각 만들 것이다.

User 클래스 사용
user가 생성될 때 쌓이는 토픽(user-create), 삭제될 때 쌓이는 토픽(user-delete), 수정될 때 쌓이는 토픽(user-update)
Team 클래스 사용
team이 생성될 때 쌓이는 토픽(team-create), 삭제될 때 쌓이는 토픽(team-delete), 수정될 때 쌓이는 토픽(team-update)

여러 가지 방법들이 있는 것 같은 데, 나는 KafkaListener Annotation를 이용해서 consumer를 만들 것이다.

현재 기준으로는 까다로운 custom 기능이 필요하지 않기 때문에, 좀 더 하이 레벨로 컨슈머를 구현할 수 있음.
빠르고, 손쉽게, 나은 퍼포먼스를 가진 kafka consumer를 만들 수 있음

각각의 토픽들이 다른 클래스 타입들을 사용하기 때문에 KafkaListener Annotation를 이용해서 만들 때, 다른 KafkaListenerContainerFactory를 만들어줘야 한다.

KafkaListenerContainerFactory를 만들기 위해서는 ConsumerFactory가 필요하고, ConsumerFactory를 만들기 위해서는 consumer configuration을 설정해줘야한다.

public Map<String, Object> consumerConfigs(String groupId) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // bootstrap server 설정, 나는 127.0.0.1:9092 로 설정
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    return props;
}

public ConsumerFactory<String, User> userConsumerFactory() {
    return new DefaultKafkaConsumerFactory<>(
        consumerConfigs(userGroupId), new StringDeserializer(), new JsonDeserializer<>(User.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, User> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(userConsumerFactory());
    return factory;
}

여러 토픽들 중 User 클래스 타입을 관리하는 user-create, user-delete, user-update 토픽들을 소비하는 userConsumer 들은 위의 코드와 같이 만든다.

다른 Team 클래스 타입을 관리하는 team-create, team-delete, team-update 토픽들을 소비하는 teamConsumer 들은 위의 코드에서 클래스 타입만 변환하면 된다.

consumerConfigs 함수는 공통이니 추가로 필요하지 않음.

public ConsumerFactory<String, Team> teamConsumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(teamGroupId), new StringDeserializer(),
            new JsonDeserializer<>(Team.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Team> teamKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Team> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(teamConsumerFactory());
    return factory;
}

다음과 같이, 위에서 만든 KafkaListenerContainerFactory를 사용해서 KafkaListener를 만들어주면 된다.

@KafkaListener(topics = "${kafka.topic.users.create}", groupId = "${kafka.group.id.users}", containerFactory = "userKafkaListenerContainerFactory")
public void createUser(User user) {

    log.info("consume user and create user {} ", user.toString());
    userService.save(user);

}

위는 user-create 토픽을 소비하는 consumer이다.

만약 user-delete 토픽을 소피하는 컨슈머를 만들고 싶으면 아래와 같이 작성하면 된다.

 @KafkaListener(topics = "${kafka.topic.users.delete}", groupId = "${kafka.group.id.users}", containerFactory = "userKafkaListenerContainerFactory")
public void deleteUser(User user) {

    log.info("consume user and delete user{} ", user.toString());
    userService.delete(user);

}

만약 Team 클래스 타입을 관리하는 토픽들의 consumer 들도 당연히 생성할 수 있다. 다음과 같다.

@KafkaListener(topics = "${kafka.topic.teams.create}", groupId = "${kafka.group.id.teams}", containerFactory = "teamKafkaListenerContainerFactory")
public void createTeam(Team team) {

    log.info("consume team and create team {} ", team.toString());
    teamService.save(team);

}

@KafkaListener(topics = "${kafka.topic.teams.delete}", groupId = "${kafka.group.id.teams}", containerFactory = "teamKafkaListenerContainerFactory")
public void deleteTeam(Team team) {

    log.info("consume team and delete team {} ", team.toString());
    teamService.delete(team);

}

이렇게 하면 여러 클래스 타입을 관리하는 여러 토픽들을 소비할 수 있는 Consumer들을 만들 수 있다.

정리

  • KafkaListener high level API 를 이용하여 여러 Consumer들 만들 수 있다.
  • Consumer Configuration 정의
  • ConsumerFactory 정의
  • KafkaListenerContainerFactory 생성 및 빈 등록
  • 소비했을 때 해야 하는 행위가 정의된 함수에 @KafkaListener 선언
  • topic 이름, group id, containerFactory를 인자로 넘김.

개선

  • 더 중복된 코드를 줄일 수 있는 방법은 없는 걸까?