refactor(storage): remove no points tables and streamReader interface
These tables were previously used to perform meta queries. Meta queries are now answered using a specific API, and as a result, these tables can go away.pull/13735/head
parent
95aa194498
commit
5d90683b07
|
@ -13,17 +13,11 @@ import (
|
|||
"github.com/influxdata/influxdb/tsdb/cursors"
|
||||
)
|
||||
|
||||
type streamCursor interface {
|
||||
streamCursor()
|
||||
}
|
||||
|
||||
type floatCursorStreamReader struct {
|
||||
fr *frameReader
|
||||
a cursors.FloatArray
|
||||
}
|
||||
|
||||
func (c *floatCursorStreamReader) streamCursor() {}
|
||||
|
||||
func (c *floatCursorStreamReader) Close() {
|
||||
for c.fr.state == stateReadFloatPoints {
|
||||
c.readFrame()
|
||||
|
@ -71,8 +65,6 @@ type integerCursorStreamReader struct {
|
|||
a cursors.IntegerArray
|
||||
}
|
||||
|
||||
func (c *integerCursorStreamReader) streamCursor() {}
|
||||
|
||||
func (c *integerCursorStreamReader) Close() {
|
||||
for c.fr.state == stateReadIntegerPoints {
|
||||
c.readFrame()
|
||||
|
@ -120,8 +112,6 @@ type unsignedCursorStreamReader struct {
|
|||
a cursors.UnsignedArray
|
||||
}
|
||||
|
||||
func (c *unsignedCursorStreamReader) streamCursor() {}
|
||||
|
||||
func (c *unsignedCursorStreamReader) Close() {
|
||||
for c.fr.state == stateReadUnsignedPoints {
|
||||
c.readFrame()
|
||||
|
@ -169,8 +159,6 @@ type stringCursorStreamReader struct {
|
|||
a cursors.StringArray
|
||||
}
|
||||
|
||||
func (c *stringCursorStreamReader) streamCursor() {}
|
||||
|
||||
func (c *stringCursorStreamReader) Close() {
|
||||
for c.fr.state == stateReadStringPoints {
|
||||
c.readFrame()
|
||||
|
@ -218,8 +206,6 @@ type booleanCursorStreamReader struct {
|
|||
a cursors.BooleanArray
|
||||
}
|
||||
|
||||
func (c *booleanCursorStreamReader) streamCursor() {}
|
||||
|
||||
func (c *booleanCursorStreamReader) Close() {
|
||||
for c.fr.state == stateReadBooleanPoints {
|
||||
c.readFrame()
|
||||
|
|
|
@ -7,18 +7,12 @@ import (
|
|||
"github.com/influxdata/influxdb/tsdb/cursors"
|
||||
)
|
||||
|
||||
type streamCursor interface {
|
||||
streamCursor()
|
||||
}
|
||||
|
||||
{{range .}}
|
||||
type {{.name}}CursorStreamReader struct {
|
||||
fr *frameReader
|
||||
a cursors.{{.Name}}Array
|
||||
}
|
||||
|
||||
func (c *{{.name}}CursorStreamReader) streamCursor() {}
|
||||
|
||||
func (c *{{.name}}CursorStreamReader) Close() {
|
||||
for c.fr.state == stateRead{{.Name}}Points {
|
||||
c.readFrame()
|
||||
|
|
|
@ -3,7 +3,6 @@ package reads
|
|||
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@types.tmpldata table.gen.go.tmpl
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/apache/arrow/go/arrow/array"
|
||||
|
@ -12,7 +11,6 @@ import (
|
|||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/memory"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/tsdb/cursors"
|
||||
)
|
||||
|
||||
type table struct {
|
||||
|
@ -158,111 +156,6 @@ func (t *table) closeDone() {
|
|||
}
|
||||
}
|
||||
|
||||
// hasPoints returns true if the next block from cur has data. If cur is not
|
||||
// nil, it will be closed.
|
||||
func hasPoints(cur cursors.Cursor) bool {
|
||||
if cur == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// TODO(sgc): this is a temporary fix to identify a remote cursor
|
||||
// which will not stream points causing hasPoints to return false.
|
||||
// This is the cause of https://github.com/influxdata/idpe/issues/2774
|
||||
if _, ok := cur.(streamCursor); ok {
|
||||
cur.Close()
|
||||
return true
|
||||
}
|
||||
|
||||
res := false
|
||||
switch cur := cur.(type) {
|
||||
case cursors.IntegerArrayCursor:
|
||||
a := cur.Next()
|
||||
res = a.Len() > 0
|
||||
case cursors.FloatArrayCursor:
|
||||
a := cur.Next()
|
||||
res = a.Len() > 0
|
||||
case cursors.UnsignedArrayCursor:
|
||||
a := cur.Next()
|
||||
res = a.Len() > 0
|
||||
case cursors.BooleanArrayCursor:
|
||||
a := cur.Next()
|
||||
res = a.Len() > 0
|
||||
case cursors.StringArrayCursor:
|
||||
a := cur.Next()
|
||||
res = a.Len() > 0
|
||||
default:
|
||||
panic(fmt.Sprintf("unreachable: %T", cur))
|
||||
}
|
||||
cur.Close()
|
||||
return res
|
||||
}
|
||||
|
||||
type tableNoPoints struct {
|
||||
table
|
||||
}
|
||||
|
||||
func newTableNoPoints(
|
||||
done chan struct{},
|
||||
bounds execute.Bounds,
|
||||
key flux.GroupKey,
|
||||
cols []flux.ColMeta,
|
||||
tags models.Tags,
|
||||
defs [][]byte,
|
||||
alloc *memory.Allocator,
|
||||
) *tableNoPoints {
|
||||
t := &tableNoPoints{
|
||||
table: newTable(done, bounds, key, cols, defs, alloc),
|
||||
}
|
||||
t.readTags(tags)
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
func (t *tableNoPoints) Close() {}
|
||||
|
||||
func (t *tableNoPoints) Statistics() cursors.CursorStats { return cursors.CursorStats{} }
|
||||
|
||||
func (t *tableNoPoints) Do(f func(flux.ColReader) error) error {
|
||||
if t.isCancelled() {
|
||||
return nil
|
||||
}
|
||||
t.err = f(t)
|
||||
t.closeDone()
|
||||
return t.err
|
||||
}
|
||||
|
||||
type groupTableNoPoints struct {
|
||||
table
|
||||
}
|
||||
|
||||
func newGroupTableNoPoints(
|
||||
done chan struct{},
|
||||
bounds execute.Bounds,
|
||||
key flux.GroupKey,
|
||||
cols []flux.ColMeta,
|
||||
defs [][]byte,
|
||||
alloc *memory.Allocator,
|
||||
) *groupTableNoPoints {
|
||||
t := &groupTableNoPoints{
|
||||
table: newTable(done, bounds, key, cols, defs, alloc),
|
||||
}
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
func (t *groupTableNoPoints) Close() {}
|
||||
|
||||
func (t *groupTableNoPoints) Do(f func(flux.ColReader) error) error {
|
||||
if t.isCancelled() {
|
||||
return nil
|
||||
}
|
||||
t.err = f(t)
|
||||
t.closeDone()
|
||||
return t.err
|
||||
}
|
||||
|
||||
func (t *groupTableNoPoints) Statistics() cursors.CursorStats { return cursors.CursorStats{} }
|
||||
|
||||
func (t *floatTable) toArrowBuffer(vs []float64) *array.Float64 {
|
||||
return arrow.NewFloat(vs, t.alloc)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue