Skip to content

Commit 4c09802

Browse files
(2.12) Atomic batch: reject unsupported headers
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 74a7d88 commit 4c09802

File tree

4 files changed

+84
-28
lines changed

4 files changed

+84
-28
lines changed

server/errors.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1750,10 +1750,10 @@
17501750
"deprecates": ""
17511751
},
17521752
{
1753-
"constant": "JSAtomicPublishDuplicateErr",
1753+
"constant": "JSAtomicPublishUnsupportedHeaderBatchErr",
17541754
"code": 400,
17551755
"error_code": 10177,
1756-
"description": "atomic publish duplicates not allowed",
1756+
"description": "atomic publish unsupported header used: {header}",
17571757
"comment": "",
17581758
"help": "",
17591759
"url": "",

server/jetstream_batching_test.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ func TestJetStreamAtomicBatchPublishDedupeNotAllowed(t *testing.T) {
194194
require_NoError(t, err)
195195
require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck))
196196
require_NotNil(t, pubAck.Error)
197-
require_Error(t, pubAck.Error, NewJSAtomicPublishDuplicateError())
197+
require_Error(t, pubAck.Error, NewJSAtomicPublishUnsupportedHeaderBatchError("Nats-Msg-Id"))
198198
}
199199

200200
for _, storage := range []StorageType{FileStorage, MemoryStorage} {
@@ -399,3 +399,42 @@ func TestJetStreamAtomicBatchPublishCleanup(t *testing.T) {
399399
t.Run("StepDown", func(t *testing.T) { test(t, StepDown) })
400400
t.Run("Delete", func(t *testing.T) { test(t, Delete) })
401401
}
402+
403+
func TestJetStreamAtomicBatchPublishDenyHeaders(t *testing.T) {
404+
test := func(t *testing.T, replicas int) {
405+
c := createJetStreamClusterExplicit(t, "R3S", 3)
406+
defer c.shutdown()
407+
408+
nc, js := jsClientConnect(t, c.randomServer())
409+
defer nc.Close()
410+
411+
cfg := &StreamConfig{
412+
Name: "TEST",
413+
Subjects: []string{"foo"},
414+
Storage: FileStorage,
415+
AllowAtomicPublish: true,
416+
Replicas: replicas,
417+
}
418+
_, err := jsStreamCreate(t, nc, cfg)
419+
require_NoError(t, err)
420+
421+
// We might support these headers later on, but for now error.
422+
for key, value := range map[string]string{
423+
"Nats-Msg-Id": "msgId",
424+
"Nats-Expected-Last-Sequence": "0",
425+
"Nats-Expected-Last-Msg-Id": "msgId",
426+
} {
427+
t.Run(key, func(t *testing.T) {
428+
m := nats.NewMsg("foo")
429+
m.Header.Set("Nats-Batch-Id", "uuid")
430+
m.Header.Set("Nats-Batch-Sequence", "1")
431+
m.Header.Set("Nats-Batch-Commit", "1")
432+
m.Header.Set(key, value)
433+
_, err = js.PublishMsg(m)
434+
require_Error(t, err, NewJSAtomicPublishUnsupportedHeaderBatchError(key))
435+
})
436+
}
437+
}
438+
439+
t.Run("R3", func(t *testing.T) { test(t, 3) })
440+
}

server/jetstream_cluster.go

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8233,6 +8233,21 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
82338233
sm *StoreMsg
82348234
sz int
82358235
)
8236+
8237+
errorOnUnsupported := func(seq uint64, header string) *ApiError {
8238+
apiErr := NewJSAtomicPublishUnsupportedHeaderBatchError(header)
8239+
// TODO(mvv): reset in-memory expected header maps
8240+
mset.clseq -= seq - 1
8241+
mset.clMu.Unlock()
8242+
cleanup()
8243+
batches.mu.Unlock()
8244+
if canRespond {
8245+
buf, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: apiErr})
8246+
outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, buf, nil, 0))
8247+
}
8248+
return apiErr
8249+
}
8250+
82368251
for seq := uint64(1); seq <= batchSeq; seq++ {
82378252
if seq == batchSeq {
82388253
bsubj, bhdr, bmsg = subject, hdr, msg
@@ -8251,19 +8266,15 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
82518266
return apiErr
82528267
}
82538268

8254-
// Duplicates are fully rejected when using batch.
8269+
// Reject unsupported headers.
82558270
if msgId := getMsgId(bhdr); msgId != _EMPTY_ {
8256-
// TODO(mvv): reset in-memory expected header maps
8257-
mset.clseq -= seq - 1
8258-
mset.clMu.Unlock()
8259-
cleanup()
8260-
batches.mu.Unlock()
8261-
apiErr := NewJSAtomicPublishDuplicateError()
8262-
if canRespond {
8263-
buf, _ := json.Marshal(&JSPubAckResponse{PubAck: &PubAck{Stream: name}, Error: apiErr})
8264-
outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, buf, nil, 0))
8265-
}
8266-
return apiErr
8271+
return errorOnUnsupported(seq, JSMsgId)
8272+
}
8273+
if _, ok = getExpectedLastSeq(hdr); ok {
8274+
return errorOnUnsupported(seq, JSExpectedLastSeq)
8275+
}
8276+
if getExpectedLastMsgId(hdr) != _EMPTY_ {
8277+
return errorOnUnsupported(seq, JSExpectedLastMsgId)
82678278
}
82688279

82698280
var apiErr *ApiError

