wire up INTO queries.

Since INTO queries need to have absolute information about the database
to work, we need to create a loopback interface back to the cluster
in order to perform them.
pull/4409/head
Daniel Morsing 2015-09-25 18:26:05 +00:00
parent f1e0c5938f
commit 62dff895e2
5 changed files with 206 additions and 31 deletions

View File

@ -204,6 +204,18 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
return mapping, nil
}
// WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of
// a cluster structure for information. This is to avoid a circular dependency
func (w *PointsWriter) WritePointsInto(p *tsdb.IntoWriteRequest) error {
req := WritePointsRequest{
Database: p.Database,
RetentionPolicy: p.RetentionPolicy,
ConsistencyLevel: ConsistencyLevelAny,
Points: p.Points,
}
return w.WritePoints(&req)
}
// WritePoints writes across multiple local and remote data nodes according the consistency level.
func (w *PointsWriter) WritePoints(p *WritePointsRequest) error {
w.statMap.Add(statWriteReq, 1)

View File

@ -135,6 +135,9 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
s.PointsWriter.ShardWriter = s.ShardWriter
s.PointsWriter.HintedHandoff = s.HintedHandoff
// needed for executing into queries.
s.QueryExecutor.IntoWriter = s.PointsWriter
// Initialize the monitor
s.Monitor.Version = s.buildInfo.Version
s.Monitor.Commit = s.buildInfo.Commit

View File

@ -4956,3 +4956,57 @@ func TestServer_Query_FieldWithMultiplePeriodsMeasurementPrefixMatch(t *testing.
}
}
}
func TestServer_Query_IntoTarget(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}
writes := []string{
fmt.Sprintf(`foo value=1 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`foo value=2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:10Z").UnixNano()),
fmt.Sprintf(`foo value=3 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:20Z").UnixNano()),
fmt.Sprintf(`foo value=4 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:30Z").UnixNano()),
}
test := NewTest("db0", "rp0")
test.write = strings.Join(writes, "\n")
test.addQueries([]*Query{
&Query{
name: "into",
params: url.Values{"db": []string{"db0"}},
command: `SELECT value AS something INTO baz FROM foo`,
exp: `{"results":[{"series":[{"name":"result","columns":["time","written"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`,
},
&Query{
name: "confirm results",
params: url.Values{"db": []string{"db0"}},
command: `SELECT something FROM baz`,
exp: `{"results":[{"series":[{"name":"baz","columns":["time","something"],"values":[["2000-01-01T00:00:00Z",1],["2000-01-01T00:00:10Z",2],["2000-01-01T00:00:20Z",3],["2000-01-01T00:00:30Z",4]]}]}]}`,
},
}...)
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

53
tsdb/into.go Normal file
View File

@ -0,0 +1,53 @@
package tsdb
import (
"errors"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/models"
"time"
)
// convertRowToPoints will convert a query result Row into Points that can be written back in.
// Used for INTO queries
func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point, error) {
// figure out which parts of the result are the time and which are the fields
timeIndex := -1
fieldIndexes := make(map[string]int)
for i, c := range row.Columns {
if c == "time" {
timeIndex = i
} else {
fieldIndexes[c] = i
}
}
if timeIndex == -1 {
return nil, errors.New("error finding time index in result")
}
points := make([]models.Point, 0, len(row.Values))
for _, v := range row.Values {
vals := make(map[string]interface{})
for fieldName, fieldIndex := range fieldIndexes {
vals[fieldName] = v[fieldIndex]
}
p := models.NewPoint(measurementName, row.Tags, vals, v[timeIndex].(time.Time))
points = append(points, p)
}
return points, nil
}
func intoDB(stmt *influxql.SelectStatement) (string, error) {
if stmt.Target.Measurement.Database != "" {
return stmt.Target.Measurement.Database, nil
}
return "", errNoDatabaseInTarget
}
var errNoDatabaseInTarget = errors.New("no database in target")
func intoRP(stmt *influxql.SelectStatement) string { return stmt.Target.Measurement.RetentionPolicy }
func intoMeasurement(stmt *influxql.SelectStatement) string { return stmt.Target.Measurement.Name }

View File

@ -46,6 +46,10 @@ type QueryExecutor struct {
CreateMapper(shard meta.ShardInfo, stmt influxql.Statement, chunkSize int) (Mapper, error)
}
IntoWriter interface {
WritePointsInto(p *IntoWriteRequest) error
}
Logger *log.Logger
QueryLogEnabled bool
@ -53,6 +57,13 @@ type QueryExecutor struct {
Store *Store
}
// partial copy of cluster.WriteRequest
type IntoWriteRequest struct {
Database string
RetentionPolicy string
Points []models.Point
}
// NewQueryExecutor returns an initialized QueryExecutor
func NewQueryExecutor(store *Store) *QueryExecutor {
return &QueryExecutor{
@ -275,34 +286,6 @@ func (q *QueryExecutor) PlanSelect(stmt *influxql.SelectStatement, chunkSize int
return executor, nil
}
// executeSelectStatement plans and executes a select statement against a database.
func (q *QueryExecutor) executeSelectStatement(statementID int, stmt *influxql.SelectStatement, results chan *influxql.Result, chunkSize int) error {
// Plan statement execution.
e, err := q.PlanSelect(stmt, chunkSize)
if err != nil {
return err
}
// Execute plan.
ch := e.Execute()
// Stream results from the channel. We should send an empty result if nothing comes through.
resultSent := false
for row := range ch {
if row.Err != nil {
return row.Err
}
resultSent = true
results <- &influxql.Result{StatementID: statementID, Series: []*models.Row{row}}
}
if !resultSent {
results <- &influxql.Result{StatementID: statementID, Series: make([]*models.Row, 0)}
}
return nil
}
// expandSources expands regex sources and removes duplicates.
// NOTE: sources must be normalized (db and rp set) before calling this function.
func (q *QueryExecutor) expandSources(sources influxql.Sources) (influxql.Sources, error) {
@ -697,15 +680,47 @@ func (q *QueryExecutor) executeStatement(statementID int, stmt influxql.Statemen
// Execute plan.
ch := e.Execute()
var writeerr error
var intoNum int64
var isinto bool
// Stream results from the channel. We should send an empty result if nothing comes through.
resultSent := false
for row := range ch {
// We had a write error. Continue draining results from the channel
// so we don't hang the goroutine in the executor.
if writeerr != nil {
continue
}
if row.Err != nil {
return row.Err
}
resultSent = true
results <- &influxql.Result{StatementID: statementID, Series: []*models.Row{row}}
selectstmt, ok := stmt.(*influxql.SelectStatement)
if ok && selectstmt.Target != nil {
isinto = true
// this is a into query. Write results back to database
writeerr = q.writeInto(row, selectstmt)
intoNum += int64(len(row.Values))
} else {
resultSent = true
results <- &influxql.Result{StatementID: statementID, Series: []*models.Row{row}}
}
}
if writeerr != nil {
return writeerr
} else if isinto {
results <- &influxql.Result{
StatementID: statementID,
Series: []*models.Row{{
Name: "result",
// it seems weird to give a time here, but so much stuff breaks if you don't
Columns: []string{"time", "written"},
Values: [][]interface{}{{
time.Unix(0, 0).UTC(),
intoNum,
}},
}},
}
return nil
}
if !resultSent {
@ -715,6 +730,44 @@ func (q *QueryExecutor) executeStatement(statementID int, stmt influxql.Statemen
return nil
}
func (q *QueryExecutor) writeInto(row *models.Row, selectstmt *influxql.SelectStatement) error {
// It might seem a bit weird that this is where we do this, since we will have to
// convert rows back to points. The Executors (both aggregate and raw) are complex
// enough that changing them to write back to the DB is going to be clumsy
//
// it might seem weird to have the write be in the QueryExecutor, but the interweaving of
// limitedRowWriter and ExecuteAggregate/Raw makes it ridiculously hard to make sure that the
// results will be the same as when queried normally.
measurement := intoMeasurement(selectstmt)
intodb, err := intoDB(selectstmt)
if err != nil {
return err
}
rp := intoRP(selectstmt)
points, err := convertRowToPoints(measurement, row)
if err != nil {
return err
}
for _, p := range points {
fields := p.Fields()
for _, v := range fields {
if v == nil {
return nil
}
}
}
req := &IntoWriteRequest{
Database: intodb,
RetentionPolicy: rp,
Points: points,
}
err = q.IntoWriter.WritePointsInto(req)
if err != nil {
return err
}
return nil
}
func (q *QueryExecutor) executeShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) *influxql.Result {
// Check for time in WHERE clause (not supported).
if influxql.HasTimeExpr(stmt.Condition) {