Skip to content
This repository has been archived by the owner on Feb 26, 2024. It is now read-only.

Commit

Permalink
update package interface
Browse files Browse the repository at this point in the history
  • Loading branch information
keisku committed Oct 9, 2021
1 parent e30fec9 commit 11f6ef4
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 139 deletions.
39 changes: 16 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Gorilla

Gorilla provides an encoder/decoder package based on Facebook's Gorilla.
Gorilla provides a compression/decompression time-series package based on Facebook's Gorilla.

Pelkonen, T., Franklin, S., Teller, J., Cavallaro, P., Huang, Q., Meza, J., & Veeraraghavan, K. (2015). Gorilla. Proceedings of the VLDB Endowment, 8(12), 1816–1827. https://doi.org/10.14778/2824032.2824078

Expand All @@ -12,53 +12,46 @@ Pelkonen, T., Franklin, S., Teller, J., Cavallaro, P., Huang, Q., Meza, J., &#38
go get github.com/kei6u/gorilla
```

### Encoder
### Compressor

```go

buf := new(bytes.Buffer)
e := gorilla.NewEncoder(buf)
header := uint32(time.Now().Unix())

h := uint32(time.Now().Unix())
if err := e.PutHeader(h); err != nil {
c, finish, err := gorilla.NewCompressor(buf, header)
if err != nil {
return err
}

if err := e.Encode(uint32(time.Now().Unix()), 10.0); err != nil {
if err := e.Compress(uint32(time.Now().Unix()), 10.0); err != nil {
return err
}
if err := e.Encode(uint32(time.Now().Unix()), 10.5); err != nil {
if err := e.Compress(uint32(time.Now().Unix()), 10.5); err != nil {
return err
}

return e.Flush()
return finish()
```

### Decoder
### Decompressor

```go

buf := new(bytes.Buffer)

// Encoding data to buf ...
// Compressing time-series data to buf ...

h, err := d.LoadHeader()
d, h, err := gorilla.NewDecompressor(buf)
if err != nil {
return err
}

var data []*gorilla.Data
for {
in := &gorilla.Data{}
err := d.Decode(in)
if err == io.EOF {
break
}
if err != nil {
return err
}
data = append(data, in)
iter := d.Iter()
for iter.Next() {
t, v := iter.Get()
fmt.Println(t, v)
}

return data
return iter.Err()
```
79 changes: 37 additions & 42 deletions encode.go → compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ const (
firstDeltaBits = 14
)

// Encoder is a Facebook's paper based encoder.
// Compressor compresses time-series data based on Facebook's paper.
// Link to the paper: https://www.vldb.org/pvldb/vol8/p1816-teller.pdf
type Encoder struct {
type Compressor struct {
bw *bitWriter
header uint32
t uint32
Expand All @@ -22,55 +22,50 @@ type Encoder struct {
value uint64
}

// NewEncoder initializes a Facebook Gorilla based encoder.
func NewEncoder(w io.Writer) *Encoder {
return &Encoder{
// NewCompressor initialize Compressor and returns a function to be invoked
// at the end of compressing.
func NewCompressor(w io.Writer, header uint32) (c *Compressor, finish func() error, err error) {
c = &Compressor{
header: header,
bw: newBitWriter(w),
leadingZeros: math.MaxUint8,
}
}

// PutHeader puts the starting timestamp.
func (e *Encoder) PutHeader(h uint32) error {
if err := e.bw.writeBits(uint64(h), 32); err != nil {
return fmt.Errorf("failed to write header: %w", err)
if err := c.bw.writeBits(uint64(header), 32); err != nil {
return nil, nil, fmt.Errorf("failed to write header: %w", err)
}
e.header = h
return nil
return c, c.finish, nil
}

// Encode encode and write data.
func (e *Encoder) Encode(data Data) error {
// Compress compresses time-series data and write.
func (e *Compressor) Compress(t uint32, v float64) error {
if e.t == 0 {
return e.writeFirst(data)
}
return e.write(data)
}
delta := t - e.header
e.t = t
e.tDelta = delta
e.value = math.Float64bits(v)

func (e *Encoder) writeFirst(data Data) error {
delta := data.UnixTimestamp - e.header
e.t = data.UnixTimestamp
e.tDelta = delta
e.value = math.Float64bits(data.Value)

if err := e.bw.writeBits(uint64(delta), firstDeltaBits); err != nil {
return fmt.Errorf("failed to write first delta: %w", err)
if err := e.bw.writeBits(uint64(delta), firstDeltaBits); err != nil {
return fmt.Errorf("failed to write first timestamp: %w", err)
}
if err := e.bw.writeBits(e.value, 64); err != nil {
return fmt.Errorf("failed to write first value: %w", err)
}
return nil
}
return e.bw.writeBits(e.value, 64)
return e.compress(t, v)
}

func (e *Encoder) write(data Data) error {
if err := e.writeTimestamp(data.UnixTimestamp); err != nil {
return fmt.Errorf("failed to write time series data: %w", err)
func (e *Compressor) compress(t uint32, v float64) error {
if err := e.compressTimestamp(t); err != nil {
return fmt.Errorf("failed to compress timestamp: %w", err)
}
if err := e.writeValue(data.Value); err != nil {
return fmt.Errorf("failed to write time series data: %w", err)
if err := e.compressValue(v); err != nil {
return fmt.Errorf("failed to compress value: %w", err)
}
return nil
}

// Compressing time stamps
func (e *Encoder) writeTimestamp(t uint32) error {
func (e *Compressor) compressTimestamp(t uint32) error {
// If t < e.t, delta is overflowed because it is uint32.
// And it causes unexpected EOF during decoding.
delta := t - e.t
Expand Down Expand Up @@ -141,7 +136,7 @@ func writeInt64Bits(bw *bitWriter, i int64, nbits uint) error {
return bw.writeBits(u, int(nbits))
}

func (e *Encoder) writeValue(v float64) error {
func (e *Compressor) compressValue(v float64) error {
value := math.Float64bits(v)
xor := e.value ^ value
e.value = value
Expand All @@ -154,15 +149,15 @@ func (e *Encoder) writeValue(v float64) error {
trailingZeros := trailingZeros(xor)

if err := e.bw.writeBit(one); err != nil {
return fmt.Errorf("failed to write one: %w", err)
return fmt.Errorf("failed to write one bit: %w", err)
}

if e.leadingZeros <= leadingZeros && e.trailingZeros <= trailingZeros {
// If the block of meaningful bits falls within the block of previous meaningful bits,
// i.e., there are at least as many leading zeros and as many trailing zeros as with the previous value
// use that information for the block position and just store the meaningful XORed value.
if err := e.bw.writeBit(zero); err != nil {
return fmt.Errorf("failed to write zero: %w", err)
return fmt.Errorf("failed to write zero bit: %w", err)
}
significantBits := int(64 - e.leadingZeros - e.trailingZeros)
if err := e.bw.writeBits(xor>>e.trailingZeros, significantBits); err != nil {
Expand All @@ -176,7 +171,7 @@ func (e *Encoder) writeValue(v float64) error {

// write new leading
if err := e.bw.writeBit(one); err != nil {
return fmt.Errorf("failed to write one: %w", err)
return fmt.Errorf("failed to write one bit: %w", err)
}
if err := e.bw.writeBits(uint64(leadingZeros), 5); err != nil {
return fmt.Errorf("failed to write leading zeros: %w", err)
Expand All @@ -188,7 +183,7 @@ func (e *Encoder) writeValue(v float64) error {
// So instead we write out a 0 and adjust it back to 64 on unpacking.
significantBits := 64 - leadingZeros - trailingZeros
if err := e.bw.writeBits(uint64(significantBits), 6); err != nil {
return fmt.Errorf("failed to write bits: %w", err)
return fmt.Errorf("failed to write significant bits: %w", err)
}
if err := e.bw.writeBits(xor>>e.trailingZeros, int(significantBits)); err != nil {
return fmt.Errorf("failed to write xor value")
Expand All @@ -214,8 +209,8 @@ func trailingZeros(v uint64) uint8 {
return ret
}

// Flush encodes the finish marker and flush bits with zero bits padding for byte-align.
func (e *Encoder) Flush() error {
// finish compresses the finish marker and flush bits with zero bits padding for byte-align.
func (e *Compressor) finish() error {
if e.t == 0 {
// Add finish marker with delta = 0x3FFF (firstDeltaBits = 14 bits), and first value = 0
err := e.bw.writeBits(1<<firstDeltaBits-1, firstDeltaBits)
Expand Down
6 changes: 0 additions & 6 deletions data.go

This file was deleted.

98 changes: 54 additions & 44 deletions decode.go → decompressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"math"
)

// Decoder is a Facebook's paper based encoder.
// Compressor decompresses time-series data based on Facebook's paper.
// Link to the paper: https://www.vldb.org/pvldb/vol8/p1816-teller.pdf
type Decoder struct {
type Decompressor struct {
br *bitReader
header uint32
t uint32
Expand All @@ -19,81 +19,91 @@ type Decoder struct {
value uint64
}

func NewDecoder(r io.Reader) *Decoder {
return &Decoder{
// NewDecompressor initializes Decompressor and returns decompressed header.
func NewDecompressor(r io.Reader) (d *Decompressor, header uint32, err error) {
d = &Decompressor{
br: newBitReader(r),
}
h, err := d.br.readBits(32)
if err != nil {
return nil, 0, fmt.Errorf("failed to decode header: %w", err)
}
d.header = uint32(h)
return d, d.header, nil
}

// LoadHeader loads the starting timestamp.
func (d *Decoder) LoadHeader() (header uint32, err error) {
t, err := d.br.readBits(32)
if err != nil {
return 0, fmt.Errorf("failed to decode header: %w", err)
// Iter returns an iterator of decompressor.
func (d *Decompressor) Iter() *DecompressorIter {
return &DecompressorIter{0, 0, nil, d}
}

// DecompressorIter is an iterator of Decompressor.
type DecompressorIter struct {
t uint32
v float64
err error
d *Decompressor
}

// Get returns decompressed time-series data.
func (d *DecompressorIter) Get() (t uint32, v float64) {
return d.t, d.v
}

// Err returns error during decompression.
func (d *DecompressorIter) Err() error {
if d.err == io.EOF {
return nil
}
d.header = uint32(t)
return d.header, nil
return d.err
}

// Decode read and decode data, then bind it to an argument.
func (d *Decoder) Decode(data *Data) error {
var err error
var read *Data
if d.t == 0 {
read, err = d.readFirst()
if err != nil {
return err
}
// Next proceeds decompressing time-series data unitil EOF.
func (d *DecompressorIter) Next() bool {
if d.d.t == 0 {
d.t, d.v, d.err = d.d.decompressFirst()
} else {
read, err = d.read()
if err != nil {
return err
}
d.t, d.v, d.err = d.d.decompress()
}
data.UnixTimestamp = read.UnixTimestamp
data.Value = read.Value
return nil
return d.err == nil
}

func (d *Decoder) readFirst() (*Data, error) {
func (d *Decompressor) decompressFirst() (t uint32, v float64, err error) {
delta, err := d.br.readBits(firstDeltaBits)
if err != nil {
return nil, fmt.Errorf("failed to read first sample: %w", err)
return 0, 0, fmt.Errorf("failed to decompress delta at first: %w", err)
}
if delta == 1<<firstDeltaBits-1 {
return nil, io.EOF
return 0, 0, io.EOF
}

value, err := d.br.readBits(64)
if err != nil {
return nil, fmt.Errorf("failed to read first sample value: %w", err)
return 0, 0, fmt.Errorf("failed to decompress value at first: %w", err)
}

d.delta = uint32(delta)
d.t = d.header + d.delta
d.value = value

return &Data{
UnixTimestamp: d.t,
Value: math.Float64frombits(d.value),
}, nil
return d.t, math.Float64frombits(d.value), nil
}

func (d *Decoder) read() (*Data, error) {
t, err := d.readTimestamp()
func (d *Decompressor) decompress() (t uint32, v float64, err error) {
t, err = d.decompressTimestamp()
if err != nil {
return nil, err
return 0, 0, fmt.Errorf("failed to decompress timestamp: %w", err)
}

v, err := d.readValue()
v, err = d.decompressValue()
if err != nil {
return nil, err
return 0, 0, fmt.Errorf("failed to decompress value: %w", err)
}

return &Data{t, v}, nil
return t, v, nil
}

func (d *Decoder) readTimestamp() (uint32, error) {
func (d *Decompressor) decompressTimestamp() (uint32, error) {
n, err := d.dodBitsN()
if err != nil {
return 0, err
Expand Down Expand Up @@ -123,7 +133,7 @@ func (d *Decoder) readTimestamp() (uint32, error) {
return d.t, nil
}

func (d *Decoder) readValue() (float64, error) {
func (d *Decompressor) decompressValue() (float64, error) {
bit, err := d.br.readBit()
if err != nil {
return 0, fmt.Errorf("failed to read value: %w", err)
Expand Down Expand Up @@ -162,7 +172,7 @@ func (d *Decoder) readValue() (float64, error) {
}

// read delta of delta
func (d *Decoder) dodBitsN() (n uint, err error) {
func (d *Decompressor) dodBitsN() (n uint, err error) {
var dod byte
for i := 0; i < 4; i++ {
dod <<= 1
Expand Down
Loading

0 comments on commit 11f6ef4

Please sign in to comment.