From 8d9a063cc267ba6f3b885fcf433efce5da1fe30a Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Fri, 28 Nov 2014 14:14:04 -0500 Subject: [PATCH] Implement setting of new series id --- database.go | 20 +++++++++++++++++--- server.go | 29 ++++++++++++++++++++++++----- 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/database.go b/database.go index c2049cf63b..951a062b76 100644 --- a/database.go +++ b/database.go @@ -23,7 +23,7 @@ type Database struct { series map[string]*Series // series by name defaultRetentionPolicy string - maxFieldID uint64 // largest field id in use + maxFieldID uint32 // largest field id in use } // newDatabase returns an instance of Database associated with a server. @@ -325,6 +325,20 @@ func (db *Database) applyCreateShardIfNotExists(id uint64, policy string, timest return nil, true } +func (db *Database) applySetSeriesId(name string, tags map[string]string) error { + db.mu.Lock() + defer db.mu.Unlock() + id := db.maxFieldID + // TODO: we need to save the db maxFieldId after this + db.maxFieldID += uint32(1) + var err error + db.server.meta.mustUpdate(func(tx *metatx) error { + err = tx.setSeriesId(id, db.name, name, tags) + return err + }) + return err +} + // WriteSeries writes series data to the database. func (db *Database) WriteSeries(retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) error { // Find retention policy matching the series and split points by shard. @@ -491,7 +505,7 @@ func (db *Database) applyWriteSeries(id uint64, t int64, values map[uint8]interf */ // ExecuteQuery executes a query against a database. -func (db *Database) ExecuteQuery(q influxql.Query) error { +func (db *Database) ExecuteQuery(q *influxql.Query) error { panic("not yet implemented: Database.ExecuteQuery()") // TODO } @@ -566,7 +580,7 @@ func (db *Database) UnmarshalJSON(data []byte) error { type databaseJSON struct { Name string `json:"name,omitempty"` DefaultRetentionPolicy string `json:"defaultRetentionPolicy,omitempty"` - MaxFieldID uint64 `json:"maxFieldID,omitempty"` + MaxFieldID uint32 `json:"maxFieldID,omitempty"` Users []*DBUser `json:"users,omitempty"` Policies []*RetentionPolicy `json:"policies,omitempty"` Shards []*Shard `json:"shards,omitempty"` diff --git a/server.go b/server.go index db6499e8a0..e1f8a29bc0 100644 --- a/server.go +++ b/server.go @@ -651,9 +651,7 @@ func (s *Server) applySetSeriesId(m *messaging.Message) error { return ErrDatabaseNotFound } - // TODO: finish this up - - return nil + return db.applySetSeriesId(c.Name, c.Tags) } type setSeriesIdCommand struct { @@ -834,6 +832,11 @@ func (tx *metatx) databases() (a []*Database) { // saveDatabase persists a database to the metastore. func (tx *metatx) saveDatabase(db *Database) error { + // TODO: ask ben how to make these two operations occur in a single atomic transaction + _, err := tx.Bucket([]byte("Series")).CreateBucketIfNotExists([]byte(db.name)) + if err != nil { + return err + } return tx.Bucket([]byte("Databases")).Put([]byte(db.name), mustMarshalJSON(db)) } @@ -867,8 +870,24 @@ func (tx *metatx) getSeriesId(database, name string, tags map[string]string) (ui } // the value is the bytes for a uint32, return it - x := (*uint32)(unsafe.Pointer(&v[0])) - return *x, nil + return *(*uint32)(unsafe.Pointer(&v[0])), nil +} + +// sets the series id for the database, name, and tags. +func (tx *metatx) setSeriesId(id uint32, database, name string, tags map[string]string) error { + b, err := tx.Bucket([]byte("Series")).Bucket([]byte(database)).CreateBucketIfNotExists([]byte(name)) + if err != nil { + return err + } + + tagBytes, err := tagsToBytes(tags) + if err != nil { + return err + } + + idBytes := make([]byte, 4) + *(*uint32)(unsafe.Pointer(&idBytes[0])) = id + return b.Put(tagBytes, idBytes) } // used to convert the tag set to bytes for use as a key in bolt