package reads import ( "sync" "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/platform/models" "github.com/influxdata/platform/tsdb/cursors" "github.com/pkg/errors" ) {{range .}} // // *********** {{.Name}} *********** // type {{.name}}Table struct { table valBuf []{{.Type}} mu sync.Mutex cur cursors.{{.Name}}ArrayCursor } func new{{.Name}}Table( done chan struct{}, cur cursors.{{.Name}}ArrayCursor, bounds execute.Bounds, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, ) *{{.name}}Table { t := &{{.name}}Table{ table: newTable(done, bounds, key, cols, defs), cur: cur, } t.readTags(tags) t.advance() return t } func (t *{{.name}}Table) Close() { t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } t.mu.Unlock() } func (t *{{.name}}Table) Do(f func(flux.ColReader) error) error { t.mu.Lock() defer func() { t.closeDone() t.mu.Unlock() }() if !t.Empty() { t.err = f(t) for !t.isCancelled() && t.err == nil && t.advance() { t.err = f(t) } } return t.err } func (t *{{.name}}Table) advance() bool { a := t.cur.Next() t.l = a.Len() if t.l == 0 { return false } if cap(t.timeBuf) < t.l { t.timeBuf = make([]execute.Time, t.l) } else { t.timeBuf = t.timeBuf[:t.l] } for i := range a.Timestamps { t.timeBuf[i] = execute.Time(a.Timestamps[i]) } if cap(t.valBuf) < t.l { t.valBuf = make([]{{.Type}}, t.l) } else { t.valBuf = t.valBuf[:t.l] } copy(t.valBuf, a.Values) t.colBufs[timeColIdx] = t.timeBuf t.colBufs[valueColIdx] = t.valBuf t.appendTags() t.appendBounds() return true } // group table type {{.name}}GroupTable struct { table valBuf []{{.Type}} mu sync.Mutex gc GroupCursor cur cursors.{{.Name}}ArrayCursor } func new{{.Name}}GroupTable( done chan struct{}, gc GroupCursor, cur cursors.{{.Name}}ArrayCursor, bounds execute.Bounds, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, ) *{{.name}}GroupTable { t := &{{.name}}GroupTable{ table: newTable(done, bounds, key, cols, defs), gc: gc, cur: cur, } t.readTags(tags) t.advance() return t } func (t *{{.name}}GroupTable) Close() { t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } if t.gc != nil { t.gc.Close() t.gc = nil } t.mu.Unlock() } func (t *{{.name}}GroupTable) Do(f func(flux.ColReader) error) error { t.mu.Lock() defer func() { t.closeDone() t.mu.Unlock() }() if !t.Empty() { t.err = f(t) for !t.isCancelled() && t.err == nil && t.advance() { t.err = f(t) } } return t.err } func (t *{{.name}}GroupTable) advance() bool { RETRY: a := t.cur.Next() t.l = a.Len() if t.l == 0 { if t.advanceCursor() { goto RETRY } return false } if cap(t.timeBuf) < t.l { t.timeBuf = make([]execute.Time, t.l) } else { t.timeBuf = t.timeBuf[:t.l] } for i := range a.Timestamps { t.timeBuf[i] = execute.Time(a.Timestamps[i]) } if cap(t.valBuf) < t.l { t.valBuf = make([]{{.Type}}, t.l) } else { t.valBuf = t.valBuf[:t.l] } copy(t.valBuf, a.Values) t.colBufs[timeColIdx] = t.timeBuf t.colBufs[valueColIdx] = t.valBuf t.appendTags() t.appendBounds() return true } func (t *{{.name}}GroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { cur := t.gc.Cursor() if cur == nil { continue } if typedCur, ok := cur.(cursors.{{.Name}}ArrayCursor); !ok { // TODO(sgc): error or skip? cur.Close() t.err = errors.Errorf("expected {{.name}} cursor type, got %T", cur) return false } else { t.readTags(t.gc.Tags()) t.cur = typedCur return true } } return false } {{end}}