Implement setting of new series id
parent
d08bc0297d
commit
8d9a063cc2
20
database.go
20
database.go
|
@ -23,7 +23,7 @@ type Database struct {
|
||||||
series map[string]*Series // series by name
|
series map[string]*Series // series by name
|
||||||
|
|
||||||
defaultRetentionPolicy string
|
defaultRetentionPolicy string
|
||||||
maxFieldID uint64 // largest field id in use
|
maxFieldID uint32 // largest field id in use
|
||||||
}
|
}
|
||||||
|
|
||||||
// newDatabase returns an instance of Database associated with a server.
|
// newDatabase returns an instance of Database associated with a server.
|
||||||
|
@ -325,6 +325,20 @@ func (db *Database) applyCreateShardIfNotExists(id uint64, policy string, timest
|
||||||
return nil, true
|
return nil, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *Database) applySetSeriesId(name string, tags map[string]string) error {
|
||||||
|
db.mu.Lock()
|
||||||
|
defer db.mu.Unlock()
|
||||||
|
id := db.maxFieldID
|
||||||
|
// TODO: we need to save the db maxFieldId after this
|
||||||
|
db.maxFieldID += uint32(1)
|
||||||
|
var err error
|
||||||
|
db.server.meta.mustUpdate(func(tx *metatx) error {
|
||||||
|
err = tx.setSeriesId(id, db.name, name, tags)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// WriteSeries writes series data to the database.
|
// WriteSeries writes series data to the database.
|
||||||
func (db *Database) WriteSeries(retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) error {
|
func (db *Database) WriteSeries(retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) error {
|
||||||
// Find retention policy matching the series and split points by shard.
|
// Find retention policy matching the series and split points by shard.
|
||||||
|
@ -491,7 +505,7 @@ func (db *Database) applyWriteSeries(id uint64, t int64, values map[uint8]interf
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// ExecuteQuery executes a query against a database.
|
// ExecuteQuery executes a query against a database.
|
||||||
func (db *Database) ExecuteQuery(q influxql.Query) error {
|
func (db *Database) ExecuteQuery(q *influxql.Query) error {
|
||||||
panic("not yet implemented: Database.ExecuteQuery()") // TODO
|
panic("not yet implemented: Database.ExecuteQuery()") // TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -566,7 +580,7 @@ func (db *Database) UnmarshalJSON(data []byte) error {
|
||||||
type databaseJSON struct {
|
type databaseJSON struct {
|
||||||
Name string `json:"name,omitempty"`
|
Name string `json:"name,omitempty"`
|
||||||
DefaultRetentionPolicy string `json:"defaultRetentionPolicy,omitempty"`
|
DefaultRetentionPolicy string `json:"defaultRetentionPolicy,omitempty"`
|
||||||
MaxFieldID uint64 `json:"maxFieldID,omitempty"`
|
MaxFieldID uint32 `json:"maxFieldID,omitempty"`
|
||||||
Users []*DBUser `json:"users,omitempty"`
|
Users []*DBUser `json:"users,omitempty"`
|
||||||
Policies []*RetentionPolicy `json:"policies,omitempty"`
|
Policies []*RetentionPolicy `json:"policies,omitempty"`
|
||||||
Shards []*Shard `json:"shards,omitempty"`
|
Shards []*Shard `json:"shards,omitempty"`
|
||||||
|
|
29
server.go
29
server.go
|
@ -651,9 +651,7 @@ func (s *Server) applySetSeriesId(m *messaging.Message) error {
|
||||||
return ErrDatabaseNotFound
|
return ErrDatabaseNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: finish this up
|
return db.applySetSeriesId(c.Name, c.Tags)
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type setSeriesIdCommand struct {
|
type setSeriesIdCommand struct {
|
||||||
|
@ -834,6 +832,11 @@ func (tx *metatx) databases() (a []*Database) {
|
||||||
|
|
||||||
// saveDatabase persists a database to the metastore.
|
// saveDatabase persists a database to the metastore.
|
||||||
func (tx *metatx) saveDatabase(db *Database) error {
|
func (tx *metatx) saveDatabase(db *Database) error {
|
||||||
|
// TODO: ask ben how to make these two operations occur in a single atomic transaction
|
||||||
|
_, err := tx.Bucket([]byte("Series")).CreateBucketIfNotExists([]byte(db.name))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return tx.Bucket([]byte("Databases")).Put([]byte(db.name), mustMarshalJSON(db))
|
return tx.Bucket([]byte("Databases")).Put([]byte(db.name), mustMarshalJSON(db))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -867,8 +870,24 @@ func (tx *metatx) getSeriesId(database, name string, tags map[string]string) (ui
|
||||||
}
|
}
|
||||||
|
|
||||||
// the value is the bytes for a uint32, return it
|
// the value is the bytes for a uint32, return it
|
||||||
x := (*uint32)(unsafe.Pointer(&v[0]))
|
return *(*uint32)(unsafe.Pointer(&v[0])), nil
|
||||||
return *x, nil
|
}
|
||||||
|
|
||||||
|
// sets the series id for the database, name, and tags.
|
||||||
|
func (tx *metatx) setSeriesId(id uint32, database, name string, tags map[string]string) error {
|
||||||
|
b, err := tx.Bucket([]byte("Series")).Bucket([]byte(database)).CreateBucketIfNotExists([]byte(name))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tagBytes, err := tagsToBytes(tags)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
idBytes := make([]byte, 4)
|
||||||
|
*(*uint32)(unsafe.Pointer(&idBytes[0])) = id
|
||||||
|
return b.Put(tagBytes, idBytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
// used to convert the tag set to bytes for use as a key in bolt
|
// used to convert the tag set to bytes for use as a key in bolt
|
||||||
|
|
Loading…
Reference in New Issue