Programming/Spring
SmartLifecycle
Albothyl
2019. 6. 10. 20:03
- 다음과 같은 상황에서 SmartLifecycle Interface를 확장하여 사용한다.
- require to be started upon ApplicationContext refresh and/or shutdown in a particular order.
특정 순서로 ApplicationContext 새로 고침 및/또는 종료 시 시작되어야 한다. - The callback-accepting stop(Runnable) method is useful for objects that have an asynchronous shutdown process.
콜백 수용 정지(Runnable) 방법은 비동기 종료 프로세스가 있는 객체에 유용하다.
- require to be started upon ApplicationContext refresh and/or shutdown in a particular order.
직접 구현한 예제
@Service
public class SomeKafkaConsumerContainer implements SmartLifecycle {
@Value("${some.kafka.autostart:true}")
private boolean autoStartUp;
@Autowired
private List<SomeKafkaConsumer> someKafkaConsumerList;
private volatile boolean running = false;
@Override
public boolean isRunning() {
// DefaultLifecycleProcessor가 start, stop하기 위한 조건
return running;
}
@Override
public boolean isAutoStartup() {
return autoStartUp;
}
@Override
public synchronized void start() {
running = true;
someKafkaConsumerList.forEach(SomeKafkaConsumer::start);
}
@Override
public synchronized void stop() {
running = false;
someKafkaConsumerList.forEach(SomeKafkaConsumer::stop);
}
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
@Override
public int getPhase() {
// 의존 관계가 있는 Bean 인경우에는 Phase를 설정을 할수 있다.
// Consumer Bean 생성된 다음 늦게 실행되고, 소멸하기전 먼저 종료가 되어야 되기 때문에 MAX으로 설정한다.
return Integer.MAX_VALUE;
}
}
SmartLifeCycle을 구현한 Spring Kafka class
- SmartLifeCycle
- KafkaListenerEndpointRegistry
- MessageListenerContainer
- AbstractMessageListenerContainer
- KafkaMessageListenerContainer
- AbstractMessageListenerContainer
- MessageListenerContainer
- KafkaListenerEndpointRegistry
- 이를 활용한 Hook Controller
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> someKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
...
}
//-------------------------------------------------------------------
@KafkaListener(topics="someKafkaTopic", containerFactory="someKafkaListenerContainerFactory")
//-------------------------------------------------------------------
@Controller
@RequestMapping(value = "/some/kafka/consumers")
public class KafkaConsumerHook {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@ResponseBody
@RequestMapping(value = "start", method = RequestMethod.POST)
public String startKafkaConsuming() {
if (kafkaListenerEndpointRegistry.isRunning()) {
return "Kafka Consumer Already Started";
}
try {
kafkaListenerEndpointRegistry.start();
log.info("#### Kafka Consumer Started");
return "Kafka Consume Started";
} catch (Exception e) {
log.error("#### Kafka Consumer Start Failed !!! : {}", e);
return "Kafka Consumer Start Failed !!!";
}
}
@ResponseBody
@RequestMapping(value = "stop", method = RequestMethod.POST)
public String stopKafkaConsuming() {
if (!kafkaListenerEndpointRegistry.isRunning()) {
return "Kafka Consumer Already Stopped";
}
try {
kafkaListenerEndpointRegistry.stop();
log.info("#### Kafka Consumer Stopped");
return "Kafka Consumer Stopped";
} catch (Exception e) {
log.error("#### Kafka Consumer Stop Failed !!! : {}", e);
return "Kafka Consumer Stop Failed !!!";
}
}
}