diff --git a/cmd/influxd/main_test.go b/cmd/influxd/main_test.go index f3e5e04e30..8c2dac9c2b 100644 --- a/cmd/influxd/main_test.go +++ b/cmd/influxd/main_test.go @@ -47,13 +47,13 @@ func TestMain_Setup(t *testing.T) { } } -func TestMain_Write(t *testing.T) { +func TestMain_WriteAndQuery(t *testing.T) { m := RunMainOrFail(t, ctx) m.SetupOrFail(t) defer m.ShutdownOrFail(t, ctx) // Execute single write against the server. - if resp, err := nethttp.DefaultClient.Do(m.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", m.Org.ID, m.Bucket.ID), `m,k=v f=0i 946684800000000000`)); err != nil { + if resp, err := nethttp.DefaultClient.Do(m.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", m.Org.ID, m.Bucket.ID), `m,k=v f=100i 946684800000000000`)); err != nil { t.Fatal(err) } else if err := resp.Body.Close(); err != nil { t.Fatal(err) @@ -64,7 +64,7 @@ func TestMain_Write(t *testing.T) { // Query server to ensure write persists. qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z)` exp := `,result,table,_start,_stop,_time,_value,_field,_measurement,k` + "\r\n" + - `,result,table,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,0,f,m,v` + "\r\n\r\n" + `,result,table,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,100,f,m,v` + "\r\n\r\n" var buf bytes.Buffer req := (http.QueryRequest{Query: qs, Org: m.Org}).WithDefaults() @@ -178,6 +178,7 @@ func (m *Main) MustNewHTTPRequest(method, rawurl, body string) *nethttp.Request if err != nil { panic(err) } + req.Header.Set("Authorization", "Token "+m.Auth.Token) return req } diff --git a/go.mod b/go.mod index 1bc73ea82d..f7a1b5232a 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/hashicorp/raft v1.0.0 // indirect github.com/imdario/mergo v0.3.6 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect - github.com/influxdata/flux v0.7.1-0.20181121233512-f499ec2a251b + github.com/influxdata/flux v0.7.1-0.20181121233512-adb2411fee9c github.com/influxdata/influxdb v0.0.0-20181017211453-9520b8d95606 github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368 @@ -87,7 +87,6 @@ require ( golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f golang.org/x/sys v0.0.0-20181023152157-44b849a8bc13 golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 - golang.org/x/tools v0.0.0-20181023010539-40a48ad93fbe // indirect google.golang.org/api v0.0.0-20181021000519-a2651947f503 google.golang.org/genproto v0.0.0-20181016170114-94acd270e44e // indirect google.golang.org/grpc v1.15.0 diff --git a/go.sum b/go.sum index 6f9d8518c4..a6203c7457 100644 --- a/go.sum +++ b/go.sum @@ -75,24 +75,6 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/gonum/blas v0.0.0-20180125090452-e7c5890b24cf h1:ukIp7SJ4RNEkyqdn8EZDzUTOsqWUbHnwPGU3d8pc7ok= -github.com/gonum/blas v0.0.0-20180125090452-e7c5890b24cf/go.mod h1:P32wAyui1PQ58Oce/KYkOqQv8cVw1zAapXOl+dRFGbc= -github.com/gonum/diff v0.0.0-20180125090814-f0137a19aa16 h1:W+m3s4RfTnJqXW1lmuPTG8hhB/MDzmkYuUpU0tLCQAM= -github.com/gonum/diff v0.0.0-20180125090814-f0137a19aa16/go.mod h1:22dM4PLscQl+Nzf64qNBurVJvfyvZELT0iRW2l/NN70= -github.com/gonum/floats v0.0.0-20180125090339-7de1f4ea7ab5 h1:YEwYZI2QOW/49JC7hb5X5irk1J4BJc6Q37OnahdSuek= -github.com/gonum/floats v0.0.0-20180125090339-7de1f4ea7ab5/go.mod h1:PxC8OnwL11+aosOB5+iEPoV3picfs8tUpkVd0pDo+Kg= -github.com/gonum/integrate v0.0.0-20180125090255-09c2f478329f h1:z7F5ExBRmE375gIulqCx1CwAx9SRLRmDpqiizqRw3vs= -github.com/gonum/integrate v0.0.0-20180125090255-09c2f478329f/go.mod h1:pDgmNM6seYpwvPos3q+zxlXMsbve6mOIPucUnUOrI7Y= -github.com/gonum/internal v0.0.0-20180125090855-fda53f8d2571 h1:7ARdo5TcdMorNtejbaqYDEcsqOdPf3iAoUIeoMhvu7E= -github.com/gonum/internal v0.0.0-20180125090855-fda53f8d2571/go.mod h1:Pu4dmpkhSyOzRwuXkOgAvijx4o+4YMUJJo9OvPYMkks= -github.com/gonum/lapack v0.0.0-20180125091020-f0b8b25edece h1:dXIQoswVYAQd8m2jqYdxKn2lCkLlcZfSyu66k6akgis= -github.com/gonum/lapack v0.0.0-20180125091020-f0b8b25edece/go.mod h1:XA3DeT6rxh2EAE789SSiSJNqxPaC0aE9J8NTOI0Jo/A= -github.com/gonum/mathext v0.0.0-20180126232648-3ffefb3e36fc h1:1SNv0SjYNLALMvX6kOidzX0lXkx8li3u9nHq7CYgFCI= -github.com/gonum/mathext v0.0.0-20180126232648-3ffefb3e36fc/go.mod h1:fmo8aiSEWkJeiGXUJf+sPvuDgEFgqIoZSs843ePKrGg= -github.com/gonum/matrix v0.0.0-20180124231301-a41cc49d4c29 h1:Aj+poYy0aVF2abLrHVN2aAxynAGg2AO8VtIJKSnmxMA= -github.com/gonum/matrix v0.0.0-20180124231301-a41cc49d4c29/go.mod h1:0EXg4mc1CNP0HCqCz+K4ts155PXIlUywf0wqN+GfPZw= -github.com/gonum/stat v0.0.0-20180125090729-ec9c8a1062f4 h1:ljlDrxv0Wij8s9+WEYGswFmz/SEg75X832pYRsYA56Y= -github.com/gonum/stat v0.0.0-20180125090729-ec9c8a1062f4/go.mod h1:Z4GIJBJO3Wa4gD4vbwQxXXZ+WHmW6E9ixmNrwvs0iZs= github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4rEjNlfyDHW9dolSY= @@ -120,8 +102,8 @@ github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/influxdata/flux v0.7.1-0.20181121233512-f499ec2a251b h1:AWzGtLkrs/4EaYMPsZ6k8OLEeSOegW585e2CwFtXorA= -github.com/influxdata/flux v0.7.1-0.20181121233512-f499ec2a251b/go.mod h1:/NSM1WBmKpRAf1LYn1+raEO4TlYI9ZhIFd4UP03XWsQ= +github.com/influxdata/flux v0.7.1-0.20181121233512-adb2411fee9c h1:HOMyy8qF0X6tWHX/KDJOSYqDbCevcisWyRXCaX5OEaM= +github.com/influxdata/flux v0.7.1-0.20181121233512-adb2411fee9c/go.mod h1:MIjvKpiQLRad9/ilY4jYwpIpMAhiOycJfK9YctCUGUM= github.com/influxdata/goreleaser v0.86.2-0.20181010170531-0fd209ba67f5/go.mod h1:aVuBpDAT5VtjtUxzvBt8HOd0buzvvk7OX3H2iaviixg= github.com/influxdata/influxdb v0.0.0-20181017211453-9520b8d95606 h1:LBPg9NDkmVEGr22IrYPNsESFPjUnSgAbIgClv65dMIg= github.com/influxdata/influxdb v0.0.0-20181017211453-9520b8d95606/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY= @@ -129,8 +111,8 @@ github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/q github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo= github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e h1:/o3vQtpWJhvnIbXley4/jwzzqNeigJK9z+LZcJZ9zfM= github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE= -github.com/influxdata/tdigest v0.0.0-20180711151920-a7d76c6f093a h1:vMqgISSVkIqWxCIZs8m1L4096temR7IbYyNdMiBxSPA= -github.com/influxdata/tdigest v0.0.0-20180711151920-a7d76c6f093a/go.mod h1:9GkyshztGufsdPQWjH+ifgnIr3xNUL5syI70g2dzU1o= +github.com/influxdata/tdigest v0.0.0-20181121200506-bf2b5ad3c0a9 h1:MHTrDWmQpHq/hkq+7cw9oYAt2PqUw52TZazRA0N7PGE= +github.com/influxdata/tdigest v0.0.0-20181121200506-bf2b5ad3c0a9/go.mod h1:Js0mqiSBE6Ffsg94weZZ2c+v/ciT8QRHFOap7EKDrR0= github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368 h1:+TUUmaFa4YD1Q+7bH9o5NCHQGPMqZCYJiNW6lIIS9z4= github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368/go.mod h1:Wbbw6tYNvwa5dlB6304Sd+82Z3f7PmVZHVKU637d4po= github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA= @@ -275,6 +257,9 @@ go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181015023909-0c41d7ab0a0e h1:IzypfodbhbnViNUO/MEh0FzCUooG97cIGfdggUrUSyU= golang.org/x/crypto v0.0.0-20181015023909-0c41d7ab0a0e/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20181112044915-a3060d491354 h1:6UAgZ8309zQ9+1iWkHzfszFguqzOdHGyGkd1HmhJ+UE= +golang.org/x/exp v0.0.0-20181112044915-a3060d491354/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -298,11 +283,14 @@ golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20181012181339-19e2aca3fdf9 h1:3sQpWsmaX/260ENaYpHTEljOMDVUlW9WHBGg9wGAXJk= -golang.org/x/tools v0.0.0-20181012181339-19e2aca3fdf9/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20181023010539-40a48ad93fbe h1:i8YNi6USHuTcWHQPvNjvHY7JmkAmn1MnN/ISnPD/ZHc= -golang.org/x/tools v0.0.0-20181023010539-40a48ad93fbe/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20181121193951-91f80e683c10 h1:6aZMfwu0xab6imbp0uu++D3WXR+p0+RDYOqqb0uY8KU= +golang.org/x/tools v0.0.0-20181121193951-91f80e683c10/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca h1:PupagGYwj8+I4ubCxcmcBRk3VlUWtTg5huQpZR9flmE= +gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= +gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6 h1:4WsZyVtkthqrHTbDCJfiTs8IWNYE4uvsSDgaV6xpp+o= +gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= google.golang.org/api v0.0.0-20181021000519-a2651947f503 h1:UK7/bFlIoP9xre0fwSiXFaZZSpzmaen5MKp1sppNJ9U= google.golang.org/api v0.0.0-20181021000519-a2651947f503/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= diff --git a/http/query_service.go b/http/query_service.go index 1442f69ac6..198c1af899 100644 --- a/http/query_service.go +++ b/http/query_service.go @@ -17,7 +17,7 @@ import ( const ( queryPath = "/api/v2/querysvc" - statsTrailer = "Influx-Query-Statistics" + queryStatisticsTrailer = "Influx-Query-Statistics" ) type QueryHandler struct { @@ -73,7 +73,7 @@ func (h *QueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) { // Setup headers stats, hasStats := results.(flux.Statisticser) if hasStats { - w.Header().Set("Trailer", statsTrailer) + w.Header().Set("Trailer", queryStatisticsTrailer) } // NOTE: We do not write out the headers here. @@ -109,7 +109,7 @@ func (h *QueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) { return } // Write statisitcs trailer - w.Header().Set(statsTrailer, string(data)) + w.Header().Set(queryStatisticsTrailer, string(data)) } } @@ -229,7 +229,7 @@ func (s *statsResultIterator) Statistics() flux.Statistics { // readStats reads the query statisitcs off the response trailers. func (s *statsResultIterator) readStats() { - data := s.resp.Trailer.Get(statsTrailer) + data := s.resp.Trailer.Get(queryStatisticsTrailer) if data != "" { s.err = json.Unmarshal([]byte(data), &s.statisitcs) } diff --git a/query/bridges.go b/query/bridges.go index 9e65cb8b82..3ef6bac3ea 100644 --- a/query/bridges.go +++ b/query/bridges.go @@ -2,7 +2,9 @@ package query import ( "context" + "encoding/json" "io" + "net/http" "github.com/influxdata/flux" "github.com/influxdata/platform" @@ -32,8 +34,29 @@ func (b ProxyQueryServiceBridge) Query(ctx context.Context, w io.Writer, req *Pr return 0, err } defer results.Release() + + // Setup headers + stats, hasStats := results.(flux.Statisticser) + if hasStats { + if w, ok := w.(http.ResponseWriter); ok { + w.Header().Set("Trailer", "Influx-Query-Statistics") + } + } + encoder := req.Dialect.Encoder() - return encoder.Encode(w, results) + n, err := encoder.Encode(w, results) + if err != nil { + return n, err + } + + if w, ok := w.(http.ResponseWriter); ok { + if hasStats { + data, _ := json.Marshal(stats.Statistics()) + w.Header().Set("Influx-Query-Statistics", string(data)) + } + } + + return n, nil } // REPLQuerier implements the repl.Querier interface while consuming a QueryService diff --git a/query/functions/inputs/storage/storage.go b/query/functions/inputs/storage/storage.go index a7c3480b9b..9d89b63235 100644 --- a/query/functions/inputs/storage/storage.go +++ b/query/functions/inputs/storage/storage.go @@ -75,6 +75,8 @@ type source struct { currentTime execute.Time overflow bool + + stats flux.Statistics } func NewSource(id execute.DatasetID, r Reader, readSpec ReadSpec, bounds execute.Bounds, w execute.Window, currentTime execute.Time) execute.Source { @@ -118,6 +120,7 @@ func (s *source) run(ctx context.Context) error { if err != nil { return err } + s.stats = s.stats.Add(tables.Statistics()) for _, t := range s.ts { if err := t.UpdateWatermark(s.id, mark); err != nil { return err @@ -160,6 +163,10 @@ func (s *source) next(ctx context.Context) (flux.TableIterator, execute.Time, bo return bi, stop, true } +func (s *source) Statistics() flux.Statistics { + return s.stats +} + type GroupMode int const ( diff --git a/query/influxql/response_iterator.go b/query/influxql/response_iterator.go index 3af89ba42d..040b590ee1 100644 --- a/query/influxql/response_iterator.go +++ b/query/influxql/response_iterator.go @@ -17,7 +17,7 @@ type responseIterator struct { resultIdx int } -// NewresponseIterator constructs a responseIterator from a flux.ResultIterator. +// NewResponseIterator constructs a flux.ResultIterator from a Response. func NewResponseIterator(r *Response) flux.ResultIterator { return &responseIterator{ response: r, diff --git a/storage/reads/array_cursor.gen.go b/storage/reads/array_cursor.gen.go index 181f2bd848..d587d911d8 100644 --- a/storage/reads/array_cursor.gen.go +++ b/storage/reads/array_cursor.gen.go @@ -44,6 +44,8 @@ func (c *floatArrayFilterCursor) reset(cur cursors.FloatArrayCursor) { c.tmp.Timestamps, c.tmp.Values = nil, nil } +func (c *floatArrayFilterCursor) Stats() cursors.CursorStats { return c.FloatArrayCursor.Stats() } + func (c *floatArrayFilterCursor) Next() *cursors.FloatArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] @@ -106,6 +108,10 @@ func (c *floatMultiShardArrayCursor) reset(cur cursors.FloatArrayCursor, itrs cu func (c *floatMultiShardArrayCursor) Err() error { return c.err } +func (c *floatMultiShardArrayCursor) Stats() cursors.CursorStats { + return c.FloatArrayCursor.Stats() +} + func (c *floatMultiShardArrayCursor) Next() *cursors.FloatArray { for { a := c.FloatArrayCursor.Next() @@ -177,6 +183,8 @@ func newFloatArraySumCursor(cur cursors.FloatArrayCursor) *floatArraySumCursor { } } +func (c floatArraySumCursor) Stats() cursors.CursorStats { return c.FloatArrayCursor.Stats() } + func (c floatArraySumCursor) Next() *cursors.FloatArray { a := c.FloatArrayCursor.Next() if len(a.Timestamps) == 0 { @@ -205,6 +213,10 @@ type integerFloatCountArrayCursor struct { cursors.FloatArrayCursor } +func (c *integerFloatCountArrayCursor) Stats() cursors.CursorStats { + return c.FloatArrayCursor.Stats() +} + func (c *integerFloatCountArrayCursor) Next() *cursors.IntegerArray { a := c.FloatArrayCursor.Next() if len(a.Timestamps) == 0 { @@ -231,9 +243,10 @@ type floatEmptyArrayCursor struct { var FloatEmptyArrayCursor cursors.FloatArrayCursor = &floatEmptyArrayCursor{} -func (c *floatEmptyArrayCursor) Err() error { return nil } -func (c *floatEmptyArrayCursor) Close() {} -func (c *floatEmptyArrayCursor) Next() *cursors.FloatArray { return &c.res } +func (c *floatEmptyArrayCursor) Err() error { return nil } +func (c *floatEmptyArrayCursor) Close() {} +func (c *floatEmptyArrayCursor) Stats() cursors.CursorStats { return cursors.CursorStats{} } +func (c *floatEmptyArrayCursor) Next() *cursors.FloatArray { return &c.res } // ******************** // Integer Array Cursor @@ -260,6 +273,8 @@ func (c *integerArrayFilterCursor) reset(cur cursors.IntegerArrayCursor) { c.tmp.Timestamps, c.tmp.Values = nil, nil } +func (c *integerArrayFilterCursor) Stats() cursors.CursorStats { return c.IntegerArrayCursor.Stats() } + func (c *integerArrayFilterCursor) Next() *cursors.IntegerArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] @@ -322,6 +337,10 @@ func (c *integerMultiShardArrayCursor) reset(cur cursors.IntegerArrayCursor, itr func (c *integerMultiShardArrayCursor) Err() error { return c.err } +func (c *integerMultiShardArrayCursor) Stats() cursors.CursorStats { + return c.IntegerArrayCursor.Stats() +} + func (c *integerMultiShardArrayCursor) Next() *cursors.IntegerArray { for { a := c.IntegerArrayCursor.Next() @@ -393,6 +412,8 @@ func newIntegerArraySumCursor(cur cursors.IntegerArrayCursor) *integerArraySumCu } } +func (c integerArraySumCursor) Stats() cursors.CursorStats { return c.IntegerArrayCursor.Stats() } + func (c integerArraySumCursor) Next() *cursors.IntegerArray { a := c.IntegerArrayCursor.Next() if len(a.Timestamps) == 0 { @@ -421,6 +442,10 @@ type integerIntegerCountArrayCursor struct { cursors.IntegerArrayCursor } +func (c *integerIntegerCountArrayCursor) Stats() cursors.CursorStats { + return c.IntegerArrayCursor.Stats() +} + func (c *integerIntegerCountArrayCursor) Next() *cursors.IntegerArray { a := c.IntegerArrayCursor.Next() if len(a.Timestamps) == 0 { @@ -449,6 +474,7 @@ var IntegerEmptyArrayCursor cursors.IntegerArrayCursor = &integerEmptyArrayCurso func (c *integerEmptyArrayCursor) Err() error { return nil } func (c *integerEmptyArrayCursor) Close() {} +func (c *integerEmptyArrayCursor) Stats() cursors.CursorStats { return cursors.CursorStats{} } func (c *integerEmptyArrayCursor) Next() *cursors.IntegerArray { return &c.res } // ******************** @@ -476,6 +502,8 @@ func (c *unsignedArrayFilterCursor) reset(cur cursors.UnsignedArrayCursor) { c.tmp.Timestamps, c.tmp.Values = nil, nil } +func (c *unsignedArrayFilterCursor) Stats() cursors.CursorStats { return c.UnsignedArrayCursor.Stats() } + func (c *unsignedArrayFilterCursor) Next() *cursors.UnsignedArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] @@ -538,6 +566,10 @@ func (c *unsignedMultiShardArrayCursor) reset(cur cursors.UnsignedArrayCursor, i func (c *unsignedMultiShardArrayCursor) Err() error { return c.err } +func (c *unsignedMultiShardArrayCursor) Stats() cursors.CursorStats { + return c.UnsignedArrayCursor.Stats() +} + func (c *unsignedMultiShardArrayCursor) Next() *cursors.UnsignedArray { for { a := c.UnsignedArrayCursor.Next() @@ -609,6 +641,8 @@ func newUnsignedArraySumCursor(cur cursors.UnsignedArrayCursor) *unsignedArraySu } } +func (c unsignedArraySumCursor) Stats() cursors.CursorStats { return c.UnsignedArrayCursor.Stats() } + func (c unsignedArraySumCursor) Next() *cursors.UnsignedArray { a := c.UnsignedArrayCursor.Next() if len(a.Timestamps) == 0 { @@ -637,6 +671,10 @@ type integerUnsignedCountArrayCursor struct { cursors.UnsignedArrayCursor } +func (c *integerUnsignedCountArrayCursor) Stats() cursors.CursorStats { + return c.UnsignedArrayCursor.Stats() +} + func (c *integerUnsignedCountArrayCursor) Next() *cursors.IntegerArray { a := c.UnsignedArrayCursor.Next() if len(a.Timestamps) == 0 { @@ -665,6 +703,7 @@ var UnsignedEmptyArrayCursor cursors.UnsignedArrayCursor = &unsignedEmptyArrayCu func (c *unsignedEmptyArrayCursor) Err() error { return nil } func (c *unsignedEmptyArrayCursor) Close() {} +func (c *unsignedEmptyArrayCursor) Stats() cursors.CursorStats { return cursors.CursorStats{} } func (c *unsignedEmptyArrayCursor) Next() *cursors.UnsignedArray { return &c.res } // ******************** @@ -692,6 +731,8 @@ func (c *stringArrayFilterCursor) reset(cur cursors.StringArrayCursor) { c.tmp.Timestamps, c.tmp.Values = nil, nil } +func (c *stringArrayFilterCursor) Stats() cursors.CursorStats { return c.StringArrayCursor.Stats() } + func (c *stringArrayFilterCursor) Next() *cursors.StringArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] @@ -754,6 +795,10 @@ func (c *stringMultiShardArrayCursor) reset(cur cursors.StringArrayCursor, itrs func (c *stringMultiShardArrayCursor) Err() error { return c.err } +func (c *stringMultiShardArrayCursor) Stats() cursors.CursorStats { + return c.StringArrayCursor.Stats() +} + func (c *stringMultiShardArrayCursor) Next() *cursors.StringArray { for { a := c.StringArrayCursor.Next() @@ -815,6 +860,10 @@ type integerStringCountArrayCursor struct { cursors.StringArrayCursor } +func (c *integerStringCountArrayCursor) Stats() cursors.CursorStats { + return c.StringArrayCursor.Stats() +} + func (c *integerStringCountArrayCursor) Next() *cursors.IntegerArray { a := c.StringArrayCursor.Next() if len(a.Timestamps) == 0 { @@ -843,6 +892,7 @@ var StringEmptyArrayCursor cursors.StringArrayCursor = &stringEmptyArrayCursor{} func (c *stringEmptyArrayCursor) Err() error { return nil } func (c *stringEmptyArrayCursor) Close() {} +func (c *stringEmptyArrayCursor) Stats() cursors.CursorStats { return cursors.CursorStats{} } func (c *stringEmptyArrayCursor) Next() *cursors.StringArray { return &c.res } // ******************** @@ -870,6 +920,8 @@ func (c *booleanArrayFilterCursor) reset(cur cursors.BooleanArrayCursor) { c.tmp.Timestamps, c.tmp.Values = nil, nil } +func (c *booleanArrayFilterCursor) Stats() cursors.CursorStats { return c.BooleanArrayCursor.Stats() } + func (c *booleanArrayFilterCursor) Next() *cursors.BooleanArray { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] @@ -932,6 +984,10 @@ func (c *booleanMultiShardArrayCursor) reset(cur cursors.BooleanArrayCursor, itr func (c *booleanMultiShardArrayCursor) Err() error { return c.err } +func (c *booleanMultiShardArrayCursor) Stats() cursors.CursorStats { + return c.BooleanArrayCursor.Stats() +} + func (c *booleanMultiShardArrayCursor) Next() *cursors.BooleanArray { for { a := c.BooleanArrayCursor.Next() @@ -993,6 +1049,10 @@ type integerBooleanCountArrayCursor struct { cursors.BooleanArrayCursor } +func (c *integerBooleanCountArrayCursor) Stats() cursors.CursorStats { + return c.BooleanArrayCursor.Stats() +} + func (c *integerBooleanCountArrayCursor) Next() *cursors.IntegerArray { a := c.BooleanArrayCursor.Next() if len(a.Timestamps) == 0 { @@ -1021,4 +1081,5 @@ var BooleanEmptyArrayCursor cursors.BooleanArrayCursor = &booleanEmptyArrayCurso func (c *booleanEmptyArrayCursor) Err() error { return nil } func (c *booleanEmptyArrayCursor) Close() {} +func (c *booleanEmptyArrayCursor) Stats() cursors.CursorStats { return cursors.CursorStats{} } func (c *booleanEmptyArrayCursor) Next() *cursors.BooleanArray { return &c.res } diff --git a/storage/reads/array_cursor.gen.go.tmpl b/storage/reads/array_cursor.gen.go.tmpl index 34edb7efbb..d209fd35dd 100644 --- a/storage/reads/array_cursor.gen.go.tmpl +++ b/storage/reads/array_cursor.gen.go.tmpl @@ -43,6 +43,8 @@ func (c *{{$type}}) reset(cur cursors.{{.Name}}ArrayCursor) { c.tmp.Timestamps, c.tmp.Values = nil, nil } +func (c *{{$type}}) Stats() cursors.CursorStats { return c.{{.Name}}ArrayCursor.Stats() } + func (c *{{$type}}) Next() {{$arrayType}} { pos := 0 c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] @@ -106,6 +108,10 @@ func (c *{{.name}}MultiShardArrayCursor) reset(cur cursors.{{.Name}}ArrayCursor, func (c *{{.name}}MultiShardArrayCursor) Err() error { return c.err } +func (c *{{.name}}MultiShardArrayCursor) Stats() cursors.CursorStats { + return c.{{.Name}}ArrayCursor.Stats() +} + func (c *{{.name}}MultiShardArrayCursor) Next() {{$arrayType}} { for { a := c.{{.Name}}ArrayCursor.Next() @@ -182,6 +188,8 @@ func new{{$Type}}(cur cursors.{{.Name}}ArrayCursor) *{{$type}} { } } +func (c {{$type}}) Stats() cursors.CursorStats { return c.{{.Name}}ArrayCursor.Stats() } + func (c {{$type}}) Next() {{$arrayType}} { a := c.{{.Name}}ArrayCursor.Next() if len(a.Timestamps) == 0 { @@ -212,6 +220,10 @@ type integer{{.Name}}CountArrayCursor struct { cursors.{{.Name}}ArrayCursor } +func (c *integer{{.Name}}CountArrayCursor) Stats() cursors.CursorStats { + return c.{{.Name}}ArrayCursor.Stats() +} + func (c *integer{{.Name}}CountArrayCursor) Next() *cursors.IntegerArray { a := c.{{.Name}}ArrayCursor.Next() if len(a.Timestamps) == 0 { @@ -240,6 +252,7 @@ var {{.Name}}EmptyArrayCursor cursors.{{.Name}}ArrayCursor = &{{.name}}EmptyArra func (c *{{.name}}EmptyArrayCursor) Err() error { return nil } func (c *{{.name}}EmptyArrayCursor) Close() {} +func (c *{{.name}}EmptyArrayCursor) Stats() cursors.CursorStats { return cursors.CursorStats{} } func (c *{{.name}}EmptyArrayCursor) Next() {{$arrayType}} { return &c.res } {{end}} diff --git a/storage/reads/datatypes/storage_common.pb.go b/storage/reads/datatypes/storage_common.pb.go index be2bf71f53..ad9b8ac5e2 100644 --- a/storage/reads/datatypes/storage_common.pb.go +++ b/storage/reads/datatypes/storage_common.pb.go @@ -1345,7 +1345,7 @@ func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) { i += n3 } if len(m.Trace) > 0 { - for k := range m.Trace { + for k, _ := range m.Trace { dAtA[i] = 0x52 i++ v := m.Trace[k] @@ -1884,7 +1884,7 @@ func (m *CapabilitiesResponse) MarshalTo(dAtA []byte) (int, error) { var l int _ = l if len(m.Caps) > 0 { - for k := range m.Caps { + for k, _ := range m.Caps { dAtA[i] = 0xa i++ v := m.Caps[k] diff --git a/storage/reads/group_resultset.go b/storage/reads/group_resultset.go index c4ef8fadaa..7a7ffdd064 100644 --- a/storage/reads/group_resultset.go +++ b/storage/reads/group_resultset.go @@ -310,6 +310,7 @@ func (c *groupNoneCursor) Tags() models.Tags { return c.row.Tags } func (c *groupNoneCursor) Keys() [][]byte { return c.keys } func (c *groupNoneCursor) PartitionKeyVals() [][]byte { return nil } func (c *groupNoneCursor) Close() { c.cur.Close() } +func (c *groupNoneCursor) Stats() cursors.CursorStats { return c.row.Query.Stats() } func (c *groupNoneCursor) Next() bool { row := c.cur.Next() @@ -366,3 +367,11 @@ func (c *groupByCursor) Cursor() cursors.Cursor { } return cur } + +func (c *groupByCursor) Stats() cursors.CursorStats { + var stats cursors.CursorStats + for _, row := range c.rows { + stats.Add(row.Query.Stats()) + } + return stats +} diff --git a/storage/reads/merge.go b/storage/reads/merge.go index 0d4acf8081..3a3f6b870f 100644 --- a/storage/reads/merge.go +++ b/storage/reads/merge.go @@ -11,6 +11,7 @@ type mergedResultSet struct { heap resultSetHeap err error first bool + stats cursors.CursorStats } func NewMergedResultSet(results []ResultSet) ResultSet { @@ -53,6 +54,8 @@ func (r *mergedResultSet) Next() bool { r.Close() } + r.stats.Add(top.Stats()) + return len(r.heap.items) > 0 } @@ -68,6 +71,10 @@ func (r *mergedResultSet) Tags() models.Tags { return r.heap.items[0].Tags() } +func (r *mergedResultSet) Stats() cursors.CursorStats { + return r.stats +} + type resultSetHeap struct { items []ResultSet } diff --git a/storage/reads/reader.go b/storage/reads/reader.go index f4cfeeec1f..bbfe753894 100644 --- a/storage/reads/reader.go +++ b/storage/reads/reader.go @@ -57,8 +57,11 @@ type tableIterator struct { s Store readSpec fstorage.ReadSpec predicate *datatypes.Predicate + stats flux.Statistics } +func (bi *tableIterator) Statistics() flux.Statistics { return bi.stats } + func (bi *tableIterator) Do(f func(flux.Table) error) error { src, err := bi.s.GetSource(bi.readSpec) if err != nil { @@ -189,6 +192,7 @@ READ: } table.Close() + bi.stats = bi.stats.Add(table.Statistics()) table = nil } return rs.Err() diff --git a/storage/reads/resultset.go b/storage/reads/resultset.go index cd0ead91fa..83e3850c23 100644 --- a/storage/reads/resultset.go +++ b/storage/reads/resultset.go @@ -68,3 +68,7 @@ func (r *resultSet) Cursor() cursors.Cursor { func (r *resultSet) Tags() models.Tags { return r.row.Tags } + +// Stats returns the stats for the underlying cursors. +// Available after resultset has been scanned. +func (r *resultSet) Stats() cursors.CursorStats { return r.row.Query.Stats() } diff --git a/storage/reads/store.go b/storage/reads/store.go index 891a47cba8..d8dd47b380 100644 --- a/storage/reads/store.go +++ b/storage/reads/store.go @@ -26,6 +26,8 @@ type ResultSet interface { // Err returns the first error encountered by the ResultSet. Err() error + + Stats() cursors.CursorStats } type GroupResultSet interface { @@ -68,6 +70,8 @@ type GroupCursor interface { // Err returns the first error encountered by the GroupCursor. Err() error + + Stats() cursors.CursorStats } type Store interface { diff --git a/storage/reads/stream_reader.gen.go b/storage/reads/stream_reader.gen.go index 516db4c5ee..370caade54 100644 --- a/storage/reads/stream_reader.gen.go +++ b/storage/reads/stream_reader.gen.go @@ -56,6 +56,8 @@ func (c *floatCursorStreamReader) readFrame() { } } +func (c *floatCursorStreamReader) Stats() cursors.CursorStats { return cursors.CursorStats{} } + type integerCursorStreamReader struct { fr *frameReader a cursors.IntegerArray @@ -99,6 +101,8 @@ func (c *integerCursorStreamReader) readFrame() { } } +func (c *integerCursorStreamReader) Stats() cursors.CursorStats { return cursors.CursorStats{} } + type unsignedCursorStreamReader struct { fr *frameReader a cursors.UnsignedArray @@ -142,6 +146,8 @@ func (c *unsignedCursorStreamReader) readFrame() { } } +func (c *unsignedCursorStreamReader) Stats() cursors.CursorStats { return cursors.CursorStats{} } + type stringCursorStreamReader struct { fr *frameReader a cursors.StringArray @@ -185,6 +191,8 @@ func (c *stringCursorStreamReader) readFrame() { } } +func (c *stringCursorStreamReader) Stats() cursors.CursorStats { return cursors.CursorStats{} } + type booleanCursorStreamReader struct { fr *frameReader a cursors.BooleanArray @@ -227,3 +235,5 @@ func (c *booleanCursorStreamReader) readFrame() { } } } + +func (c *booleanCursorStreamReader) Stats() cursors.CursorStats { return cursors.CursorStats{} } diff --git a/storage/reads/stream_reader.gen.go.tmpl b/storage/reads/stream_reader.gen.go.tmpl index d3ff5788fc..7da4e2d800 100644 --- a/storage/reads/stream_reader.gen.go.tmpl +++ b/storage/reads/stream_reader.gen.go.tmpl @@ -51,4 +51,7 @@ func (c *{{.name}}CursorStreamReader) readFrame() { } } } + +func (c *{{.name}}CursorStreamReader) Stats() cursors.CursorStats { return cursors.CursorStats{} } + {{end}} diff --git a/storage/reads/stream_reader.go b/storage/reads/stream_reader.go index 59a21838cf..5f5a5f6e49 100644 --- a/storage/reads/stream_reader.go +++ b/storage/reads/stream_reader.go @@ -38,9 +38,10 @@ func NewResultSetStreamReader(stream StreamReader) *ResultSetStreamReader { return r } -func (r *ResultSetStreamReader) Err() error { return r.fr.err } -func (r *ResultSetStreamReader) Close() { r.fr.state = stateDone } -func (r *ResultSetStreamReader) Cursor() cursors.Cursor { return r.cur.cursor() } +func (r *ResultSetStreamReader) Err() error { return r.fr.err } +func (r *ResultSetStreamReader) Close() { r.fr.state = stateDone } +func (r *ResultSetStreamReader) Cursor() cursors.Cursor { return r.cur.cursor() } +func (r *ResultSetStreamReader) Stats() cursors.CursorStats { return cursors.CursorStats{} } func (r *ResultSetStreamReader) Next() bool { if r.fr.state == stateReadSeries { @@ -181,6 +182,7 @@ func (gc *groupCursorStreamReader) Tags() models.Tags { return gc.tags func (gc *groupCursorStreamReader) Keys() [][]byte { return gc.tagKeys } func (gc *groupCursorStreamReader) PartitionKeyVals() [][]byte { return gc.partitionKeyVals } func (gc *groupCursorStreamReader) Cursor() cursors.Cursor { return gc.cur.cursor() } +func (gc *groupCursorStreamReader) Stats() cursors.CursorStats { return cursors.CursorStats{} } func (gc *groupCursorStreamReader) Next() bool { if gc.fr.state == stateReadSeries { diff --git a/storage/reads/table.gen.go b/storage/reads/table.gen.go index f019fc6f4b..61ea1e21e7 100644 --- a/storage/reads/table.gen.go +++ b/storage/reads/table.gen.go @@ -55,6 +55,20 @@ func (t *floatTable) Close() { t.mu.Unlock() } +func (t *floatTable) Statistics() flux.Statistics { + t.mu.Lock() + cur := t.cur + t.mu.Unlock() + if cur == nil { + return flux.Statistics{} + } + cs := cur.Stats() + return flux.Statistics{ + ScannedValues: cs.ScannedValues, + ScannedBytes: cs.ScannedBytes, + } +} + func (t *floatTable) Do(f func(flux.ColReader) error) error { t.mu.Lock() defer func() { @@ -223,6 +237,17 @@ func (t *floatGroupTable) advanceCursor() bool { return false } +func (t *floatGroupTable) Statistics() flux.Statistics { + if t.cur == nil { + return flux.Statistics{} + } + cs := t.cur.Stats() + return flux.Statistics{ + ScannedValues: cs.ScannedValues, + ScannedBytes: cs.ScannedBytes, + } +} + // // *********** Integer *********** // @@ -262,6 +287,20 @@ func (t *integerTable) Close() { t.mu.Unlock() } +func (t *integerTable) Statistics() flux.Statistics { + t.mu.Lock() + cur := t.cur + t.mu.Unlock() + if cur == nil { + return flux.Statistics{} + } + cs := cur.Stats() + return flux.Statistics{ + ScannedValues: cs.ScannedValues, + ScannedBytes: cs.ScannedBytes, + } +} + func (t *integerTable) Do(f func(flux.ColReader) error) error { t.mu.Lock() defer func() { @@ -430,6 +469,17 @@ func (t *integerGroupTable) advanceCursor() bool { return false } +func (t *integerGroupTable) Statistics() flux.Statistics { + if t.cur == nil { + return flux.Statistics{} + } + cs := t.cur.Stats() + return flux.Statistics{ + ScannedValues: cs.ScannedValues, + ScannedBytes: cs.ScannedBytes, + } +} + // // *********** Unsigned *********** // @@ -469,6 +519,20 @@ func (t *unsignedTable) Close() { t.mu.Unlock() } +func (t *unsignedTable) Statistics() flux.Statistics { + t.mu.Lock() + cur := t.cur + t.mu.Unlock() + if cur == nil { + return flux.Statistics{} + } + cs := cur.Stats() + return flux.Statistics{ + ScannedValues: cs.ScannedValues, + ScannedBytes: cs.ScannedBytes, + } +} + func (t *unsignedTable) Do(f func(flux.ColReader) error) error { t.mu.Lock() defer func() { @@ -637,6 +701,17 @@ func (t *unsignedGroupTable) advanceCursor() bool { return false } +func (t *unsignedGroupTable) Statistics() flux.Statistics { + if t.cur == nil { + return flux.Statistics{} + } + cs := t.cur.Stats() + return flux.Statistics{ + ScannedValues: cs.ScannedValues, + ScannedBytes: cs.ScannedBytes, + } +} + // // *********** String *********** // @@ -676,6 +751,20 @@ func (t *stringTable) Close() { t.mu.Unlock() } +func (t *stringTable) Statistics() flux.Statistics { + t.mu.Lock() + cur := t.cur + t.mu.Unlock() + if cur == nil { + return flux.Statistics{} + } + cs := cur.Stats() + return flux.Statistics{ + ScannedValues: cs.ScannedValues, + ScannedBytes: cs.ScannedBytes, + } +} + func (t *stringTable) Do(f func(flux.ColReader) error) error { t.mu.Lock() defer func() { @@ -844,6 +933,17 @@ func (t *stringGroupTable) advanceCursor() bool { return false } +func (t *stringGroupTable) Statistics() flux.Statistics { + if t.cur == nil { + return flux.Statistics{} + } + cs := t.cur.Stats() + return flux.Statistics{ + ScannedValues: cs.ScannedValues, + ScannedBytes: cs.ScannedBytes, + } +} + // // *********** Boolean *********** // @@ -883,6 +983,20 @@ func (t *booleanTable) Close() { t.mu.Unlock() } +func (t *booleanTable) Statistics() flux.Statistics { + t.mu.Lock() + cur := t.cur + t.mu.Unlock() + if cur == nil { + return flux.Statistics{} + } + cs := cur.Stats() + return flux.Statistics{ + ScannedValues: cs.ScannedValues, + ScannedBytes: cs.ScannedBytes, + } +} + func (t *booleanTable) Do(f func(flux.ColReader) error) error { t.mu.Lock() defer func() { @@ -1050,3 +1164,14 @@ func (t *booleanGroupTable) advanceCursor() bool { } return false } + +func (t *booleanGroupTable) Statistics() flux.Statistics { + if t.cur == nil { + return flux.Statistics{} + } + cs := t.cur.Stats() + return flux.Statistics{ + ScannedValues: cs.ScannedValues, + ScannedBytes: cs.ScannedBytes, + } +} diff --git a/storage/reads/table.gen.go.tmpl b/storage/reads/table.gen.go.tmpl index 56b3f27188..54aaf1f02e 100644 --- a/storage/reads/table.gen.go.tmpl +++ b/storage/reads/table.gen.go.tmpl @@ -49,6 +49,20 @@ func (t *{{.name}}Table) Close() { t.mu.Unlock() } +func (t *{{.name}}Table) Statistics() flux.Statistics { + t.mu.Lock() + cur := t.cur + t.mu.Unlock() + if cur == nil { + return flux.Statistics{} + } + cs := cur.Stats() + return flux.Statistics{ + ScannedValues: cs.ScannedValues, + ScannedBytes: cs.ScannedBytes, + } +} + func (t *{{.name}}Table) Do(f func(flux.ColReader) error) error { t.mu.Lock() defer func() { @@ -217,4 +231,15 @@ func (t *{{.name}}GroupTable) advanceCursor() bool { return false } +func (t *{{.name}}GroupTable) Statistics() flux.Statistics { + if t.cur == nil { + return flux.Statistics{} + } + cs := t.cur.Stats() + return flux.Statistics{ + ScannedValues: cs.ScannedValues, + ScannedBytes: cs.ScannedBytes, + } +} + {{end}} diff --git a/storage/reads/table.go b/storage/reads/table.go index 524bc47780..4940c7d0a1 100644 --- a/storage/reads/table.go +++ b/storage/reads/table.go @@ -33,6 +33,8 @@ type table struct { err error cancelled int32 + + stats flux.Statistics } func newTable( @@ -221,6 +223,8 @@ func newTableNoPoints( func (t *tableNoPoints) Close() {} +func (t *tableNoPoints) Statistics() flux.Statistics { return flux.Statistics{} } + func (t *tableNoPoints) Do(f func(flux.ColReader) error) error { if t.isCancelled() { return nil @@ -258,3 +262,5 @@ func (t *groupTableNoPoints) Do(f func(flux.ColReader) error) error { t.closeDone() return t.err } + +func (t *groupTableNoPoints) Statistics() flux.Statistics { return flux.Statistics{} } diff --git a/task/backend/executor/executor_test.go b/task/backend/executor/executor_test.go index 8010aad2b0..fa1e4d94f7 100644 --- a/task/backend/executor/executor_test.go +++ b/task/backend/executor/executor_test.go @@ -219,6 +219,8 @@ func (ts tables) Do(f func(flux.Table) error) error { return nil } +func (ts tables) Statistics() flux.Statistics { return flux.Statistics{} } + type system struct { name string svc *fakeQueryService diff --git a/tsdb/cursors/cursor.go b/tsdb/cursors/cursor.go index baacce4809..3b53a539f4 100644 --- a/tsdb/cursors/cursor.go +++ b/tsdb/cursors/cursor.go @@ -11,6 +11,7 @@ const DefaultMaxPointsPerBlock = 1000 type Cursor interface { Close() Err() error + Stats() CursorStats } type IntegerArrayCursor interface { @@ -49,6 +50,28 @@ type CursorRequest struct { type CursorIterator interface { Next(ctx context.Context, r *CursorRequest) (Cursor, error) + Stats() CursorStats } type CursorIterators []CursorIterator + +// Stats returns the aggregate stats of all cursor iterators. +func (a CursorIterators) Stats() CursorStats { + var stats CursorStats + for _, itr := range a { + stats.Add(itr.Stats()) + } + return stats +} + +// CursorStats represents stats collected by a cursor. +type CursorStats struct { + ScannedValues int // number of values scanned + ScannedBytes int // number of uncompressed bytes scanned +} + +// Add adds other to s and updates s. +func (s *CursorStats) Add(other CursorStats) { + s.ScannedValues += other.ScannedValues + s.ScannedBytes += other.ScannedBytes +} diff --git a/tsdb/internal/meta.pb.go b/tsdb/internal/meta.pb.go index bf28a630a7..67941cc265 100644 --- a/tsdb/internal/meta.pb.go +++ b/tsdb/internal/meta.pb.go @@ -30,7 +30,7 @@ func (m *Series) Reset() { *m = Series{} } func (m *Series) String() string { return proto.CompactTextString(m) } func (*Series) ProtoMessage() {} func (*Series) Descriptor() ([]byte, []int) { - return fileDescriptor_meta_7ee628d194324698, []int{0} + return fileDescriptor_meta_22ef3744f5766343, []int{0} } func (m *Series) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Series.Unmarshal(m, b) @@ -76,7 +76,7 @@ func (m *Tag) Reset() { *m = Tag{} } func (m *Tag) String() string { return proto.CompactTextString(m) } func (*Tag) ProtoMessage() {} func (*Tag) Descriptor() ([]byte, []int) { - return fileDescriptor_meta_7ee628d194324698, []int{1} + return fileDescriptor_meta_22ef3744f5766343, []int{1} } func (m *Tag) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Tag.Unmarshal(m, b) @@ -122,7 +122,7 @@ func (m *MeasurementFields) Reset() { *m = MeasurementFields{} } func (m *MeasurementFields) String() string { return proto.CompactTextString(m) } func (*MeasurementFields) ProtoMessage() {} func (*MeasurementFields) Descriptor() ([]byte, []int) { - return fileDescriptor_meta_7ee628d194324698, []int{2} + return fileDescriptor_meta_22ef3744f5766343, []int{2} } func (m *MeasurementFields) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_MeasurementFields.Unmarshal(m, b) @@ -168,7 +168,7 @@ func (m *Field) Reset() { *m = Field{} } func (m *Field) String() string { return proto.CompactTextString(m) } func (*Field) ProtoMessage() {} func (*Field) Descriptor() ([]byte, []int) { - return fileDescriptor_meta_7ee628d194324698, []int{3} + return fileDescriptor_meta_22ef3744f5766343, []int{3} } func (m *Field) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Field.Unmarshal(m, b) @@ -213,7 +213,7 @@ func (m *MeasurementFieldSet) Reset() { *m = MeasurementFieldSet{} } func (m *MeasurementFieldSet) String() string { return proto.CompactTextString(m) } func (*MeasurementFieldSet) ProtoMessage() {} func (*MeasurementFieldSet) Descriptor() ([]byte, []int) { - return fileDescriptor_meta_7ee628d194324698, []int{4} + return fileDescriptor_meta_22ef3744f5766343, []int{4} } func (m *MeasurementFieldSet) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_MeasurementFieldSet.Unmarshal(m, b) @@ -248,9 +248,9 @@ func init() { proto.RegisterType((*MeasurementFieldSet)(nil), "platform.tsdb.MeasurementFieldSet") } -func init() { proto.RegisterFile("internal/meta.proto", fileDescriptor_meta_7ee628d194324698) } +func init() { proto.RegisterFile("internal/meta.proto", fileDescriptor_meta_22ef3744f5766343) } -var fileDescriptor_meta_7ee628d194324698 = []byte{ +var fileDescriptor_meta_22ef3744f5766343 = []byte{ // 242 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x90, 0x3f, 0x4f, 0xc3, 0x40, 0x0c, 0xc5, 0x95, 0xe6, 0x8f, 0xc0, 0x80, 0x04, 0x6e, 0x87, 0x8c, 0xd1, 0x0d, 0xa8, 0x03, 0xa4, diff --git a/tsdb/tsm1/array_cursor.gen.go b/tsdb/tsm1/array_cursor.gen.go index e2e52017a7..78eb70d48f 100644 --- a/tsdb/tsm1/array_cursor.gen.go +++ b/tsdb/tsm1/array_cursor.gen.go @@ -10,6 +10,7 @@ import ( "sort" "github.com/influxdata/platform/tsdb" + "github.com/influxdata/platform/tsdb/cursors" ) // Array Cursors @@ -27,8 +28,9 @@ type floatArrayAscendingCursor struct { keyCursor *KeyCursor } - end int64 - res *tsdb.FloatArray + end int64 + res *tsdb.FloatArray + stats cursors.CursorStats } func newFloatArrayAscendingCursor() *floatArrayAscendingCursor { @@ -47,7 +49,7 @@ func (c *floatArrayAscendingCursor) reset(seek, end int64, cacheValues Values, t }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) @@ -65,6 +67,8 @@ func (c *floatArrayAscendingCursor) Close() { c.tsm.values = nil } +func (c *floatArrayAscendingCursor) Stats() cursors.CursorStats { return c.stats } + // Next returns the next key/value for the cursor. func (c *floatArrayAscendingCursor) Next() *tsdb.FloatArray { pos := 0 @@ -140,16 +144,25 @@ func (c *floatArrayAscendingCursor) Next() *tsdb.FloatArray { c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] + c.stats.ScannedValues += len(c.res.Values) + + c.stats.ScannedBytes += len(c.res.Values) * 8 + return c.res } func (c *floatArrayAscendingCursor) nextTSM() *tsdb.FloatArray { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = 0 return c.tsm.values } +func (c *floatArrayAscendingCursor) readArrayBlock() *tsdb.FloatArray { + values, _ := c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + return values +} + type floatArrayDescendingCursor struct { cache struct { values Values @@ -163,8 +176,9 @@ type floatArrayDescendingCursor struct { keyCursor *KeyCursor } - end int64 - res *tsdb.FloatArray + end int64 + res *tsdb.FloatArray + stats cursors.CursorStats } func newFloatArrayDescendingCursor() *floatArrayDescendingCursor { @@ -192,7 +206,7 @@ func (c *floatArrayDescendingCursor) reset(seek, end int64, cacheValues Values, } c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) @@ -218,6 +232,8 @@ func (c *floatArrayDescendingCursor) Close() { c.tsm.values = nil } +func (c *floatArrayDescendingCursor) Stats() cursors.CursorStats { return c.stats } + func (c *floatArrayDescendingCursor) Next() *tsdb.FloatArray { pos := 0 cvals := c.cache.values @@ -292,11 +308,21 @@ func (c *floatArrayDescendingCursor) Next() *tsdb.FloatArray { func (c *floatArrayDescendingCursor) nextTSM() *tsdb.FloatArray { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = len(c.tsm.values.Timestamps) - 1 return c.tsm.values } +func (c *floatArrayDescendingCursor) readArrayBlock() *tsdb.FloatArray { + values, _ := c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + + c.stats.ScannedValues += len(values.Values) + + c.stats.ScannedBytes += len(values.Values) * 8 + + return values +} + type integerArrayAscendingCursor struct { cache struct { values Values @@ -310,8 +336,9 @@ type integerArrayAscendingCursor struct { keyCursor *KeyCursor } - end int64 - res *tsdb.IntegerArray + end int64 + res *tsdb.IntegerArray + stats cursors.CursorStats } func newIntegerArrayAscendingCursor() *integerArrayAscendingCursor { @@ -330,7 +357,7 @@ func (c *integerArrayAscendingCursor) reset(seek, end int64, cacheValues Values, }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) @@ -348,6 +375,8 @@ func (c *integerArrayAscendingCursor) Close() { c.tsm.values = nil } +func (c *integerArrayAscendingCursor) Stats() cursors.CursorStats { return c.stats } + // Next returns the next key/value for the cursor. func (c *integerArrayAscendingCursor) Next() *tsdb.IntegerArray { pos := 0 @@ -423,16 +452,25 @@ func (c *integerArrayAscendingCursor) Next() *tsdb.IntegerArray { c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] + c.stats.ScannedValues += len(c.res.Values) + + c.stats.ScannedBytes += len(c.res.Values) * 8 + return c.res } func (c *integerArrayAscendingCursor) nextTSM() *tsdb.IntegerArray { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = 0 return c.tsm.values } +func (c *integerArrayAscendingCursor) readArrayBlock() *tsdb.IntegerArray { + values, _ := c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + return values +} + type integerArrayDescendingCursor struct { cache struct { values Values @@ -446,8 +484,9 @@ type integerArrayDescendingCursor struct { keyCursor *KeyCursor } - end int64 - res *tsdb.IntegerArray + end int64 + res *tsdb.IntegerArray + stats cursors.CursorStats } func newIntegerArrayDescendingCursor() *integerArrayDescendingCursor { @@ -475,7 +514,7 @@ func (c *integerArrayDescendingCursor) reset(seek, end int64, cacheValues Values } c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) @@ -501,6 +540,8 @@ func (c *integerArrayDescendingCursor) Close() { c.tsm.values = nil } +func (c *integerArrayDescendingCursor) Stats() cursors.CursorStats { return c.stats } + func (c *integerArrayDescendingCursor) Next() *tsdb.IntegerArray { pos := 0 cvals := c.cache.values @@ -575,11 +616,21 @@ func (c *integerArrayDescendingCursor) Next() *tsdb.IntegerArray { func (c *integerArrayDescendingCursor) nextTSM() *tsdb.IntegerArray { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = len(c.tsm.values.Timestamps) - 1 return c.tsm.values } +func (c *integerArrayDescendingCursor) readArrayBlock() *tsdb.IntegerArray { + values, _ := c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + + c.stats.ScannedValues += len(values.Values) + + c.stats.ScannedBytes += len(values.Values) * 8 + + return values +} + type unsignedArrayAscendingCursor struct { cache struct { values Values @@ -593,8 +644,9 @@ type unsignedArrayAscendingCursor struct { keyCursor *KeyCursor } - end int64 - res *tsdb.UnsignedArray + end int64 + res *tsdb.UnsignedArray + stats cursors.CursorStats } func newUnsignedArrayAscendingCursor() *unsignedArrayAscendingCursor { @@ -613,7 +665,7 @@ func (c *unsignedArrayAscendingCursor) reset(seek, end int64, cacheValues Values }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) @@ -631,6 +683,8 @@ func (c *unsignedArrayAscendingCursor) Close() { c.tsm.values = nil } +func (c *unsignedArrayAscendingCursor) Stats() cursors.CursorStats { return c.stats } + // Next returns the next key/value for the cursor. func (c *unsignedArrayAscendingCursor) Next() *tsdb.UnsignedArray { pos := 0 @@ -706,16 +760,25 @@ func (c *unsignedArrayAscendingCursor) Next() *tsdb.UnsignedArray { c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] + c.stats.ScannedValues += len(c.res.Values) + + c.stats.ScannedBytes += len(c.res.Values) * 8 + return c.res } func (c *unsignedArrayAscendingCursor) nextTSM() *tsdb.UnsignedArray { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = 0 return c.tsm.values } +func (c *unsignedArrayAscendingCursor) readArrayBlock() *tsdb.UnsignedArray { + values, _ := c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + return values +} + type unsignedArrayDescendingCursor struct { cache struct { values Values @@ -729,8 +792,9 @@ type unsignedArrayDescendingCursor struct { keyCursor *KeyCursor } - end int64 - res *tsdb.UnsignedArray + end int64 + res *tsdb.UnsignedArray + stats cursors.CursorStats } func newUnsignedArrayDescendingCursor() *unsignedArrayDescendingCursor { @@ -758,7 +822,7 @@ func (c *unsignedArrayDescendingCursor) reset(seek, end int64, cacheValues Value } c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) @@ -784,6 +848,8 @@ func (c *unsignedArrayDescendingCursor) Close() { c.tsm.values = nil } +func (c *unsignedArrayDescendingCursor) Stats() cursors.CursorStats { return c.stats } + func (c *unsignedArrayDescendingCursor) Next() *tsdb.UnsignedArray { pos := 0 cvals := c.cache.values @@ -858,11 +924,21 @@ func (c *unsignedArrayDescendingCursor) Next() *tsdb.UnsignedArray { func (c *unsignedArrayDescendingCursor) nextTSM() *tsdb.UnsignedArray { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = len(c.tsm.values.Timestamps) - 1 return c.tsm.values } +func (c *unsignedArrayDescendingCursor) readArrayBlock() *tsdb.UnsignedArray { + values, _ := c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + + c.stats.ScannedValues += len(values.Values) + + c.stats.ScannedBytes += len(values.Values) * 8 + + return values +} + type stringArrayAscendingCursor struct { cache struct { values Values @@ -876,8 +952,9 @@ type stringArrayAscendingCursor struct { keyCursor *KeyCursor } - end int64 - res *tsdb.StringArray + end int64 + res *tsdb.StringArray + stats cursors.CursorStats } func newStringArrayAscendingCursor() *stringArrayAscendingCursor { @@ -896,7 +973,7 @@ func (c *stringArrayAscendingCursor) reset(seek, end int64, cacheValues Values, }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) @@ -914,6 +991,8 @@ func (c *stringArrayAscendingCursor) Close() { c.tsm.values = nil } +func (c *stringArrayAscendingCursor) Stats() cursors.CursorStats { return c.stats } + // Next returns the next key/value for the cursor. func (c *stringArrayAscendingCursor) Next() *tsdb.StringArray { pos := 0 @@ -989,16 +1068,27 @@ func (c *stringArrayAscendingCursor) Next() *tsdb.StringArray { c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] + c.stats.ScannedValues += len(c.res.Values) + + for _, v := range c.res.Values { + c.stats.ScannedBytes += len(v) + } + return c.res } func (c *stringArrayAscendingCursor) nextTSM() *tsdb.StringArray { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = 0 return c.tsm.values } +func (c *stringArrayAscendingCursor) readArrayBlock() *tsdb.StringArray { + values, _ := c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + return values +} + type stringArrayDescendingCursor struct { cache struct { values Values @@ -1012,8 +1102,9 @@ type stringArrayDescendingCursor struct { keyCursor *KeyCursor } - end int64 - res *tsdb.StringArray + end int64 + res *tsdb.StringArray + stats cursors.CursorStats } func newStringArrayDescendingCursor() *stringArrayDescendingCursor { @@ -1041,7 +1132,7 @@ func (c *stringArrayDescendingCursor) reset(seek, end int64, cacheValues Values, } c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) @@ -1067,6 +1158,8 @@ func (c *stringArrayDescendingCursor) Close() { c.tsm.values = nil } +func (c *stringArrayDescendingCursor) Stats() cursors.CursorStats { return c.stats } + func (c *stringArrayDescendingCursor) Next() *tsdb.StringArray { pos := 0 cvals := c.cache.values @@ -1141,11 +1234,23 @@ func (c *stringArrayDescendingCursor) Next() *tsdb.StringArray { func (c *stringArrayDescendingCursor) nextTSM() *tsdb.StringArray { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = len(c.tsm.values.Timestamps) - 1 return c.tsm.values } +func (c *stringArrayDescendingCursor) readArrayBlock() *tsdb.StringArray { + values, _ := c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + + c.stats.ScannedValues += len(values.Values) + + for _, v := range values.Values { + c.stats.ScannedBytes += len(v) + } + + return values +} + type booleanArrayAscendingCursor struct { cache struct { values Values @@ -1159,8 +1264,9 @@ type booleanArrayAscendingCursor struct { keyCursor *KeyCursor } - end int64 - res *tsdb.BooleanArray + end int64 + res *tsdb.BooleanArray + stats cursors.CursorStats } func newBooleanArrayAscendingCursor() *booleanArrayAscendingCursor { @@ -1179,7 +1285,7 @@ func (c *booleanArrayAscendingCursor) reset(seek, end int64, cacheValues Values, }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) @@ -1197,6 +1303,8 @@ func (c *booleanArrayAscendingCursor) Close() { c.tsm.values = nil } +func (c *booleanArrayAscendingCursor) Stats() cursors.CursorStats { return c.stats } + // Next returns the next key/value for the cursor. func (c *booleanArrayAscendingCursor) Next() *tsdb.BooleanArray { pos := 0 @@ -1272,16 +1380,25 @@ func (c *booleanArrayAscendingCursor) Next() *tsdb.BooleanArray { c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] + c.stats.ScannedValues += len(c.res.Values) + + c.stats.ScannedBytes += len(c.res.Values) * 1 + return c.res } func (c *booleanArrayAscendingCursor) nextTSM() *tsdb.BooleanArray { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = 0 return c.tsm.values } +func (c *booleanArrayAscendingCursor) readArrayBlock() *tsdb.BooleanArray { + values, _ := c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + return values +} + type booleanArrayDescendingCursor struct { cache struct { values Values @@ -1295,8 +1412,9 @@ type booleanArrayDescendingCursor struct { keyCursor *KeyCursor } - end int64 - res *tsdb.BooleanArray + end int64 + res *tsdb.BooleanArray + stats cursors.CursorStats } func newBooleanArrayDescendingCursor() *booleanArrayDescendingCursor { @@ -1324,7 +1442,7 @@ func (c *booleanArrayDescendingCursor) reset(seek, end int64, cacheValues Values } c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) @@ -1350,6 +1468,8 @@ func (c *booleanArrayDescendingCursor) Close() { c.tsm.values = nil } +func (c *booleanArrayDescendingCursor) Stats() cursors.CursorStats { return c.stats } + func (c *booleanArrayDescendingCursor) Next() *tsdb.BooleanArray { pos := 0 cvals := c.cache.values @@ -1424,7 +1544,17 @@ func (c *booleanArrayDescendingCursor) Next() *tsdb.BooleanArray { func (c *booleanArrayDescendingCursor) nextTSM() *tsdb.BooleanArray { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = len(c.tsm.values.Timestamps) - 1 return c.tsm.values } + +func (c *booleanArrayDescendingCursor) readArrayBlock() *tsdb.BooleanArray { + values, _ := c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + + c.stats.ScannedValues += len(values.Values) + + c.stats.ScannedBytes += len(values.Values) * 1 + + return values +} diff --git a/tsdb/tsm1/array_cursor.gen.go.tmpl b/tsdb/tsm1/array_cursor.gen.go.tmpl index 88a397ae2f..ecf97151ef 100644 --- a/tsdb/tsm1/array_cursor.gen.go.tmpl +++ b/tsdb/tsm1/array_cursor.gen.go.tmpl @@ -4,6 +4,7 @@ import ( "sort" "github.com/influxdata/platform/tsdb" + "github.com/influxdata/platform/tsdb/cursors" ) // Array Cursors @@ -26,8 +27,9 @@ type {{$type}} struct { keyCursor *KeyCursor } - end int64 - res {{$arrayType}} + end int64 + res {{$arrayType}} + stats cursors.CursorStats } func new{{$Type}}() *{{$type}} { @@ -46,7 +48,7 @@ c.end = end }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) @@ -64,6 +66,8 @@ func (c *{{$type}}) Close() { c.tsm.values = nil } +func (c *{{$type}}) Stats() cursors.CursorStats { return c.stats } + // Next returns the next key/value for the cursor. func (c *{{$type}}) Next() {{$arrayType}} { pos := 0 @@ -139,16 +143,30 @@ func (c *{{$type}}) Next() {{$arrayType}} { c.res.Timestamps = c.res.Timestamps[:pos] c.res.Values = c.res.Values[:pos] + c.stats.ScannedValues += len(c.res.Values) + {{if eq .Name "String" }} + for _, v := range c.res.Values { + c.stats.ScannedBytes += len(v) + } + {{else}} + c.stats.ScannedBytes += len(c.res.Values) * {{.Size}} + {{end}} + return c.res } func (c *{{$type}}) nextTSM() {{$arrayType}} { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = 0 return c.tsm.values } +func (c *{{$type}}) readArrayBlock() {{$arrayType}} { + values, _ := c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + return values +} + {{$type := print .name "ArrayDescendingCursor"}} {{$Type := print .Name "ArrayDescendingCursor"}} @@ -165,8 +183,9 @@ type {{$type}} struct { keyCursor *KeyCursor } - end int64 - res {{$arrayType}} + end int64 + res {{$arrayType}} + stats cursors.CursorStats } func new{{$Type}}() *{{$type}} { @@ -194,7 +213,7 @@ func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *Key } c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) @@ -220,6 +239,8 @@ func (c *{{$type}}) Close() { c.tsm.values = nil } +func (c *{{$type}}) Stats() cursors.CursorStats { return c.stats } + func (c *{{$type}}) Next() {{$arrayType}} { pos := 0 cvals := c.cache.values @@ -294,9 +315,24 @@ func (c *{{$type}}) Next() {{$arrayType}} { func (c *{{$type}}) nextTSM() {{$arrayType}} { c.tsm.keyCursor.Next() - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + c.tsm.values = c.readArrayBlock() c.tsm.pos = len(c.tsm.values.Timestamps) - 1 return c.tsm.values } +func (c *{{$type}}) readArrayBlock() {{$arrayType}} { + values, _ := c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + + c.stats.ScannedValues += len(values.Values) + {{if eq .Name "String" }} + for _, v := range values.Values { + c.stats.ScannedBytes += len(v) + } + {{else}} + c.stats.ScannedBytes += len(values.Values) * {{.Size}} + {{end}} + + return values +} + {{end}} diff --git a/tsdb/tsm1/array_cursor_iterator.go b/tsdb/tsm1/array_cursor_iterator.go index 1ec5f74273..83236eaf9d 100644 --- a/tsdb/tsm1/array_cursor_iterator.go +++ b/tsdb/tsm1/array_cursor_iterator.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/influxdb/query" "github.com/influxdata/platform/models" "github.com/influxdata/platform/tsdb" + "github.com/influxdata/platform/tsdb/cursors" ) type arrayCursorIterator struct { @@ -70,3 +71,31 @@ func (q *arrayCursorIterator) seriesFieldKeyBytes(name []byte, tags models.Tags, q.key = append(q.key, field...) return q.key } + +// Stats returns the cumulative stats for all cursors. +func (q *arrayCursorIterator) Stats() cursors.CursorStats { + var stats cursors.CursorStats + if cur := q.asc.Float; cur != nil { + stats.Add(cur.Stats()) + } else if cur := q.asc.Integer; cur != nil { + stats.Add(cur.Stats()) + } else if cur := q.asc.Unsigned; cur != nil { + stats.Add(cur.Stats()) + } else if cur := q.asc.Boolean; cur != nil { + stats.Add(cur.Stats()) + } else if cur := q.asc.String; cur != nil { + stats.Add(cur.Stats()) + } + if cur := q.desc.Float; cur != nil { + stats.Add(cur.Stats()) + } else if cur := q.desc.Integer; cur != nil { + stats.Add(cur.Stats()) + } else if cur := q.desc.Unsigned; cur != nil { + stats.Add(cur.Stats()) + } else if cur := q.desc.Boolean; cur != nil { + stats.Add(cur.Stats()) + } else if cur := q.desc.String; cur != nil { + stats.Add(cur.Stats()) + } + return stats +} diff --git a/tsdb/tsm1/iterator.gen.go.tmpldata b/tsdb/tsm1/iterator.gen.go.tmpldata index 3e230721cb..648898fbdb 100644 --- a/tsdb/tsm1/iterator.gen.go.tmpldata +++ b/tsdb/tsm1/iterator.gen.go.tmpldata @@ -4,34 +4,39 @@ "name":"float", "Type":"float64", "ValueType":"FloatValue", - "Nil":"0" + "Nil":"0", + "Size":"8" }, { "Name":"Integer", "name":"integer", "Type":"int64", "ValueType":"IntegerValue", - "Nil":"0" + "Nil":"0", + "Size":"8" }, { "Name":"Unsigned", "name":"unsigned", "Type":"uint64", "ValueType":"UnsignedValue", - "Nil":"0" + "Nil":"0", + "Size":"8" }, { "Name":"String", "name":"string", "Type":"string", "ValueType":"StringValue", - "Nil":"\"\"" + "Nil":"\"\"", + "Size":"0" }, { "Name":"Boolean", "name":"boolean", "Type":"bool", "ValueType":"BooleanValue", - "Nil":"false" + "Nil":"false", + "Size":"1" } ]