Skip to content

Commit

Permalink
Merge pull request #225 from nspcc-dev/fast-improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Jun 26, 2024
2 parents 9005ba8 + 1370519 commit b482739
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 111 deletions.
12 changes: 10 additions & 2 deletions cmd/neofs-rest-gw/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const (
defaultPoolErrorThreshold uint32 = 100
defaultPoolDefaultTimestamp bool = false

// default [handlers.PrmAPI.MaxPayloadBufferSize] value.
defaultMaxObjectPayloadBufferSize = 4 << 20

// Pool config.
cmdNodeDialTimeout = "node-dial-timeout"
cfgNodeDialTimeout = "pool." + cmdNodeDialTimeout
Expand Down Expand Up @@ -585,11 +588,16 @@ func newNeofsAPI(ctx context.Context, logger *zap.Logger, v *viper.Viper) (*hand
}

apiPrm.ServiceShutdownTimeout = defaultShutdownTimeout
apiPrm.MaxObjectSize = int64(ni.MaxObjectSize())
if apiPrm.MaxPayloadBufferSize = ni.MaxObjectSize(); apiPrm.MaxPayloadBufferSize == 0 {
// default to some heuristic value that in practice should not be needed at all
apiPrm.MaxPayloadBufferSize = defaultMaxObjectPayloadBufferSize
logger.Debug("NeoFS max object size setting is zero, using default limit for payload buffer size",
zap.Uint64("value", apiPrm.MaxPayloadBufferSize))
}

apiPrm.DefaultTimestamp = v.GetBool(cfgPoolDefaultTimestamp)

return handlers.NewAPI(&apiPrm), nil
return handlers.NewAPI(&apiPrm)
}

