fix: Fixes #697 data race in execute test
parent
a2e7ba3422
commit
575c8138cc
|
@ -44,8 +44,8 @@ func TestExecutor_Execute(t *testing.T) {
|
|||
Procedures: map[plan.ProcedureID]*plan.Procedure{
|
||||
plan.ProcedureIDFromOperationID("from"): {
|
||||
ID: plan.ProcedureIDFromOperationID("from"),
|
||||
Spec: &testFromProcedureSource{
|
||||
data: []query.Table{&executetest.Table{
|
||||
Spec: newTestFromProcedureSource(
|
||||
[]*executetest.Table{&executetest.Table{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
|
@ -61,7 +61,7 @@ func TestExecutor_Execute(t *testing.T) {
|
|||
{execute.Time(0), execute.Time(5), execute.Time(4), 5.0},
|
||||
},
|
||||
}},
|
||||
},
|
||||
),
|
||||
Bounds: &plan.BoundsSpec{
|
||||
Start: values.ConvertTime(time.Unix(0, 1)),
|
||||
Stop: values.ConvertTime(time.Unix(0, 5)),
|
||||
|
@ -114,8 +114,8 @@ func TestExecutor_Execute(t *testing.T) {
|
|||
Procedures: map[plan.ProcedureID]*plan.Procedure{
|
||||
plan.ProcedureIDFromOperationID("from"): {
|
||||
ID: plan.ProcedureIDFromOperationID("from"),
|
||||
Spec: &testFromProcedureSource{
|
||||
data: []query.Table{&executetest.Table{
|
||||
Spec: newTestFromProcedureSource(
|
||||
[]*executetest.Table{&executetest.Table{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
|
@ -131,7 +131,7 @@ func TestExecutor_Execute(t *testing.T) {
|
|||
{execute.Time(0), execute.Time(5), execute.Time(4), int64(5)},
|
||||
},
|
||||
}},
|
||||
},
|
||||
),
|
||||
Bounds: &plan.BoundsSpec{
|
||||
Start: values.ConvertTime(time.Unix(0, 1)),
|
||||
Stop: values.ConvertTime(time.Unix(0, 5)),
|
||||
|
@ -218,8 +218,8 @@ func TestExecutor_Execute(t *testing.T) {
|
|||
Procedures: map[plan.ProcedureID]*plan.Procedure{
|
||||
plan.ProcedureIDFromOperationID("from"): {
|
||||
ID: plan.ProcedureIDFromOperationID("from"),
|
||||
Spec: &testFromProcedureSource{
|
||||
data: []query.Table{
|
||||
Spec: newTestFromProcedureSource(
|
||||
[]*executetest.Table{
|
||||
&executetest.Table{
|
||||
KeyCols: []string{"_start", "_stop", "_key"},
|
||||
ColMeta: []query.ColMeta{
|
||||
|
@ -286,7 +286,7 @@ func TestExecutor_Execute(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
),
|
||||
Parents: nil,
|
||||
Bounds: &plan.BoundsSpec{
|
||||
Start: values.ConvertTime(time.Unix(0, 1)),
|
||||
|
@ -437,8 +437,8 @@ func TestExecutor_Execute(t *testing.T) {
|
|||
Procedures: map[plan.ProcedureID]*plan.Procedure{
|
||||
plan.ProcedureIDFromOperationID("from"): {
|
||||
ID: plan.ProcedureIDFromOperationID("from"),
|
||||
Spec: &testFromProcedureSource{
|
||||
data: []query.Table{&executetest.Table{
|
||||
Spec: newTestFromProcedureSource(
|
||||
[]*executetest.Table{&executetest.Table{
|
||||
KeyCols: []string{"_start", "_stop"},
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_start", Type: query.TTime},
|
||||
|
@ -454,7 +454,7 @@ func TestExecutor_Execute(t *testing.T) {
|
|||
{execute.Time(0), execute.Time(5), execute.Time(4), 5.0},
|
||||
},
|
||||
}},
|
||||
},
|
||||
),
|
||||
Bounds: &plan.BoundsSpec{
|
||||
Start: values.ConvertTime(time.Unix(0, 1)),
|
||||
Stop: values.ConvertTime(time.Unix(0, 5)),
|
||||
|
@ -565,10 +565,21 @@ func TestExecutor_Execute(t *testing.T) {
|
|||
}
|
||||
|
||||
type testFromProcedureSource struct {
|
||||
data []query.Table
|
||||
data []*executetest.Table
|
||||
ts []execute.Transformation
|
||||
}
|
||||
|
||||
func newTestFromProcedureSource(data []*executetest.Table) *testFromProcedureSource {
|
||||
p := &testFromProcedureSource{
|
||||
data: data,
|
||||
}
|
||||
// Normalize the data before anything can read it
|
||||
for _, tbl := range p.data {
|
||||
tbl.Normalize()
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *testFromProcedureSource) Kind() plan.ProcedureKind {
|
||||
return "from-test"
|
||||
}
|
||||
|
@ -585,11 +596,11 @@ func (p *testFromProcedureSource) Run(ctx context.Context) {
|
|||
id := execute.DatasetID(uuid.NewV4())
|
||||
for _, t := range p.ts {
|
||||
var max execute.Time
|
||||
for _, b := range p.data {
|
||||
t.Process(id, b)
|
||||
stopIdx := execute.ColIdx(execute.DefaultStopColLabel, b.Cols())
|
||||
for _, tbl := range p.data {
|
||||
t.Process(id, tbl)
|
||||
stopIdx := execute.ColIdx(execute.DefaultStopColLabel, tbl.Cols())
|
||||
if stopIdx >= 0 {
|
||||
if s := b.Key().ValueTime(stopIdx); s > max {
|
||||
if s := tbl.Key().ValueTime(stopIdx); s > max {
|
||||
max = s
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue