springboot- kafka create dynamically
나는 두가지 방법으로 kafa topic listener를 동적으로 생성하고 싶었다.
-
서버가 로딩될 때, DB로부터 kafka topic 목록을 받아서 Listener 생성
-
topic 추가 API를 통해 kafka topic이 DB에 추가될 때마다 Listener 생성
당연히 두개 다 같은 방식으로 동작하므로 1번을 기반으로 정리를 하려 한다.
Springboot가 모든 Bean들을 로딩한 뒤, 다음의 코드를 실행시켜야 한다.
@Override
public void run(String... args) throws Exception {
List<Topic> topics = kafkaTopicService.getTopics();
for (Topic topic: topics) {
AnnotationConfigApplicationContext childContext = new AnnotationConfigApplicationContext();
childContext.setParent(context);
childContext.register(KafkaListenerConfig.class);
Properties props = new Properties();
props.setProperty("topic", topic.getPath());
PropertiesPropertySource pps = new PropertiesPropertySource("listenerProps", props);
childContext.getEnvironment().getPropertySources().addLast(pps);
childContext.refresh();
}
}
앞서 말했듯, Bean들을 로딩한 이후에 실행시켜야 하기 때문에 CommandLineRunner에서 실행시켰다.
위 코드에서 동적으로 빈으로 등록하고 있는 KafkaListenerConfig.class는 다음과 같다.
@Configuration
@EnableKafka
public class KafkaListenerConfig {
@Bean
public Listener listener() {
return new Listener();
}
}
Listener class의 인스턴스를 생성하여 빈으로 등록하고 있다.
Listener class에는 우리가 동적으로 등록되길 원했던 KafkaListener가 포함되어 있다.
public class Listener {
@Autowired
private WebSocketSubscribedTopicManager subscribedTopicManager;
@Autowired
private SimpMessagingTemplate websocketTemplate;
@KafkaListener(id = "${kafka.group.id:${random.uuid}}", topics = "${topic}")
public void listen(KafkaResponse kafkaResponse) {
String websocketTopic = MessageFormat.format("{0}/{1}",kafkaResponse.getTopicPath() , kafkaResponse.getUuid());
if (!subscribedTopicManager.topicExists(websocketTopic)) {
return;
}
websocketTemplate.convertAndSend(websocketTopic, kafkaResponse);
}
}
여기서 주의할 점은 main의 package가 KafkaListenerConfig.class, Listener.class의 package와 달라야 한다는 것이다.
Springboot는 기본적으로 Main package와 같은 package에 있는 Bean들을 로드한다. 즉 같은 package에 있으면 ComponentScan의 대상이 된다.
그러면 Listener가 Bean으로 등록될텐데, 그 때 SPEL로 변수처리된 topic 값에 대한 정보가 없으므로 에러가 발생한다.
따라서 ComponentScan의 대상이 되지 않기 위해 다른 package에 두어야 한다.