diff --git a/coordinator.go b/coordinator.go new file mode 100644 index 0000000000..6a091a4c61 --- /dev/null +++ b/coordinator.go @@ -0,0 +1,122 @@ +package influxdb + +import ( + "errors" + "time" + + "github.com/influxdb/influxdb/data" + "github.com/influxdb/influxdb/meta" +) + +const defaultReadTimeout = 5 * time.Second + +var ErrTimeout = errors.New("timeout") + +// Coordinator handle queries and writes across multiple local and remote +// data nodes. +type Coordinator struct { + MetaStore meta.Store + DataNode data.Node +} + +// ShardMapping contiains a mapping of a shardIDs to a points +type ShardMapping map[uint64][]Point + +func (c *Coordinator) MapShards(wp *WritePointsRequest) (ShardMapping, error) { + + // holds the start time ranges for required shard groups + timeRanges := map[time.Time]*meta.ShardGroupInfo{} + + rp, err := c.MetaStore.RetentionPolicy(wp.Database, wp.RetentionPolicy) + if err != nil { + return nil, err + } + + for _, p := range wp.Points { + timeRanges[p.Time.Truncate(rp.ShardGroupDuration)] = nil + } + + // holds all the shard groups and shards that are required for writes + for t := range timeRanges { + g, err := c.MetaStore.CreateShardGroupIfNotExists(wp.Database, wp.RetentionPolicy, t) + if err != nil { + return nil, err + } + timeRanges[t] = g + } + + shardMapping := make(ShardMapping) + for _, p := range wp.Points { + g, ok := timeRanges[p.Time.Truncate(rp.ShardGroupDuration)] + + sid := p.SeriesID() + shardInfo := g.Shards[sid%uint64(len(g.Shards))] + points, ok := shardMapping[shardInfo.ID] + if !ok { + shardMapping[shardInfo.ID] = []Point{p} + } else { + shardMapping[shardInfo.ID] = append(points, p) + } + } + return shardMapping, nil +} + +// Write is coordinates multiple writes across local and remote data nodes +// according the request consistency level +func (c *Coordinator) Write(p *WritePointsRequest) error { + + // FIXME: use the consistency level specified by the WritePointsRequest + pol := newConsistencyPolicyN(1) + + _, err := c.MapShards(p) + if err != nil { + return err + } + + // FIXME: build set of local and remote point writers + ws := []PointsWriter{} + + type result struct { + writerID int + err error + } + ch := make(chan result, len(ws)) + for i, w := range ws { + go func(id int, w PointsWriter) { + err := w.Write(p) + ch <- result{id, err} + }(i, w) + } + timeout := time.After(defaultReadTimeout) + for range ws { + select { + case <-timeout: + // return timeout error to caller + return ErrTimeout + case res := <-ch: + if !pol.IsDone(res.writerID, res.err) { + continue + } + if res.err != nil { + return res.err + } + return nil + } + + } + panic("unreachable or bad policy impl") +} + +func (c *Coordinator) Execute(q *QueryRequest) (chan *Result, error) { + return nil, nil +} + +// remoteWriter is a PointWriter for a remote data node +type remoteWriter struct { + //ShardInfo []ShardInfo + //DataNodes DataNodes +} + +func (w *remoteWriter) Write(p *WritePointsRequest) error { + return nil +} diff --git a/coordinator_test.go b/coordinator_test.go new file mode 100644 index 0000000000..6966ba1f1b --- /dev/null +++ b/coordinator_test.go @@ -0,0 +1,130 @@ +package influxdb_test + +import ( + "fmt" + "testing" + "time" + + "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/meta" + "github.com/influxdb/influxdb/test" +) + +func TestCoordinatorWriteOne(t *testing.T) { + t.Skip("later") + ms := test.MetaStore{} + ms.RetentionPolicyFn = func(db, rp string) (*meta.RetentionPolicyInfo, error) { + return nil, fmt.Errorf("boom!") + } + c := influxdb.Coordinator{MetaStore: ms} + + pr := &influxdb.WritePointsRequest{ + Database: "mydb", + RetentionPolicy: "myrp", + ConsistencyLevel: influxdb.ConsistencyLevelOne, + } + pr.AddPoint("cpu", 1.0, time.Now(), nil) + + if err := c.Write(pr); err != nil { + t.Fatalf("Coordinator.Write() failed: %v", err) + } +} + +// TestCoordinatorEnsureShardMappingOne tests that a single point maps to +// a single shard +func TestCoordinatorEnsureShardMappingOne(t *testing.T) { + ms := test.MetaStore{} + rp := test.NewRetentionPolicy("myp", time.Hour, 3) + + ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) { + return rp, nil + } + + ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { + return &rp.ShardGroups[0], nil + } + + c := influxdb.Coordinator{MetaStore: ms} + pr := &influxdb.WritePointsRequest{ + Database: "mydb", + RetentionPolicy: "myrp", + ConsistencyLevel: influxdb.ConsistencyLevelOne, + } + pr.AddPoint("cpu", 1.0, time.Now(), nil) + + var ( + shardMappings influxdb.ShardMapping + err error + ) + if shardMappings, err = c.MapShards(pr); err != nil { + t.Fatalf("unexpected an error: %v", err) + } + + if exp := 1; len(shardMappings) != exp { + t.Errorf("MapShards() len mismatch. got %v, exp %v", len(shardMappings), exp) + } +} + +// TestCoordinatorEnsureShardMappingMultiple tests that MapShards maps multiple points +// across shard group boundaries to multiple shards +func TestCoordinatorEnsureShardMappingMultiple(t *testing.T) { + ms := test.MetaStore{} + rp := test.NewRetentionPolicy("myp", time.Hour, 3) + test.AttachShardGroupInfo(rp, []uint64{1, 2, 3}) + test.AttachShardGroupInfo(rp, []uint64{1, 2, 3}) + + ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) { + return rp, nil + } + + ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { + for i, sg := range rp.ShardGroups { + if timestamp.Equal(sg.StartTime) || timestamp.After(sg.StartTime) && timestamp.Before(sg.EndTime) { + return &rp.ShardGroups[i], nil + } + } + panic("should not get here") + } + + c := influxdb.Coordinator{MetaStore: ms} + pr := &influxdb.WritePointsRequest{ + Database: "mydb", + RetentionPolicy: "myrp", + ConsistencyLevel: influxdb.ConsistencyLevelOne, + } + + // Three points that range over the shardGroup duration (1h) and should map to two + // distinct shards + pr.AddPoint("cpu", 1.0, time.Unix(0, 0), nil) + pr.AddPoint("cpu", 2.0, time.Unix(0, 0).Add(time.Hour), nil) + pr.AddPoint("cpu", 3.0, time.Unix(0, 0).Add(time.Hour+time.Second), nil) + + var ( + shardMappings influxdb.ShardMapping + err error + ) + if shardMappings, err = c.MapShards(pr); err != nil { + t.Fatalf("unexpected an error: %v", err) + } + + if exp := 2; len(shardMappings) != exp { + t.Errorf("MapShards() len mismatch. got %v, exp %v", len(shardMappings), exp) + } + + for _, points := range shardMappings { + // First shard shoud have 1 point w/ first point added + if len(points) == 1 && points[0].Time != pr.Points[0].Time { + t.Fatalf("MapShards() value mismatch. got %v, exp %v", points[0].Time, pr.Points[0].Time) + } + + // Second shard shoud have the last two points added + if len(points) == 2 && points[0].Time != pr.Points[1].Time { + t.Fatalf("MapShards() value mismatch. got %v, exp %v", points[0].Time, pr.Points[1].Time) + } + + if len(points) == 2 && points[1].Time != pr.Points[2].Time { + t.Fatalf("MapShards() value mismatch. got %v, exp %v", points[1].Time, pr.Points[2].Time) + } + + } +} diff --git a/points.go b/points.go new file mode 100644 index 0000000000..bb82e75f00 --- /dev/null +++ b/points.go @@ -0,0 +1,74 @@ +package influxdb + +import ( + "hash/fnv" + "sort" + "time" +) + +// Point defines the values that will be written to the database +type Point struct { + Name string + Tags Tags + Time time.Time + Fields map[string]interface{} +} + +func (p *Point) SeriesID() uint64 { + + // |||| + // cpu|host|servera + encodedTags := p.Tags.Marshal() + size := len(p.Name) + len(encodedTags) + if len(encodedTags) > 0 { + size++ + } + b := make([]byte, 0, size) + b = append(b, p.Name...) + if len(encodedTags) > 0 { + b = append(b, '|') + } + b = append(b, encodedTags...) + // TODO pick a better hashing that guarantees uniqueness + // TODO create a cash for faster lookup + h := fnv.New64a() + h.Write(b) + sum := h.Sum64() + return sum +} + +type Tags map[string]string + +func (t Tags) Marshal() []byte { + // Empty maps marshal to empty bytes. + if len(t) == 0 { + return nil + } + + // Extract keys and determine final size. + sz := (len(t) * 2) - 1 // separators + keys := make([]string, 0, len(t)) + for k, v := range t { + keys = append(keys, k) + sz += len(k) + len(v) + } + sort.Strings(keys) + + // Generate marshaled bytes. + b := make([]byte, sz) + buf := b + for _, k := range keys { + copy(buf, k) + buf[len(k)] = '|' + buf = buf[len(k)+1:] + } + for i, k := range keys { + v := t[k] + copy(buf, v) + if i < len(keys)-1 { + buf[len(v)] = '|' + buf = buf[len(v)+1:] + } + } + return b +} diff --git a/points_test.go b/points_test.go new file mode 100644 index 0000000000..68db5b6d85 --- /dev/null +++ b/points_test.go @@ -0,0 +1,24 @@ +package influxdb_test + +import ( + "testing" + + "github.com/influxdb/influxdb" +) + +var tags = influxdb.Tags{"foo": "bar", "apple": "orange", "host": "serverA", "region": "uswest"} + +func TestMarshal(t *testing.T) { + got := tags.Marshal() + if exp := "apple|foo|host|region|orange|bar|serverA|uswest"; string(got) != exp { + t.Log("got: ", string(got)) + t.Log("exp: ", exp) + t.Error("invalid match") + } +} + +func BenchmarkMarshal(b *testing.B) { + for i := 0; i < b.N; i++ { + tags.Marshal() + } +} diff --git a/shard_test.go b/shard_test.go new file mode 100644 index 0000000000..2c80e84db4 --- /dev/null +++ b/shard_test.go @@ -0,0 +1,45 @@ +package influxdb + +import ( + "reflect" + "testing" + "time" +) + +func TestShardWrite(t *testing.T) { + // Enable when shard can convert a WritePointsRequest to stored data. + // Needs filed encoding/types saved on the shard + t.Skip("not implemented yet") + + sh := &Shard{ID: 1} + + pt := Point{ + Name: "cpu", + Tags: map[string]string{"host": "server"}, + Time: time.Unix(1, 2), + Fields: map[string]interface{}{"value": 1.0}, + } + pr := &WritePointsRequest{ + Database: "foo", + RetentionPolicy: "default", + Points: []Point{ + pt}, + } + + if err := sh.Write(pr); err != nil { + t.Errorf("LocalWriter.Write() failed: %v", err) + } + + p, err := sh.Read(pt.Time) + if err != nil { + t.Fatalf("LocalWriter.Read() failed: %v", err) + } + + if exp := 1; len(p) != exp { + t.Fatalf("LocalWriter.Read() points len mismatch. got %v, exp %v", len(p), exp) + } + + if !reflect.DeepEqual(p[0], pt) { + t.Fatalf("LocalWriter.Read() point mismatch. got %v, exp %v", p[0], pt) + } +} diff --git a/test/meta_store.go b/test/meta_store.go index 1ea5f0e684..06e0f65239 100644 --- a/test/meta_store.go +++ b/test/meta_store.go @@ -14,11 +14,16 @@ var ( ) type MetaStore struct { +<<<<<<< HEAD OpenFn func(path string) error CloseFn func() error CreateContinuousQueryFn func(query string) (*meta.ContinuousQueryInfo, error) DropContinuousQueryFn func(query string) error +======= + CreateContinuousQueryFn func(q *influxql.CreateContinuousQueryStatement) (*meta.ContinuousQueryInfo, error) + DropContinuousQueryFn func(q *influxql.DropContinuousQueryStatement) error +>>>>>>> Allow coordinator to map points to multiple shards NodeFn func(id uint64) (*meta.NodeInfo, error) NodeByHostFn func(host string) (*meta.NodeInfo, error) @@ -48,6 +53,7 @@ type MetaStore struct { SetPrivilegeFn func(p influxql.Privilege, username string, dbname string) error } +<<<<<<< HEAD func (m MetaStore) Open(path string) error { return m.OpenFn(path) } @@ -62,6 +68,14 @@ func (m MetaStore) CreateContinuousQuery(query string) (*meta.ContinuousQueryInf func (m MetaStore) DropContinuousQuery(query string) error { return m.DropContinuousQueryFn(query) +======= +func (m MetaStore) CreateContinuousQuery(q *influxql.CreateContinuousQueryStatement) (*meta.ContinuousQueryInfo, error) { + return m.CreateContinuousQueryFn(q) +} + +func (m MetaStore) DropContinuousQuery(q *influxql.DropContinuousQueryStatement) error { + return m.DropContinuousQueryFn(q) +>>>>>>> Allow coordinator to map points to multiple shards } func (m MetaStore) Node(id uint64) (*meta.NodeInfo, error) {