commit
6d188d9703
|
@ -16,6 +16,7 @@
|
|||
- [#4348](https://github.com/influxdb/influxdb/pull/4348): Public ApplyTemplate function for graphite parser.
|
||||
- [#4178](https://github.com/influxdb/influxdb/pull/4178): Support fields in graphite parser. Thanks @roobert!
|
||||
- [#4291](https://github.com/influxdb/influxdb/pull/4291): Added ALTER DATABASE RENAME. Thanks @linearb
|
||||
- [#4409](https://github.com/influxdb/influxdb/pull/4291): wire up INTO queries.
|
||||
|
||||
### Bugfixes
|
||||
- [#4389](https://github.com/influxdb/influxdb/pull/4389): Don't add a new segment file on each hinted-handoff purge cycle.
|
||||
|
|
|
@ -204,6 +204,18 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
|
|||
return mapping, nil
|
||||
}
|
||||
|
||||
// WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of
|
||||
// a cluster structure for information. This is to avoid a circular dependency
|
||||
func (w *PointsWriter) WritePointsInto(p *tsdb.IntoWriteRequest) error {
|
||||
req := WritePointsRequest{
|
||||
Database: p.Database,
|
||||
RetentionPolicy: p.RetentionPolicy,
|
||||
ConsistencyLevel: ConsistencyLevelAny,
|
||||
Points: p.Points,
|
||||
}
|
||||
return w.WritePoints(&req)
|
||||
}
|
||||
|
||||
// WritePoints writes across multiple local and remote data nodes according the consistency level.
|
||||
func (w *PointsWriter) WritePoints(p *WritePointsRequest) error {
|
||||
w.statMap.Add(statWriteReq, 1)
|
||||
|
|
|
@ -135,6 +135,9 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
|
|||
s.PointsWriter.ShardWriter = s.ShardWriter
|
||||
s.PointsWriter.HintedHandoff = s.HintedHandoff
|
||||
|
||||
// needed for executing INTO queries.
|
||||
s.QueryExecutor.IntoWriter = s.PointsWriter
|
||||
|
||||
// Initialize the monitor
|
||||
s.Monitor.Version = s.buildInfo.Version
|
||||
s.Monitor.Commit = s.buildInfo.Commit
|
||||
|
@ -299,7 +302,6 @@ func (s *Server) appendContinuousQueryService(c continuous_querier.Config) {
|
|||
srv := continuous_querier.NewService(c)
|
||||
srv.MetaStore = s.MetaStore
|
||||
srv.QueryExecutor = s.QueryExecutor
|
||||
srv.PointsWriter = s.PointsWriter
|
||||
s.Services = append(s.Services, srv)
|
||||
}
|
||||
|
||||
|
|
|
@ -4962,3 +4962,57 @@ func TestServer_Query_FieldWithMultiplePeriodsMeasurementPrefixMatch(t *testing.
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_Query_IntoTarget(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := OpenServer(NewConfig(), "")
|
||||
defer s.Close()
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
writes := []string{
|
||||
fmt.Sprintf(`foo value=1 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`foo value=2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:10Z").UnixNano()),
|
||||
fmt.Sprintf(`foo value=3 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:20Z").UnixNano()),
|
||||
fmt.Sprintf(`foo value=4 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:30Z").UnixNano()),
|
||||
}
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
test.write = strings.Join(writes, "\n")
|
||||
|
||||
test.addQueries([]*Query{
|
||||
&Query{
|
||||
name: "into",
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
command: `SELECT value AS something INTO baz FROM foo`,
|
||||
exp: `{"results":[{"series":[{"name":"result","columns":["time","written"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "confirm results",
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
command: `SELECT something FROM baz`,
|
||||
exp: `{"results":[{"series":[{"name":"baz","columns":["time","something"],"values":[["2000-01-01T00:00:00Z",1],["2000-01-01T00:00:10Z",2],["2000-01-01T00:00:20Z",3],["2000-01-01T00:00:30Z",4]]}]}]}`,
|
||||
},
|
||||
}...)
|
||||
|
||||
if err := test.init(s); err != nil {
|
||||
t.Fatalf("test init failed: %s", err)
|
||||
}
|
||||
|
||||
for _, query := range test.queries {
|
||||
if query.skip {
|
||||
t.Logf("SKIP:: %s", query.name)
|
||||
continue
|
||||
}
|
||||
if err := query.Execute(s); err != nil {
|
||||
t.Error(query.Error(err))
|
||||
} else if !query.success() {
|
||||
t.Error(query.failureMessage())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,10 +11,8 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/cluster"
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
"github.com/influxdb/influxdb/meta"
|
||||
"github.com/influxdb/influxdb/models"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
|
@ -48,11 +46,6 @@ type metaStore interface {
|
|||
Database(name string) (*meta.DatabaseInfo, error)
|
||||
}
|
||||
|
||||
// pointsWriter is an internal interface to make testing easier.
|
||||
type pointsWriter interface {
|
||||
WritePoints(p *cluster.WritePointsRequest) error
|
||||
}
|
||||
|
||||
// RunRequest is a request to run one or more CQs.
|
||||
type RunRequest struct {
|
||||
// Now tells the CQ serivce what the current time is.
|
||||
|
@ -79,7 +72,6 @@ func (rr *RunRequest) matches(cq *meta.ContinuousQueryInfo) bool {
|
|||
type Service struct {
|
||||
MetaStore metaStore
|
||||
QueryExecutor queryExecutor
|
||||
PointsWriter pointsWriter
|
||||
Config *Config
|
||||
RunInterval time.Duration
|
||||
// RunCh can be used by clients to signal service to run CQs.
|
||||
|
@ -119,7 +111,6 @@ func (s *Service) Open() error {
|
|||
|
||||
assert(s.MetaStore != nil, "MetaStore is nil")
|
||||
assert(s.QueryExecutor != nil, "QueryExecutor is nil")
|
||||
assert(s.PointsWriter != nil, "PointsWriter is nil")
|
||||
|
||||
s.stop = make(chan struct{})
|
||||
s.wg = &sync.WaitGroup{}
|
||||
|
@ -331,104 +322,17 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Read all rows from the result channel.
|
||||
points := make([]models.Point, 0, 100)
|
||||
for result := range ch {
|
||||
if result.Err != nil {
|
||||
return result.Err
|
||||
}
|
||||
|
||||
for _, row := range result.Series {
|
||||
// Get the measurement name for the result.
|
||||
measurement := cq.intoMeasurement()
|
||||
if measurement == "" {
|
||||
measurement = row.Name
|
||||
}
|
||||
// Convert the result row to points.
|
||||
part, err := s.convertRowToPoints(measurement, row)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(part) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// If the points have any nil values, can't write.
|
||||
// This happens if the CQ is created and running before data is written to the measurement.
|
||||
for _, p := range part {
|
||||
fields := p.Fields()
|
||||
for _, v := range fields {
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
points = append(points, part...)
|
||||
}
|
||||
// There is only one statement, so we will only ever receive one result
|
||||
res, ok := <-ch
|
||||
if !ok {
|
||||
panic("result channel was closed")
|
||||
}
|
||||
|
||||
if len(points) == 0 {
|
||||
return nil
|
||||
if res.Err != nil {
|
||||
return res.Err
|
||||
}
|
||||
|
||||
// Create a write request for the points.
|
||||
req := &cluster.WritePointsRequest{
|
||||
Database: cq.intoDB(),
|
||||
RetentionPolicy: cq.intoRP(),
|
||||
ConsistencyLevel: cluster.ConsistencyLevelAny,
|
||||
Points: points,
|
||||
}
|
||||
|
||||
// Write the request.
|
||||
if err := s.PointsWriter.WritePoints(req); err != nil {
|
||||
s.Logger.Println(err)
|
||||
return err
|
||||
}
|
||||
|
||||
s.statMap.Add(statPointsWritten, int64(len(points)))
|
||||
if s.loggingEnabled {
|
||||
s.Logger.Printf("wrote %d point(s) to %s.%s", len(points), cq.intoDB(), cq.intoRP())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// convertRowToPoints will convert a query result Row into Points that can be written back in.
|
||||
// Used for continuous and INTO queries
|
||||
func (s *Service) convertRowToPoints(measurementName string, row *models.Row) ([]models.Point, error) {
|
||||
// figure out which parts of the result are the time and which are the fields
|
||||
timeIndex := -1
|
||||
fieldIndexes := make(map[string]int)
|
||||
for i, c := range row.Columns {
|
||||
if c == "time" {
|
||||
timeIndex = i
|
||||
} else {
|
||||
fieldIndexes[c] = i
|
||||
}
|
||||
}
|
||||
|
||||
if timeIndex == -1 {
|
||||
return nil, errors.New("error finding time index in result")
|
||||
}
|
||||
|
||||
points := make([]models.Point, 0, len(row.Values))
|
||||
for _, v := range row.Values {
|
||||
vals := make(map[string]interface{})
|
||||
for fieldName, fieldIndex := range fieldIndexes {
|
||||
vals[fieldName] = v[fieldIndex]
|
||||
}
|
||||
|
||||
p := models.NewPoint(measurementName, row.Tags, vals, v[timeIndex].(time.Time))
|
||||
|
||||
points = append(points, p)
|
||||
}
|
||||
|
||||
return points, nil
|
||||
}
|
||||
|
||||
// ContinuousQuery is a local wrapper / helper around continuous queries.
|
||||
type ContinuousQuery struct {
|
||||
Database string
|
||||
|
@ -437,16 +341,8 @@ type ContinuousQuery struct {
|
|||
q *influxql.SelectStatement
|
||||
}
|
||||
|
||||
func (cq *ContinuousQuery) intoDB() string {
|
||||
if cq.q.Target.Measurement.Database != "" {
|
||||
return cq.q.Target.Measurement.Database
|
||||
}
|
||||
return cq.Database
|
||||
}
|
||||
|
||||
func (cq *ContinuousQuery) intoRP() string { return cq.q.Target.Measurement.RetentionPolicy }
|
||||
func (cq *ContinuousQuery) setIntoRP(rp string) { cq.q.Target.Measurement.RetentionPolicy = rp }
|
||||
func (cq *ContinuousQuery) intoMeasurement() string { return cq.q.Target.Measurement.Name }
|
||||
func (cq *ContinuousQuery) intoRP() string { return cq.q.Target.Measurement.RetentionPolicy }
|
||||
func (cq *ContinuousQuery) setIntoRP(rp string) { cq.q.Target.Measurement.RetentionPolicy = rp }
|
||||
|
||||
// NewContinuousQuery returns a ContinuousQuery object with a parsed influxql.CreateContinuousQueryStatement
|
||||
func NewContinuousQuery(database string, cqi *meta.ContinuousQueryInfo) (*ContinuousQuery, error) {
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -38,95 +37,6 @@ func TestOpenAndClose(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Test ExecuteContinuousQuery.
|
||||
func TestExecuteContinuousQuery(t *testing.T) {
|
||||
s := NewTestService(t)
|
||||
dbis, _ := s.MetaStore.Databases()
|
||||
dbi := dbis[0]
|
||||
cqi := dbi.ContinuousQueries[0]
|
||||
|
||||
pointCnt := 100
|
||||
qe := s.QueryExecutor.(*QueryExecutor)
|
||||
qe.Results = []*influxql.Result{genResult(1, pointCnt)}
|
||||
|
||||
pw := s.PointsWriter.(*PointsWriter)
|
||||
pw.WritePointsFn = func(p *cluster.WritePointsRequest) error {
|
||||
if len(p.Points) != pointCnt {
|
||||
return fmt.Errorf("exp = %d, got = %d", pointCnt, len(p.Points))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Test ExecuteContinuousQuery when INTO measurements are taken from the FROM clause.
|
||||
func TestExecuteContinuousQuery_ReferenceSource(t *testing.T) {
|
||||
s := NewTestService(t)
|
||||
dbis, _ := s.MetaStore.Databases()
|
||||
dbi := dbis[2]
|
||||
cqi := dbi.ContinuousQueries[0]
|
||||
|
||||
rowCnt := 2
|
||||
pointCnt := 1
|
||||
qe := s.QueryExecutor.(*QueryExecutor)
|
||||
qe.Results = []*influxql.Result{genResult(rowCnt, pointCnt)}
|
||||
|
||||
pw := s.PointsWriter.(*PointsWriter)
|
||||
pw.WritePointsFn = func(p *cluster.WritePointsRequest) error {
|
||||
if len(p.Points) != pointCnt*rowCnt {
|
||||
return fmt.Errorf("exp = %d, got = %d", pointCnt, len(p.Points))
|
||||
}
|
||||
|
||||
exp := "cpu,host=server01 value=0"
|
||||
got := p.Points[0].String()
|
||||
if !strings.Contains(got, exp) {
|
||||
return fmt.Errorf("\n\tExpected ':MEASUREMENT' to be expanded to the measurement name(s) in the FROM regexp.\n\tqry = %s\n\texp = %s\n\tgot = %s\n", cqi.Query, got, exp)
|
||||
}
|
||||
|
||||
exp = "cpu2,host=server01 value=0"
|
||||
got = p.Points[1].String()
|
||||
if !strings.Contains(got, exp) {
|
||||
return fmt.Errorf("\n\tExpected ':MEASUREMENT' to be expanded to the measurement name(s) in the FROM regexp.\n\tqry = %s\n\texp = %s\n\tgot = %s\n", cqi.Query, got, exp)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
err := s.ExecuteContinuousQuery(&dbi, &cqi, time.Now())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Test the service happy path.
|
||||
func TestContinuousQueryService(t *testing.T) {
|
||||
s := NewTestService(t)
|
||||
|
||||
pointCnt := 100
|
||||
qe := s.QueryExecutor.(*QueryExecutor)
|
||||
qe.Results = []*influxql.Result{genResult(1, pointCnt)}
|
||||
|
||||
pw := s.PointsWriter.(*PointsWriter)
|
||||
ch := make(chan int, 10)
|
||||
defer close(ch)
|
||||
pw.WritePointsFn = func(p *cluster.WritePointsRequest) error {
|
||||
ch <- len(p.Points)
|
||||
return nil
|
||||
}
|
||||
|
||||
s.Open()
|
||||
if cnt, err := waitInt(ch, time.Second); err != nil {
|
||||
t.Error(err)
|
||||
} else if cnt != pointCnt {
|
||||
t.Errorf("exp = %d, got = %d", pointCnt, cnt)
|
||||
}
|
||||
s.Close()
|
||||
}
|
||||
|
||||
// Test Run method.
|
||||
func TestContinuousQueryService_Run(t *testing.T) {
|
||||
s := NewTestService(t)
|
||||
|
@ -148,7 +58,9 @@ func TestContinuousQueryService_Run(t *testing.T) {
|
|||
if callCnt >= expectCallCnt {
|
||||
done <- struct{}{}
|
||||
}
|
||||
return nil, nil
|
||||
dummych := make(chan *influxql.Result, 1)
|
||||
dummych <- &influxql.Result{}
|
||||
return dummych, nil
|
||||
}
|
||||
|
||||
s.Open()
|
||||
|
@ -280,7 +192,6 @@ func NewTestService(t *testing.T) *Service {
|
|||
ms := NewMetaStore(t)
|
||||
s.MetaStore = ms
|
||||
s.QueryExecutor = NewQueryExecutor(t)
|
||||
s.PointsWriter = NewPointsWriter(t)
|
||||
s.RunInterval = time.Millisecond
|
||||
|
||||
// Set Logger to write to dev/null so stdout isn't polluted.
|
||||
|
@ -406,21 +317,19 @@ func (ms *MetaStore) CreateContinuousQuery(database, name, query string) error {
|
|||
|
||||
// QueryExecutor is a mock query executor.
|
||||
type QueryExecutor struct {
|
||||
ExecuteQueryFn func(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error)
|
||||
Results []*influxql.Result
|
||||
ResultInterval time.Duration
|
||||
Err error
|
||||
ErrAfterResult int
|
||||
StopRespondingAfter int
|
||||
t *testing.T
|
||||
ExecuteQueryFn func(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error)
|
||||
Results []*influxql.Result
|
||||
ResultInterval time.Duration
|
||||
Err error
|
||||
ErrAfterResult int
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
// NewQueryExecutor returns a *QueryExecutor.
|
||||
func NewQueryExecutor(t *testing.T) *QueryExecutor {
|
||||
return &QueryExecutor{
|
||||
ErrAfterResult: -1,
|
||||
StopRespondingAfter: -1,
|
||||
t: t,
|
||||
ErrAfterResult: -1,
|
||||
t: t,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -450,15 +359,15 @@ func (qe *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, ch
|
|||
ch <- &influxql.Result{Err: qe.Err}
|
||||
close(ch)
|
||||
return
|
||||
} else if i == qe.StopRespondingAfter {
|
||||
qe.t.Log("ExecuteQuery(): StopRespondingAfter")
|
||||
return
|
||||
}
|
||||
ch <- r
|
||||
n++
|
||||
time.Sleep(qe.ResultInterval)
|
||||
}
|
||||
qe.t.Logf("ExecuteQuery(): all (%d) results sent", n)
|
||||
if n == 0 {
|
||||
ch <- &influxql.Result{Err: qe.Err}
|
||||
}
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
"github.com/influxdb/influxdb/models"
|
||||
"time"
|
||||
)
|
||||
|
||||
// convertRowToPoints will convert a query result Row into Points that can be written back in.
|
||||
// Used for INTO queries
|
||||
func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point, error) {
|
||||
// figure out which parts of the result are the time and which are the fields
|
||||
timeIndex := -1
|
||||
fieldIndexes := make(map[string]int)
|
||||
for i, c := range row.Columns {
|
||||
if c == "time" {
|
||||
timeIndex = i
|
||||
} else {
|
||||
fieldIndexes[c] = i
|
||||
}
|
||||
}
|
||||
|
||||
if timeIndex == -1 {
|
||||
return nil, errors.New("error finding time index in result")
|
||||
}
|
||||
|
||||
points := make([]models.Point, 0, len(row.Values))
|
||||
for _, v := range row.Values {
|
||||
vals := make(map[string]interface{})
|
||||
for fieldName, fieldIndex := range fieldIndexes {
|
||||
vals[fieldName] = v[fieldIndex]
|
||||
}
|
||||
|
||||
p := models.NewPoint(measurementName, row.Tags, vals, v[timeIndex].(time.Time))
|
||||
|
||||
points = append(points, p)
|
||||
}
|
||||
|
||||
return points, nil
|
||||
}
|
||||
|
||||
func intoDB(stmt *influxql.SelectStatement) (string, error) {
|
||||
if stmt.Target.Measurement.Database != "" {
|
||||
return stmt.Target.Measurement.Database, nil
|
||||
}
|
||||
return "", errNoDatabaseInTarget
|
||||
}
|
||||
|
||||
var errNoDatabaseInTarget = errors.New("no database in target")
|
||||
|
||||
func intoRP(stmt *influxql.SelectStatement) string { return stmt.Target.Measurement.RetentionPolicy }
|
||||
func intoMeasurement(stmt *influxql.SelectStatement) string { return stmt.Target.Measurement.Name }
|
|
@ -46,6 +46,10 @@ type QueryExecutor struct {
|
|||
CreateMapper(shard meta.ShardInfo, stmt influxql.Statement, chunkSize int) (Mapper, error)
|
||||
}
|
||||
|
||||
IntoWriter interface {
|
||||
WritePointsInto(p *IntoWriteRequest) error
|
||||
}
|
||||
|
||||
Logger *log.Logger
|
||||
QueryLogEnabled bool
|
||||
|
||||
|
@ -53,6 +57,13 @@ type QueryExecutor struct {
|
|||
Store *Store
|
||||
}
|
||||
|
||||
// partial copy of cluster.WriteRequest
|
||||
type IntoWriteRequest struct {
|
||||
Database string
|
||||
RetentionPolicy string
|
||||
Points []models.Point
|
||||
}
|
||||
|
||||
// NewQueryExecutor returns an initialized QueryExecutor
|
||||
func NewQueryExecutor(store *Store) *QueryExecutor {
|
||||
return &QueryExecutor{
|
||||
|
@ -275,34 +286,6 @@ func (q *QueryExecutor) PlanSelect(stmt *influxql.SelectStatement, chunkSize int
|
|||
return executor, nil
|
||||
}
|
||||
|
||||
// executeSelectStatement plans and executes a select statement against a database.
|
||||
func (q *QueryExecutor) executeSelectStatement(statementID int, stmt *influxql.SelectStatement, results chan *influxql.Result, chunkSize int) error {
|
||||
// Plan statement execution.
|
||||
e, err := q.PlanSelect(stmt, chunkSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Execute plan.
|
||||
ch := e.Execute()
|
||||
|
||||
// Stream results from the channel. We should send an empty result if nothing comes through.
|
||||
resultSent := false
|
||||
for row := range ch {
|
||||
if row.Err != nil {
|
||||
return row.Err
|
||||
}
|
||||
resultSent = true
|
||||
results <- &influxql.Result{StatementID: statementID, Series: []*models.Row{row}}
|
||||
}
|
||||
|
||||
if !resultSent {
|
||||
results <- &influxql.Result{StatementID: statementID, Series: make([]*models.Row, 0)}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// expandSources expands regex sources and removes duplicates.
|
||||
// NOTE: sources must be normalized (db and rp set) before calling this function.
|
||||
func (q *QueryExecutor) expandSources(sources influxql.Sources) (influxql.Sources, error) {
|
||||
|
@ -697,15 +680,47 @@ func (q *QueryExecutor) executeStatement(statementID int, stmt influxql.Statemen
|
|||
|
||||
// Execute plan.
|
||||
ch := e.Execute()
|
||||
|
||||
var writeerr error
|
||||
var intoNum int64
|
||||
var isinto bool
|
||||
// Stream results from the channel. We should send an empty result if nothing comes through.
|
||||
resultSent := false
|
||||
for row := range ch {
|
||||
// We had a write error. Continue draining results from the channel
|
||||
// so we don't hang the goroutine in the executor.
|
||||
if writeerr != nil {
|
||||
continue
|
||||
}
|
||||
if row.Err != nil {
|
||||
return row.Err
|
||||
}
|
||||
resultSent = true
|
||||
results <- &influxql.Result{StatementID: statementID, Series: []*models.Row{row}}
|
||||
selectstmt, ok := stmt.(*influxql.SelectStatement)
|
||||
if ok && selectstmt.Target != nil {
|
||||
isinto = true
|
||||
// this is a into query. Write results back to database
|
||||
writeerr = q.writeInto(row, selectstmt)
|
||||
intoNum += int64(len(row.Values))
|
||||
} else {
|
||||
resultSent = true
|
||||
results <- &influxql.Result{StatementID: statementID, Series: []*models.Row{row}}
|
||||
}
|
||||
}
|
||||
if writeerr != nil {
|
||||
return writeerr
|
||||
} else if isinto {
|
||||
results <- &influxql.Result{
|
||||
StatementID: statementID,
|
||||
Series: []*models.Row{{
|
||||
Name: "result",
|
||||
// it seems weird to give a time here, but so much stuff breaks if you don't
|
||||
Columns: []string{"time", "written"},
|
||||
Values: [][]interface{}{{
|
||||
time.Unix(0, 0).UTC(),
|
||||
intoNum,
|
||||
}},
|
||||
}},
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if !resultSent {
|
||||
|
@ -715,6 +730,44 @@ func (q *QueryExecutor) executeStatement(statementID int, stmt influxql.Statemen
|
|||
return nil
|
||||
}
|
||||
|
||||
func (q *QueryExecutor) writeInto(row *models.Row, selectstmt *influxql.SelectStatement) error {
|
||||
// It might seem a bit weird that this is where we do this, since we will have to
|
||||
// convert rows back to points. The Executors (both aggregate and raw) are complex
|
||||
// enough that changing them to write back to the DB is going to be clumsy
|
||||
//
|
||||
// it might seem weird to have the write be in the QueryExecutor, but the interweaving of
|
||||
// limitedRowWriter and ExecuteAggregate/Raw makes it ridiculously hard to make sure that the
|
||||
// results will be the same as when queried normally.
|
||||
measurement := intoMeasurement(selectstmt)
|
||||
intodb, err := intoDB(selectstmt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rp := intoRP(selectstmt)
|
||||
points, err := convertRowToPoints(measurement, row)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, p := range points {
|
||||
fields := p.Fields()
|
||||
for _, v := range fields {
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
req := &IntoWriteRequest{
|
||||
Database: intodb,
|
||||
RetentionPolicy: rp,
|
||||
Points: points,
|
||||
}
|
||||
err = q.IntoWriter.WritePointsInto(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *QueryExecutor) executeShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) *influxql.Result {
|
||||
// Check for time in WHERE clause (not supported).
|
||||
if influxql.HasTimeExpr(stmt.Condition) {
|
||||
|
|
Loading…
Reference in New Issue