diff --git a/influxql/v1tests/query_test.go b/influxql/v1tests/query_test.go index eafc811a0a..21d55e67d8 100644 --- a/influxql/v1tests/query_test.go +++ b/influxql/v1tests/query_test.go @@ -183,3 +183,153 @@ func TestServer_Query_ShowDatabases(t *testing.T) { test.Run(context.Background(), t, s) } + +func TestServer_Query_Subquery(t *testing.T) { + writes := []string{ + fmt.Sprintf(`request,region=west,status=200 duration_ms=100 %d`, mustParseTime(time.RFC3339Nano, "2004-04-09T01:00:00Z").UnixNano()), + fmt.Sprintf(`request,region=west,status=200 duration_ms=100 %d`, mustParseTime(time.RFC3339Nano, "2004-04-09T01:00:10Z").UnixNano()), + fmt.Sprintf(`request,region=west,status=200 duration_ms=100 %d`, mustParseTime(time.RFC3339Nano, "2004-04-09T01:00:20Z").UnixNano()), + fmt.Sprintf(`request,region=west,status=204 duration_ms=100 %d`, mustParseTime(time.RFC3339Nano, "2004-04-09T01:00:30Z").UnixNano()), + fmt.Sprintf(`request,region=west,status=204 duration_ms=100 %d`, mustParseTime(time.RFC3339Nano, "2004-04-09T01:00:40Z").UnixNano()), + fmt.Sprintf(`request,region=west,status=500 duration_ms=200 %d`, mustParseTime(time.RFC3339Nano, "2004-04-09T01:00:00Z").UnixNano()), + fmt.Sprintf(`request,region=west,status=500 duration_ms=200 %d`, mustParseTime(time.RFC3339Nano, "2004-04-09T01:00:10Z").UnixNano()), + fmt.Sprintf(`request,region=west,status=500 duration_ms=200 %d`, mustParseTime(time.RFC3339Nano, "2004-04-09T01:00:20Z").UnixNano()), + fmt.Sprintf(`request,region=west,status=504 duration_ms=200 %d`, mustParseTime(time.RFC3339Nano, "2004-04-09T01:00:30Z").UnixNano()), + fmt.Sprintf(`request,region=west,status=504 duration_ms=200 %d`, mustParseTime(time.RFC3339Nano, "2004-04-09T01:00:40Z").UnixNano()), + } + + ctx := context.Background() + s := NewTestServer(ctx, t, "db0", "rp0", writes...) + + cases := []Query{ + { + // This test verifies that data cached from the storage layer + // is complete in order to satisfy the two subqueries. + name: "different tag predicates for same field", + params: url.Values{"db": []string{"db0"}}, + command: ` + SELECT SUM(success) as sum_success, SUM(requests) as sum_fail + FROM ( + SELECT duration_ms as success + FROM request + WHERE status !~ /^5.*$/ AND region = 'west' + ), ( + SELECT duration_ms as requests + FROM request + WHERE status =~ /^5.*$/ AND region = 'west' + ) +`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"request","columns":["time","sum_success","sum_fail"],"values":[["1970-01-01T00:00:00Z",500,1000]]}]}]}`, + }, + { + name: "different time predicates for same field", + params: url.Values{"db": []string{"db0"}}, + command: ` + SELECT COUNT(r1) as r1, COUNT(r2) as r2 + FROM ( + SELECT duration_ms as r1 + FROM request + WHERE time >= '2004-04-09T01:00:00Z' AND time <= '2004-04-09T01:00:20Z' + ), ( + SELECT duration_ms as r2 + FROM request + WHERE time >= '2004-04-09T01:00:10Z' AND time <= '2004-04-09T01:00:40Z' + ) +`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"request","columns":["time","r1","r2"],"values":[["1970-01-01T00:00:00Z",6,8]]}]}]}`, + }, + { + name: "outer query with narrower time range than subqueries", + params: url.Values{"db": []string{"db0"}}, + command: ` + SELECT COUNT(r1) as r1, COUNT(r2) as r2 + FROM ( + SELECT duration_ms as r1 + FROM request + WHERE time >= '2004-04-09T01:00:00Z' AND time <= '2004-04-09T01:00:20Z' + ), ( + SELECT duration_ms as r2 + FROM request + WHERE time >= '2004-04-09T01:00:10Z' AND time <= '2004-04-09T01:00:40Z' + ) + WHERE time >= '2004-04-09T01:00:20Z' AND time <= '2004-04-09T01:00:30Z' +`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"request","columns":["time","r1","r2"],"values":[["2004-04-09T01:00:20Z",2,4]]}]}]}`, + }, + { + name: "outer query with narrower time range than subqueries using aggregates", + params: url.Values{"db": []string{"db0"}}, + command: ` + SELECT r1 as r1, r2 as r2 + FROM ( + SELECT COUNT(duration_ms) as r1 + FROM request + WHERE time >= '2004-04-09T01:00:00Z' AND time <= '2004-04-09T01:00:20Z' + ), ( + SELECT COUNT(duration_ms) as r2 + FROM request + WHERE time >= '2004-04-09T01:00:10Z' AND time <= '2004-04-09T01:00:40Z' + ) + WHERE time >= '2004-04-09T01:00:20Z' AND time <= '2004-04-09T01:00:30Z' +`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"request","columns":["time","r1","r2"],"values":[["2004-04-09T01:00:20Z",2,null],["2004-04-09T01:00:20Z",null,4]]}]}]}`, + }, + { + name: "outer query with no time range and subqueries using aggregates", + params: url.Values{"db": []string{"db0"}}, + command: ` + SELECT r1 as r1, r2 as r2 + FROM ( + SELECT COUNT(duration_ms) as r1 + FROM request + WHERE time >= '2004-04-09T01:00:00Z' AND time <= '2004-04-09T01:00:20Z' + ), ( + SELECT COUNT(duration_ms) as r2 + FROM request + WHERE time >= '2004-04-09T01:00:10Z' AND time <= '2004-04-09T01:00:40Z' + ) +`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"request","columns":["time","r1","r2"],"values":[["2004-04-09T01:00:00Z",6,null],["2004-04-09T01:00:10Z",null,8]]}]}]}`, + }, + { + name: "outer query with narrower time range than subqueries no aggregate", + params: url.Values{"db": []string{"db0"}}, + command: ` + SELECT r1 as r1, r2 as r2 + FROM ( + SELECT duration_ms as r1 + FROM request + WHERE time >= '2004-04-09T01:00:00Z' AND time <= '2004-04-09T01:00:20Z' + ), ( + SELECT duration_ms as r2 + FROM request + WHERE time >= '2004-04-09T01:00:10Z' AND time <= '2004-04-09T01:00:40Z' + ) + WHERE time >= '2004-04-09T01:00:20Z' AND time <= '2004-04-09T01:00:30Z' +`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"request","columns":["time","r1","r2"],"values":[["2004-04-09T01:00:20Z",100,null],["2004-04-09T01:00:20Z",null,100],["2004-04-09T01:00:20Z",200,null],["2004-04-09T01:00:20Z",null,200],["2004-04-09T01:00:30Z",null,200],["2004-04-09T01:00:30Z",null,100]]}]}]}`, + }, + { + name: "outer query with time range", + params: url.Values{"db": []string{"db0"}}, + command: ` + SELECT COUNT(r1) as r1, COUNT(r2) as r2 + FROM ( + SELECT duration_ms as r1 + FROM request + ), ( + SELECT duration_ms as r2 + FROM request + ) + WHERE time >= '2004-04-09T01:00:20Z' AND time <= '2004-04-09T01:00:30Z' +`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"request","columns":["time","r1","r2"],"values":[["2004-04-09T01:00:20Z",4,4]]}]}]}`, + }, + } + + for _, q := range cases { + t.Run(q.name, func(t *testing.T) { + s.Execute(ctx, t, q) + }) + } +} diff --git a/influxql/v1tests/test_server.go b/influxql/v1tests/test_server.go new file mode 100644 index 0000000000..aca585f794 --- /dev/null +++ b/influxql/v1tests/test_server.go @@ -0,0 +1,82 @@ +package v1tests + +import ( + "context" + "strings" + "testing" + + "github.com/influxdata/influxdb/v2" + icontext "github.com/influxdata/influxdb/v2/context" + "github.com/influxdata/influxdb/v2/tests" + "github.com/influxdata/influxdb/v2/tests/pipeline" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type TestServer struct { + db string + rp string + p *tests.Pipeline + fx pipeline.BaseFixture + auth *influxdb.Authorization +} + +func NewTestServer(ctx context.Context, t *testing.T, db, rp string, writes ...string) *TestServer { + require.Greater(t, len(writes), 0) + + p := OpenServer(t) + t.Cleanup(func() { + _ = p.Close() + }) + + fx := pipeline.NewBaseFixture(t, p.Pipeline, p.DefaultOrgID, p.DefaultBucketID) + + // write test data + err := fx.Admin.WriteTo(ctx, influxdb.BucketFilter{ID: &p.DefaultBucketID, OrganizationID: &p.DefaultOrgID}, strings.NewReader(strings.Join(writes, "\n"))) + require.NoError(t, err) + + p.Flush() + + writeOrg, err := influxdb.NewPermissionAtID(p.DefaultOrgID, influxdb.WriteAction, influxdb.OrgsResourceType, p.DefaultOrgID) + require.NoError(t, err) + + bucketWritePerm, err := influxdb.NewPermissionAtID(p.DefaultBucketID, influxdb.WriteAction, influxdb.BucketsResourceType, p.DefaultOrgID) + require.NoError(t, err) + + bucketReadPerm, err := influxdb.NewPermissionAtID(p.DefaultBucketID, influxdb.ReadAction, influxdb.BucketsResourceType, p.DefaultOrgID) + require.NoError(t, err) + + auth := tests.MakeAuthorization(p.DefaultOrgID, p.DefaultUserID, []influxdb.Permission{*writeOrg, *bucketWritePerm, *bucketReadPerm}) + ctx = icontext.SetAuthorizer(ctx, auth) + err = p.Launcher. + DBRPMappingService(). + Create(ctx, &influxdb.DBRPMapping{ + Database: db, + RetentionPolicy: rp, + Default: true, + OrganizationID: p.DefaultOrgID, + BucketID: p.DefaultBucketID, + }) + require.NoError(t, err) + + return &TestServer{ + p: p.Pipeline, + db: db, + rp: rp, + fx: fx, + auth: auth, + } +} + +func (qr *TestServer) Execute(ctx context.Context, t *testing.T, query Query) { + t.Helper() + ctx = icontext.SetAuthorizer(ctx, qr.auth) + if query.skip != "" { + t.Skipf("SKIP:: %s", query.skip) + } + err := query.Execute(ctx, t, qr.db, qr.fx.Admin) + assert.NoError(t, err) + assert.Equal(t, query.exp, query.got, + "%s: unexpected results\nquery: %s\nparams: %v\nexp: %s\nactual: %s\n", + query.name, query.command, query.params, query.exp, query.got) +}