Skip to content

Commit

Permalink
improve stream client API (#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpunish3r authored Mar 1, 2024
1 parent 35ea575 commit ebf261c
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 112 deletions.
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ Command format sent by the client:
If not started terminates the connection.

### Header
### GetHeader
Gets the current stream file header (`HeaderEntry` format defined in the [STREAM FILE](#stream-file) section), so stream clients can know the total number of entries and the size of the stream file.

Command format sent by the client:
Expand All @@ -99,7 +99,7 @@ Command format sent by the client:
If streaming already started terminates the connection.

### Entry
### GetEntry
Gets the data from the entry (`entryNumber`) in the format `FileEntry` defined in the [STREAM FILE](#stream-file) section).

Command format sent by the client:
Expand All @@ -109,7 +109,7 @@ Command format sent by the client:
If streaming already started terminates the connection.

### Bookmark
### GetBookmark
Gets the data from the entry pointed by the bookmark (`bookmark`) in the format `FileEntry` defined in the [STREAM FILE](#stream-file) section).

Command format sent by the client:
Expand Down Expand Up @@ -165,18 +165,18 @@ Stream relay server included in the datastream library allows scaling the number

### CLIENT API
- Create and start a datastream client (`StreamClient`) using the `NewClient` function followed by the `Start` function.
- Executes server commands by calling `ExecCommand`
- Executes server commands by calling `ExecCommandStart`, `ExecCommandStartBookmark`, `ExecCommandGetHeader`, `ExecCommandGetEntry`, `ExecCommandGetBookmark`, or `ExecCommandStop`.

#### Streaming API
- ExecCommand(datastreamer.CmdStart) -> starts receiving stream from the entry number specified by setting `.FromEntry` field
- ExecCommand(datastreamer.CmdStartBookmark) -> starts receiving stream from the entry pointed by bookmark specified by setting `.FromBookmark` field
- ExecCommand(datastreamer.CmdStop) -> stops receiving stream
- SetProcessEntryFunc(f `ProcessEntryFunc`) -> sets the callback function for each entry received. Overrides default function that just prints the entry fields.
- ExecCommandStart(fromEntry): Initiates the stream starting from the entry number specified in the parameter.
- ExecCommandStartBookmark(fromBookmark): Initiates the stream starting from the entry pointed by the bookmark specified in the parameter.
- ExecCommandStop(): Stops receiving stream.
- SetProcessEntryFunc(f `ProcessEntryFunc`): Sets the callback function for each entry received. Overrides default function that just prints the entry fields.

#### Query data API
- ExecCommand(datastreamer.CmdHeader) -> gets data stream file header info and fills the `.Header` field
- ExecCommand(datastreamer.CmdEntry) -> gets entry data from entry number and fills the `.Entry` field
- ExecCommand(datastreamer.CmdBookmark) -> gets entry data pointed by bookmark and fills the `.Entry` field
- ExecCommandGetHeader() -> returns struct HeaderEntry: Fetches stream file header info and returns it.
- ExecCommandGetEntry(fromEntry) -> returns struct FileEntry: Fetches entry data from the specified entry number and returns it.
- ExecCommandGetBookmark(fromBookmark) -> returns struct FileEntry: Fetches entry data pointed by the specified bookmark and returns it.

## DATASTREAM CLI DEMO APP
Build the binary datastream demo app (`dsapp`):
Expand Down
40 changes: 18 additions & 22 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,12 +421,12 @@ func runClient(ctx *cli.Context) error {

// Query file header information
if queryHeader {
err = c.ExecCommand(datastreamer.CmdHeader)
header, err := c.ExecCommandGetHeader()
if err != nil {
log.Infof("Error: %v", err)
} else {
log.Infof("QUERY HEADER: TotalEntries[%d] TotalLength[%d] Version[%d] SystemID[%d]",
c.Header.TotalEntries, c.Header.TotalLength, c.Header.Version, c.Header.SystemID)
header.TotalEntries, header.TotalLength, header.Version, header.SystemID)
}
return nil
}
Expand All @@ -437,12 +437,11 @@ func runClient(ctx *cli.Context) error {
if err != nil {
return err
}
c.FromEntry = uint64(qEntry)
err = c.ExecCommand(datastreamer.CmdEntry)
entry, err := c.ExecCommandGetEntry(uint64(qEntry))
if err != nil {
log.Infof("Error: %v", err)
} else {
log.Infof("QUERY ENTRY %d: Entry[%d] Length[%d] Type[%d] Data[%v]", qEntry, c.Entry.Number, c.Entry.Length, c.Entry.Type, c.Entry.Data)
log.Infof("QUERY ENTRY %d: Entry[%d] Length[%d] Type[%d] Data[%v]", qEntry, entry.Number, entry.Length, entry.Type, entry.Data)
}
return nil
}
Expand All @@ -454,19 +453,17 @@ func runClient(ctx *cli.Context) error {
return err
}
qBook := []byte{bookType} // nolint:gomnd
qBook = binary.BigEndian.AppendUint64(qBook, uint64(qBookmark))
c.FromBookmark = qBook
err = c.ExecCommand(datastreamer.CmdBookmark)
entry, err := c.ExecCommandGetBookmark(binary.BigEndian.AppendUint64(qBook, uint64(qBookmark)))
if err != nil {
log.Infof("Error: %v", err)
} else {
log.Infof("QUERY BOOKMARK (%d)%v: Entry[%d] Length[%d] Type[%d] Data[%v]", bookType, qBook, c.Entry.Number, c.Entry.Length, c.Entry.Type, c.Entry.Data)
log.Infof("QUERY BOOKMARK (%d)%v: Entry[%d] Length[%d] Type[%d] Data[%v]", bookType, qBook, entry.Number, entry.Length, entry.Type, entry.Data)
}
return nil
}

// Command header: Get status
err = c.ExecCommand(datastreamer.CmdHeader)
header, err := c.ExecCommandGetHeader()
if err != nil {
return err
}
Expand All @@ -478,24 +475,23 @@ func runClient(ctx *cli.Context) error {
return err
}
bookmark := []byte{bookType} // nolint:gomnd
bookmark = binary.BigEndian.AppendUint64(bookmark, uint64(fromBookNum))
c.FromBookmark = bookmark
err = c.ExecCommand(datastreamer.CmdStartBookmark)
err = c.ExecCommandStartBookmark(binary.BigEndian.AppendUint64(bookmark, uint64(fromBookNum)))
if err != nil {
return err
}
} else {
// Command start: Sync and start streaming receive from entry number
var fromEntry uint64
if from == "latest" { // nolint:gomnd
c.FromEntry = c.Header.TotalEntries
fromEntry = header.TotalEntries
} else {
fromNum, err := strconv.Atoi(from)
if err != nil {
return err
}
c.FromEntry = uint64(fromNum)
fromEntry = uint64(fromNum)
}
err = c.ExecCommand(datastreamer.CmdStart)
err = c.ExecCommandStart(fromEntry)
if err != nil {
return err
}
Expand All @@ -507,7 +503,7 @@ func runClient(ctx *cli.Context) error {
<-interruptSignal

// Command stop: Stop streaming
err = c.ExecCommand(datastreamer.CmdStop)
err = c.ExecCommandStop()
if err != nil {
return err
}
Expand All @@ -527,8 +523,8 @@ func checkEntryBlockSanity(e *datastreamer.FileEntry, c *datastreamer.StreamClie
// Sanity check initialization
if !initSanityEntry {
initSanityEntry = true
if c.FromEntry > 0 {
sanityEntry = c.FromEntry
if c.GetFromStream() > 0 {
sanityEntry = c.GetFromStream()
} else {
sanityEntry = 0
}
Expand Down Expand Up @@ -641,9 +637,9 @@ func checkEntryBlockSanity(e *datastreamer.FileEntry, c *datastreamer.StreamClie
}

// Sanity check end condition
if e.Number+1 >= c.Header.TotalEntries {
if e.Number+1 >= c.GetTotalEntries() {
log.Infof("SANITY CHECK finished! From entry [%d] to entry [%d]. Latest L2block[%d], Bookmark0[%d], Bookmark1[%d]",
c.FromEntry, c.Header.TotalEntries-1, sanityBlock-1, sanityBookmark0-1, sanityBookmark1-1)
c.GetFromStream(), c.GetTotalEntries()-1, sanityBlock-1, sanityBookmark0-1, sanityBookmark1-1)
return errors.New("sanity check finished")
}

Expand Down Expand Up @@ -671,7 +667,7 @@ func doDumpBatchData(e *datastreamer.FileEntry, c *datastreamer.StreamClient, s
batchNumber := binary.BigEndian.Uint64(e.Data[0:8])
if batchNumber < dumpBatchNumber {
return nil
} else if (batchNumber > dumpBatchNumber) || (e.Number+1 >= c.Header.TotalEntries) {
} else if (batchNumber > dumpBatchNumber) || (e.Number+1 >= c.GetTotalEntries()) {
log.Infof("DUMP BATCH finished! First entry[%d], last entry[%d], first block[%d], last block[%d], total tx[%d]",
dumpEntryFirst, dumpEntryLast, dumpBlockFirst, dumpBlockLast, dumpTotalTx)

Expand Down
47 changes: 26 additions & 21 deletions datastreamer/datastreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,44 +441,49 @@ func TestServer(t *testing.T) {
}

func TestClient(t *testing.T) {
var fromBookmark []byte
var fromEntry uint64
var entry datastreamer.FileEntry
var header datastreamer.HeaderEntry

client, err := datastreamer.NewClient(fmt.Sprintf("localhost:%d", config.Port), streamType)
require.NoError(t, err)

err = client.Start()
require.NoError(t, err)

// Case: Query data from not existing bookmark -> FAIL
client.FromBookmark = nonAddedBookmark.Encode()
err = client.ExecCommand(datastreamer.CmdBookmark)
fromBookmark = nonAddedBookmark.Encode()
_, err = client.ExecCommandGetBookmark(fromBookmark)
require.EqualError(t, datastreamer.ErrBookmarkNotFound, err.Error())

// Case: Query data from existing bookmark -> OK
client.FromBookmark = testBookmark.Encode()
err = client.ExecCommand(datastreamer.CmdBookmark)
fromBookmark = testBookmark.Encode()
_, err = client.ExecCommandGetBookmark(fromBookmark)
require.NoError(t, err)

// Case: Query data for entry number that doesn't exist -> FAIL
client.FromEntry = 5000
err = client.ExecCommand(datastreamer.CmdEntry)
fromEntry = 5000
_, err = client.ExecCommandGetEntry(fromEntry)
require.EqualError(t, datastreamer.ErrEntryNotFound, err.Error())

// Case: Query data for entry number that exists -> OK
client.FromEntry = 2
err = client.ExecCommand(datastreamer.CmdEntry)
fromEntry = 2
entry, err = client.ExecCommandGetEntry(fromEntry)
require.NoError(t, err)
require.Equal(t, testEntries[2], TestEntry{}.Decode(client.Entry.Data))
require.Equal(t, testEntries[2], TestEntry{}.Decode(entry.Data))

// Case: Query data for entry number that exists -> OK
client.FromEntry = 1
err = client.ExecCommand(datastreamer.CmdEntry)
fromEntry = 1
entry, err = client.ExecCommandGetEntry(fromEntry)
require.NoError(t, err)
require.Equal(t, testEntries[1], TestEntry{}.Decode(client.Entry.Data))
require.Equal(t, testEntries[1], TestEntry{}.Decode(entry.Data))

// Case: Query header info -> OK
err = client.ExecCommand(datastreamer.CmdHeader)
header, err = client.ExecCommandGetHeader()
require.NoError(t, err)
require.Equal(t, headerEntry.TotalEntries, client.Header.TotalEntries)
require.Equal(t, headerEntry.TotalLength, client.Header.TotalLength)
require.Equal(t, headerEntry.TotalEntries, header.TotalEntries)
require.Equal(t, headerEntry.TotalLength, header.TotalLength)

// Case: Start sync from not existing entry -> FAIL
// client.FromEntry = 22
Expand All @@ -496,8 +501,8 @@ func TestClient(t *testing.T) {
// require.NoError(t, err)

// Case: Start sync from existing bookmark -> OK
client.FromBookmark = testBookmark.Encode()
err = client.ExecCommand(datastreamer.CmdStartBookmark)
fromBookmark = testBookmark.Encode()
err = client.ExecCommandStartBookmark(fromBookmark)
require.NoError(t, err)

// Case: Query entry data with streaming started -> FAIL
Expand All @@ -511,12 +516,12 @@ func TestClient(t *testing.T) {
// require.EqualError(t, datastreamer.ErrResultCommandError, err.Error())

// Case: Stop receiving streaming -> OK
err = client.ExecCommand(datastreamer.CmdStop)
err = client.ExecCommandStop()
require.NoError(t, err)

// Case: Query entry data after stop the streaming -> OK
client.FromEntry = 2
err = client.ExecCommand(datastreamer.CmdEntry)
fromEntry = 2
entry, err = client.ExecCommandGetEntry(fromEntry)
require.NoError(t, err)
require.Equal(t, testEntries[2], TestEntry{}.Decode(client.Entry.Data))
require.Equal(t, testEntries[2], TestEntry{}.Decode(entry.Data))
}
2 changes: 1 addition & 1 deletion datastreamer/streambookmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (b *StreamBookmark) GetBookmark(bookmark []byte) (uint64, error) {
// Get the bookmark from DB
entry, err := b.db.Get(bookmark, nil)
if err == leveldb.ErrNotFound {
log.Infof("Bookmark not found [%v]: %v", bookmark, err)
// log.Infof("Bookmark not found [%v]: %v", bookmark, err)
return 0, err
} else if err != nil {
log.Errorf("Error getting bookmark [%v]: %v", bookmark, err)
Expand Down
Loading

0 comments on commit ebf261c

Please sign in to comment.