Skip to content

Error retrieving state store during graceful shutdown #3067

@cedric-schaller

Description

@cedric-schaller

Scenario

Our Spring Boot application relies on information stored in Kafka to answer REST requests. It does so by retrieving information from a global state store via the InteractiveQueryService.

Problem

Unfortunately, during a graceful shutdown, ongoing requests lead to the following error:

java.lang.IllegalStateException: Error retrieving state store: my-global-store-name-v0
    at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getQueryableStore$1(InteractiveQueryService.java:153)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:344)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:217)
    at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.getQueryableStore(InteractiveQueryService.java:103)
    ...

Analysis

Our analysis showed that this error is caused by the fact that WebServerGracefulShutdownLifecycle, which handles the graceful shutdown, has a phase with value Integer.MAX_VALUE - 1024 while StreamsBuilderFactoryManager has a phase with value Integer.MAX_VALUE - 100. This means that StreamsBuilderFactoryManager is shut down before the graceful shutdown is initiated, so that ongoing requests processed as part of the graceful shutdown no longer have access to the global state store.

This is indeed visible in the logs:

2025-01-08 17:04:16.225 DEBUG  org.springframework.context.support.DefaultLifecycleProcessor [Stopping beans in phase 2147483547]
...
2025-01-08 17:04:16.572 DEBUG  org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl [Closing global storage engine my-global-store-name-v0]
...
2025-01-08 17:04:16.577 DEBUG  org.springframework.context.support.DefaultLifecycleProcessor [Bean 'streamsBuilderFactoryManager' completed its stop procedure]
...
2025-01-08 17:04:16.584 DEBUG  org.springframework.context.support.DefaultLifecycleProcessor [Stopping beans in phase 2147482623]
2025-01-08 17:04:16.584 INFO   org.springframework.boot.web.embedded.tomcat.GracefulShutdown [Commencing graceful shutdown. Waiting for active requests to complete]
2025-01-08 17:04:16.587 INFO   org.springframework.boot.web.embedded.tomcat.GracefulShutdown [Graceful shutdown complete]

The Javadoc of StreamsBuilderFactoryManager highlights the fact that choosing a phase close to Integer.MAX_VALUE was a conscious decision:

* This {@link SmartLifecycle} class ensures that the bean created from it is started very
* late through the bootstrap process by setting the phase value closer to
* Integer.MAX_VALUE. This is to guarantee that the {@link StreamsBuilderFactoryBean} on a
* function with multiple bindings is only started after all the binding phases have completed successfully.

Also, the constant AbstractMessageListenerContainer.DEFAULT_PHASE, which is not used by the StreamsBuilderFactoryManager but which has the same value of Integer.MAX_VALUE - 100, makes the following claim (which unfortunately I was unable to verify):

// The default org.springframework.context.SmartLifecycle phase for listener containers 2147483547.

As a final consideration, StreamsBuilderFactoryManager stops StreamsBuilderFactoryBeans in its stop() method (which takes place at phase Integer.MAX_VALUE - 100), but StreamsBuilderFactoryBean itself implements SmartLifecycle and its phase is defined to be Integer.MAX_VALUE - 1000. So we are wondering whether:

  1. both mechanisms are necessary?
  2. if so, do they need to have a different lifecycle phase?

Expected behavior

Kafka Streams (and state stores in our specific case) need to be available during the graceful shutdown phase and hence StreamsBuilderFactoryManager (and potentially StreamsBuilderFactoryBean) should have a phase which is lower than WebServerGracefulShutdownLifecycle.

Once the questions above have been clarified and the design is clear, I am happy to provide a fix.

To reproduce

Steps to reproduce the behavior:

  1. Define a state store
spring.cloud.stream.bindings.some-store-in-0.destination: some-topic
spring.cloud.stream.kafka.streams.some-store-in-0.consumer.materialized.as: some-store-v0
  @Bean
  public Consumer<GlobalKTable<String, SomeClass>> someStore() {
    ...
  }
  1. Have a REST controller access the state store via a service
private final InteractiveQueryService queryService;
...
ReadOnlyKeyValueStore<String, LockEvent> store = queryService.getQueryableStore("some-store-v0", QueryableStoreTypes.keyValueStore());
  1. Initiate a graceful shutdown while under load, so that requests are being processed while shutting down. Log statements similar to the ones above will be generated.

Version of the framework

Spring Boot 3.4.1 (current latest) and spring-cloud-stream 4.2 (current latest)

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions