make sure we are done processsing block before process exits
parent
593e8f85c0
commit
e11c351853
|
@ -11,6 +11,7 @@ import (
|
||||||
"runtime"
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/line-protocol"
|
"github.com/influxdata/line-protocol"
|
||||||
|
@ -341,8 +342,9 @@ func (t *ToHTTPTransformation) Process(id execute.DatasetID, b query.Block) erro
|
||||||
isValue[i] = sort.SearchStrings(t.spec.Spec.ValueColumns, col.Label) < len(t.spec.Spec.ValueColumns) && t.spec.Spec.ValueColumns[sort.SearchStrings(t.spec.Spec.ValueColumns, col.Label)] == col.Label
|
isValue[i] = sort.SearchStrings(t.spec.Spec.ValueColumns, col.Label) < len(t.spec.Spec.ValueColumns) && t.spec.Spec.ValueColumns[sort.SearchStrings(t.spec.Spec.ValueColumns, col.Label)] == col.Label
|
||||||
isTag[i] = sort.SearchStrings(t.spec.Spec.TagColumns, col.Label) < len(t.spec.Spec.TagColumns) && t.spec.Spec.TagColumns[sort.SearchStrings(t.spec.Spec.TagColumns, col.Label)] == col.Label
|
isTag[i] = sort.SearchStrings(t.spec.Spec.TagColumns, col.Label) < len(t.spec.Spec.TagColumns) && t.spec.Spec.TagColumns[sort.SearchStrings(t.spec.Spec.TagColumns, col.Label)] == col.Label
|
||||||
}
|
}
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
var err error
|
var err error
|
||||||
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
m.name = t.spec.Spec.Name
|
m.name = t.spec.Spec.Name
|
||||||
b.Do(func(er query.ColReader) error {
|
b.Do(func(er query.ColReader) error {
|
||||||
|
@ -391,6 +393,7 @@ func (t *ToHTTPTransformation) Process(id execute.DatasetID, b query.Block) erro
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
pw.Close()
|
pw.Close()
|
||||||
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
req, err := http.NewRequest(t.spec.Spec.Method, t.spec.Spec.Addr, pr)
|
req, err := http.NewRequest(t.spec.Spec.Method, t.spec.Spec.Addr, pr)
|
||||||
|
@ -413,8 +416,8 @@ func (t *ToHTTPTransformation) Process(id execute.DatasetID, b query.Block) erro
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
defer resp.Body.Close()
|
resp.Body.Close()
|
||||||
|
|
||||||
return req.Body.Close()
|
return req.Body.Close()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue