From 1d5a7bf9c82e6f4eaa321291f9abc219fb9282e3 Mon Sep 17 00:00:00 2001
From: "Jonathan A. Sternberg" <jonathan@influxdata.com>
Date: Thu, 11 Jun 2020 09:32:27 -0500
Subject: [PATCH] fix(storage/flux): fix the empty call for storage/flux
 (#18446)

The tables produced by `storage/flux` didn't previously pass our table
tests. The `Empty()` call is supposed to return false if the table was
ever not empty, but reading the table or calling `Done()` would cause
the table implementations here to return that they were always empty.
This messes up the csv encoder which then believes that it just emitted
an empty table.

The table tests for valid table implementations states that this is an
error for the table implementation. This change introduces a simple test
for `ReadFilter` and also runs the table tests on the filter iterator.
---
 storage/flux/table.gen.go           |  30 ++---
 storage/flux/table.gen.go.tmpl      |   6 +-
 storage/flux/table.go               |   7 +-
 storage/flux/table_internal_test.go |   7 ++
 storage/flux/table_test.go          | 181 ++++++++++++++++++++++++++++
 5 files changed, 212 insertions(+), 19 deletions(-)
 create mode 100644 storage/flux/table_internal_test.go

diff --git a/storage/flux/table.gen.go b/storage/flux/table.gen.go
index 05ae594ad1..3931f9eae1 100644
--- a/storage/flux/table.gen.go
+++ b/storage/flux/table.gen.go
@@ -47,7 +47,7 @@ func newFloatTable(
 		cur:   cur,
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
@@ -137,7 +137,7 @@ func newFloatWindowTable(
 		t.nextTS = start + (every - start%every)
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
@@ -332,7 +332,7 @@ func newFloatGroupTable(
 		cur:   cur,
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
@@ -445,7 +445,7 @@ func newIntegerTable(
 		cur:   cur,
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
@@ -535,7 +535,7 @@ func newIntegerWindowTable(
 		t.nextTS = start + (every - start%every)
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
@@ -730,7 +730,7 @@ func newIntegerGroupTable(
 		cur:   cur,
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
@@ -843,7 +843,7 @@ func newUnsignedTable(
 		cur:   cur,
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
@@ -933,7 +933,7 @@ func newUnsignedWindowTable(
 		t.nextTS = start + (every - start%every)
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
@@ -1128,7 +1128,7 @@ func newUnsignedGroupTable(
 		cur:   cur,
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
@@ -1241,7 +1241,7 @@ func newStringTable(
 		cur:   cur,
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
@@ -1331,7 +1331,7 @@ func newStringWindowTable(
 		t.nextTS = start + (every - start%every)
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
@@ -1526,7 +1526,7 @@ func newStringGroupTable(
 		cur:   cur,
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
@@ -1639,7 +1639,7 @@ func newBooleanTable(
 		cur:   cur,
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
@@ -1729,7 +1729,7 @@ func newBooleanWindowTable(
 		t.nextTS = start + (every - start%every)
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
@@ -1924,7 +1924,7 @@ func newBooleanGroupTable(
 		cur:   cur,
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
diff --git a/storage/flux/table.gen.go.tmpl b/storage/flux/table.gen.go.tmpl
index 54f39057d0..a417df6fb1 100644
--- a/storage/flux/table.gen.go.tmpl
+++ b/storage/flux/table.gen.go.tmpl
@@ -41,7 +41,7 @@ func new{{.Name}}Table(
 		cur:   cur,
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
@@ -131,7 +131,7 @@ func new{{.Name}}WindowTable(
 		t.nextTS = start + (every - start % every)
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
@@ -326,7 +326,7 @@ func new{{.Name}}GroupTable(
 		cur:   cur,
 	}
 	t.readTags(tags)
-	t.advance()
+	t.init(t.advance)
 
 	return t
 }
diff --git a/storage/flux/table.go b/storage/flux/table.go
index 063700f755..e691dd9274 100644
--- a/storage/flux/table.go
+++ b/storage/flux/table.go
@@ -27,6 +27,7 @@ type table struct {
 	done chan struct{}
 
 	colBufs *colReader
+	empty   bool
 
 	err error
 
@@ -59,7 +60,7 @@ func newTable(
 func (t *table) Key() flux.GroupKey   { return t.key }
 func (t *table) Cols() []flux.ColMeta { return t.cols }
 func (t *table) Err() error           { return t.err }
-func (t *table) Empty() bool          { return t.colBufs == nil || t.colBufs.l == 0 }
+func (t *table) Empty() bool          { return t.empty }
 
 func (t *table) Cancel() {
 	atomic.StoreInt32(&t.cancelled, 1)
@@ -69,6 +70,10 @@ func (t *table) isCancelled() bool {
 	return atomic.LoadInt32(&t.cancelled) != 0
 }
 
+func (t *table) init(advance func() bool) {
+	t.empty = !advance()
+}
+
 func (t *table) do(f func(flux.ColReader) error, advance func() bool) error {
 	// Mark this table as having been used. If this doesn't
 	// succeed, then this has already been invoked somewhere else.
diff --git a/storage/flux/table_internal_test.go b/storage/flux/table_internal_test.go
new file mode 100644
index 0000000000..182a78387d
--- /dev/null
+++ b/storage/flux/table_internal_test.go
@@ -0,0 +1,7 @@
+package storageflux
+
+import "sync/atomic"
+
+func (t *table) IsDone() bool {
+	return atomic.LoadInt32(&t.used) != 0
+}
diff --git a/storage/flux/table_test.go b/storage/flux/table_test.go
index 7d613ae076..a1779018b8 100644
--- a/storage/flux/table_test.go
+++ b/storage/flux/table_test.go
@@ -83,6 +83,187 @@ func (r *StorageReader) ReadWindowAggregate(ctx context.Context, spec query.Read
 	return wr.ReadWindowAggregate(ctx, spec, alloc)
 }
 
+func TestStorageReader_ReadFilter(t *testing.T) {
+	reader := NewStorageReader(t, func(org, bucket influxdb.ID) (gen.SeriesGenerator, gen.TimeRange) {
+		tagsSpec := &gen.TagsSpec{
+			Tags: []*gen.TagValuesSpec{
+				{
+					TagKey: "t0",
+					Values: func() gen.CountableSequence {
+						return gen.NewCounterByteSequence("a-%s", 0, 3)
+					},
+				},
+			},
+		}
+		spec := gen.Spec{
+			OrgID:    org,
+			BucketID: bucket,
+			Measurements: []gen.MeasurementSpec{
+				{
+					Name:     "m0",
+					TagsSpec: tagsSpec,
+					FieldValuesSpec: &gen.FieldValuesSpec{
+						Name: "f0",
+						TimeSequenceSpec: gen.TimeSequenceSpec{
+							Count: math.MaxInt32,
+							Delta: 10 * time.Second,
+						},
+						DataType: models.Float,
+						Values: func(spec gen.TimeSequenceSpec) gen.TimeValuesSequence {
+							return gen.NewTimeFloatValuesSequence(
+								spec.Count,
+								gen.NewTimestampSequenceFromSpec(spec),
+								gen.NewFloatArrayValuesSequence([]float64{1.0, 2.0, 3.0}),
+							)
+						},
+					},
+				},
+			},
+		}
+		tr := gen.TimeRange{
+			Start: mustParseTime("2019-11-25T00:00:00Z"),
+			End:   mustParseTime("2019-11-25T00:00:30Z"),
+		}
+		return gen.NewSeriesGeneratorFromSpec(&spec, tr), tr
+	})
+	defer reader.Close()
+
+	mem := &memory.Allocator{}
+	ti, err := reader.ReadFilter(context.Background(), query.ReadFilterSpec{
+		OrganizationID: reader.Org,
+		BucketID:       reader.Bucket,
+		Bounds:         reader.Bounds,
+	}, mem)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	makeTable := func(t0 string) *executetest.Table {
+		start, stop := reader.Bounds.Start, reader.Bounds.Stop
+		return &executetest.Table{
+			KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},
+			ColMeta: []flux.ColMeta{
+				{Label: "_start", Type: flux.TTime},
+				{Label: "_stop", Type: flux.TTime},
+				{Label: "_time", Type: flux.TTime},
+				{Label: "_value", Type: flux.TFloat},
+				{Label: "_field", Type: flux.TString},
+				{Label: "_measurement", Type: flux.TString},
+				{Label: "t0", Type: flux.TString},
+			},
+			Data: [][]interface{}{
+				{start, stop, Time("2019-11-25T00:00:00Z"), 1.0, "f0", "m0", t0},
+				{start, stop, Time("2019-11-25T00:00:10Z"), 2.0, "f0", "m0", t0},
+				{start, stop, Time("2019-11-25T00:00:20Z"), 3.0, "f0", "m0", t0},
+			},
+		}
+	}
+
+	want := []*executetest.Table{
+		makeTable("a-0"),
+		makeTable("a-1"),
+		makeTable("a-2"),
+	}
+	executetest.NormalizeTables(want)
+	sort.Sort(executetest.SortedTables(want))
+
+	var got []*executetest.Table
+	if err := ti.Do(func(table flux.Table) error {
+		t, err := executetest.ConvertTable(table)
+		if err != nil {
+			return err
+		}
+		got = append(got, t)
+		return nil
+	}); err != nil {
+		t.Fatal(err)
+	}
+	executetest.NormalizeTables(got)
+	sort.Sort(executetest.SortedTables(got))
+
+	// compare these two
+	if diff := cmp.Diff(want, got); diff != "" {
+		t.Errorf("unexpected results -want/+got:\n%s", diff)
+	}
+}
+
+func TestStorageReader_Table(t *testing.T) {
+	reader := NewStorageReader(t, func(org, bucket influxdb.ID) (gen.SeriesGenerator, gen.TimeRange) {
+		tagsSpec := &gen.TagsSpec{
+			Tags: []*gen.TagValuesSpec{
+				{
+					TagKey: "t0",
+					Values: func() gen.CountableSequence {
+						return gen.NewCounterByteSequence("a-%s", 0, 3)
+					},
+				},
+			},
+		}
+		spec := gen.Spec{
+			OrgID:    org,
+			BucketID: bucket,
+			Measurements: []gen.MeasurementSpec{
+				{
+					Name:     "m0",
+					TagsSpec: tagsSpec,
+					FieldValuesSpec: &gen.FieldValuesSpec{
+						Name: "f0",
+						TimeSequenceSpec: gen.TimeSequenceSpec{
+							Count: math.MaxInt32,
+							Delta: 10 * time.Second,
+						},
+						DataType: models.Float,
+						Values: func(spec gen.TimeSequenceSpec) gen.TimeValuesSequence {
+							return gen.NewTimeFloatValuesSequence(
+								spec.Count,
+								gen.NewTimestampSequenceFromSpec(spec),
+								gen.NewFloatArrayValuesSequence([]float64{1.0, 2.0, 3.0}),
+							)
+						},
+					},
+				},
+			},
+		}
+		tr := gen.TimeRange{
+			Start: mustParseTime("2019-11-25T00:00:00Z"),
+			End:   mustParseTime("2019-11-25T00:00:30Z"),
+		}
+		return gen.NewSeriesGeneratorFromSpec(&spec, tr), tr
+	})
+	defer reader.Close()
+
+	for _, tc := range []struct {
+		name  string
+		newFn func(ctx context.Context, alloc *memory.Allocator) flux.TableIterator
+	}{
+		{
+			name: "ReadFilter",
+			newFn: func(ctx context.Context, alloc *memory.Allocator) flux.TableIterator {
+				ti, err := reader.ReadFilter(context.Background(), query.ReadFilterSpec{
+					OrganizationID: reader.Org,
+					BucketID:       reader.Bucket,
+					Bounds:         reader.Bounds,
+				}, alloc)
+				if err != nil {
+					t.Fatal(err)
+				}
+				return ti
+			},
+		},
+	} {
+		t.Run(tc.name, func(t *testing.T) {
+			executetest.RunTableTests(t, executetest.TableTest{
+				NewFn: tc.newFn,
+				IsDone: func(table flux.Table) bool {
+					return table.(interface {
+						IsDone() bool
+					}).IsDone()
+				},
+			})
+		})
+	}
+}
+
 func TestStorageReader_ReadWindowAggregate(t *testing.T) {
 	reader := NewStorageReader(t, func(org, bucket influxdb.ID) (gen.SeriesGenerator, gen.TimeRange) {
 		tagsSpec := &gen.TagsSpec{