Skip to content

Commit

Permalink
read payload files with directio
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Bertschy <[email protected]>
  • Loading branch information
matthyx committed Sep 23, 2024
1 parent c453c91 commit e9cef2c
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 2 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/kinbiko/jsonassert v1.1.1
github.com/kubescape/go-logger v0.0.22
github.com/kubescape/k8s-interface v0.0.162
github.com/ncw/directio v1.0.5
github.com/olvrng/ujson v1.1.0
github.com/puzpuzpuz/xsync/v2 v2.4.1
github.com/spf13/afero v1.11.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,8 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/ncw/directio v1.0.5 h1:JSUBhdjEvVaJvOoyPAbcW0fnd0tvRXD76wEfZ1KcQz4=
github.com/ncw/directio v1.0.5/go.mod h1:rX/pKEYkOXBGOggmcyJeJGloCkleSvphPx2eV3t6ROk=
github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo=
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
github.com/olvrng/ujson v1.1.0 h1:8xVUzVlqwdMVWh5d1UHBtLQ1D50nxoPuPEq9Wozs8oA=
Expand Down
47 changes: 47 additions & 0 deletions pkg/registry/file/directio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package file

import (
"errors"
"io"

"github.com/ncw/directio"
)

// DirectIOReader is a reader that reads data from the underlying reader using direct I/O.
type DirectIOReader struct {
buf []byte
bufSize int
off int
rd io.Reader
}

var _ io.ByteReader = (*DirectIOReader)(nil)

var _ io.Reader = (*DirectIOReader)(nil)

func NewDirectIOReader(rd io.Reader) *DirectIOReader {
return &DirectIOReader{
buf: directio.AlignedBlock(directio.BlockSize),
rd: rd,
}
}

func (d *DirectIOReader) Read(p []byte) (int, error) {
// read data from the underlying reader if the buffer is empty
if d.off == d.bufSize {
var err error
d.bufSize, err = io.ReadFull(d.rd, d.buf)
if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) {
return 0, err
}
d.off = 0
}
// copy data to the buffer
n := copy(p, d.buf[d.off:])
d.off += n
return n, nil
}

func (d *DirectIOReader) ReadByte() (byte, error) {
panic("ReadByte not implemented, gob.Decode should not be using this")
}
5 changes: 3 additions & 2 deletions pkg/registry/file/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"reflect"
"strings"
"syscall"

"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
Expand Down Expand Up @@ -284,7 +285,7 @@ func (s *StorageImpl) Get(ctx context.Context, key string, opts storage.GetOptio
// get is a helper function for Get to allow calls without locks from other methods that already have them
func (s *StorageImpl) get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
p := filepath.Join(s.root, key)
payloadFile, err := s.appFs.Open(makePayloadPath(p))
payloadFile, err := s.appFs.OpenFile(makePayloadPath(p), syscall.O_DIRECT|os.O_RDONLY, 0)
if err != nil {
if errors.Is(err, afero.ErrFileNotFound) {
if opts.IgnoreNotFound {
Expand All @@ -296,7 +297,7 @@ func (s *StorageImpl) get(ctx context.Context, key string, opts storage.GetOptio
logger.L().Ctx(ctx).Error("Get - read file failed", helpers.Error(err), helpers.String("key", key))
return err
}
decoder := gob.NewDecoder(payloadFile)
decoder := gob.NewDecoder(NewDirectIOReader(payloadFile))
err = decoder.Decode(objPtr)
if err != nil {
if errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) {
Expand Down

0 comments on commit e9cef2c

Please sign in to comment.