Merge pull request #1165 from influxdb/write_series
Implement writes series data on databasepull/1177/merge
commit
1e8e1f223c
289
database.go
289
database.go
|
@ -6,9 +6,10 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
"github.com/influxdb/influxdb/influxql"
|
"github.com/influxdb/influxdb/influxql"
|
||||||
// "github.com/influxdb/influxdb/messaging"
|
"github.com/influxdb/influxdb/messaging"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Database represents a collection of retention policies.
|
// Database represents a collection of retention policies.
|
||||||
|
@ -23,7 +24,6 @@ type Database struct {
|
||||||
series map[string]*Series // series by name
|
series map[string]*Series // series by name
|
||||||
|
|
||||||
defaultRetentionPolicy string
|
defaultRetentionPolicy string
|
||||||
maxFieldID uint64 // largest field id in use
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// newDatabase returns an instance of Database associated with a server.
|
// newDatabase returns an instance of Database associated with a server.
|
||||||
|
@ -75,11 +75,11 @@ func (db *Database) CreateUser(username, password string, read, write []*Matcher
|
||||||
// TODO: Authorization.
|
// TODO: Authorization.
|
||||||
|
|
||||||
c := &createDBUserCommand{
|
c := &createDBUserCommand{
|
||||||
Database: db.Name(),
|
Database: db.Name(),
|
||||||
Username: username,
|
Username: username,
|
||||||
Password: password,
|
Password: password,
|
||||||
ReadFrom: read,
|
ReadFrom: read,
|
||||||
WriteTo: write,
|
WriteTo: write,
|
||||||
}
|
}
|
||||||
_, err := db.server.broadcast(createDBUserMessageType, c)
|
_, err := db.server.broadcast(createDBUserMessageType, c)
|
||||||
return err
|
return err
|
||||||
|
@ -288,11 +288,34 @@ func (db *Database) RetentionPolicies() []*RetentionPolicy {
|
||||||
return policies
|
return policies
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateShardIfNotExists creates a shard for a retention policy for a given timestamp.
|
// CreateShardsIfNotExist creates all the shards for a retention policy for the interval a timestamp falls into. Note that multiple shards can be created for each bucket of time.
|
||||||
func (db *Database) CreateShardIfNotExists(space string, timestamp time.Time) error {
|
func (db *Database) CreateShardsIfNotExists(policy string, timestamp time.Time) ([]*Shard, error) {
|
||||||
c := &createShardIfNotExistsSpaceCommand{Database: db.name, Space: space, Timestamp: timestamp}
|
db.mu.RLock()
|
||||||
_, err := db.server.broadcast(createShardIfNotExistsMessageType, c)
|
p := db.policies[policy]
|
||||||
return err
|
db.mu.RUnlock()
|
||||||
|
if p == nil {
|
||||||
|
return nil, ErrRetentionPolicyNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
c := &createShardIfNotExistsCommand{Database: db.name, Policy: policy, Timestamp: timestamp}
|
||||||
|
if _, err := db.server.broadcast(createShardIfNotExistsMessageType, c); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.shardsByTimestamp(timestamp), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// createShardIfNotExists returns the shard for a given retention policy, series, and timestamp. If it doesn't exist, it will create all shards for the given timestamp
|
||||||
|
func (db *Database) createShardIfNotExists(policy *RetentionPolicy, id uint32, timestamp time.Time) (*Shard, error) {
|
||||||
|
if s := policy.shardBySeriesTimestamp(id, timestamp); s != nil {
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := db.CreateShardsIfNotExists(policy.Name, timestamp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return policy.shardBySeriesTimestamp(id, timestamp), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *Database) applyCreateShardIfNotExists(id uint64, policy string, timestamp time.Time) (error, bool) {
|
func (db *Database) applyCreateShardIfNotExists(id uint64, policy string, timestamp time.Time) (error, bool) {
|
||||||
|
@ -331,134 +354,128 @@ func (db *Database) applyCreateShardIfNotExists(id uint64, policy string, timest
|
||||||
return nil, true
|
return nil, true
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteSeries writes series data to the database.
|
func (db *Database) applyCreateSeriesIfNotExists(name string, tags map[string]string) error {
|
||||||
func (db *Database) WriteSeries(name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) error {
|
|
||||||
panic("not yet implemented: Database.WriteSeries()")
|
|
||||||
|
|
||||||
/* TEMPORARILY REMOVED FOR PROTOBUFS.
|
|
||||||
// Find retention policy matching the series and split points by shard.
|
|
||||||
db.mu.Lock()
|
db.mu.Lock()
|
||||||
name := db.name
|
defer db.mu.Unlock()
|
||||||
space := db.retentionPolicyBySeries(series.GetName())
|
db.server.meta.mustUpdate(func(tx *metatx) error {
|
||||||
db.mu.Unlock()
|
return tx.createSeriesIfNotExists(db.name, name, tags)
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure there is a space available.
|
// WriteSeries writes series data to the database.
|
||||||
if space == nil {
|
func (db *Database) WriteSeries(retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) error {
|
||||||
|
// Find retention policy matching the series and split points by shard.
|
||||||
|
db.mu.RLock()
|
||||||
|
rp, ok := db.policies[retentionPolicy]
|
||||||
|
db.mu.RUnlock()
|
||||||
|
|
||||||
|
// Ensure the policy was found
|
||||||
|
if !ok {
|
||||||
return ErrRetentionPolicyNotFound
|
return ErrRetentionPolicyNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
// Group points by shard.
|
// get the id for the series and tagset
|
||||||
pointsByShard, unassigned := space.Split(series.Points)
|
id, err := db.createSeriesIfNotExists(name, tags)
|
||||||
|
if err != nil {
|
||||||
// Request shard creation for timestamps for missing shards.
|
return err
|
||||||
for _, p := range unassigned {
|
|
||||||
timestamp := time.Unix(0, p.GetTimestamp())
|
|
||||||
if err := db.CreateShardIfNotExists(space.Name, timestamp); err != nil {
|
|
||||||
return fmt.Errorf("create shard(%s/%d): %s", space.Name, timestamp.Format(time.RFC3339Nano), err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to split the points again. Fail if it doesn't work this time.
|
// now write it into the shard
|
||||||
pointsByShard, unassigned = space.Split(series.Points)
|
s, err := db.createShardIfNotExists(rp, id, timestamp)
|
||||||
if len(unassigned) > 0 {
|
if err != nil {
|
||||||
return fmt.Errorf("unmatched points in space(%s): %#v", unassigned)
|
return fmt.Errorf("create shard(%s/%d): %s", retentionPolicy, timestamp.Format(time.RFC3339Nano), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publish each group of points.
|
data, err := marshalPoint(id, timestamp, values)
|
||||||
for shardID, points := range pointsByShard {
|
if err != nil {
|
||||||
// Marshal series into protobuf format.
|
return err
|
||||||
req := &protocol.WriteSeriesRequest{
|
|
||||||
Database: proto.String(name),
|
|
||||||
Series: &protocol.Series{
|
|
||||||
Name: series.Name,
|
|
||||||
Fields: series.Fields,
|
|
||||||
FieldIds: series.FieldIds,
|
|
||||||
ShardId: proto.Uint64(shardID),
|
|
||||||
Points: points,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
data, err := proto.Marshal(req)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Publish "write series" message on shard's topic to broker.
|
|
||||||
m := &messaging.Message{
|
|
||||||
Type: writeSeriesMessageType,
|
|
||||||
TopicID: shardID,
|
|
||||||
Data: data,
|
|
||||||
}
|
|
||||||
index, err := db.server.client.Publish(m)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := db.server.sync(index); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
// Publish "write series" message on shard's topic to broker.
|
||||||
*/
|
m := &messaging.Message{
|
||||||
|
Type: writeSeriesMessageType,
|
||||||
|
TopicID: s.ID,
|
||||||
|
Data: data,
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = db.server.client.Publish(m)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
/* TEMPORARILY REMOVED FOR PROTOBUFS.
|
func marshalPoint(seriesID uint32, timestamp time.Time, values map[string]interface{}) ([]byte, error) {
|
||||||
func (db *Database) applyWriteSeries(id uint64, t int64, values map[uint8]interface{}) error {
|
b := make([]byte, 12)
|
||||||
db.mu.Lock()
|
*(*uint32)(unsafe.Pointer(&b[0])) = seriesID
|
||||||
defer db.mu.Unlock()
|
*(*int64)(unsafe.Pointer(&b[4])) = timestamp.UnixNano()
|
||||||
shard := db.shard(s.GetShardId())
|
|
||||||
|
d, err := json.Marshal(values)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return append(b, d...), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func unmarshalPoint(data []byte) (uint32, time.Time, map[string]interface{}, error) {
|
||||||
|
id := *(*uint32)(unsafe.Pointer(&data[0]))
|
||||||
|
ts := *(*int64)(unsafe.Pointer(&data[4]))
|
||||||
|
timestamp := time.Unix(0, ts)
|
||||||
|
var v map[string]interface{}
|
||||||
|
|
||||||
|
err := json.Unmarshal(data[12:], &v)
|
||||||
|
return id, timestamp, v, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// seriesID returns the unique id of a series and tagset and a bool indicating if it was found
|
||||||
|
func (db *Database) seriesID(name string, tags map[string]string) (uint32, bool) {
|
||||||
|
var id uint32
|
||||||
|
var err error
|
||||||
|
db.server.meta.view(func(tx *metatx) error {
|
||||||
|
id, err = tx.seriesID(db.name, name, tags)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return uint32(0), false
|
||||||
|
}
|
||||||
|
return id, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) createSeriesIfNotExists(name string, tags map[string]string) (uint32, error) {
|
||||||
|
if id, ok := db.seriesID(name, tags); ok {
|
||||||
|
return id, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
c := &createSeriesIfNotExistsCommand{
|
||||||
|
Database: db.Name(),
|
||||||
|
Name: name,
|
||||||
|
Tags: tags,
|
||||||
|
}
|
||||||
|
_, err := db.server.broadcast(createSeriesIfNotExistsMessageType, c)
|
||||||
|
if err != nil {
|
||||||
|
return uint32(0), err
|
||||||
|
}
|
||||||
|
id, ok := db.seriesID(name, tags)
|
||||||
|
if !ok {
|
||||||
|
return uint32(0), ErrSeriesNotFound
|
||||||
|
}
|
||||||
|
return id, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *Database) applyWriteSeries(shardID uint64, overwrite bool, data []byte) error {
|
||||||
|
db.mu.RLock()
|
||||||
|
s := db.shards[shardID]
|
||||||
|
db.mu.RUnlock()
|
||||||
|
|
||||||
// Find shard.
|
// Find shard.
|
||||||
if s == nil {
|
if s == nil {
|
||||||
return ErrShardNotFound
|
return ErrShardNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find or create series.
|
|
||||||
var changed bool
|
|
||||||
var series *Series
|
|
||||||
if series = db.series[s.GetName()]; series == nil {
|
|
||||||
series = &Series{Name: s.GetName()}
|
|
||||||
db.series[s.GetName()] = series
|
|
||||||
changed = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Assign field ids.
|
|
||||||
s.FieldIds = nil
|
|
||||||
for _, name := range s.GetFields() {
|
|
||||||
// Find field on series.
|
|
||||||
var fieldID uint64
|
|
||||||
for _, f := range series.Fields {
|
|
||||||
if f.Name == name {
|
|
||||||
fieldID = f.ID
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a new field, if not exists.
|
|
||||||
if fieldID == 0 {
|
|
||||||
db.maxFieldID++
|
|
||||||
fieldID = db.maxFieldID
|
|
||||||
series.Fields = append(series.Fields, &Field{ID: fieldID, Name: name})
|
|
||||||
changed = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Append the field id.
|
|
||||||
s.FieldIds = append(s.FieldIds, fieldID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Perist to metastore if changed.
|
|
||||||
if changed {
|
|
||||||
db.server.meta.mustUpdate(func(tx *metatx) error {
|
|
||||||
return tx.saveDatabase(db)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write to shard.
|
// Write to shard.
|
||||||
return shard.writeSeries(s)
|
return s.writeSeries(overwrite, data)
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
// ExecuteQuery executes a query against a database.
|
// ExecuteQuery executes a query against a database.
|
||||||
func (db *Database) ExecuteQuery(q influxql.Query) error {
|
func (db *Database) ExecuteQuery(q *influxql.Query) error {
|
||||||
panic("not yet implemented: Database.ExecuteQuery()") // TODO
|
panic("not yet implemented: Database.ExecuteQuery()") // TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -473,7 +490,6 @@ func (db *Database) MarshalJSON() ([]byte, error) {
|
||||||
var o databaseJSON
|
var o databaseJSON
|
||||||
o.Name = db.name
|
o.Name = db.name
|
||||||
o.DefaultRetentionPolicy = db.defaultRetentionPolicy
|
o.DefaultRetentionPolicy = db.defaultRetentionPolicy
|
||||||
o.MaxFieldID = db.maxFieldID
|
|
||||||
for _, u := range db.users {
|
for _, u := range db.users {
|
||||||
o.Users = append(o.Users, u)
|
o.Users = append(o.Users, u)
|
||||||
}
|
}
|
||||||
|
@ -500,7 +516,6 @@ func (db *Database) UnmarshalJSON(data []byte) error {
|
||||||
// Copy over properties from intermediate type.
|
// Copy over properties from intermediate type.
|
||||||
db.name = o.Name
|
db.name = o.Name
|
||||||
db.defaultRetentionPolicy = o.DefaultRetentionPolicy
|
db.defaultRetentionPolicy = o.DefaultRetentionPolicy
|
||||||
db.maxFieldID = o.MaxFieldID
|
|
||||||
|
|
||||||
// Copy users.
|
// Copy users.
|
||||||
db.users = make(map[string]*DBUser)
|
db.users = make(map[string]*DBUser)
|
||||||
|
@ -533,7 +548,6 @@ func (db *Database) UnmarshalJSON(data []byte) error {
|
||||||
type databaseJSON struct {
|
type databaseJSON struct {
|
||||||
Name string `json:"name,omitempty"`
|
Name string `json:"name,omitempty"`
|
||||||
DefaultRetentionPolicy string `json:"defaultRetentionPolicy,omitempty"`
|
DefaultRetentionPolicy string `json:"defaultRetentionPolicy,omitempty"`
|
||||||
MaxFieldID uint64 `json:"maxFieldID,omitempty"`
|
|
||||||
Users []*DBUser `json:"users,omitempty"`
|
Users []*DBUser `json:"users,omitempty"`
|
||||||
Policies []*RetentionPolicy `json:"policies,omitempty"`
|
Policies []*RetentionPolicy `json:"policies,omitempty"`
|
||||||
Shards []*Shard `json:"shards,omitempty"`
|
Shards []*Shard `json:"shards,omitempty"`
|
||||||
|
@ -571,33 +585,26 @@ func NewRetentionPolicy(name string) *RetentionPolicy {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
// shardBySeriesTimestamp returns the shard in the space that owns a given timestamp for a given series id.
|
||||||
// SplitPoints groups a set of points by shard id.
|
|
||||||
// Also returns a list of timestamps that did not match an existing shard.
|
|
||||||
func (rp *RetentionPolicy) Split(a []*protocol.Point) (points map[uint64][]*protocol.Point, unassigned []*protocol.Point) {
|
|
||||||
points = make(map[uint64][]*protocol.Point)
|
|
||||||
for _, p := range a {
|
|
||||||
if s := rp.ShardByTimestamp(time.Unix(0, p.GetTimestamp())); s != nil {
|
|
||||||
points[s.ID] = append(points[s.ID], p)
|
|
||||||
} else {
|
|
||||||
unassigned = append(unassigned, p)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
// ShardByTimestamp returns the shard in the space that owns a given timestamp.
|
|
||||||
// Returns nil if the shard does not exist.
|
// Returns nil if the shard does not exist.
|
||||||
func (rp *RetentionPolicy) ShardByTimestamp(timestamp time.Time) *Shard {
|
func (rp *RetentionPolicy) shardBySeriesTimestamp(id uint32, timestamp time.Time) *Shard {
|
||||||
for _, s := range rp.Shards {
|
shards := rp.shardsByTimestamp(timestamp)
|
||||||
if timeBetween(timestamp, s.StartTime, s.EndTime) {
|
if len(shards) > 0 {
|
||||||
return s
|
return shards[int(id)%len(shards)]
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rp *RetentionPolicy) shardsByTimestamp(timestamp time.Time) []*Shard {
|
||||||
|
shards := make([]*Shard, 0, rp.SplitN)
|
||||||
|
for _, s := range rp.Shards {
|
||||||
|
if timeBetween(timestamp, s.StartTime, s.EndTime) {
|
||||||
|
shards = append(shards, s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return shards
|
||||||
|
}
|
||||||
|
|
||||||
// MarshalJSON encodes a retention policy to a JSON-encoded byte slice.
|
// MarshalJSON encodes a retention policy to a JSON-encoded byte slice.
|
||||||
func (rp *RetentionPolicy) MarshalJSON() ([]byte, error) {
|
func (rp *RetentionPolicy) MarshalJSON() ([]byte, error) {
|
||||||
return json.Marshal(&retentionPolicyJSON{
|
return json.Marshal(&retentionPolicyJSON{
|
||||||
|
|
|
@ -394,38 +394,30 @@ func TestDatabase_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound(t *testin
|
||||||
|
|
||||||
// Ensure the database can write data to the database.
|
// Ensure the database can write data to the database.
|
||||||
func TestDatabase_WriteSeries(t *testing.T) {
|
func TestDatabase_WriteSeries(t *testing.T) {
|
||||||
t.Skip("pending")
|
|
||||||
|
|
||||||
/* TEMPORARILY REMOVED FOR PROTOBUFS.
|
|
||||||
s := OpenServer(NewMessagingClient())
|
s := OpenServer(NewMessagingClient())
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
s.CreateDatabase("foo")
|
s.CreateDatabase("foo")
|
||||||
db := s.Database("foo")
|
db := s.Database("foo")
|
||||||
db.CreateRetentionPolicies(&influxdb.RetentionPolicy{Name: "myspace", Duration: 1 * time.Hour})
|
db.CreateRetentionPolicy(&influxdb.RetentionPolicy{Name: "myspace", Duration: 1 * time.Hour})
|
||||||
db.CreateUser("susy", "pass", nil)
|
db.CreateUser("susy", "pass", nil, nil)
|
||||||
|
|
||||||
// Write series with one point to the database.
|
// Write series with one point to the database.
|
||||||
timestamp := mustParseMicroTime("2000-01-01T00:00:00Z")
|
timestamp := mustParseTime("2000-01-01T00:00:00Z")
|
||||||
series := &protocol.Series{
|
|
||||||
Name: proto.String("cpu_load"),
|
name := "cpu_load"
|
||||||
Fields: []string{"myval"},
|
tags := map[string]string{"host": "servera.influx.com", "region": "uswest"}
|
||||||
Points: []*protocol.Point{
|
values := map[string]interface{}{"value": 23.2}
|
||||||
{
|
|
||||||
Values: []*protocol.FieldValue{{Int64Value: proto.Int64(100)}},
|
if err := db.WriteSeries("myspace", name, tags, timestamp, values); err != nil {
|
||||||
Timestamp: proto.Int64(timestamp),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if err := db.WriteSeries(series); err != nil {
|
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute a query and record all series found.
|
t.Skip("pending")
|
||||||
q := mustParseQuery(`select myval from cpu_load`)
|
// TODO: Execute a query and record all series found.
|
||||||
if err := db.ExecuteQuery(q); err != nil {
|
// q := mustParseQuery(`select myval from cpu_load`)
|
||||||
t.Fatal(err)
|
// if err := db.ExecuteQuery(q); err != nil {
|
||||||
}
|
// t.Fatal(err)
|
||||||
*/
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
// mustParseQuery parses a query string into a query object. Panic on error.
|
// mustParseQuery parses a query string into a query object. Panic on error.
|
||||||
|
@ -443,12 +435,11 @@ func TestDatabase_CreateShardIfNotExist(t *testing.T) {
|
||||||
s.CreateDatabase("foo")
|
s.CreateDatabase("foo")
|
||||||
db := s.Database("foo")
|
db := s.Database("foo")
|
||||||
|
|
||||||
rp := &influxdb.RetentionPolicy{Name: "bar"}
|
if err := db.CreateRetentionPolicy(&influxdb.RetentionPolicy{Name: "bar"}); err != nil {
|
||||||
if err := db.CreateRetentionPolicy(rp); err != nil {
|
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.CreateShardIfNotExists("bar", time.Time{}); err != nil {
|
if _, err := db.CreateShardsIfNotExists("bar", time.Time{}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -263,9 +263,9 @@ func (h *Handler) serveAuthenticateDBUser(w http.ResponseWriter, r *http.Request
|
||||||
func (h *Handler) serveDBUsers(w http.ResponseWriter, r *http.Request) {}
|
func (h *Handler) serveDBUsers(w http.ResponseWriter, r *http.Request) {}
|
||||||
|
|
||||||
type userJSON struct {
|
type userJSON struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Password string `json:"password"`
|
Password string `json:"password"`
|
||||||
IsAdmin bool `json:"isAdmin"`
|
IsAdmin bool `json:"isAdmin"`
|
||||||
ReadFrom []*Matcher `json:"readFrom"`
|
ReadFrom []*Matcher `json:"readFrom"`
|
||||||
WriteTo []*Matcher `json:"writeTo"`
|
WriteTo []*Matcher `json:"writeTo"`
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,7 +112,7 @@ func TestHandler_Shards(t *testing.T) {
|
||||||
srvr.CreateDatabase("foo")
|
srvr.CreateDatabase("foo")
|
||||||
db := srvr.Database("foo")
|
db := srvr.Database("foo")
|
||||||
db.CreateRetentionPolicy(influxdb.NewRetentionPolicy("bar"))
|
db.CreateRetentionPolicy(influxdb.NewRetentionPolicy("bar"))
|
||||||
db.CreateShardIfNotExists("bar", time.Time{})
|
db.CreateShardsIfNotExists("bar", time.Time{})
|
||||||
s := NewHTTPServer(srvr)
|
s := NewHTTPServer(srvr)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
status, body := MustHTTP("GET", s.URL+`/db/foo/shards`, "")
|
status, body := MustHTTP("GET", s.URL+`/db/foo/shards`, "")
|
||||||
|
|
|
@ -65,4 +65,10 @@ var (
|
||||||
|
|
||||||
// ErrInvalidQuery is returned when executing an unknown query type.
|
// ErrInvalidQuery is returned when executing an unknown query type.
|
||||||
ErrInvalidQuery = errors.New("invalid query")
|
ErrInvalidQuery = errors.New("invalid query")
|
||||||
|
|
||||||
|
// ErrSeriesNotFound is returned when looking up a non-existent series by database, name and tags
|
||||||
|
ErrSeriesNotFound = errors.New("series not found")
|
||||||
|
|
||||||
|
// ErrSeriesExists is returned when attempting to set the id of a series by database, name and tags that already exists
|
||||||
|
ErrSeriesExists = errors.New("series already exists")
|
||||||
)
|
)
|
||||||
|
|
144
server.go
144
server.go
|
@ -7,8 +7,10 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
"github.com/boltdb/bolt"
|
"github.com/boltdb/bolt"
|
||||||
"github.com/influxdb/influxdb/messaging"
|
"github.com/influxdb/influxdb/messaging"
|
||||||
|
@ -49,6 +51,7 @@ const (
|
||||||
dbUserSetPasswordMessageType = messaging.MessageType(0x09)
|
dbUserSetPasswordMessageType = messaging.MessageType(0x09)
|
||||||
createShardIfNotExistsMessageType = messaging.MessageType(0x0a)
|
createShardIfNotExistsMessageType = messaging.MessageType(0x0a)
|
||||||
setDefaultRetentionPolicyMessageType = messaging.MessageType(0x0b)
|
setDefaultRetentionPolicyMessageType = messaging.MessageType(0x0b)
|
||||||
|
createSeriesIfNotExistsMessageType = messaging.MessageType(0x0c)
|
||||||
|
|
||||||
// per-topic messages
|
// per-topic messages
|
||||||
writeSeriesMessageType = messaging.MessageType(0x80)
|
writeSeriesMessageType = messaging.MessageType(0x80)
|
||||||
|
@ -66,8 +69,9 @@ type Server struct {
|
||||||
|
|
||||||
meta *metastore // metadata store
|
meta *metastore // metadata store
|
||||||
|
|
||||||
databases map[string]*Database // databases by name
|
databases map[string]*Database // databases by name
|
||||||
admins map[string]*ClusterAdmin // admins by name
|
databasesByShard map[uint64]*Database // databases by shard id
|
||||||
|
admins map[string]*ClusterAdmin // admins by name
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer returns a new instance of Server.
|
// NewServer returns a new instance of Server.
|
||||||
|
@ -75,11 +79,12 @@ type Server struct {
|
||||||
func NewServer(client MessagingClient) *Server {
|
func NewServer(client MessagingClient) *Server {
|
||||||
assert(client != nil, "messaging client required")
|
assert(client != nil, "messaging client required")
|
||||||
return &Server{
|
return &Server{
|
||||||
client: client,
|
client: client,
|
||||||
meta: &metastore{},
|
meta: &metastore{},
|
||||||
databases: make(map[string]*Database),
|
databases: make(map[string]*Database),
|
||||||
admins: make(map[string]*ClusterAdmin),
|
databasesByShard: make(map[uint64]*Database),
|
||||||
errors: make(map[uint64]error),
|
admins: make(map[string]*ClusterAdmin),
|
||||||
|
errors: make(map[uint64]error),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,6 +170,10 @@ func (s *Server) load() error {
|
||||||
for _, db := range tx.databases() {
|
for _, db := range tx.databases() {
|
||||||
db.server = s
|
db.server = s
|
||||||
s.databases[db.name] = db
|
s.databases[db.name] = db
|
||||||
|
|
||||||
|
for sh := range db.shards {
|
||||||
|
s.databasesByShard[sh] = db
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load cluster admins.
|
// Load cluster admins.
|
||||||
|
@ -312,7 +321,7 @@ type deleteDatabaseCommand struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) applyCreateShardIfNotExists(m *messaging.Message) error {
|
func (s *Server) applyCreateShardIfNotExists(m *messaging.Message) error {
|
||||||
var c createShardIfNotExistsSpaceCommand
|
var c createShardIfNotExistsCommand
|
||||||
mustUnmarshalJSON(m.Data, &c)
|
mustUnmarshalJSON(m.Data, &c)
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
|
@ -322,8 +331,10 @@ func (s *Server) applyCreateShardIfNotExists(m *messaging.Message) error {
|
||||||
return ErrDatabaseNotFound
|
return ErrDatabaseNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
|
shardID := m.Index
|
||||||
|
|
||||||
// Check if a matching shard already exists.
|
// Check if a matching shard already exists.
|
||||||
if err, ok := db.applyCreateShardIfNotExists(m.Index, c.Space, c.Timestamp); err != nil {
|
if err, ok := db.applyCreateShardIfNotExists(shardID, c.Policy, c.Timestamp); err != nil {
|
||||||
return err
|
return err
|
||||||
} else if !ok {
|
} else if !ok {
|
||||||
return nil
|
return nil
|
||||||
|
@ -334,12 +345,14 @@ func (s *Server) applyCreateShardIfNotExists(m *messaging.Message) error {
|
||||||
return tx.saveDatabase(db)
|
return tx.saveDatabase(db)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
s.databasesByShard[shardID] = db
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type createShardIfNotExistsSpaceCommand struct {
|
type createShardIfNotExistsCommand struct {
|
||||||
Database string `json:"name"`
|
Database string `json:"name"`
|
||||||
Space string `json:"space"`
|
Policy string `json:"policy"`
|
||||||
Timestamp time.Time `json:"timestamp"`
|
Timestamp time.Time `json:"timestamp"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -637,29 +650,41 @@ type setDefaultRetentionPolicyCommand struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
}
|
}
|
||||||
|
|
||||||
/* TEMPORARILY REMOVED FOR PROTOBUFS.
|
func (s *Server) applyCreateSeriesIfNotExists(m *messaging.Message) error {
|
||||||
func (s *Server) applyWriteSeries(m *messaging.Message) error {
|
var c createSeriesIfNotExistsCommand
|
||||||
req := &protocol.WriteSeriesRequest{}
|
mustUnmarshalJSON(m.Data, &c)
|
||||||
if err := proto.Unmarshal(m.Data, req); err != nil {
|
|
||||||
panic("unmarshal request: " + err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
db := s.databases[c.Database]
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
// Retrieve the database.
|
|
||||||
db := s.databases[req.GetDatabase()]
|
|
||||||
if db == nil {
|
if db == nil {
|
||||||
return ErrDatabaseNotFound
|
return ErrDatabaseNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.applyWriteSeries(id, t, values); err != nil {
|
return db.applyCreateSeriesIfNotExists(c.Name, c.Tags)
|
||||||
return err
|
}
|
||||||
|
|
||||||
|
type createSeriesIfNotExistsCommand struct {
|
||||||
|
Database string `json:"database"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Tags map[string]string `json:"tags"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) applyWriteSeries(m *messaging.Message) error {
|
||||||
|
// Retrieve the database.
|
||||||
|
s.mu.RLock()
|
||||||
|
db := s.databasesByShard[m.TopicID]
|
||||||
|
s.mu.RUnlock()
|
||||||
|
|
||||||
|
if db == nil {
|
||||||
|
return ErrDatabaseNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
// TODO: enable some way to specify if the data should be overwritten
|
||||||
|
overwrite := true
|
||||||
|
return db.applyWriteSeries(m.TopicID, overwrite, m.Data)
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
// processor runs in a separate goroutine and processes all incoming broker messages.
|
// processor runs in a separate goroutine and processes all incoming broker messages.
|
||||||
func (s *Server) processor(done chan struct{}) {
|
func (s *Server) processor(done chan struct{}) {
|
||||||
|
@ -676,6 +701,8 @@ func (s *Server) processor(done chan struct{}) {
|
||||||
// Process message.
|
// Process message.
|
||||||
var err error
|
var err error
|
||||||
switch m.Type {
|
switch m.Type {
|
||||||
|
case writeSeriesMessageType:
|
||||||
|
err = s.applyWriteSeries(m)
|
||||||
case createDatabaseMessageType:
|
case createDatabaseMessageType:
|
||||||
err = s.applyCreateDatabase(m)
|
err = s.applyCreateDatabase(m)
|
||||||
case deleteDatabaseMessageType:
|
case deleteDatabaseMessageType:
|
||||||
|
@ -698,10 +725,8 @@ func (s *Server) processor(done chan struct{}) {
|
||||||
err = s.applyCreateShardIfNotExists(m)
|
err = s.applyCreateShardIfNotExists(m)
|
||||||
case setDefaultRetentionPolicyMessageType:
|
case setDefaultRetentionPolicyMessageType:
|
||||||
err = s.applySetDefaultRetentionPolicy(m)
|
err = s.applySetDefaultRetentionPolicy(m)
|
||||||
case writeSeriesMessageType:
|
case createSeriesIfNotExistsMessageType:
|
||||||
/* TEMPORARILY REMOVED FOR PROTOBUFS.
|
err = s.applyCreateSeriesIfNotExists(m)
|
||||||
err = s.applyWriteSeries(m)
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync high water mark and errors.
|
// Sync high water mark and errors.
|
||||||
|
@ -807,6 +832,10 @@ func (tx *metatx) databases() (a []*Database) {
|
||||||
|
|
||||||
// saveDatabase persists a database to the metastore.
|
// saveDatabase persists a database to the metastore.
|
||||||
func (tx *metatx) saveDatabase(db *Database) error {
|
func (tx *metatx) saveDatabase(db *Database) error {
|
||||||
|
_, err := tx.Bucket([]byte("Series")).CreateBucketIfNotExists([]byte(db.name))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return tx.Bucket([]byte("Databases")).Put([]byte(db.name), mustMarshalJSON(db))
|
return tx.Bucket([]byte("Databases")).Put([]byte(db.name), mustMarshalJSON(db))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -815,6 +844,65 @@ func (tx *metatx) deleteDatabase(name string) error {
|
||||||
return tx.Bucket([]byte("Databases")).Delete([]byte(name))
|
return tx.Bucket([]byte("Databases")).Delete([]byte(name))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// returns a unique series id by database, name and tags. Returns ErrSeriesNotFound
|
||||||
|
func (tx *metatx) seriesID(database, name string, tags map[string]string) (uint32, error) {
|
||||||
|
// get the bucket that holds series data for the database
|
||||||
|
b := tx.Bucket([]byte("Series")).Bucket([]byte(database))
|
||||||
|
if b == nil {
|
||||||
|
return uint32(0), ErrSeriesNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
// get the bucket that holds tag data for the series name
|
||||||
|
b = b.Bucket([]byte(name))
|
||||||
|
if b == nil {
|
||||||
|
return uint32(0), ErrSeriesNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
// look up the id of the tagset
|
||||||
|
tagBytes := tagsToBytes(tags)
|
||||||
|
v := b.Get(tagBytes)
|
||||||
|
if v == nil {
|
||||||
|
return uint32(0), ErrSeriesNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
// the value is the bytes for a uint32, return it
|
||||||
|
return *(*uint32)(unsafe.Pointer(&v[0])), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// sets the series id for the database, name, and tags.
|
||||||
|
func (tx *metatx) createSeriesIfNotExists(database, name string, tags map[string]string) error {
|
||||||
|
db := tx.Bucket([]byte("Series")).Bucket([]byte(database))
|
||||||
|
b, err := db.CreateBucketIfNotExists([]byte(name))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
id, _ := db.NextSequence()
|
||||||
|
|
||||||
|
tagBytes := tagsToBytes(tags)
|
||||||
|
|
||||||
|
idBytes := make([]byte, 4)
|
||||||
|
*(*uint32)(unsafe.Pointer(&idBytes[0])) = uint32(id)
|
||||||
|
|
||||||
|
return b.Put(tagBytes, idBytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
// used to convert the tag set to bytes for use as a key in bolt
|
||||||
|
func tagsToBytes(tags map[string]string) []byte {
|
||||||
|
s := make([]string, 0, len(tags))
|
||||||
|
// pull out keys to sort
|
||||||
|
for k := range tags {
|
||||||
|
s = append(s, k)
|
||||||
|
}
|
||||||
|
sort.Strings(s)
|
||||||
|
|
||||||
|
// now append on the key values in key sorted order
|
||||||
|
for _, k := range s {
|
||||||
|
s = append(s, tags[k])
|
||||||
|
}
|
||||||
|
return []byte(strings.Join(s, "|"))
|
||||||
|
}
|
||||||
|
|
||||||
// series returns a series by database and name.
|
// series returns a series by database and name.
|
||||||
func (tx *metatx) series(database, name string) (s *Series) {
|
func (tx *metatx) series(database, name string) (s *Series) {
|
||||||
b := tx.Bucket([]byte("Series")).Bucket([]byte(database))
|
b := tx.Bucket([]byte("Series")).Bucket([]byte(database))
|
||||||
|
|
|
@ -290,12 +290,6 @@ func mustParseTime(s string) time.Time {
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
|
|
||||||
// mustParseMicroTime parses an IS0-8601 string into microseconds since epoch.
|
|
||||||
// Panic on error.
|
|
||||||
func mustParseMicroTime(s string) int64 {
|
|
||||||
return mustParseTime(s).UnixNano() / int64(time.Microsecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
// errstr is an ease-of-use function to convert an error to a string.
|
// errstr is an ease-of-use function to convert an error to a string.
|
||||||
func errstr(err error) string {
|
func errstr(err error) string {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
9
shard.go
9
shard.go
|
@ -60,7 +60,14 @@ func (s *Shard) close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// write writes series data to a shard.
|
// write writes series data to a shard.
|
||||||
func (s *Shard) writeSeries(name string, tags map[string]string, value interface{}) error {
|
func (s *Shard) writeSeries(overwrite bool, data []byte) error {
|
||||||
|
id, timestamp, values, err := unmarshalPoint(data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: make this work
|
||||||
|
fmt.Println("writeSeries: ", id, timestamp, values)
|
||||||
return s.store.Update(func(tx *bolt.Tx) error {
|
return s.store.Update(func(tx *bolt.Tx) error {
|
||||||
// TODO
|
// TODO
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in New Issue