169 lines
4.9 KiB
Cheetah
169 lines
4.9 KiB
Cheetah
package tsm1
|
|
|
|
|
|
{{range .}}
|
|
// Read{{.Name}}Block reads the next block as a set of {{.name}} values.
|
|
func (c *KeyCursor) Read{{.Name}}Block(buf *[]{{.Name}}Value) ([]{{.Name}}Value, error) {
|
|
// No matching blocks to decode
|
|
if len(c.current) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
// First block is the oldest block containing the points we're searching for.
|
|
first := c.current[0]
|
|
*buf = (*buf)[:0]
|
|
values, err := first.r.Read{{.Name}}BlockAt(&first.entry, buf)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Remove values we already read
|
|
values = {{.Name}}Values(values).Exclude(first.readMin, first.readMax)
|
|
|
|
// Remove any tombstones
|
|
tombstones := first.r.TombstoneRange(c.key)
|
|
values = c.filter{{.Name}}Values(tombstones, values)
|
|
|
|
// Check we have remaining values.
|
|
if len(values) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
// Only one block with this key and time range so return it
|
|
if len(c.current) == 1 {
|
|
if len(values) > 0 {
|
|
first.markRead(values[0].UnixNano(), values[len(values)-1].UnixNano())
|
|
}
|
|
return values, nil
|
|
}
|
|
|
|
// Use the current block time range as our overlapping window
|
|
minT, maxT := first.readMin, first.readMax
|
|
if len(values) > 0 {
|
|
minT, maxT = values[0].UnixNano(), values[len(values)-1].UnixNano()
|
|
}
|
|
if c.ascending {
|
|
// Blocks are ordered by generation, we may have values in the past in later blocks, if so,
|
|
// expand the window to include the min time range to ensure values are returned in ascending
|
|
// order
|
|
for i := 1; i < len(c.current); i++ {
|
|
cur := c.current[i]
|
|
if cur.entry.MinTime < minT && !cur.read() {
|
|
minT = cur.entry.MinTime
|
|
}
|
|
}
|
|
|
|
// Find first block that overlaps our window
|
|
for i := 1; i < len(c.current); i++ {
|
|
cur := c.current[i]
|
|
if cur.entry.OverlapsTimeRange(minT, maxT) && !cur.read() {
|
|
// Shrink our window so it's the intersection of the first overlapping block and the
|
|
// first block. We do this to minimize the region that overlaps and needs to
|
|
// be merged.
|
|
if cur.entry.MaxTime > maxT {
|
|
maxT = cur.entry.MaxTime
|
|
}
|
|
values = {{.Name}}Values(values).Include(minT, maxT)
|
|
break
|
|
}
|
|
}
|
|
|
|
// Search the remaining blocks that overlap our window and append their values so we can
|
|
// merge them.
|
|
for i := 1; i < len(c.current); i++ {
|
|
cur := c.current[i]
|
|
// Skip this block if it doesn't contain points we looking for or they have already been read
|
|
if !cur.entry.OverlapsTimeRange(minT, maxT) || cur.read() {
|
|
cur.markRead(minT, maxT)
|
|
continue
|
|
}
|
|
|
|
tombstones := cur.r.TombstoneRange(c.key)
|
|
var a []{{.Name}}Value
|
|
v, err := cur.r.Read{{.Name}}BlockAt(&cur.entry, &a)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Remove any tombstoned values
|
|
v = c.filter{{.Name}}Values(tombstones, v)
|
|
|
|
// Remove values we already read
|
|
v = {{.Name}}Values(v).Exclude(cur.readMin, cur.readMax)
|
|
|
|
if len(v) > 0 {
|
|
// Only use values in the overlapping window
|
|
v = {{.Name}}Values(v).Include(minT, maxT)
|
|
|
|
// Merge the remaing values with the existing
|
|
values = {{.Name}}Values(values).Merge(v)
|
|
}
|
|
cur.markRead(minT, maxT)
|
|
}
|
|
|
|
} else {
|
|
// Blocks are ordered by generation, we may have values in the past in later blocks, if so,
|
|
// expand the window to include the max time range to ensure values are returned in descending
|
|
// order
|
|
for i := 1; i < len(c.current); i++ {
|
|
cur := c.current[i]
|
|
if cur.entry.MaxTime > maxT && !cur.read() {
|
|
maxT = cur.entry.MaxTime
|
|
}
|
|
}
|
|
|
|
// Find first block that overlaps our window
|
|
for i := 1; i < len(c.current); i++ {
|
|
cur := c.current[i]
|
|
if cur.entry.OverlapsTimeRange(minT, maxT) && !cur.read() {
|
|
// Shrink our window so it's the intersection of the first overlapping block and the
|
|
// first block. We do this to minimize the region that overlaps and needs to
|
|
// be merged.
|
|
if cur.entry.MinTime < minT {
|
|
minT = cur.entry.MinTime
|
|
}
|
|
values = {{.Name}}Values(values).Include(minT, maxT)
|
|
break
|
|
}
|
|
}
|
|
|
|
// Search the remaining blocks that overlap our window and append their values so we can
|
|
// merge them.
|
|
for i := 1; i < len(c.current); i++ {
|
|
cur := c.current[i]
|
|
// Skip this block if it doesn't contain points we looking for or they have already been read
|
|
if !cur.entry.OverlapsTimeRange(minT, maxT) || cur.read() {
|
|
cur.markRead(minT, maxT)
|
|
continue
|
|
}
|
|
|
|
tombstones := cur.r.TombstoneRange(c.key)
|
|
|
|
var a []{{.Name}}Value
|
|
v, err := cur.r.Read{{.Name}}BlockAt(&cur.entry, &a)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Remove any tombstoned values
|
|
v = c.filter{{.Name}}Values(tombstones, v)
|
|
|
|
// Remove values we already read
|
|
v = {{.Name}}Values(v).Exclude(cur.readMin, cur.readMax)
|
|
|
|
// If the block we decoded should have all of it's values included, mark it as read so we
|
|
// don't use it again.
|
|
if len(v) > 0 {
|
|
v = {{.Name}}Values(v).Include(minT, maxT)
|
|
// Merge the remaing values with the existing
|
|
values = {{.Name}}Values(v).Merge(values)
|
|
}
|
|
cur.markRead(minT, maxT)
|
|
}
|
|
}
|
|
|
|
first.markRead(minT, maxT)
|
|
|
|
return values, err
|
|
}
|
|
|
|
{{ end }}
|