Skip to content

Conversation

@scottf
Copy link
Contributor

@scottf scottf commented Jul 3, 2025

Much better.

  • It was not the scheduler change that was the error, it was a change in workflow
  • Added a unit test to simulate the error found.
  • Added a simplified Ordered Consumer example
  • fixed an issue that was pre-existing which may have caused simplified ordered consumers to not reset correctly after disconnnect.

scottf added 2 commits July 3, 2025 13:32
* It was not the scheduler change that was the error, it was a change in workflow
* It was not the scheduler change that was the error, it was a change in workflow
@scottf scottf changed the title Fix what i broke 2 21 3 Fix heartbeat timer handling broken when replacing timer with scheduler. Jul 3, 2025
Copy link
Member

@MauriceVanVeen MauriceVanVeen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
Should/can a test be added for this as well?

shutdownHeartbeatTimer(); // a new one will get started when needed.
// updates lastMsgReceivedNanoTime so this doesn't so this
// alarm won't be triggered to soon
updateLastMessageReceived();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shutting down the timer was the problem. Consider a disconnect. This code triggered

  • The timer shutdown
  • The heartbeat error handler starts a new sub and a new timer.
    This is fine if the client reconnects within the alarm period as a new sub will me made and a new timer will get started. But if it takes longer than the alarm period to reconnect, the sub will fail, there will be no new timer and the loop ends.
    This code is now back to where it was before I broke it.


@Override
protected void startup(NatsJetStreamSubscription sub) {
expectedExternalConsumerSeq = 1; // consumer always starts with consumer sequence 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, the replacement order consumer would be trying to match the wrong sequence, would fail and then loop. This is an old bug that I found during this process.

userMessageHandler.onMessage(msg);
if (stopped.get() && pmm.noMorePending()) {
finishAndClose();
finished.set(true);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stopped.set(false);
finished.set(false);
super.initSub(subscriptionMaker.subscribe(mh, userDispatcher, pmm, null));
repull();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also matches before, although I did move the setting of the flags before instead of after

repull();
}
catch (JetStreamApiException | IOException e) {
setupHbAlarmToTrigger();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also matches before

public void close() throws Exception {
lenientClose();
stopped.set(true);
shutdownSub();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've clarified this behavior.

  • set stopped to true since we are close (same as stop() does)
  • shut the sub down.
    I do not set finished to true because their could be messages already at the client by not seen by the user.

if (pmm != null) {
pmm.shutdownHeartbeatTimer();
}
protected void fullClose() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This consume is completely done. Stopped, finished, caput!

  1. From fetch consumer since fetch is not endless, when the fetch has been satisfied or there is a heartbeat error which means there aren't any more messages coming.
  2. From NatsMessageConsumer (endless consumes) only when the user has told the consumer to stop() and the latest pull is complete or there was a heartbeat error

}

protected void lenientClose() {
protected void shutdownSub() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being a good citizen and shutting down the sub and the heartbeat timer.

  1. Called when completely shutting down close()
  2. Called during endless consumes to stop the current sub since a new one is being made.

@scottf scottf requested a review from MauriceVanVeen July 4, 2025 01:08
Copy link
Member

@MauriceVanVeen MauriceVanVeen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@scottf scottf merged commit 12625ef into main Jul 4, 2025
5 checks passed
@scottf scottf deleted the fix-what-i-broke-2-21-3 branch July 4, 2025 13:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants