Skip to content

Commit

Permalink
get data between bookmarks (#113)
Browse files Browse the repository at this point in the history
* get data between bookmarks

* fix typo
  • Loading branch information
ToniRamirezM authored Feb 6, 2024
1 parent afbba53 commit 35ea575
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 56 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ Stream relay server included in the datastream library allows scaling the number
- GetEntry(u64 entryNumber) -> returns struct FileEntry
- GetBookmark(u8[] bookmark) -> returns u64 entryNumber
- GetFirstEventAfterBookmark(u8[] bookmark) -> returns struct FileEntry
- GetDataBetweenBookmarks(bookmarkFrom []byte, bookmarkTo []byte) ([]byte, error) -> returns the array of data, ignoring bookmarks, between the given ones

#### Update data API
- UpdateEntryData(u64 entryNumber, u32 entryType, u8[] newData)
Expand Down
54 changes: 0 additions & 54 deletions datastreamer/command_enumer.go

This file was deleted.

27 changes: 27 additions & 0 deletions datastreamer/datastreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ var (
FieldA: []byte{0, 2, 0, 0, 0, 0, 0, 0, 0},
}

testBookmark2 = TestBookmark{
FieldA: []byte{0, 3, 0, 0, 0, 0, 0, 0, 0},
}

headerEntry = TestHeader{
PacketType: 1,
HeadLength: 29,
Expand Down Expand Up @@ -225,6 +229,29 @@ func TestServer(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint64(1), entryNumber)

entryNumber, err = streamServer.AddStreamBookmark(testBookmark2.Encode())
require.NoError(t, err)
require.Equal(t, uint64(2), entryNumber)

entryNumber, err = streamServer.AddStreamEntry(entryType1, testEntries[2].Encode())
require.NoError(t, err)
require.Equal(t, uint64(3), entryNumber)

err = streamServer.CommitAtomicOp()
require.NoError(t, err)

// Check get data between 2 bookmarks
data, err := streamServer.GetDataBetweenBookmarks(testBookmark.Encode(), testBookmark2.Encode())
require.NoError(t, err)
require.Equal(t, testEntries[1].Encode(), data)

// Truncate file
err = streamServer.TruncateFile(2)
require.NoError(t, err)

err = streamServer.StartAtomicOp()
require.NoError(t, err)

entryNumber, err = streamServer.AddStreamEntry(entryType1, testEntries[2].Encode())
require.NoError(t, err)
require.Equal(t, uint64(2), entryNumber)
Expand Down
2 changes: 2 additions & 0 deletions datastreamer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,6 @@ var (
ErrBookmarkNotFound = fmt.Errorf("bookmark not found")
// ErrBookmarkMaxLength is returned when the bookmark length exceeds maximum length
ErrBookmarkMaxLength = fmt.Errorf("bookmark max length")
// ErrInvalidBookmarkRange is returned when the bookmark range is invalid
ErrInvalidBookmarkRange = fmt.Errorf("invalid bookmark range")
)
56 changes: 54 additions & 2 deletions datastreamer/streamserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
)

// Command type for the TCP client commands
//
//go:generate go run github.com/alvaroloes/enumer -type=Command
type Command uint64

// ClientStatus type for the status of the client
Expand Down Expand Up @@ -577,6 +575,55 @@ func (s *StreamServer) GetFirstEventAfterBookmark(bookmark []byte) (FileEntry, e
return iterator.Entry, err
}

// GetDataBetweenBookmarks returns the data between two bookmarks
func (s *StreamServer) GetDataBetweenBookmarks(bookmarkFrom []byte, bookmarkTo []byte) ([]byte, error) {
var err error
var response []byte

// Get entry of the from bookmark
fromEntryNum, err := s.bookmark.GetBookmark(bookmarkFrom)
if err != nil {
return response, err
}

// Get entry of the to bookmark
toEntryNum, err := s.bookmark.GetBookmark(bookmarkTo)
if err != nil {
return response, err
}

// Check if the from bookmark is greater than the to bookmark
if fromEntryNum > toEntryNum {
return response, ErrInvalidBookmarkRange
}

// Initialize file stream iterator from bookmark's entry
iterator, err := s.streamFile.iteratorFrom(fromEntryNum, true)
if err != nil {
return response, err
}

// Loop until we reach the to bookmark
for {
// Get next entry data
end, err := s.streamFile.iteratorNext(iterator)

// Loop break conditions
if err != nil || end || iterator.Entry.Number >= toEntryNum {
break
}

if iterator.Entry.Type != EtBookmark {
response = append(response, iterator.Entry.Data...)
}
}

// Close iterator
s.streamFile.iteratorEnd(iterator)

return response, err
}

// clearAtomicOp sets the current atomic operation to none
func (s *StreamServer) clearAtomicOp() {
// No atomic operation in progress and empty entries slice
Expand Down Expand Up @@ -1138,3 +1185,8 @@ func PrintResultEntry(e ResultEntry) {
log.Debugf("errorNum: [%d]", e.errorNum)
log.Debugf("errorStr: [%s]", e.errorStr)
}

// IsACommand checks if a command is a valid command
func (c Command) IsACommand() bool {
return c >= CmdStart && c <= CmdBookmark
}

0 comments on commit 35ea575

Please sign in to comment.