diff --git a/mock/storage_reads.go b/mock/storage_reads.go new file mode 100644 index 0000000000..acfef80ef1 --- /dev/null +++ b/mock/storage_reads.go @@ -0,0 +1,187 @@ +package mock + +import ( + "github.com/influxdata/platform/models" + "github.com/influxdata/platform/storage/reads" + "github.com/influxdata/platform/storage/reads/datatypes" + "github.com/influxdata/platform/tsdb/cursors" + "google.golang.org/grpc/metadata" +) + +type ResponseStream struct { + SendFunc func(*datatypes.ReadResponse) error + SetTrailerFunc func(metadata.MD) +} + +func NewResponseStream() *ResponseStream { + return &ResponseStream{ + SendFunc: func(*datatypes.ReadResponse) error { return nil }, + SetTrailerFunc: func(mds metadata.MD) {}, + } +} + +func (s *ResponseStream) Send(r *datatypes.ReadResponse) error { + return s.SendFunc(r) +} + +func (s *ResponseStream) SetTrailer(m metadata.MD) { + s.SetTrailerFunc(m) +} + +type ResultSet struct { + NextFunc func() bool + CursorFunc func() cursors.Cursor + TagsFunc func() models.Tags + CloseFunc func() + ErrFunc func() error + StatsFunc func() cursors.CursorStats +} + +func NewResultSet() *ResultSet { + return &ResultSet{ + NextFunc: func() bool { return false }, + CursorFunc: func() cursors.Cursor { return nil }, + TagsFunc: func() models.Tags { return nil }, + CloseFunc: func() {}, + ErrFunc: func() error { return nil }, + StatsFunc: func() cursors.CursorStats { return cursors.CursorStats{} }, + } +} + +func (rs *ResultSet) Next() bool { + return rs.NextFunc() +} + +func (rs *ResultSet) Cursor() cursors.Cursor { + return rs.CursorFunc() +} + +func (rs *ResultSet) Tags() models.Tags { + return rs.TagsFunc() +} + +func (rs *ResultSet) Close() { + rs.CloseFunc() +} + +func (rs *ResultSet) Err() error { + return rs.ErrFunc() +} + +func (rs *ResultSet) Stats() cursors.CursorStats { + return rs.StatsFunc() +} + +type GroupResultSet struct { + NextFunc func() reads.GroupCursor + CloseFunc func() + ErrFunc func() error +} + +func NewGroupResultSet() *GroupResultSet { + return &GroupResultSet{ + NextFunc: func() reads.GroupCursor { return nil }, + CloseFunc: func() {}, + ErrFunc: func() error { return nil }, + } +} + +func (rs *GroupResultSet) Next() reads.GroupCursor { + return rs.NextFunc() +} + +func (rs *GroupResultSet) Close() { + rs.CloseFunc() +} + +func (rs *GroupResultSet) Err() error { + return rs.ErrFunc() +} + +type IntegerArrayCursor struct { + CloseFunc func() + Errfunc func() error + StatsFunc func() cursors.CursorStats + NextFunc func() *cursors.IntegerArray +} + +func NewIntegerArrayCursor() *IntegerArrayCursor { + return &IntegerArrayCursor{ + CloseFunc: func() {}, + Errfunc: func() error { return nil }, + StatsFunc: func() cursors.CursorStats { return cursors.CursorStats{} }, + NextFunc: func() *cursors.IntegerArray { return &cursors.IntegerArray{} }, + } +} + +func (c *IntegerArrayCursor) Close() { + c.CloseFunc() +} + +func (c *IntegerArrayCursor) Err() error { + return c.Errfunc() +} + +func (c *IntegerArrayCursor) Stats() cursors.CursorStats { + return c.StatsFunc() +} + +func (c *IntegerArrayCursor) Next() *cursors.IntegerArray { + return c.NextFunc() +} + +type GroupCursor struct { + NextFunc func() bool + CursorFunc func() cursors.Cursor + TagsFunc func() models.Tags + KeysFunc func() [][]byte + PartitionKeyValsFunc func() [][]byte + CloseFunc func() + ErrFunc func() error + StatsFunc func() cursors.CursorStats +} + +func NewGroupCursor() *GroupCursor { + return &GroupCursor{ + NextFunc: func() bool { return false }, + CursorFunc: func() cursors.Cursor { return nil }, + TagsFunc: func() models.Tags { return nil }, + KeysFunc: func() [][]byte { return nil }, + PartitionKeyValsFunc: func() [][]byte { return nil }, + CloseFunc: func() {}, + ErrFunc: func() error { return nil }, + StatsFunc: func() cursors.CursorStats { return cursors.CursorStats{} }, + } +} + +func (c *GroupCursor) Next() bool { + return c.NextFunc() +} + +func (c *GroupCursor) Cursor() cursors.Cursor { + return c.CursorFunc() +} + +func (c *GroupCursor) Tags() models.Tags { + return c.TagsFunc() +} + +func (c *GroupCursor) Keys() [][]byte { + return c.KeysFunc() +} + +func (c *GroupCursor) PartitionKeyVals() [][]byte { + return c.PartitionKeyValsFunc() +} + +func (c *GroupCursor) Close() { + c.CloseFunc() +} + +func (c *GroupCursor) Err() error { + return c.ErrFunc() +} + +func (c *GroupCursor) Stats() cursors.CursorStats { + return c.StatsFunc() +} diff --git a/storage/reads/response_writer.go b/storage/reads/response_writer.go index b4066262ae..061163f0d4 100644 --- a/storage/reads/response_writer.go +++ b/storage/reads/response_writer.go @@ -2,6 +2,7 @@ package reads import ( "fmt" + "google.golang.org/grpc/metadata" "github.com/influxdata/platform/models" "github.com/influxdata/platform/storage/reads/datatypes" @@ -10,6 +11,9 @@ import ( type ResponseStream interface { Send(*datatypes.ReadResponse) error + // SetTrailer sets the trailer metadata which will be sent with the RPC status. + // When called more than once, all the provided metadata will be merged. + SetTrailer(metadata.MD) } const ( @@ -73,10 +77,16 @@ func (w *ResponseWriter) WriteResultSet(rs ResultSet) error { } } + stats := rs.Stats() + w.stream.SetTrailer(metadata.Pairs( + "scanned-bytes", fmt.Sprint(stats.ScannedBytes), + "scanned-values", fmt.Sprint(stats.ScannedValues))) + return nil } func (w *ResponseWriter) WriteGroupResultSet(rs GroupResultSet) error { + stats := cursors.CursorStats{} gc := rs.Next() for gc != nil { w.startGroup(gc.Keys(), gc.PartitionKeyVals()) @@ -93,11 +103,16 @@ func (w *ResponseWriter) WriteGroupResultSet(rs GroupResultSet) error { gc.Close() return w.err } + stats.Add(gc.Stats()) } gc.Close() gc = rs.Next() } + w.stream.SetTrailer(metadata.Pairs( + "scanned-bytes", fmt.Sprint(stats.ScannedBytes), + "scanned-values", fmt.Sprint(stats.ScannedValues))) + return nil } diff --git a/storage/reads/response_writer_test.go b/storage/reads/response_writer_test.go new file mode 100644 index 0000000000..3b287c5b77 --- /dev/null +++ b/storage/reads/response_writer_test.go @@ -0,0 +1,124 @@ +package reads_test + +import ( + "fmt" + "github.com/influxdata/platform/mock" + "github.com/influxdata/platform/storage/reads" + "github.com/influxdata/platform/tsdb/cursors" + "google.golang.org/grpc/metadata" + "reflect" + "testing" +) + +func TestResponseWriter_WriteResultSet_Stats(t *testing.T) { + scannedValues := 37 + scannedBytes := 41 + + var gotTrailer metadata.MD = nil + + stream := mock.NewResponseStream() + stream.SetTrailerFunc = func(trailer metadata.MD) { + if gotTrailer != nil { + t.Error("trailer expected to be set once, but SetTrailer was called more than once") + } else { + gotTrailer = trailer + } + } + + rs := mock.NewResultSet() + rs.StatsFunc = func() cursors.CursorStats { + return cursors.CursorStats{ + ScannedValues: scannedValues, + ScannedBytes: scannedBytes, + } + } + nextHasBeenCalledOnce := false + rs.NextFunc = func() bool { // Returns true exactly once + if !nextHasBeenCalledOnce { + nextHasBeenCalledOnce = true + return true + } + return false + } + cursorHasBeenCalledOnce := false + rs.CursorFunc = func() cursors.Cursor { + if !cursorHasBeenCalledOnce { + cursorHasBeenCalledOnce = true + return mock.NewIntegerArrayCursor() + } + return nil + } + + // This is what we're testing. + rw := reads.NewResponseWriter(stream, 0) + err := rw.WriteResultSet(rs) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(gotTrailer.Get("scanned-values"), []string{fmt.Sprint(scannedValues)}) { + t.Errorf("expected scanned-values '%v' but got '%v'", []string{fmt.Sprint(scannedValues)}, gotTrailer.Get("scanned-values")) + } + if !reflect.DeepEqual(gotTrailer.Get("scanned-bytes"), []string{fmt.Sprint(scannedBytes)}) { + t.Errorf("expected scanned-bytes '%v' but got '%v'", []string{fmt.Sprint(scannedBytes)}, gotTrailer.Get("scanned-bytes")) + } +} + +func TestResponseWriter_WriteGroupResultSet_Stats(t *testing.T) { + scannedValues := 37 + scannedBytes := 41 + + var gotTrailer metadata.MD = nil + + stream := mock.NewResponseStream() + stream.SetTrailerFunc = func(trailer metadata.MD) { + if gotTrailer != nil { + t.Error("trailer expected to be set once, but SetTrailer was called more than once") + } else { + gotTrailer = trailer + } + } + + gc := mock.NewGroupCursor() + gc.StatsFunc = func() cursors.CursorStats { + return cursors.CursorStats{ + ScannedValues: scannedValues, + ScannedBytes: scannedBytes, + } + } + cNextHasBeenCalledOnce := false + gc.NextFunc = func() bool { + if !cNextHasBeenCalledOnce { + cNextHasBeenCalledOnce = true + return true + } + return false + } + gc.CursorFunc = func() cursors.Cursor { + return mock.NewIntegerArrayCursor() + } + + rs := mock.NewGroupResultSet() + rsNextHasBeenCalledOnce := false + rs.NextFunc = func() reads.GroupCursor { + if !rsNextHasBeenCalledOnce { + rsNextHasBeenCalledOnce = true + return gc + } + return nil + } + + // This is what we're testing. + rw := reads.NewResponseWriter(stream, 0) + err := rw.WriteGroupResultSet(rs) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(gotTrailer.Get("scanned-values"), []string{fmt.Sprint(scannedValues)}) { + t.Errorf("expected scanned-values '%v' but got '%v'", []string{fmt.Sprint(scannedValues)}, gotTrailer.Get("scanned-values")) + } + if !reflect.DeepEqual(gotTrailer.Get("scanned-bytes"), []string{fmt.Sprint(scannedBytes)}) { + t.Errorf("expected scanned-bytes '%v' but got '%v'", []string{fmt.Sprint(scannedBytes)}, gotTrailer.Get("scanned-bytes")) + } +}