Skip to content

Commit

Permalink
Add error details to invalid jetstream json errors
Browse files Browse the repository at this point in the history
  • Loading branch information
caspervonb committed Sep 6, 2024
1 parent c4418f4 commit a9b2549
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 33 deletions.
40 changes: 20 additions & 20 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,7 @@ func (s *Server) jsTemplateCreateRequest(sub *subscription, c *client, _ *Accoun

var cfg StreamTemplateConfig
if err := s.unmarshalRequest(ci, acc, subject, msg, &cfg); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -1259,7 +1259,7 @@ func (s *Server) jsTemplateNamesRequest(sub *subscription, c *client, _ *Account
if !isEmptyRequest(msg) {
var req JSApiStreamTemplatesRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -1442,7 +1442,7 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,

var cfg StreamConfigRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &cfg); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -1549,7 +1549,7 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
}
var ncfg StreamConfigRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &ncfg); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -1644,7 +1644,7 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account,
if !isEmptyRequest(msg) {
var req JSApiStreamNamesRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -1774,7 +1774,7 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
if !isEmptyRequest(msg) {
var req JSApiStreamListRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -1944,7 +1944,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
if !isEmptyRequest(msg) {
var req JSApiStreamInfoRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -2302,7 +2302,7 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Acco

var req JSApiStreamRemovePeerRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -2382,7 +2382,7 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac

var req JSApiMetaServerRemoveRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -2485,7 +2485,7 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _

var req JSApiMetaServerStreamMoveRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -2829,7 +2829,7 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun
if !isEmptyRequest(msg) {
var req JSApiLeaderStepdownRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -3035,7 +3035,7 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, su
}
var req JSApiMsgDeleteRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -3154,7 +3154,7 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje
}
var req JSApiMsgGetRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -3297,7 +3297,7 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account,
if !isEmptyRequest(msg) {
var req JSApiStreamPurgeRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -3387,7 +3387,7 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, _ *Account

var req JSApiStreamRestoreRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -3689,7 +3689,7 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, _ *Accoun

var req JSApiStreamSnapshotRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, smsg, s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -3887,7 +3887,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun

var req CreateConsumerRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -4124,7 +4124,7 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account
if !isEmptyRequest(msg) {
var req JSApiConsumersRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -4246,7 +4246,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account,
if !isEmptyRequest(msg) {
var req JSApiConsumersRequest
if err := s.unmarshalRequest(ci, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -4557,7 +4557,7 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account

if !isEmptyRequest(msg) {
if err := s.unmarshalRequest(ci, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down
12 changes: 9 additions & 3 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ var (
JSConsumerWQRequiresExplicitAckErr: {Code: 400, ErrCode: 10098, Description: "workqueue stream requires explicit ack"},
JSConsumerWithFlowControlNeedsHeartbeats: {Code: 400, ErrCode: 10108, Description: "consumer with flow control also needs heartbeats"},
JSInsufficientResourcesErr: {Code: 503, ErrCode: 10023, Description: "insufficient resources"},
JSInvalidJSONErr: {Code: 400, ErrCode: 10025, Description: "invalid JSON"},
JSInvalidJSONErr: {Code: 400, ErrCode: 10025, Description: "invalid JSON: {err}"},
JSMaximumConsumersLimitErr: {Code: 400, ErrCode: 10026, Description: "maximum consumers limit reached"},
JSMaximumStreamsLimitErr: {Code: 400, ErrCode: 10027, Description: "maximum number of streams reached"},
JSMemoryResourcesExceededErr: {Code: 500, ErrCode: 10028, Description: "insufficient memory resources available"},
Expand Down Expand Up @@ -1438,13 +1438,19 @@ func NewJSInsufficientResourcesError(opts ...ErrorOption) *ApiError {
}

// NewJSInvalidJSONError creates a new JSInvalidJSONErr error: "invalid JSON"
func NewJSInvalidJSONError(opts ...ErrorOption) *ApiError {
func NewJSInvalidJSONError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSInvalidJSONErr]
e := ApiErrors[JSInvalidJSONErr]
args := e.toReplacerArgs([]interface{}{"{err}", err})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}

// NewJSMaximumConsumersLimitError creates a new JSMaximumConsumersLimitErr error: "maximum consumers limit reached"
Expand Down
20 changes: 10 additions & 10 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1912,61 +1912,61 @@ func (as *mqttAccountSessionManager) processJSAPIReplies(_ *subscription, pc *cl
case mqttJSAStreamCreate:
var resp = &JSApiStreamCreateResponse{}
if err := json.Unmarshal(msg, resp); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
}
out(resp)
case mqttJSAStreamUpdate:
var resp = &JSApiStreamUpdateResponse{}
if err := json.Unmarshal(msg, resp); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
}
out(resp)
case mqttJSAStreamLookup:
var resp = &JSApiStreamInfoResponse{}
if err := json.Unmarshal(msg, &resp); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
}
out(resp)
case mqttJSAStreamDel:
var resp = &JSApiStreamDeleteResponse{}
if err := json.Unmarshal(msg, &resp); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
}
out(resp)
case mqttJSAConsumerCreate:
var resp = &JSApiConsumerCreateResponse{}
if err := json.Unmarshal(msg, resp); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
}
out(resp)
case mqttJSAConsumerDel:
var resp = &JSApiConsumerDeleteResponse{}
if err := json.Unmarshal(msg, resp); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
}
out(resp)
case mqttJSAMsgStore, mqttJSASessPersist:
var resp = &JSPubAckResponse{}
if err := json.Unmarshal(msg, resp); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
}
out(resp)
case mqttJSAMsgLoad:
var resp = &JSApiMsgGetResponse{}
if err := json.Unmarshal(msg, &resp); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
}
out(resp)
case mqttJSAStreamNames:
var resp = &JSApiStreamNamesResponse{}
if err := json.Unmarshal(msg, resp); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
}
out(resp)
case mqttJSAMsgDelete:
var resp = &JSApiMsgDeleteResponse{}
if err := json.Unmarshal(msg, resp); err != nil {
resp.Error = NewJSInvalidJSONError()
resp.Error = NewJSInvalidJSONError(err)
}
out(resp)
default:
Expand Down

0 comments on commit a9b2549

Please sign in to comment.