일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- springboot-angular-jwt
- jvm 메모리 구조
- filter ordering
- intern
- Constants pool
- spring-boot-maven-plugin
- install mongodb docker
- spring filter ordering
- docker mongodb install
- String Constants Pool
- 기본 Manifest 속성이 없습니다
- mongodb install ec2
- jwt example
- spring jwt
- jvm 모델
- jwt token
- docker mongodb
- jpa pagination
- HHH000104
- springboot maven plugin
- angular jwt
- jvm memory model
- jvm memory structure
- springboot jwt example
- springboot jwt
- string comparison
- JWT
- String Pool
- JPA
- springboot mongodb config
- Today
- Total
개발블로그
springboot- kafka create dynamically 본문
나는 두가지 방법으로 kafa topic listener를 동적으로 생성하고 싶었다.
-
서버가 로딩될 때, DB로부터 kafka topic 목록을 받아서 Listener 생성
-
topic 추가 API를 통해 kafka topic이 DB에 추가될 때마다 Listener 생성
당연히 두개 다 같은 방식으로 동작하므로 1번을 기반으로 정리를 하려 한다.
Springboot가 모든 Bean들을 로딩한 뒤, 다음의 코드를 실행시켜야 한다.
@Override
public void run(String... args) throws Exception {
List<Topic> topics = kafkaTopicService.getTopics();
for (Topic topic: topics) {
AnnotationConfigApplicationContext childContext = new AnnotationConfigApplicationContext();
childContext.setParent(context);
childContext.register(KafkaListenerConfig.class);
Properties props = new Properties();
props.setProperty("topic", topic.getPath());
PropertiesPropertySource pps = new PropertiesPropertySource("listenerProps", props);
childContext.getEnvironment().getPropertySources().addLast(pps);
childContext.refresh();
}
}
앞서 말했듯, Bean들을 로딩한 이후에 실행시켜야 하기 때문에 CommandLineRunner에서 실행시켰다.
위 코드에서 동적으로 빈으로 등록하고 있는 KafkaListenerConfig.class는 다음과 같다.
@Configuration
@EnableKafka
public class KafkaListenerConfig {
@Bean
public Listener listener() {
return new Listener();
}
}
Listener class의 인스턴스를 생성하여 빈으로 등록하고 있다.
Listener class에는 우리가 동적으로 등록되길 원했던 KafkaListener가 포함되어 있다.
public class Listener {
@Autowired
private WebSocketSubscribedTopicManager subscribedTopicManager;
@Autowired
private SimpMessagingTemplate websocketTemplate;
@KafkaListener(id = "${kafka.group.id:${random.uuid}}", topics = "${topic}")
public void listen(KafkaResponse kafkaResponse) {
String websocketTopic = MessageFormat.format("{0}/{1}",kafkaResponse.getTopicPath() , kafkaResponse.getUuid());
if (!subscribedTopicManager.topicExists(websocketTopic)) {
return;
}
websocketTemplate.convertAndSend(websocketTopic, kafkaResponse);
}
}
여기서 주의할 점은 main의 package가 KafkaListenerConfig.class, Listener.class의 package와 달라야 한다는 것이다.
Springboot는 기본적으로 Main package와 같은 package에 있는 Bean들을 로드한다. 즉 같은 package에 있으면 ComponentScan의 대상이 된다.
그러면 Listener가 Bean으로 등록될텐데, 그 때 SPEL로 변수처리된 topic 값에 대한 정보가 없으므로 에러가 발생한다.
따라서 ComponentScan의 대상이 되지 않기 위해 다른 package에 두어야 한다.
'Spring' 카테고리의 다른 글
springboot-kafka :: Error deserializing key/value (0) | 2019.06.19 |
---|---|
springboot - intelliJ can not know application.yml (0) | 2019.06.19 |
Spring data mongodb nested array 추가/수정/삭제 (1) | 2019.06.04 |
Spring WebFlux CORS Configuration (0) | 2019.05.26 |
Springboot-Angular-JWT기반 A/A기능 구현(2) (0) | 2019.04.24 |