server/jetstream_errors_generated.go

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@ const (
1111
// JSAtomicPublishDisabledErr atomic publish is disabled
1212
JSAtomicPublishDisabledErr ErrorIdentifier = 10174
1313

14-
// JSAtomicPublishDuplicateErr atomic publish duplicates not allowed
15-
JSAtomicPublishDuplicateErr ErrorIdentifier = 10177
16-
1714
// JSAtomicPublishIncompleteBatchErr atomic publish batch is incomplete
1815
JSAtomicPublishIncompleteBatchErr ErrorIdentifier = 10176
1916

2017
// JSAtomicPublishMissingSeqErr atomic publish sequence is missing
2118
JSAtomicPublishMissingSeqErr ErrorIdentifier = 10175
2219

20+
// JSAtomicPublishUnsupportedHeaderBatchErr atomic publish unsupported header used: {header}
21+
JSAtomicPublishUnsupportedHeaderBatchErr ErrorIdentifier = 10177
22+
2323
// JSBadRequestErr bad request
2424
JSBadRequestErr ErrorIdentifier = 10003
2525

@@ -541,9 +541,9 @@ var (
541541
ApiErrors = map[ErrorIdentifier]*ApiError{
542542
JSAccountResourcesExceededErr: {Code: 400, ErrCode: 10002, Description: "resource limits exceeded for account"},
543543
JSAtomicPublishDisabledErr: {Code: 400, ErrCode: 10174, Description: "atomic publish is disabled"},
544-
JSAtomicPublishDuplicateErr: {Code: 400, ErrCode: 10177, Description: "atomic publish duplicates not allowed"},
545544
JSAtomicPublishIncompleteBatchErr: {Code: 400, ErrCode: 10176, Description: "atomic publish batch is incomplete"},
546545
JSAtomicPublishMissingSeqErr: {Code: 400, ErrCode: 10175, Description: "atomic publish sequence is missing"},
546+
JSAtomicPublishUnsupportedHeaderBatchErr: {Code: 400, ErrCode: 10177, Description: "atomic publish unsupported header used: {header}"},
547547
JSBadRequestErr: {Code: 400, ErrCode: 10003, Description: "bad request"},
548548
JSClusterIncompleteErr: {Code: 503, ErrCode: 10004, Description: "incomplete results"},
549549
JSClusterNoPeersErrF: {Code: 400, ErrCode: 10005, Description: "{err}"},
@@ -761,34 +761,40 @@ func NewJSAtomicPublishDisabledError(opts ...ErrorOption) *ApiError {
761761
return ApiErrors[JSAtomicPublishDisabledErr]
762762
}
763763

764-
// NewJSAtomicPublishDuplicateError creates a new JSAtomicPublishDuplicateErr error: "atomic publish duplicates not allowed"
765-
func NewJSAtomicPublishDuplicateError(opts ...ErrorOption) *ApiError {
764+
// NewJSAtomicPublishIncompleteBatchError creates a new JSAtomicPublishIncompleteBatchErr error: "atomic publish batch is incomplete"
765+
func NewJSAtomicPublishIncompleteBatchError(opts ...ErrorOption) *ApiError {
766766
eopts := parseOpts(opts)
767767
if ae, ok := eopts.err.(*ApiError); ok {
768768
return ae
769769
}
770770

771-
return ApiErrors[JSAtomicPublishDuplicateErr]
771+
return ApiErrors[JSAtomicPublishIncompleteBatchErr]
772772
}
773773

774-
// NewJSAtomicPublishIncompleteBatchError creates a new JSAtomicPublishIncompleteBatchErr error: "atomic publish batch is incomplete"
775-
func NewJSAtomicPublishIncompleteBatchError(opts ...ErrorOption) *ApiError {
774+
// NewJSAtomicPublishMissingSeqError creates a new JSAtomicPublishMissingSeqErr error: "atomic publish sequence is missing"
775+
func NewJSAtomicPublishMissingSeqError(opts ...ErrorOption) *ApiError {
776776
eopts := parseOpts(opts)
777777
if ae, ok := eopts.err.(*ApiError); ok {
778778
return ae
779779
}
780780

781-
return ApiErrors[JSAtomicPublishIncompleteBatchErr]
781+
return ApiErrors[JSAtomicPublishMissingSeqErr]
782782
}
783783

784-
// NewJSAtomicPublishMissingSeqError creates a new JSAtomicPublishMissingSeqErr error: "atomic publish sequence is missing"
785-
func NewJSAtomicPublishMissingSeqError(opts ...ErrorOption) *ApiError {
784+
// NewJSAtomicPublishUnsupportedHeaderBatchError creates a new JSAtomicPublishUnsupportedHeaderBatchErr error: "atomic publish unsupported header used: {header}"
785+
func NewJSAtomicPublishUnsupportedHeaderBatchError(header interface{}, opts ...ErrorOption) *ApiError {
786786
eopts := parseOpts(opts)
787787
if ae, ok := eopts.err.(*ApiError); ok {
788788
return ae
789789
}
790790

791-
return ApiErrors[JSAtomicPublishMissingSeqErr]
791+
e := ApiErrors[JSAtomicPublishUnsupportedHeaderBatchErr]
792+
args := e.toReplacerArgs([]interface{}{"{header}", header})
793+
return &ApiError{
794+
Code: e.Code,
795+
ErrCode: e.ErrCode,
796+
Description: strings.NewReplacer(args...).Replace(e.Description),
797+
}
792798
}
793799

794800
// NewJSBadRequestError creates a new JSBadRequestErr error: "bad request"

0 commit comments

Comments
 (0)