feat: make sure the query plan nodes have unique ids (#19879)
parent
0853370cda
commit
17fcd2dce2
2
go.mod
2
go.mod
|
@ -47,7 +47,7 @@ require (
|
|||
github.com/hashicorp/vault/api v1.0.2
|
||||
github.com/imdario/mergo v0.3.9 // indirect
|
||||
github.com/influxdata/cron v0.0.0-20191203200038-ded12750aac6
|
||||
github.com/influxdata/flux v0.92.0
|
||||
github.com/influxdata/flux v0.92.1-0.20201102163214-958736c09599
|
||||
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69
|
||||
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6
|
||||
github.com/influxdata/pkg-config v0.2.5
|
||||
|
|
4
go.sum
4
go.sum
|
@ -324,8 +324,8 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH
|
|||
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
||||
github.com/influxdata/cron v0.0.0-20191203200038-ded12750aac6 h1:OtjKkeWDjUbyMi82C7XXy7Tvm2LXMwiBBXyFIGNPaGA=
|
||||
github.com/influxdata/cron v0.0.0-20191203200038-ded12750aac6/go.mod h1:XabtPPW2qsCg0tl+kjaPU+cFS+CjQXEXbT1VJvHT4og=
|
||||
github.com/influxdata/flux v0.92.0 h1:EJDZiE0l2pb8FAfkDyGtcLx5i1hP0wP3GBYLuPCTXyE=
|
||||
github.com/influxdata/flux v0.92.0/go.mod h1:9csju6RUyFbwxcIR0Nyr8Z+fh2O4axq0zJE6DGHg1Cc=
|
||||
github.com/influxdata/flux v0.92.1-0.20201102163214-958736c09599 h1:DzLmQmQLf8LYuLwArt5nm6uH8O4NbyPcbLf1WwpCZsU=
|
||||
github.com/influxdata/flux v0.92.1-0.20201102163214-958736c09599/go.mod h1:9csju6RUyFbwxcIR0Nyr8Z+fh2O4axq0zJE6DGHg1Cc=
|
||||
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0fXO4ZE/lFGIE84G6rIV5SJN3P3sjIXAP1a8eU=
|
||||
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69/go.mod h1:pwymjR6SrP3gD3pRj9RJwdl1j5s3doEEV8gS4X9qSzA=
|
||||
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo=
|
||||
|
|
|
@ -2488,16 +2488,16 @@ func TestService(t *testing.T) {
|
|||
Name: "view name",
|
||||
},
|
||||
Properties: influxdb.SingleStatViewProperties{
|
||||
Type: influxdb.ViewPropertyTypeSingleStat,
|
||||
DecimalPlaces: influxdb.DecimalPlaces{IsEnforced: true, Digits: 1},
|
||||
Note: "a note",
|
||||
Queries: []influxdb.DashboardQuery{newQuery()},
|
||||
Prefix: "pre",
|
||||
TickPrefix: "false",
|
||||
ShowNoteWhenEmpty: true,
|
||||
Suffix: "suf",
|
||||
TickSuffix: "true",
|
||||
ViewColors: []influxdb.ViewColor{{Type: "text", Hex: "red"}},
|
||||
Type: influxdb.ViewPropertyTypeSingleStat,
|
||||
DecimalPlaces: influxdb.DecimalPlaces{IsEnforced: true, Digits: 1},
|
||||
Note: "a note",
|
||||
Queries: []influxdb.DashboardQuery{newQuery()},
|
||||
Prefix: "pre",
|
||||
TickPrefix: "false",
|
||||
ShowNoteWhenEmpty: true,
|
||||
Suffix: "suf",
|
||||
TickSuffix: "true",
|
||||
ViewColors: []influxdb.ViewColor{{Type: "text", Hex: "red"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -2509,16 +2509,16 @@ func TestService(t *testing.T) {
|
|||
Name: "view name",
|
||||
},
|
||||
Properties: influxdb.SingleStatViewProperties{
|
||||
Type: influxdb.ViewPropertyTypeSingleStat,
|
||||
DecimalPlaces: influxdb.DecimalPlaces{IsEnforced: true, Digits: 1},
|
||||
Note: "a note",
|
||||
Queries: []influxdb.DashboardQuery{newQuery()},
|
||||
Prefix: "pre",
|
||||
TickPrefix: "false",
|
||||
ShowNoteWhenEmpty: true,
|
||||
Suffix: "suf",
|
||||
TickSuffix: "true",
|
||||
ViewColors: []influxdb.ViewColor{{Type: "text", Hex: "red"}},
|
||||
Type: influxdb.ViewPropertyTypeSingleStat,
|
||||
DecimalPlaces: influxdb.DecimalPlaces{IsEnforced: true, Digits: 1},
|
||||
Note: "a note",
|
||||
Queries: []influxdb.DashboardQuery{newQuery()},
|
||||
Prefix: "pre",
|
||||
TickPrefix: "false",
|
||||
ShowNoteWhenEmpty: true,
|
||||
Suffix: "suf",
|
||||
TickSuffix: "true",
|
||||
ViewColors: []influxdb.ViewColor{{Type: "text", Hex: "red"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
@ -10,7 +10,7 @@ require (
|
|||
github.com/go-kit/kit v0.10.0 // indirect
|
||||
github.com/google/go-cmp v0.5.0
|
||||
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
|
||||
github.com/influxdata/flux v0.91.0
|
||||
github.com/influxdata/flux v0.92.1-0.20201102163214-958736c09599
|
||||
github.com/influxdata/influxdb/v2 v2.0.0-00010101000000-000000000000
|
||||
github.com/influxdata/influxql v1.0.1 // indirect
|
||||
github.com/influxdata/promql/v2 v2.12.0
|
||||
|
|
|
@ -265,6 +265,8 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb
|
|||
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
|
||||
github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
|
||||
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
|
||||
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
|
||||
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
|
||||
|
@ -393,8 +395,8 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH
|
|||
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
||||
github.com/influxdata/cron v0.0.0-20191203200038-ded12750aac6 h1:OtjKkeWDjUbyMi82C7XXy7Tvm2LXMwiBBXyFIGNPaGA=
|
||||
github.com/influxdata/cron v0.0.0-20191203200038-ded12750aac6/go.mod h1:XabtPPW2qsCg0tl+kjaPU+cFS+CjQXEXbT1VJvHT4og=
|
||||
github.com/influxdata/flux v0.91.0 h1:af7BlrhXcw9Ka7jwCa2tl2x86szjSl40mxME3c884HM=
|
||||
github.com/influxdata/flux v0.91.0/go.mod h1:9csju6RUyFbwxcIR0Nyr8Z+fh2O4axq0zJE6DGHg1Cc=
|
||||
github.com/influxdata/flux v0.92.1-0.20201102163214-958736c09599 h1:DzLmQmQLf8LYuLwArt5nm6uH8O4NbyPcbLf1WwpCZsU=
|
||||
github.com/influxdata/flux v0.92.1-0.20201102163214-958736c09599/go.mod h1:9csju6RUyFbwxcIR0Nyr8Z+fh2O4axq0zJE6DGHg1Cc=
|
||||
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0fXO4ZE/lFGIE84G6rIV5SJN3P3sjIXAP1a8eU=
|
||||
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69/go.mod h1:pwymjR6SrP3gD3pRj9RJwdl1j5s3doEEV8gS4X9qSzA=
|
||||
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
|
||||
|
|
|
@ -96,7 +96,7 @@ func (rule PushDownGroupRule) Rewrite(ctx context.Context, node plan.Node) (plan
|
|||
}
|
||||
}
|
||||
|
||||
return plan.CreatePhysicalNode("ReadGroup", &ReadGroupPhysSpec{
|
||||
return plan.CreateUniquePhysicalNode(ctx, "ReadGroup", &ReadGroupPhysSpec{
|
||||
ReadRangePhysSpec: *src.Copy().(*ReadRangePhysSpec),
|
||||
GroupMode: grp.GroupMode,
|
||||
GroupKeys: grp.GroupKeys,
|
||||
|
@ -120,7 +120,7 @@ func (rule PushDownRangeRule) Rewrite(ctx context.Context, node plan.Node) (plan
|
|||
fromNode := node.Predecessors()[0]
|
||||
fromSpec := fromNode.ProcedureSpec().(*FromStorageProcedureSpec)
|
||||
rangeSpec := node.ProcedureSpec().(*universe.RangeProcedureSpec)
|
||||
return plan.CreatePhysicalNode("ReadRange", &ReadRangePhysSpec{
|
||||
return plan.CreateUniquePhysicalNode(ctx, "ReadRange", &ReadRangePhysSpec{
|
||||
Bucket: fromSpec.Bucket.Name,
|
||||
BucketID: fromSpec.Bucket.ID,
|
||||
Bounds: rangeSpec.Bounds,
|
||||
|
@ -279,7 +279,7 @@ func (rule PushDownReadTagKeysRule) Rewrite(ctx context.Context, pn plan.Node) (
|
|||
|
||||
// We have passed all of the necessary prerequisites
|
||||
// so construct the procedure spec.
|
||||
return plan.CreatePhysicalNode("ReadTagKeys", &ReadTagKeysPhysSpec{
|
||||
return plan.CreateUniquePhysicalNode(ctx, "ReadTagKeys", &ReadTagKeysPhysSpec{
|
||||
ReadRangePhysSpec: *fromSpec.Copy().(*ReadRangePhysSpec),
|
||||
}), true, nil
|
||||
}
|
||||
|
@ -350,7 +350,7 @@ func (rule PushDownReadTagValuesRule) Rewrite(ctx context.Context, pn plan.Node)
|
|||
|
||||
// We have passed all of the necessary prerequisites
|
||||
// so construct the procedure spec.
|
||||
return plan.CreatePhysicalNode("ReadTagValues", &ReadTagValuesPhysSpec{
|
||||
return plan.CreateUniquePhysicalNode(ctx, "ReadTagValues", &ReadTagValuesPhysSpec{
|
||||
ReadRangePhysSpec: *fromSpec.Copy().(*ReadRangePhysSpec),
|
||||
TagKey: tagKey,
|
||||
}), true, nil
|
||||
|
@ -743,7 +743,7 @@ func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (p
|
|||
}
|
||||
|
||||
// Rule passes.
|
||||
return plan.CreatePhysicalNode("ReadWindowAggregate", &ReadWindowAggregatePhysSpec{
|
||||
return plan.CreateUniquePhysicalNode(ctx, "ReadWindowAggregate", &ReadWindowAggregatePhysSpec{
|
||||
ReadRangePhysSpec: *fromSpec.Copy().(*ReadRangePhysSpec),
|
||||
Aggregates: []plan.ProcedureKind{fnNode.Kind()},
|
||||
WindowEvery: windowSpec.Window.Every,
|
||||
|
@ -813,7 +813,7 @@ func (PushDownWindowAggregateByTimeRule) Rewrite(ctx context.Context, pn plan.No
|
|||
|
||||
// Rule passes.
|
||||
windowAggregateSpec.TimeColumn = duplicateSpec.Column
|
||||
return plan.CreatePhysicalNode("ReadWindowAggregateByTime", windowAggregateSpec), true, nil
|
||||
return plan.CreateUniquePhysicalNode(ctx, "ReadWindowAggregateByTime", windowAggregateSpec), true, nil
|
||||
}
|
||||
|
||||
// PushDownBareAggregateRule is a rule that allows pushing down of aggregates
|
||||
|
@ -838,7 +838,7 @@ func (p PushDownBareAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (p
|
|||
fromNode := fnNode.Predecessors()[0]
|
||||
fromSpec := fromNode.ProcedureSpec().(*ReadRangePhysSpec)
|
||||
|
||||
return plan.CreatePhysicalNode("ReadWindowAggregate", &ReadWindowAggregatePhysSpec{
|
||||
return plan.CreateUniquePhysicalNode(ctx, "ReadWindowAggregate", &ReadWindowAggregatePhysSpec{
|
||||
ReadRangePhysSpec: *fromSpec.Copy().(*ReadRangePhysSpec),
|
||||
Aggregates: []plan.ProcedureKind{fnNode.Kind()},
|
||||
WindowEvery: flux.ConvertDuration(math.MaxInt64 * time.Nanosecond),
|
||||
|
@ -903,7 +903,7 @@ func (p GroupWindowAggregateTransposeRule) Rewrite(ctx context.Context, pn plan.
|
|||
}
|
||||
|
||||
// Perform the rewrite by replacing each of the nodes.
|
||||
newFromNode := plan.CreatePhysicalNode("ReadWindowAggregate", &ReadWindowAggregatePhysSpec{
|
||||
newFromNode := plan.CreateUniquePhysicalNode(ctx, "ReadWindowAggregate", &ReadWindowAggregatePhysSpec{
|
||||
ReadRangePhysSpec: *fromSpec.ReadRangePhysSpec.Copy().(*ReadRangePhysSpec),
|
||||
Aggregates: []plan.ProcedureKind{fnNode.Kind()},
|
||||
WindowEvery: windowSpec.Window.Every,
|
||||
|
@ -920,7 +920,7 @@ func (p GroupWindowAggregateTransposeRule) Rewrite(ctx context.Context, pn plan.
|
|||
if !execute.ContainsStr(groupKeys, execute.DefaultStopColLabel) {
|
||||
groupKeys = append(groupKeys, execute.DefaultStopColLabel)
|
||||
}
|
||||
newGroupNode := plan.CreatePhysicalNode("group", &universe.GroupProcedureSpec{
|
||||
newGroupNode := plan.CreateUniquePhysicalNode(ctx, "group", &universe.GroupProcedureSpec{
|
||||
GroupMode: flux.GroupModeBy,
|
||||
GroupKeys: groupKeys,
|
||||
})
|
||||
|
@ -935,7 +935,7 @@ func (p GroupWindowAggregateTransposeRule) Rewrite(ctx context.Context, pn plan.
|
|||
// Replace the spec for the function if needed.
|
||||
switch spec := fnNode.ProcedureSpec().(type) {
|
||||
case *universe.CountProcedureSpec:
|
||||
newFnNode := plan.CreatePhysicalNode("sum", &universe.SumProcedureSpec{
|
||||
newFnNode := plan.CreateUniquePhysicalNode(ctx, "sum", &universe.SumProcedureSpec{
|
||||
AggregateConfig: spec.AggregateConfig,
|
||||
})
|
||||
plan.ReplaceNode(fnNode, newFnNode)
|
||||
|
@ -984,7 +984,7 @@ func (PushDownGroupAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (pl
|
|||
switch pn.Kind() {
|
||||
case universe.CountKind:
|
||||
// ReadGroup() -> count => ReadGroup(count)
|
||||
node := plan.CreatePhysicalNode("ReadGroupAggregate", &ReadGroupPhysSpec{
|
||||
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
|
||||
ReadRangePhysSpec: group.ReadRangePhysSpec,
|
||||
GroupMode: group.GroupMode,
|
||||
GroupKeys: group.GroupKeys,
|
||||
|
@ -993,7 +993,7 @@ func (PushDownGroupAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (pl
|
|||
return node, true, nil
|
||||
case universe.SumKind:
|
||||
// ReadGroup() -> sum => ReadGroup(sum)
|
||||
node := plan.CreatePhysicalNode("ReadGroupAggregate", &ReadGroupPhysSpec{
|
||||
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
|
||||
ReadRangePhysSpec: group.ReadRangePhysSpec,
|
||||
GroupMode: group.GroupMode,
|
||||
GroupKeys: group.GroupKeys,
|
||||
|
@ -1002,7 +1002,7 @@ func (PushDownGroupAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (pl
|
|||
return node, true, nil
|
||||
case universe.FirstKind:
|
||||
// ReadGroup() -> first => ReadGroup(first)
|
||||
node := plan.CreatePhysicalNode("ReadGroupAggregate", &ReadGroupPhysSpec{
|
||||
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
|
||||
ReadRangePhysSpec: group.ReadRangePhysSpec,
|
||||
GroupMode: group.GroupMode,
|
||||
GroupKeys: group.GroupKeys,
|
||||
|
@ -1011,7 +1011,7 @@ func (PushDownGroupAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (pl
|
|||
return node, true, nil
|
||||
case universe.LastKind:
|
||||
// ReadGroup() -> last => ReadGroup(last)
|
||||
node := plan.CreatePhysicalNode("ReadGroupAggregate", &ReadGroupPhysSpec{
|
||||
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
|
||||
ReadRangePhysSpec: group.ReadRangePhysSpec,
|
||||
GroupMode: group.GroupMode,
|
||||
GroupKeys: group.GroupKeys,
|
||||
|
@ -1020,7 +1020,7 @@ func (PushDownGroupAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (pl
|
|||
return node, true, nil
|
||||
case universe.MinKind:
|
||||
// ReadGroup() -> min => ReadGroup(min)
|
||||
node := plan.CreatePhysicalNode("ReadGroupAggregate", &ReadGroupPhysSpec{
|
||||
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
|
||||
ReadRangePhysSpec: group.ReadRangePhysSpec,
|
||||
GroupMode: group.GroupMode,
|
||||
GroupKeys: group.GroupKeys,
|
||||
|
@ -1029,7 +1029,7 @@ func (PushDownGroupAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (pl
|
|||
return node, true, nil
|
||||
case universe.MaxKind:
|
||||
// ReadGroup() -> max => ReadGroup(max)
|
||||
node := plan.CreatePhysicalNode("ReadGroupAggregate", &ReadGroupPhysSpec{
|
||||
node := plan.CreateUniquePhysicalNode(ctx, "ReadGroupAggregate", &ReadGroupPhysSpec{
|
||||
ReadRangePhysSpec: group.ReadRangePhysSpec,
|
||||
GroupMode: group.GroupMode,
|
||||
GroupKeys: group.GroupKeys,
|
||||
|
|
|
@ -61,10 +61,10 @@ func TestAuthedPasswordService_SetPassword(t *testing.T) {
|
|||
defer ctrl.Finish()
|
||||
influxdb.OperPermissions()
|
||||
auth := influxdb.Authorization{
|
||||
ID: authID,
|
||||
OrgID: orgID,
|
||||
UserID: userID,
|
||||
Status: influxdb.Active,
|
||||
ID: authID,
|
||||
OrgID: orgID,
|
||||
UserID: userID,
|
||||
Status: influxdb.Active,
|
||||
}
|
||||
ctx := context.Background()
|
||||
ctx = icontext.SetAuthorizer(ctx, &auth)
|
||||
|
|
Loading…
Reference in New Issue