Allow coordinator to map points to multiple shards

Given a WritePointsRequest, MapShards will return a map
of shard ID to points.  This map can then be used to write
each point to the respective shard owners.
pull/2638/head
Jason Wilder 2015-05-08 14:27:31 -06:00 committed by Paul Dix
parent 2bc299d808
commit a6ae533572
6 changed files with 409 additions and 0 deletions

122
coordinator.go Normal file
View File

@ -0,0 +1,122 @@
package influxdb
import (
"errors"
"time"
"github.com/influxdb/influxdb/data"
"github.com/influxdb/influxdb/meta"
)
const defaultReadTimeout = 5 * time.Second
var ErrTimeout = errors.New("timeout")
// Coordinator handle queries and writes across multiple local and remote
// data nodes.
type Coordinator struct {
MetaStore meta.Store
DataNode data.Node
}
// ShardMapping contiains a mapping of a shardIDs to a points
type ShardMapping map[uint64][]Point
func (c *Coordinator) MapShards(wp *WritePointsRequest) (ShardMapping, error) {
// holds the start time ranges for required shard groups
timeRanges := map[time.Time]*meta.ShardGroupInfo{}
rp, err := c.MetaStore.RetentionPolicy(wp.Database, wp.RetentionPolicy)
if err != nil {
return nil, err
}
for _, p := range wp.Points {
timeRanges[p.Time.Truncate(rp.ShardGroupDuration)] = nil
}
// holds all the shard groups and shards that are required for writes
for t := range timeRanges {
g, err := c.MetaStore.CreateShardGroupIfNotExists(wp.Database, wp.RetentionPolicy, t)
if err != nil {
return nil, err
}
timeRanges[t] = g
}
shardMapping := make(ShardMapping)
for _, p := range wp.Points {
g, ok := timeRanges[p.Time.Truncate(rp.ShardGroupDuration)]
sid := p.SeriesID()
shardInfo := g.Shards[sid%uint64(len(g.Shards))]
points, ok := shardMapping[shardInfo.ID]
if !ok {
shardMapping[shardInfo.ID] = []Point{p}
} else {
shardMapping[shardInfo.ID] = append(points, p)
}
}
return shardMapping, nil
}
// Write is coordinates multiple writes across local and remote data nodes
// according the request consistency level
func (c *Coordinator) Write(p *WritePointsRequest) error {
// FIXME: use the consistency level specified by the WritePointsRequest
pol := newConsistencyPolicyN(1)
_, err := c.MapShards(p)
if err != nil {
return err
}
// FIXME: build set of local and remote point writers
ws := []PointsWriter{}
type result struct {
writerID int
err error
}
ch := make(chan result, len(ws))
for i, w := range ws {
go func(id int, w PointsWriter) {
err := w.Write(p)
ch <- result{id, err}
}(i, w)
}
timeout := time.After(defaultReadTimeout)
for range ws {
select {
case <-timeout:
// return timeout error to caller
return ErrTimeout
case res := <-ch:
if !pol.IsDone(res.writerID, res.err) {
continue
}
if res.err != nil {
return res.err
}
return nil
}
}
panic("unreachable or bad policy impl")
}
func (c *Coordinator) Execute(q *QueryRequest) (chan *Result, error) {
return nil, nil
}
// remoteWriter is a PointWriter for a remote data node
type remoteWriter struct {
//ShardInfo []ShardInfo
//DataNodes DataNodes
}
func (w *remoteWriter) Write(p *WritePointsRequest) error {
return nil
}

130
coordinator_test.go Normal file
View File

