-
Notifications
You must be signed in to change notification settings - Fork 186
Fix heartbeat timer handling broken when replacing timer with scheduler. #1348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
* It was not the scheduler change that was the error, it was a change in workflow
* It was not the scheduler change that was the error, it was a change in workflow
MauriceVanVeen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Should/can a test be added for this as well?
| shutdownHeartbeatTimer(); // a new one will get started when needed. | ||
| // updates lastMsgReceivedNanoTime so this doesn't so this | ||
| // alarm won't be triggered to soon | ||
| updateLastMessageReceived(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shutting down the timer was the problem. Consider a disconnect. This code triggered
- The timer shutdown
- The heartbeat error handler starts a new sub and a new timer.
This is fine if the client reconnects within the alarm period as a new sub will me made and a new timer will get started. But if it takes longer than the alarm period to reconnect, the sub will fail, there will be no new timer and the loop ends.
This code is now back to where it was before I broke it.
|
|
||
| @Override | ||
| protected void startup(NatsJetStreamSubscription sub) { | ||
| expectedExternalConsumerSeq = 1; // consumer always starts with consumer sequence 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this, the replacement order consumer would be trying to match the wrong sequence, would fail and then loop. This is an old bug that I found during this process.
| userMessageHandler.onMessage(msg); | ||
| if (stopped.get() && pmm.noMorePending()) { | ||
| finishAndClose(); | ||
| finished.set(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This matches before I, ahem, "fixed" (broke) it
https://github.com/nats-io/nats.java/blob/0e4f39c569e7db99cf116651f7d63e41c87d73d6/src/main/java/io/nats/client/impl/NatsMessageConsumer.java
| stopped.set(false); | ||
| finished.set(false); | ||
| super.initSub(subscriptionMaker.subscribe(mh, userDispatcher, pmm, null)); | ||
| repull(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this also matches before, although I did move the setting of the flags before instead of after
| repull(); | ||
| } | ||
| catch (JetStreamApiException | IOException e) { | ||
| setupHbAlarmToTrigger(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also matches before
| public void close() throws Exception { | ||
| lenientClose(); | ||
| stopped.set(true); | ||
| shutdownSub(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've clarified this behavior.
- set stopped to true since we are close (same as stop() does)
- shut the sub down.
I do not set finished to true because their could be messages already at the client by not seen by the user.
| if (pmm != null) { | ||
| pmm.shutdownHeartbeatTimer(); | ||
| } | ||
| protected void fullClose() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This consume is completely done. Stopped, finished, caput!
- From fetch consumer since fetch is not endless, when the fetch has been satisfied or there is a heartbeat error which means there aren't any more messages coming.
- From NatsMessageConsumer (endless consumes) only when the user has told the consumer to stop() and the latest pull is complete or there was a heartbeat error
| } | ||
|
|
||
| protected void lenientClose() { | ||
| protected void shutdownSub() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Being a good citizen and shutting down the sub and the heartbeat timer.
- Called when completely shutting down close()
- Called during endless consumes to stop the current sub since a new one is being made.
MauriceVanVeen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Much better.