From 47cd03f3d3a3ae1ca4287c132743ce5b1d10d391 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 13 Jan 2015 10:16:43 -0700 Subject: [PATCH] Partial engine integration. --- database.go | 94 ++++++++++++++++++++++++++++++++++++++++-------- influxdb.go | 4 +++ server.go | 97 +++++++++++++++++++++++++++++++++++++++++++------- server_test.go | 36 ++++++++++++++----- shard.go | 4 +++ 5 files changed, 200 insertions(+), 35 deletions(-) diff --git a/database.go b/database.go index 0033be4fd8..f93a29a4e5 100644 --- a/database.go +++ b/database.go @@ -7,6 +7,8 @@ import ( "sort" "strings" "time" + + "github.com/influxdb/influxdb/influxql" ) // database is a collection of retention policies and shards. It also has methods @@ -120,7 +122,7 @@ func NewMeasurement(name string) *Measurement { // CreateFieldIfNotExists creates a new field with an autoincrementing ID. // Returns an error if 255 fields have already been created on the measurement. -func (m *Measurement) createFieldIfNotExists(name string, typ FieldType) (*Field, error) { +func (m *Measurement) createFieldIfNotExists(name string, typ influxql.DataType) (*Field, error) { // Ignore if the field already exists. if f := m.FieldByName(name); f != nil { return f, nil @@ -280,21 +282,11 @@ type Measurements []*Measurement // Field represents a series field. type Field struct { - ID uint8 `json:"id,omitempty"` - Name string `json:"name,omitempty"` - Type FieldType `json:"field"` + ID uint8 `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Type influxql.DataType `json:"type,omitempty"` } -type FieldType int - -const ( - Int64 FieldType = iota - Float64 - String - Boolean - Binary -) - // Fields represents a list of fields. type Fields []*Field @@ -306,6 +298,16 @@ type Series struct { measurement *Measurement } +// match returns true if all tags match the series' tags. +func (s *Series) match(tags map[string]string) bool { + for k, v := range tags { + if s.Tags[k] != v { + return false + } + } + return true +} + // RetentionPolicy represents a policy for creating new shards in a database and how long they're kept around for. type RetentionPolicy struct { // Unique name within database. Required. @@ -737,3 +739,67 @@ func marshalTags(tags map[string]string) []byte { } return []byte(strings.Join(s, "|")) } + +// dbi is an interface the query engine to communicate with the database during planning. +type dbi struct { + server *Server + db *database +} + +// MatchSeries returns a list of series data ids matching a name and tags. +func (dbi *dbi) MatchSeries(name string, tags map[string]string) (a []uint32) { + // Find measurement by name. + m := dbi.db.measurements[name] + if m == nil { + return nil + } + + // Match each series on the measurement by tagset. + // TODO: Use paul's fancy index. + for _, s := range m.seriesByID { + if s.match(tags) { + a = append(a, s.ID) + } + } + return +} + +// SeriesTagValues returns a slice of tag values for a series. +func (dbi *dbi) SeriesTagValues(seriesID uint32, keys []string) []string { + // Find series by id. + s := dbi.db.series[seriesID] + + // Lookup value for each key. + values := make([]string, len(keys)) + for i, keys := range keys { + values[i] = s.Tags[keys] + } + return values +} + +// Field returns the id and data type for a series field. +// Returns id of zero if not a field. +func (dbi *dbi) Field(name, field string) (fieldID uint8, typ influxql.DataType) { + // Find measurement by name. + m := dbi.db.measurements[name] + if m == nil { + return 0, influxql.Unknown + } + + // Find field by name. + f := m.FieldByName(name) + if f == nil { + return 0, influxql.Unknown + } + + return f.ID, f.Type +} + +// CreateIterator returns an iterator given a series data id, field id, & field data type. +func (dbi *dbi) CreateIterator(seriesID uint32, fieldID uint8, typ influxql.DataType, min, max time.Time, interval time.Duration) influxql.Iterator { + // TODO: Find shard group. + // TODO: Find shard for series. + // TODO: Open bolt cursor. + // TODO: Return wrapper cursor. + panic("TODO") +} diff --git a/influxdb.go b/influxdb.go index 76cc964f0e..2467894aca 100644 --- a/influxdb.go +++ b/influxdb.go @@ -95,6 +95,10 @@ var ( // ErrSeriesExists is returned when attempting to set the id of a series by database, name and tags that already exists ErrSeriesExists = errors.New("series already exists") + + // ErrNotExecuted is returned when a statement is not executed in a query. + // This can occur when a previous statement in the same query has errorred. + ErrNotExecuted = errors.New("not executed") ) // mustMarshal encodes a value to JSON. diff --git a/server.go b/server.go index afe0f03cab..26f0d39d02 100644 --- a/server.go +++ b/server.go @@ -17,6 +17,7 @@ import ( "time" "code.google.com/p/go.crypto/bcrypt" + "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/messaging" ) @@ -247,8 +248,9 @@ func (s *Server) setClient(client MessagingClient) error { // Start goroutine to read messages from the broker. if client != nil { - s.done = make(chan struct{}, 0) - go s.processor(client, s.done) + done := make(chan struct{}, 0) + s.done = done + go s.processor(client, done) } return nil @@ -1211,18 +1213,18 @@ type createSeriesIfNotExistsCommand struct { } // WriteSeries writes series data to the database. -func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) error { +func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) (uint64, error) { // Find the id for the series and tagset seriesID, err := s.createSeriesIfNotExists(database, name, tags) if err != nil { - return err + return 0, err } // If the retention policy is not set, use the default for this database. if retentionPolicy == "" { rp, err := s.DefaultRetentionPolicy(database) if err != nil { - return fmt.Errorf("failed to determine default retention policy: %s", err.Error()) + return 0, fmt.Errorf("failed to determine default retention policy: %s", err.Error()) } retentionPolicy = rp.Name } @@ -1230,15 +1232,15 @@ func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[st // Retrieve measurement. m, err := s.measurement(database, name) if err != nil { - return err + return 0, err } else if m == nil { - return ErrMeasurementNotFound + return 0, ErrMeasurementNotFound } // Retrieve shard group. g, err := s.createShardGroupIfNotExists(database, retentionPolicy, timestamp) if err != nil { - return fmt.Errorf("create shard(%s/%d): %s", retentionPolicy, timestamp.Format(time.RFC3339Nano), err) + return 0, fmt.Errorf("create shard(%s/%d): %s", retentionPolicy, timestamp.Format(time.RFC3339Nano), err) } // Find appropriate shard within the shard group. @@ -1246,7 +1248,7 @@ func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[st // Ignore requests that have no values. if len(values) == 0 { - return nil + return 0, nil } // Convert string-key/values to fieldID-key/values. @@ -1268,7 +1270,7 @@ func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[st TopicID: sh.ID, Data: data, }) - return err + return 0, err } // If we can successfully encode the string keys to raw field ids then @@ -1279,12 +1281,11 @@ func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[st data = append(data, marshalValues(rawValues)...) // Publish "raw write series" message on shard's topic to broker. - _, err = s.client.Publish(&messaging.Message{ + return s.client.Publish(&messaging.Message{ Type: writeRawSeriesMessageType, TopicID: sh.ID, Data: data, }) - return err } type writeSeriesCommand struct { @@ -1331,7 +1332,7 @@ func (s *Server) applyWriteSeries(m *messaging.Message) error { // Find or create fields. // If too many fields are on the measurement then log the issue. // If any other error occurs then exit. - f, err := mm.createFieldIfNotExists(k, Float64) + f, err := mm.createFieldIfNotExists(k, influxql.Number) if err == ErrFieldOverflow { log.Printf("no more fields allowed: %s::%s", mm.Name, k) continue @@ -1505,6 +1506,70 @@ func (s *Server) measurement(database, name string) (*Measurement, error) { return db.measurements[name], nil } +// ExecuteQuery executes an InfluxQL query against the server. +// Returns a resultset for each statement in the query. +// Stops on first execution error that occurs. +func (s *Server) Execute(q *influxql.Query, database string) []*Result { + // Build empty resultsets. + results := make([]*Result, len(q.Statements)) + + // Execute each statement. + for i, st := range q.Statements { + switch st := st.(type) { + case *influxql.SelectStatement: + results[i] = s.executeSelectStatement(st, database) + } + } + + // Fill any empty results after error. + for i, res := range results { + if res == nil { + results[i] = &Result{Error: ErrNotExecuted} + } + } + + return results +} + +// plans and executes a select statement against a database. +func (s *Server) executeSelectStatement(stmt *influxql.SelectStatement, database string) *Result { + // Plan statement execution. + e, err := s.planSelectStatement(stmt, database) + if err != nil { + return &Result{Error: err} + } + + // Execute plan. + ch, err := e.Execute() + if err != nil { + return &Result{Error: err} + } + + // Read all rows from channel. + res := &Result{Rows: make([]*influxql.Row, 0)} + for row := range ch { + res.Rows = append(res.Rows, row) + } + + return res +} + +// plans a selection statement under lock. +func (s *Server) planSelectStatement(stmt *influxql.SelectStatement, database string) (*influxql.Executor, error) { + s.mu.Lock() + defer s.mu.Unlock() + + // Find database. + db := s.databases[database] + if db == nil { + return nil, ErrDatabaseNotFound + } + + // Plan query. + p := influxql.NewPlanner(&dbi{server: s, db: db}) + return p.Plan(stmt) +} + // processor runs in a separate goroutine and processes all incoming broker messages. func (s *Server) processor(client MessagingClient, done chan struct{}) { for { @@ -1561,6 +1626,12 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) { } } +// Result represents a resultset returned from a single statement. +type Result struct { + Rows []*influxql.Row `json:"rows"` + Error error `json:"error"` +} + // MessagingClient represents the client used to receive messages from brokers. type MessagingClient interface { // Publishes a message to the broker. diff --git a/server_test.go b/server_test.go index 0356f823b4..d99d87d052 100644 --- a/server_test.go +++ b/server_test.go @@ -505,29 +505,46 @@ func TestServer_WriteSeries(t *testing.T) { // Write series with one point to the database. tags := map[string]string{"host": "servera.influx.com", "region": "uswest"} - values := map[string]interface{}{"value": 23.2} - if err := s.WriteSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:00Z"), values); err != nil { + index, err := s.WriteSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:00Z"), map[string]interface{}{"value": 23.2}) + if err != nil { t.Fatal(err) + } else if err = s.Sync(index); err != nil { + t.Fatalf("sync error: %s", err) } - time.Sleep(1 * time.Second) // TEMP // Write another point 10 seconds later so it goes through "raw series". - if err := s.WriteSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:10Z"), values); err != nil { + index, err = s.WriteSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:10Z"), map[string]interface{}{"value": 100}) + if err != nil { t.Fatal(err) + } else if err = s.Sync(index); err != nil { + t.Fatalf("sync error: %s", err) } - time.Sleep(1 * time.Second) // TEMP // Verify a subscription was made. if !subscribed { t.Fatal("expected subscription") } - // Retrieve series data point. + // Retrieve first series data point. if v, err := s.ReadSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:00Z")); err != nil { t.Fatal(err) - } else if !reflect.DeepEqual(v, values) { + } else if !reflect.DeepEqual(v, map[string]interface{}{"value": 23.2}) { t.Fatalf("values mismatch: %#v", v) } + + // Retrieve second series data point. + if v, err := s.ReadSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:10Z")); err != nil { + t.Fatal(err) + } else if mustMarshalJSON(v) != mustMarshalJSON(map[string]interface{}{"value": 100}) { + t.Fatalf("values mismatch: %#v", mustMarshalJSON(v)) + } + + // Retrieve non-existent series data point. + if v, err := s.ReadSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:01:00Z")); err != nil { + t.Fatal(err) + } else if v != nil { + t.Fatalf("expected nil values: %#v", v) + } } func TestServer_CreateShardGroupIfNotExist(t *testing.T) { @@ -563,8 +580,11 @@ func TestServer_Measurements(t *testing.T) { tags := map[string]string{"host": "servera.influx.com", "region": "uswest"} values := map[string]interface{}{"value": 23.2} - if err := s.WriteSeries("foo", "mypolicy", "cpu_load", tags, timestamp, values); err != nil { + index, err := s.WriteSeries("foo", "mypolicy", "cpu_load", tags, timestamp, values) + if err != nil { t.Fatal(err) + } else if err = s.Sync(index); err != nil { + t.Fatal("sync error: %s", err) } expectedMeasurementNames := []string{"cpu_load"} diff --git a/shard.go b/shard.go index 379505d13e..63bdc3a7ad 100644 --- a/shard.go +++ b/shard.go @@ -185,6 +185,10 @@ func marshalValues(values map[uint8]interface{}) []byte { // unmarshalValues decodes a byte slice into a set of field ids and values. func unmarshalValues(b []byte) map[uint8]interface{} { + if len(b) == 0 { + return nil + } + // Read the field count from the field byte. n := int(b[0])