@ -0,0 +1,130 @@
package influxdb_test
import (
"fmt"
"testing"
"time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/test"
)
func TestCoordinatorWriteOne(t *testing.T) {
t.Skip("later")
ms := test.MetaStore{}
ms.RetentionPolicyFn = func(db, rp string) (*meta.RetentionPolicyInfo, error) {
return nil, fmt.Errorf("boom!")
}
c := influxdb.Coordinator{MetaStore: ms}
pr := &influxdb.WritePointsRequest{
Database: "mydb",
RetentionPolicy: "myrp",
ConsistencyLevel: influxdb.ConsistencyLevelOne,
}
pr.AddPoint("cpu", 1.0, time.Now(), nil)
if err := c.Write(pr); err != nil {
t.Fatalf("Coordinator.Write() failed: %v", err)
}
}
// TestCoordinatorEnsureShardMappingOne tests that a single point maps to
// a single shard
func TestCoordinatorEnsureShardMappingOne(t *testing.T) {
ms := test.MetaStore{}
rp := test.NewRetentionPolicy("myp", time.Hour, 3)
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
return rp, nil
}
ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
return &rp.ShardGroups[0], nil
}
c := influxdb.Coordinator{MetaStore: ms}
pr := &influxdb.WritePointsRequest{
Database: "mydb",
RetentionPolicy: "myrp",
ConsistencyLevel: influxdb.ConsistencyLevelOne,
}
pr.AddPoint("cpu", 1.0, time.Now(), nil)
var (
shardMappings influxdb.ShardMapping
err error
)
if shardMappings, err = c.MapShards(pr); err != nil {
t.Fatalf("unexpected an error: %v", err)
}
if exp := 1; len(shardMappings) != exp {
t.Errorf("MapShards() len mismatch. got %v, exp %v", len(shardMappings), exp)
}
}
// TestCoordinatorEnsureShardMappingMultiple tests that MapShards maps multiple points
// across shard group boundaries to multiple shards
func TestCoordinatorEnsureShardMappingMultiple(t *testing.T) {
ms := test.MetaStore{}
rp := test.NewRetentionPolicy("myp", time.Hour, 3)
test.AttachShardGroupInfo(rp, []uint64{1, 2, 3})
test.AttachShardGroupInfo(rp, []uint64{1, 2, 3})
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
return rp, nil
}
ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
for i, sg := range rp.ShardGroups {
if timestamp.Equal(sg.StartTime) || timestamp.After(sg.StartTime) && timestamp.Before(sg.EndTime) {
return &rp.ShardGroups[i], nil
}
}
panic("should not get here")
}
c := influxdb.Coordinator{MetaStore: ms}
pr := &influxdb.WritePointsRequest{
Database: "mydb",
RetentionPolicy: "myrp",
ConsistencyLevel: influxdb.ConsistencyLevelOne,
}
// Three points that range over the shardGroup duration (1h) and should map to two
// distinct shards
pr.AddPoint("cpu", 1.0, time.Unix(0, 0), nil)
pr.AddPoint("cpu", 2.0, time.Unix(0, 0).Add(time.Hour), nil)
pr.AddPoint("cpu", 3.0, time.Unix(0, 0).Add(time.Hour+time.Second), nil)
var (
shardMappings influxdb.ShardMapping
err error
)
if shardMappings, err = c.MapShards(pr); err != nil {
t.Fatalf("unexpected an error: %v", err)
}
if exp := 2; len(shardMappings) != exp {
t.Errorf("MapShards() len mismatch. got %v, exp %v", len(shardMappings), exp)
}
for _, points := range shardMappings {
// First shard shoud have 1 point w/ first point added
if len(points) == 1 && points[0].Time != pr.Points[0].Time {
t.Fatalf("MapShards() value mismatch. got %v, exp %v", points[0].Time, pr.Points[0].Time)
}
// Second shard shoud have the last two points added
if len(points) == 2 && points[0].Time != pr.Points[1].Time {
t.Fatalf("MapShards() value mismatch. got %v, exp %v", points[0].Time, pr.Points[1].Time)
}
if len(points) == 2 && points[1].Time != pr.Points[2].Time {
t.Fatalf("MapShards() value mismatch. got %v, exp %v", points[1].Time, pr.Points[2].Time)
}
}
}

74
points.go Normal file
View File

@ -0,0 +1,74 @@
package influxdb
import (
"hash/fnv"
"sort"
"time"
)
// Point defines the values that will be written to the database
type Point struct {
Name string
Tags Tags
Time time.Time
Fields map[string]interface{}
}
func (p *Point) SeriesID() uint64 {
// <measurementName>|<tagKey>|<tagKey>|<tagValue>|<tagValue>
// cpu|host|servera
encodedTags := p.Tags.Marshal()
size := len(p.Name) + len(encodedTags)
if len(encodedTags) > 0 {
size++
}
b := make([]byte, 0, size)
b = append(b, p.Name...)
if len(encodedTags) > 0 {
b = append(b, '|')
}
b = append(b, encodedTags...)
// TODO pick a better hashing that guarantees uniqueness
// TODO create a cash for faster lookup
h := fnv.New64a()
h.Write(b)
sum := h.Sum64()
return sum
}
type Tags map[string]string
func (t Tags) Marshal() []byte {
// Empty maps marshal to empty bytes.
if len(t) == 0 {
return nil
}
// Extract keys and determine final size.
sz := (len(t) * 2) - 1 // separators
keys := make([]string, 0, len(t))
for k, v := range t {
keys = append(keys, k)
sz += len(k) + len(v)
}
sort.Strings(keys)
// Generate marshaled bytes.
b := make([]byte, sz)
buf := b
for _, k := range keys {
copy(buf, k)
buf[len(k)] = '|'
buf = buf[len(k)+1:]
}
for i, k := range keys {
v := t[k]
copy(buf, v)
if i < len(keys)-1 {
buf[len(v)] = '|'
buf = buf[len(v)+1:]
}
}
return b
}

