From 7b8decb80f30e0aa59a7833a2160a3d06693e1a1 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Mon, 4 Sep 2023 10:43:37 +1000 Subject: [PATCH] fix: add closed check, expose storage.ErrClosed --- v2/storage/deferred/deferredcarwriter.go | 35 +++++++++++++++---- v2/storage/deferred/deferredcarwriter_test.go | 33 +++++++++++++++++ v2/storage/storage.go | 8 ++--- 3 files changed, 65 insertions(+), 11 deletions(-) diff --git a/v2/storage/deferred/deferredcarwriter.go b/v2/storage/deferred/deferredcarwriter.go index d6d5f107..7d5d3c10 100644 --- a/v2/storage/deferred/deferredcarwriter.go +++ b/v2/storage/deferred/deferredcarwriter.go @@ -29,6 +29,10 @@ var _ io.Closer = (*DeferredCarWriter)(nil) // DeferredCarWriter is threadsafe, and can be used concurrently. // Closing the writer will close, but not delete, the underlying file. // +// DeferredCarWriter only implements the storage.WritableStorage interface and +// is not intended as a general purpose storage implementation. It only supports +// storage Put() and Get() operations. +// // This utility is useful for cases where a CAR will be streamed but an error // may occur before any content is written. In this case, the CAR file will not // be created, and the output stream will not be written to. In the case of an @@ -45,11 +49,12 @@ type DeferredCarWriter struct { outPath string outStream io.Writer - lk sync.Mutex - f *os.File - w carstorage.WritableCar - putCb []putCb - opts []carv2.Option + lk sync.Mutex + f *os.File + closed bool + w carstorage.WritableCar + putCb []putCb + opts []carv2.Option } // NewDeferredCarWriterForPath creates a DeferredCarWriter that will write to a @@ -89,6 +94,10 @@ func (dcw *DeferredCarWriter) Has(ctx context.Context, key string) (bool, error) dcw.lk.Lock() defer dcw.lk.Unlock() + if dcw.closed { + return false, carstorage.ErrClosed + } + if dcw.w == nil { // shortcut, haven't written anything, don't even initialise return false, nil } @@ -107,6 +116,10 @@ func (dcw *DeferredCarWriter) Put(ctx context.Context, key string, content []byt dcw.lk.Lock() defer dcw.lk.Unlock() + if dcw.closed { + return carstorage.ErrClosed + } + if dcw.putCb != nil { // call all callbacks, remove those that were only needed once for i := 0; i < len(dcw.putCb); i++ { @@ -150,11 +163,18 @@ func (dcw *DeferredCarWriter) writer() (carstorage.WritableCar, error) { } // Close closes the underlying file, if one was created. -func (dcw *DeferredCarWriter) Close() error { +func (dcw *DeferredCarWriter) Close() (err error) { dcw.lk.Lock() defer dcw.lk.Unlock() - err := dcw.w.Finalize() + if dcw.closed { + return carstorage.ErrClosed + } + dcw.closed = true + + if dcw.w != nil { + err = dcw.w.Finalize() + } if dcw.f != nil { defer func() { dcw.f = nil }() @@ -163,6 +183,7 @@ func (dcw *DeferredCarWriter) Close() error { err = err2 } } + return err } diff --git a/v2/storage/deferred/deferredcarwriter_test.go b/v2/storage/deferred/deferredcarwriter_test.go index 2a5994f2..e2976d62 100644 --- a/v2/storage/deferred/deferredcarwriter_test.go +++ b/v2/storage/deferred/deferredcarwriter_test.go @@ -12,6 +12,7 @@ import ( "github.com/ipfs/go-cid" carv2 "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/storage" deferred "github.com/ipld/go-car/v2/storage/deferred" mh "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" @@ -218,6 +219,38 @@ func TestDeferredCarWriterPutCb(t *testing.T) { require.Equal(t, 1, pc3) } +func TestDeferredCarWriterWriteAfterClose(t *testing.T) { + req := require.New(t) + + ctx := context.Background() + testCid1, testData1 := randBlock() + testCid2, testData2 := randBlock() + + var buf bytes.Buffer + cw := deferred.NewDeferredCarWriterForStream(&buf, []cid.Cid{testCid1}) + // no writes + req.NoError(cw.Close()) + + req.ErrorIs(cw.Put(ctx, testCid1.KeyString(), testData1), storage.ErrClosed) + _, err := cw.Has(ctx, testCid1.KeyString()) + req.ErrorIs(err, storage.ErrClosed) + req.ErrorIs(cw.Close(), storage.ErrClosed) + + // with writes + + buf = bytes.Buffer{} + cw = deferred.NewDeferredCarWriterForStream(&buf, []cid.Cid{testCid1}) + + req.NoError(cw.Put(ctx, testCid1.KeyString(), testData1)) + req.NoError(cw.Put(ctx, testCid2.KeyString(), testData2)) + req.NoError(cw.Close()) + + req.ErrorIs(cw.Put(ctx, testCid1.KeyString(), testData1), storage.ErrClosed) + _, err = cw.Has(ctx, testCid1.KeyString()) + req.ErrorIs(err, storage.ErrClosed) + req.ErrorIs(cw.Close(), storage.ErrClosed) +} + func randBlock() (cid.Cid, []byte) { data := make([]byte, 1024) rngLk.Lock() diff --git a/v2/storage/storage.go b/v2/storage/storage.go index b98e70d8..a912d7e1 100644 --- a/v2/storage/storage.go +++ b/v2/storage/storage.go @@ -18,7 +18,7 @@ import ( ipldstorage "github.com/ipld/go-ipld-prime/storage" ) -var errClosed = errors.New("cannot use a CARv2 storage after closing") +var ErrClosed = errors.New("cannot use a CAR storage after closing") type ReaderAtWriterAt interface { io.ReaderAt @@ -314,7 +314,7 @@ func (sc *StorageCar) Put(ctx context.Context, keyStr string, data []byte) error defer sc.mu.Unlock() if sc.closed { - return errClosed + return ErrClosed } idx, ok := sc.idx.(*index.InsertionIndex) @@ -361,7 +361,7 @@ func (sc *StorageCar) Has(ctx context.Context, keyStr string) (bool, error) { defer sc.mu.RUnlock() if sc.closed { - return false, errClosed + return false, ErrClosed } if idx, ok := sc.idx.(*index.InsertionIndex); ok && sc.writer != nil { @@ -443,7 +443,7 @@ func (sc *StorageCar) GetStream(ctx context.Context, keyStr string) (io.ReadClos defer sc.mu.RUnlock() if sc.closed { - return nil, errClosed + return nil, ErrClosed } _, offset, size, err := store.FindCid(