개발블로그

springboot- kafka create dynamically 본문

Spring

springboot- kafka create dynamically

개발자수니 2019. 6. 19. 10:57

나는 두가지 방법으로 kafa topic listener를 동적으로 생성하고 싶었다. 

  1. 서버가 로딩될 때, DB로부터 kafka topic 목록을 받아서 Listener 생성

  2. topic 추가 API를 통해 kafka topic이 DB에 추가될 때마다 Listener 생성

당연히 두개 다 같은 방식으로 동작하므로 1번을 기반으로 정리를 하려 한다. 

Springboot가 모든 Bean들을 로딩한 뒤, 다음의 코드를 실행시켜야 한다.

 

@Override
public void run(String... args) throws Exception {

    List<Topic> topics = kafkaTopicService.getTopics();

    for (Topic topic: topics) {
        AnnotationConfigApplicationContext childContext = new AnnotationConfigApplicationContext();
        childContext.setParent(context);
        childContext.register(KafkaListenerConfig.class);
        Properties props = new Properties();
        props.setProperty("topic", topic.getPath());
        PropertiesPropertySource pps = new PropertiesPropertySource("listenerProps", props);
        childContext.getEnvironment().getPropertySources().addLast(pps);
        childContext.refresh();
    }

}

 

앞서 말했듯, Bean들을 로딩한 이후에 실행시켜야 하기 때문에 CommandLineRunner에서 실행시켰다. 

 

위 코드에서 동적으로 빈으로 등록하고 있는 KafkaListenerConfig.class는 다음과 같다.

@Configuration
@EnableKafka
public class KafkaListenerConfig {
   
    @Bean
    public Listener listener() {
        return new Listener();
    }
}

Listener class의 인스턴스를 생성하여 빈으로 등록하고 있다. 

 

Listener class에는 우리가 동적으로 등록되길 원했던 KafkaListener가 포함되어 있다. 

public class Listener {
    @Autowired
    private WebSocketSubscribedTopicManager subscribedTopicManager;

    @Autowired
    private SimpMessagingTemplate websocketTemplate;

    @KafkaListener(id = "${kafka.group.id:${random.uuid}}", topics = "${topic}")
    public void listen(KafkaResponse kafkaResponse) {
        String websocketTopic = MessageFormat.format("{0}/{1}",kafkaResponse.getTopicPath() , kafkaResponse.getUuid());

        if (!subscribedTopicManager.topicExists(websocketTopic)) {
            return;
        }
        websocketTemplate.convertAndSend(websocketTopic, kafkaResponse);
    }
}

 

 

여기서 주의할 점은 main의 package가 KafkaListenerConfig.class, Listener.class의 package와 달라야 한다는 것이다.

Springboot는 기본적으로 Main package와 같은 package에 있는 Bean들을 로드한다. 즉 같은 package에 있으면 ComponentScan의 대상이 된다.

그러면 Listener가 Bean으로 등록될텐데, 그 때 SPEL로 변수처리된 topic 값에 대한 정보가 없으므로 에러가 발생한다.

 

따라서 ComponentScan의 대상이 되지 않기 위해 다른 package에 두어야 한다. 

Comments