24
points_test.go Normal file
View File

@ -0,0 +1,24 @@
package influxdb_test
import (
"testing"
"github.com/influxdb/influxdb"
)
var tags = influxdb.Tags{"foo": "bar", "apple": "orange", "host": "serverA", "region": "uswest"}
func TestMarshal(t *testing.T) {
got := tags.Marshal()
if exp := "apple|foo|host|region|orange|bar|serverA|uswest"; string(got) != exp {
t.Log("got: ", string(got))
t.Log("exp: ", exp)
t.Error("invalid match")
}
}
func BenchmarkMarshal(b *testing.B) {
for i := 0; i < b.N; i++ {
tags.Marshal()
}
}

45
shard_test.go Normal file
View File

@ -0,0 +1,45 @@
package influxdb
import (
"reflect"
"testing"
"time"
)
func TestShardWrite(t *testing.T) {
// Enable when shard can convert a WritePointsRequest to stored data.
// Needs filed encoding/types saved on the shard
t.Skip("not implemented yet")
sh := &Shard{ID: 1}
pt := Point{
Name: "cpu",
Tags: map[string]string{"host": "server"},
Time: time.Unix(1, 2),
Fields: map[string]interface{}{"value": 1.0},
}
pr := &WritePointsRequest{
Database: "foo",
RetentionPolicy: "default",
Points: []Point{
pt},
}
if err := sh.Write(pr); err != nil {
t.Errorf("LocalWriter.Write() failed: %v", err)
}
p, err := sh.Read(pt.Time)
if err != nil {
t.Fatalf("LocalWriter.Read() failed: %v", err)
}
if exp := 1; len(p) != exp {
t.Fatalf("LocalWriter.Read() points len mismatch. got %v, exp %v", len(p), exp)
}
if !reflect.DeepEqual(p[0], pt) {
t.Fatalf("LocalWriter.Read() point mismatch. got %v, exp %v", p[0], pt)
}
}

View File

@ -14,11 +14,16 @@ var (
)
type MetaStore struct {
<<<<<<< HEAD
OpenFn func(path string) error
CloseFn func() error
CreateContinuousQueryFn func(query string) (*meta.ContinuousQueryInfo, error)
DropContinuousQueryFn func(query string) error
=======
CreateContinuousQueryFn func(q *influxql.CreateContinuousQueryStatement) (*meta.ContinuousQueryInfo, error)
DropContinuousQueryFn func(q *influxql.DropContinuousQueryStatement) error
>>>>>>> Allow coordinator to map points to multiple shards
NodeFn func(id uint64) (*meta.NodeInfo, error)
NodeByHostFn func(host string) (*meta.NodeInfo, error)
@ -48,6 +53,7 @@ type MetaStore struct {
SetPrivilegeFn func(p influxql.Privilege, username string, dbname string) error
}
<<<<<<< HEAD
func (m MetaStore) Open(path string) error {
return m.OpenFn(path)
}
@ -62,6 +68,14 @@ func (m MetaStore) CreateContinuousQuery(query string) (*meta.ContinuousQueryInf
func (m MetaStore) DropContinuousQuery(query string) error {
return m.DropContinuousQueryFn(query)
=======
func (m MetaStore) CreateContinuousQuery(q *influxql.CreateContinuousQueryStatement) (*meta.ContinuousQueryInfo, error) {
return m.CreateContinuousQueryFn(q)
}
func (m MetaStore) DropContinuousQuery(q *influxql.DropContinuousQueryStatement) error {
return m.DropContinuousQueryFn(q)
>>>>>>> Allow coordinator to map points to multiple shards
}
func (m MetaStore) Node(id uint64) (*meta.NodeInfo, error) {