-
Notifications
You must be signed in to change notification settings - Fork 0
/
iterator.go
194 lines (160 loc) · 5.3 KB
/
iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
// Copyright 2017 Pilosa Corp.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pilosa
import (
"fmt"
"github.com/m3dbx/pilosa/roaring"
)
// iterator is an interface for looping over row/column pairs.
type iterator interface {
Seek(rowID, columnID uint64)
Next() (rowID, columnID uint64, eof bool)
}
// bufIterator wraps an iterator to provide the ability to unread values.
type bufIterator struct {
buf struct {
rowID uint64
columnID uint64
eof bool
full bool
}
itr iterator
}
// newBufIterator returns a buffered iterator that wraps itr.
func newBufIterator(itr iterator) *bufIterator {
return &bufIterator{itr: itr}
}
// Seek moves to the first pair equal to or greater than pseek/bseek.
func (itr *bufIterator) Seek(rowID, columnID uint64) {
itr.buf.full = false
itr.itr.Seek(rowID, columnID)
}
// Next returns the next pair in the row.
// If a value has been buffered then it is returned and the buffer is cleared.
func (itr *bufIterator) Next() (rowID, columnID uint64, eof bool) {
if itr.buf.full {
itr.buf.full = false
return itr.buf.rowID, itr.buf.columnID, itr.buf.eof
}
// Read values onto buffer in case of unread.
itr.buf.rowID, itr.buf.columnID, itr.buf.eof = itr.itr.Next()
return itr.buf.rowID, itr.buf.columnID, itr.buf.eof
}
// Peek reads the next value but leaves it on the buffer.
func (itr *bufIterator) Peek() (rowID, columnID uint64, eof bool) {
rowID, columnID, eof = itr.Next()
itr.Unread()
return
}
// Unread pushes previous pair on to the buffer.
// Panics if the buffer is already full.
func (itr *bufIterator) Unread() {
if itr.buf.full {
panic("pilosa.BufIterator: buffer full")
}
itr.buf.full = true
}
// limitIterator wraps an Iterator and limits it to a max column/row pair.
type limitIterator struct {
itr iterator
maxRowID uint64
maxColumnID uint64
eof bool
}
// newLimitIterator returns a new LimitIterator.
func newLimitIterator(itr iterator, maxRowID, maxColumnID uint64) *limitIterator { // nolint: unparam
return &limitIterator{
itr: itr,
maxRowID: maxRowID,
maxColumnID: maxColumnID,
}
}
// Seek moves the underlying iterator to a column/row pair.
func (itr *limitIterator) Seek(rowID, columnID uint64) { itr.itr.Seek(rowID, columnID) }
// Next returns the next row/column ID pair.
// If the underlying iterator returns a pair higher than the max then EOF is returned.
func (itr *limitIterator) Next() (rowID, columnID uint64, eof bool) {
// Always return EOF once it is reached by limit or the underlying iterator.
if itr.eof {
return 0, 0, true
}
// Retrieve pair from underlying iterator.
// Mark as EOF if it is beyond the limit (or at EOF).
rowID, columnID, eof = itr.itr.Next()
if eof || rowID > itr.maxRowID || (rowID == itr.maxRowID && columnID > itr.maxColumnID) {
itr.eof = true
return 0, 0, true
}
return rowID, columnID, false
}
// sliceIterator iterates over a pair of row/column ID slices.
type sliceIterator struct {
rowIDs []uint64
columnIDs []uint64
i, n int
}
// newSliceIterator returns an iterator to iterate over a set of row/column ID pairs.
// Both slices MUST have an equal length. Otherwise the function will panic.
func newSliceIterator(rowIDs, columnIDs []uint64) *sliceIterator {
if len(columnIDs) != len(rowIDs) {
panic(fmt.Sprintf("pilosa.SliceIterator: pair length mismatch: %d != %d", len(rowIDs), len(columnIDs)))
}
return &sliceIterator{
rowIDs: rowIDs,
columnIDs: columnIDs,
n: len(rowIDs),
}
}
// Seek moves the cursor to a given pair.
// If the pair is not found, the iterator seeks to the next pair.
func (itr *sliceIterator) Seek(bseek, pseek uint64) {
for i := 0; i < itr.n; i++ {
rowID := itr.rowIDs[i]
columnID := itr.columnIDs[i]
if (bseek == rowID && pseek <= columnID) || bseek < rowID {
itr.i = i
return
}
}
// Seek to the end of the slice if all values are less than seek pair.
itr.i = itr.n
}
// Next returns the next row/column ID pair.
func (itr *sliceIterator) Next() (rowID, columnID uint64, eof bool) {
if itr.i >= itr.n {
return 0, 0, true
}
rowID = itr.rowIDs[itr.i]
columnID = itr.columnIDs[itr.i]
itr.i++
return rowID, columnID, false
}
// roaringIterator converts a roaring.Iterator to output column/row pairs.
type roaringIterator struct {
itr *roaring.Iterator
}
// newRoaringIterator returns a new iterator wrapping itr.
func newRoaringIterator(itr *roaring.Iterator) *roaringIterator {
return &roaringIterator{itr: itr}
}
// Seek moves the cursor to a pair matching bseek/pseek.
// If the pair is not found then it moves to the next pair.
func (itr *roaringIterator) Seek(bseek, pseek uint64) {
itr.itr.Seek((bseek * ShardWidth) + pseek)
}
// Next returns the next column/row ID pair.
func (itr *roaringIterator) Next() (rowID, columnID uint64, eof bool) {
v, eof := itr.itr.Next()
return v / ShardWidth, v % ShardWidth, eof
}