-
Notifications
You must be signed in to change notification settings - Fork 630
Description
Scenario
Our Spring Boot application relies on information stored in Kafka to answer REST requests. It does so by retrieving information from a global state store via the InteractiveQueryService.
Problem
Unfortunately, during a graceful shutdown, ongoing requests lead to the following error:
java.lang.IllegalStateException: Error retrieving state store: my-global-store-name-v0
at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getQueryableStore$1(InteractiveQueryService.java:153)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:344)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:217)
at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.getQueryableStore(InteractiveQueryService.java:103)
...
Analysis
Our analysis showed that this error is caused by the fact that WebServerGracefulShutdownLifecycle, which handles the graceful shutdown, has a phase with value Integer.MAX_VALUE - 1024 while StreamsBuilderFactoryManager has a phase with value Integer.MAX_VALUE - 100. This means that StreamsBuilderFactoryManager is shut down before the graceful shutdown is initiated, so that ongoing requests processed as part of the graceful shutdown no longer have access to the global state store.
This is indeed visible in the logs:
2025-01-08 17:04:16.225 DEBUG org.springframework.context.support.DefaultLifecycleProcessor [Stopping beans in phase 2147483547]
...
2025-01-08 17:04:16.572 DEBUG org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl [Closing global storage engine my-global-store-name-v0]
...
2025-01-08 17:04:16.577 DEBUG org.springframework.context.support.DefaultLifecycleProcessor [Bean 'streamsBuilderFactoryManager' completed its stop procedure]
...
2025-01-08 17:04:16.584 DEBUG org.springframework.context.support.DefaultLifecycleProcessor [Stopping beans in phase 2147482623]
2025-01-08 17:04:16.584 INFO org.springframework.boot.web.embedded.tomcat.GracefulShutdown [Commencing graceful shutdown. Waiting for active requests to complete]
2025-01-08 17:04:16.587 INFO org.springframework.boot.web.embedded.tomcat.GracefulShutdown [Graceful shutdown complete]
The Javadoc of StreamsBuilderFactoryManager highlights the fact that choosing a phase close to Integer.MAX_VALUE was a conscious decision:
* This {@link SmartLifecycle} class ensures that the bean created from it is started very
* late through the bootstrap process by setting the phase value closer to
* Integer.MAX_VALUE. This is to guarantee that the {@link StreamsBuilderFactoryBean} on a
* function with multiple bindings is only started after all the binding phases have completed successfully.
Also, the constant AbstractMessageListenerContainer.DEFAULT_PHASE, which is not used by the StreamsBuilderFactoryManager but which has the same value of Integer.MAX_VALUE - 100, makes the following claim (which unfortunately I was unable to verify):
// The default org.springframework.context.SmartLifecycle phase for listener containers 2147483547.
As a final consideration, StreamsBuilderFactoryManager stops StreamsBuilderFactoryBeans in its stop() method (which takes place at phase Integer.MAX_VALUE - 100), but StreamsBuilderFactoryBean itself implements SmartLifecycle and its phase is defined to be Integer.MAX_VALUE - 1000. So we are wondering whether:
- both mechanisms are necessary?
- if so, do they need to have a different lifecycle phase?
Expected behavior
Kafka Streams (and state stores in our specific case) need to be available during the graceful shutdown phase and hence StreamsBuilderFactoryManager (and potentially StreamsBuilderFactoryBean) should have a phase which is lower than WebServerGracefulShutdownLifecycle.
Once the questions above have been clarified and the design is clear, I am happy to provide a fix.
To reproduce
Steps to reproduce the behavior:
- Define a state store
spring.cloud.stream.bindings.some-store-in-0.destination: some-topic
spring.cloud.stream.kafka.streams.some-store-in-0.consumer.materialized.as: some-store-v0
@Bean
public Consumer<GlobalKTable<String, SomeClass>> someStore() {
...
}
- Have a REST controller access the state store via a service
private final InteractiveQueryService queryService;
...
ReadOnlyKeyValueStore<String, LockEvent> store = queryService.getQueryableStore("some-store-v0", QueryableStoreTypes.keyValueStore());
- Initiate a graceful shutdown while under load, so that requests are being processed while shutting down. Log statements similar to the ones above will be generated.
Version of the framework
Spring Boot 3.4.1 (current latest) and spring-cloud-stream 4.2 (current latest)