Conversation
| try { | ||
| long time = NatsSystemClock.nanoTime(); | ||
| writer.queueInternalMessage(new ProtocolMessage(OP_PING_BYTES)); | ||
| writer.queueInternalMessage(new ProtocolMessage(PONG_PROTO)); |
There was a problem hiding this comment.
Use the premade PONG_PROTO for faster construction
| this.outgoing.filter((msg) -> | ||
| msg.isProtocol() && | ||
| (msg.getProtocolBab().equals(OP_PING_BYTES) || msg.getProtocolBab().equals(OP_PONG_BYTES))); | ||
| this.outgoing.filter(NatsMessage::isProtocolFilterOnStop); |
There was a problem hiding this comment.
Leverage the new flag for faster filtering instead of doing byte compares on proto bytes
| sendBuffer[sendPosition++] = LF; | ||
|
|
||
| if (!msg.isProtocol()) { // because a protocol message does not have headers | ||
| if (!msg.isProtocol()) { // because a protocol message does not have headers or data |
There was a problem hiding this comment.
This clarification was important.
| // Protocol message is a special version of a NatsPublishableMessage extends NatsMessage | ||
| // ---------------------------------------------------------------------------------------------------- | ||
| class ProtocolMessage extends NatsPublishableMessage { | ||
| final boolean filterOnStop; |
There was a problem hiding this comment.
must construct with this information now
There was a problem hiding this comment.
One concern I wanted to raise is the potential memory impact of adding additional state to each NatsMessage.
In high-throughput systems (we’re handling tens of thousands of messages per second), even a small increase in object size can become noticeable.
There was a problem hiding this comment.
On second thought, I may have overestimated the impact — this change applies to protocol messages rather than the actual PUB data, so the volume is relatively low.
That likely means the memory overhead is negligible.
| super(false); | ||
| protocolBab = new ByteArrayBuilder(protocol); | ||
| sizeInBytes = controlLineLength = protocolBab.length() + 2; // CRLF, protocol doesn't have data | ||
| this.filterOnStop = true; |
There was a problem hiding this comment.
I looked through every use this constructor. 99% were test construction. The only production usage were these, the pre-made messages which speed up new ProtocolMessage creation by sharing the existing protocol buffer which never changes for these fixed size protocols. These are used to construct new ProtocolMessage, which is required since they have some state related to their queueing - you can only share / reuse the data, not the message itself.
private static final ProtocolMessage PING_PROTO = new ProtocolMessage(OP_PING_BYTES);
private static final ProtocolMessage PONG_PROTO = new ProtocolMessage(OP_PONG_BYTES);
…eady registered internally and will be re-subscribed automatically after a recconnect. It's only a rare edge case anyway
Fixes #1320
An Authorization Violation could occur if an UNSUB was sent between a disconnect and reconnect, because the UNSUB got queued and sent before the CONNECT protocol finished. This PR prevents that from happening and also addresses not needing to send unsubs after a disconnect.
Puts UNSUBS in the normal (not internal/reconnect) queue. This by itself is enough to solve the bug, since the normal queue is never sent while not connected.
On a disconnect, PINGS and PONGS were already filtered out of the normal outgoing queue because they are invalid anyway.
If you want to verify, run the test it passes. Then go into NatsConnection and change line 1062 to
and then run the test, it will fail.
Thanks @ajax-surovskyi-y