func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam {
Expand Down
30 changes: 17 additions & 13 deletions handlers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type PrmAPI struct {
Pool *pool.Pool
Key *keys.PrivateKey
DefaultTimestamp bool
MaxObjectSize int64
// Size limit for buffering of object payloads. Must be positive.
MaxPayloadBufferSize uint64

GateMetric *metrics.GateMetrics
PrometheusService *metrics.Service
Expand Down Expand Up @@ -55,21 +56,24 @@ const (
//go:generate go run github.com/deepmap/oapi-codegen/cmd/oapi-codegen --config=server.cfg.yaml ../spec/rest.yaml

// NewAPI creates a new RestAPI using specified logger, connection pool and other parameters.
func NewAPI(prm *PrmAPI) *RestAPI {
func NewAPI(prm *PrmAPI) (*RestAPI, error) {
if prm.MaxPayloadBufferSize == 0 {
return nil, errors.New("zero payload buffer size limit")
}
signer := user.NewAutoIDSignerRFC6979(prm.Key.PrivateKey)

return &RestAPI{
log: prm.Logger,
pool: prm.Pool,
signer: signer,
defaultTimestamp: prm.DefaultTimestamp,
maxObjectSize: prm.MaxObjectSize,
log: prm.Logger,
pool: prm.Pool,
signer: signer,
defaultTimestamp: prm.DefaultTimestamp,
payloadBufferSize: prm.MaxPayloadBufferSize,

prometheusService: prm.PrometheusService,
pprofService: prm.PprofService,
gateMetric: prm.GateMetric,
serviceShutdownTimeout: prm.ServiceShutdownTimeout,
}
}, nil
}

func getPrincipalFromHeader(ctx echo.Context) (string, error) {
Expand Down Expand Up @@ -125,11 +129,11 @@ func (a *RestAPI) logAndGetErrorResponse(msg string, err error, fields ...zap.Fi

// RestAPI is a REST v1 request handler.
type RestAPI struct {
log *zap.Logger
pool *pool.Pool
signer user.Signer
defaultTimestamp bool
maxObjectSize int64
log *zap.Logger
pool *pool.Pool
signer user.Signer
defaultTimestamp bool
payloadBufferSize uint64

gateMetric *metrics.GateMetrics
prometheusService *metrics.Service
Expand Down
15 changes: 15 additions & 0 deletions handlers/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package handlers_test

import (
"testing"

"github.com/nspcc-dev/neofs-rest-gw/handlers"
"github.com/stretchr/testify/require"
)

func TestNewAPI(t *testing.T) {
t.Run("non-positive buffer size limit", func(t *testing.T) {
_, err := handlers.NewAPI(new(handlers.PrmAPI))
require.EqualError(t, err, "zero payload buffer size limit")
})
}
43 changes: 16 additions & 27 deletions handlers/newObjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/nspcc-dev/neofs-rest-gw/handlers/apiserver"
"github.com/nspcc-dev/neofs-rest-gw/internal/util"
"github.com/nspcc-dev/neofs-sdk-go/bearer"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand All @@ -22,7 +21,6 @@ import (
func (a *RestAPI) NewUploadContainerObject(ctx echo.Context, containerID apiserver.ContainerId, params apiserver.NewUploadContainerObjectParams) error {
var (
err error
idObj oid.ID
addr oid.Address
btoken *bearer.Token
)
Expand Down Expand Up @@ -98,35 +96,26 @@ func (a *RestAPI) NewUploadContainerObject(ctx echo.Context, containerID apiserv
}
}

var obj object.Object
obj.SetContainerID(idCnr)
a.setOwner(&obj, btoken)
obj.SetAttributes(attributes...)
var hdr object.Object
hdr.SetContainerID(idCnr)
a.setOwner(&hdr, btoken)
hdr.SetAttributes(attributes...)

var prmPutInit client.PrmObjectPutInit
if btoken != nil {
prmPutInit.WithBearerToken(*btoken)
}

writer, err := a.pool.ObjectPutInit(ctx.Request().Context(), obj, a.signer, prmPutInit)
if err != nil {
resp := a.logAndGetErrorResponse("put object init", err)
return ctx.JSON(http.StatusBadRequest, resp)
}

chunk := make([]byte, a.maxObjectSize)
_, err = io.CopyBuffer(writer, ctx.Request().Body, chunk)
idObj, err := a.putObject(ctx, hdr, btoken, func(w io.Writer) error {
var err error
if cln := ctx.Request().ContentLength; cln >= 0 && uint64(cln) < a.payloadBufferSize { // negative means unknown
if cln != 0 { // otherwise io.CopyBuffer panics
_, err = io.CopyBuffer(w, ctx.Request().Body, make([]byte, cln))
}
} else {
_, err = io.CopyBuffer(w, ctx.Request().Body, make([]byte, a.payloadBufferSize))
}
return err
})
if err != nil {
resp := a.logAndGetErrorResponse("write", err)
return ctx.JSON(http.StatusBadRequest, resp)
}

if err = writer.Close(); err != nil {
resp := a.logAndGetErrorResponse("writer close", err)
return ctx.JSON(http.StatusBadRequest, resp)
return err
}

idObj = writer.GetResult().StoredObjectID()
addr.SetObject(idObj)
addr.SetContainer(idCnr)

Expand Down
105 changes: 36 additions & 69 deletions handlers/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,39 +117,19 @@ func (a *RestAPI) PutObject(ctx echo.Context, params apiserver.PutObjectParams)
return ctx.JSON(http.StatusBadRequest, resp)
}

var obj object.Object
obj.SetContainerID(cnrID)
attachOwner(&obj, btoken)
obj.SetAttributes(attributes...)

var prmPutInit client.PrmObjectPutInit
if btoken != nil {
prmPutInit.WithBearerToken(*btoken)
}

writer, err := a.pool.ObjectPutInit(ctx.Request().Context(), obj, a.signer, prmPutInit)
if err != nil {
resp := a.logAndGetErrorResponse("put object init", err)
return ctx.JSON(http.StatusBadRequest, resp)
}

var objID oid.ID

data := bytes.NewReader(payload)
chunk := make([]byte, a.maxObjectSize)
_, err = io.CopyBuffer(writer, data, chunk)
var hdr object.Object
hdr.SetContainerID(cnrID)
attachOwner(&hdr, btoken)
hdr.SetAttributes(attributes...)

objID, err := a.putObject(ctx, hdr, btoken, func(w io.Writer) error {
_, err := w.Write(payload)
return err
})
if err != nil {
resp := a.logAndGetErrorResponse("write", err)
return ctx.JSON(http.StatusBadRequest, resp)
}

if err = writer.Close(); err != nil {
resp := a.logAndGetErrorResponse("writer close", err)
return ctx.JSON(http.StatusBadRequest, resp)
return err
}

objID = writer.GetResult().StoredObjectID()

var resp apiserver.Address
resp.ContainerId = body.ContainerId
resp.ObjectId = objID.String()
Expand Down Expand Up @@ -915,7 +895,6 @@ func (a *RestAPI) UploadContainerObject(ctx echo.Context, containerID apiserver.
header *multipart.FileHeader
file multipart.File
err error
idObj oid.ID
addr oid.Address
btoken *bearer.Token
)
Expand Down Expand Up @@ -956,6 +935,15 @@ func (a *RestAPI) UploadContainerObject(ctx echo.Context, containerID apiserver.
resp := a.logAndGetErrorResponse(fmt.Sprintf("get file %q from HTTP request", fileKey), err)
return ctx.JSON(http.StatusBadRequest, resp)
}
defer func() {
err := file.Close()
a.log.Debug(
"close temporary multipart/form file",
zap.Stringer("address", addr),
zap.String("filename", header.Filename),
zap.Error(err),
)
}()

break
}
Expand All @@ -965,19 +953,6 @@ func (a *RestAPI) UploadContainerObject(ctx echo.Context, containerID apiserver.
return ctx.JSON(http.StatusBadRequest, resp)
}

defer func() {
if file == nil {
return
}
err := file.Close()
a.log.Debug(
"close temporary multipart/form file",
zap.Stringer("address", addr),
zap.String("filename", header.Filename),
zap.Error(err),
)
}()

filtered, err := filterHeaders(a.log, ctx.Request().Header)
if err != nil {
resp := a.logAndGetErrorResponse("could not process headers", err)
Expand Down Expand Up @@ -1022,35 +997,27 @@ func (a *RestAPI) UploadContainerObject(ctx echo.Context, containerID apiserver.
attributes = append(attributes, *timestamp)
}

var obj object.Object
obj.SetContainerID(idCnr)
a.setOwner(&obj, btoken)
obj.SetAttributes(attributes...)
var hdr object.Object
hdr.SetContainerID(idCnr)
a.setOwner(&hdr, btoken)
hdr.SetAttributes(attributes...)

var prmPutInit client.PrmObjectPutInit
if btoken != nil {
prmPutInit.WithBearerToken(*btoken)
}

writer, err := a.pool.ObjectPutInit(ctx.Request().Context(), obj, a.signer, prmPutInit)
if err != nil {
resp := a.logAndGetErrorResponse("put object init", err)
return ctx.JSON(http.StatusBadRequest, resp)
}

chunk := make([]byte, a.maxObjectSize)
_, err = io.CopyBuffer(writer, file, chunk)
idObj, err := a.putObject(ctx, hdr, btoken, func(w io.Writer) error {
var buf []byte
if header.Size > 0 && uint64(header.Size) < a.payloadBufferSize {
buf = make([]byte, header.Size)
} else {
// Size field is not documented, so we cannot be sure what exactly non-positive
// values mean. Thus, it's better to keep default behavior for them.
buf = make([]byte, a.payloadBufferSize)
}
_, err = io.CopyBuffer(w, file, buf)
return err
})
if err != nil {
resp := a.logAndGetErrorResponse("write", err)
return ctx.JSON(http.StatusBadRequest, resp)
}

if err = writer.Close(); err != nil {
resp := a.logAndGetErrorResponse("writer close", err)
return ctx.JSON(http.StatusBadRequest, resp)
return err
}

idObj = writer.GetResult().StoredObjectID()
addr.SetObject(idObj)
addr.SetContainer(idCnr)

Expand Down
30 changes: 30 additions & 0 deletions handlers/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"math"
"net/http"
"strconv"
"strings"
"time"

"github.com/labstack/echo/v4"
"github.com/nspcc-dev/neofs-rest-gw/handlers/apiserver"
"github.com/nspcc-dev/neofs-sdk-go/bearer"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container/acl"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/pool"
"github.com/nspcc-dev/neofs-sdk-go/session"
"go.uber.org/zap"
Expand Down Expand Up @@ -400,3 +404,29 @@ func addExpirationHeaders(headers map[string]string, params apiserver.NewUploadC
headers[ExpirationRFC3339Attr] = *params.XNeofsExpirationRFC3339
}
}

// shares code of NeoFS object recording performed by various RestAPI methods.
func (a *RestAPI) putObject(ctx echo.Context, hdr object.Object, bt *bearer.Token, wp func(io.Writer) error) (oid.ID, error) {
var opts client.PrmObjectPutInit
if bt != nil {
opts.WithBearerToken(*bt)
}
writer, err := a.pool.ObjectPutInit(ctx.Request().Context(), hdr, a.signer, opts)
if err != nil {
resp := a.logAndGetErrorResponse("put object init", err)
return oid.ID{}, ctx.JSON(http.StatusBadRequest, resp)
}

err = wp(writer)
if err != nil {
resp := a.logAndGetErrorResponse("write", err)
return oid.ID{}, ctx.JSON(http.StatusBadRequest, resp)
}

if err = writer.Close(); err != nil {
resp := a.logAndGetErrorResponse("writer close", err)
return oid.ID{}, ctx.JSON(http.StatusBadRequest, resp)
}

return writer.GetResult().StoredObjectID(), nil
}

0 comments on commit b482739

Please sign in to comment.