diff --git a/.gitignore b/.gitignore index 212af7d4d7..147b881227 100644 --- a/.gitignore +++ b/.gitignore @@ -56,7 +56,6 @@ benchmark.log # config file config.toml -/data/ # test data files integration/migration_data/ diff --git a/consistency.go b/consistency.go new file mode 100644 index 0000000000..50d17e8eca --- /dev/null +++ b/consistency.go @@ -0,0 +1,61 @@ +package influxdb + +// ConsistencyLevel represent a required replication criteria before a write can +// be returned as successful +type ConsistencyLevel int + +const ( + // ConsistencyLevelAny allows for hinted hand off, potentially no write happened yet + ConsistencyLevelAny ConsistencyLevel = iota + + // ConsistencyLevelOne requires at least one data node acknowledged a write + ConsistencyLevelOne + + // ConsistencyLevelOne requires a quorum of data nodes to acknowledge a write + ConsistencyLevelQuorum + + // ConsistencyLevelAll requires all data nodes to acknowledge a write + ConsistencyLevelAll +) + +func newConsistencyPolicyN(need int) ConsistencyPolicy { + return &policyNum{ + need: need, + } +} + +func newConsistencyOwnerPolicy(ownerID int) ConsistencyPolicy { + return &policyOwner{ + ownerID: ownerID, + } +} + +// ConsistencyPolicy verifies a write consistency level has be met +type ConsistencyPolicy interface { + IsDone(writerID int, err error) bool +} + +// policyNum implements One, Quorum, and All +type policyNum struct { + failed, succeeded, need int +} + +// IsDone determines if the policy has been satisfied based on the given +// writerID or error +func (p *policyNum) IsDone(writerID int, err error) bool { + if err == nil { + p.succeeded++ + return p.succeeded >= p.need + } + p.failed++ + return p.need-p.failed-p.succeeded >= p.need-p.succeeded + +} + +type policyOwner struct { + ownerID int +} + +func (p *policyOwner) IsDone(writerID int, err error) bool { + return p.ownerID == writerID +} diff --git a/consistency_test.go b/consistency_test.go new file mode 100644 index 0000000000..cc4dc34d23 --- /dev/null +++ b/consistency_test.go @@ -0,0 +1,120 @@ +package influxdb + +import ( + "fmt" + "testing" +) + +func TestConsistencyOne(t *testing.T) { + ap := newConsistencyPolicyN(1) + + if got := ap.IsDone(0, nil); got != true { + t.Errorf("ack one mismatch: got %v, exp %v", got, true) + } +} + +func TestConsistencyOneError(t *testing.T) { + ap := newConsistencyPolicyN(1) + + if got := ap.IsDone(0, fmt.Errorf("foo")); got != false { + t.Errorf("ack one error mismatch: got %v, exp %v", got, false) + } +} + +func TestConsistencyOneMultiple(t *testing.T) { + ap := newConsistencyPolicyN(1) + + if got := ap.IsDone(0, nil); got != true { + t.Errorf("ack one error mismatch: got %v, exp %v", got, true) + } + + if got := ap.IsDone(1, nil); got != true { + t.Errorf("ack one error mismatch: got %v, exp %v", got, true) + } +} + +func TestConsistencyAll(t *testing.T) { + ap := newConsistencyPolicyN(3) + + if got := ap.IsDone(0, nil); got != false { + t.Errorf("ack one error mismatch: got %v, exp %v", got, false) + } + + if got := ap.IsDone(1, nil); got != false { + t.Errorf("ack one error mismatch: got %v, exp %v", got, false) + } + + if got := ap.IsDone(2, nil); got != true { + t.Errorf("ack one error mismatch: got %v, exp %v", got, true) + } +} + +func TestConsistencyAllError(t *testing.T) { + ap := newConsistencyPolicyN(3) + + if got := ap.IsDone(0, nil); got != false { + t.Errorf("ack one error mismatch: got %v, exp %v", got, false) + } + + if got := ap.IsDone(1, fmt.Errorf("foo")); got != false { + t.Errorf("ack one error mismatch: got %v, exp %v", got, false) + } + + if got := ap.IsDone(2, nil); got != false { + t.Errorf("ack one error mismatch: got %v, exp %v", got, true) + } +} + +func TestConsistencyQuorumError(t *testing.T) { + ap := newConsistencyPolicyN(2) + + if got := ap.IsDone(0, nil); got != false { + t.Errorf("ack one error mismatch: got %v, exp %v", got, false) + } + + if got := ap.IsDone(1, fmt.Errorf("foo")); got != false { + t.Errorf("ack one error mismatch: got %v, exp %v", got, false) + } + + if got := ap.IsDone(2, nil); got != true { + t.Errorf("ack one error mismatch: got %v, exp %v", got, true) + } +} + +func TestConsistencyOwner(t *testing.T) { + ap := newConsistencyOwnerPolicy(2) + + // non-owner, not done + if got := ap.IsDone(0, nil); got != false { + t.Errorf("ack one error mismatch: got %v, exp %v", got, false) + } + + // non-owner, not done + if got := ap.IsDone(1, nil); got != false { + t.Errorf("ack one error mismatch: got %v, exp %v", got, false) + } + + // owner, should be done + if got := ap.IsDone(2, nil); got != true { + t.Errorf("ack one error mismatch: got %v, exp %v", got, true) + } +} + +func TestConsistencyOwnerError(t *testing.T) { + ap := newConsistencyOwnerPolicy(2) + + // non-owner succeeds, should not be done + if got := ap.IsDone(0, nil); got != false { + t.Errorf("ack one error mismatch: got %v, exp %v", got, false) + } + + // non-owner failed, should not be done + if got := ap.IsDone(1, fmt.Errorf("foo")); got != false { + t.Errorf("ack one error mismatch: got %v, exp %v", got, false) + } + + // owner failed + if got := ap.IsDone(2, fmt.Errorf("foo")); got != true { + t.Errorf("ack one error mismatch: got %v, exp %v", got, true) + } +} diff --git a/coordinator.go b/coordinator.go new file mode 100644 index 0000000000..8f6ad84bab --- /dev/null +++ b/coordinator.go @@ -0,0 +1,70 @@ +package influxdb + +import ( + "errors" + "time" +) + +const defaultReadTimeout = 5 * time.Second + +var ErrTimeout = errors.New("timeout") + +// Coordinator handle queries and writes across multiple local and remote +// data nodes. +type Coordinator struct { +} + +// Write is coordinates multiple writes across local and remote data nodes +// according the request consistency level +func (c *Coordinator) Write(p *WritePointsRequest) error { + + // FIXME: use the consistency level specified by the WritePointsRequest + pol := newConsistencyPolicyN(1) + + // FIXME: build set of local and remote point writers + ws := []PointsWriter{} + + type result struct { + writerID int + err error + } + ch := make(chan result, len(ws)) + for i, w := range ws { + go func(id int, w PointsWriter) { + err := w.Write(p) + ch <- result{id, err} + }(i, w) + } + timeout := time.After(defaultReadTimeout) + for range ws { + select { + case <-timeout: + // return timeout error to caller + return ErrTimeout + case res := <-ch: + if !pol.IsDone(res.writerID, res.err) { + continue + } + if res.err != nil { + return res.err + } + return nil + } + + } + panic("unreachable or bad policy impl") +} + +func (c *Coordinator) Execute(q *QueryRequest) (chan *Result, error) { + return nil, nil +} + +// remoteWriter is a PointWriter for a remote data node +type remoteWriter struct { + //ShardInfo []ShardInfo + //DataNodes DataNodes +} + +func (w *remoteWriter) Write(p *WritePointsRequest) error { + return nil +} diff --git a/data/node.go b/data/node.go new file mode 100644 index 0000000000..29c3a0d828 --- /dev/null +++ b/data/node.go @@ -0,0 +1,18 @@ +package data + +func NewDataNode() *Node { + return &Node{} +} + +type Node struct { + //ms meta.Store +} + +func (n *Node) Open() error { + // Open shards + // Start AE for Node + return nil +} + +func (n *Node) Close() error { return nil } +func (n *Node) Init() error { return nil } diff --git a/meta/store.go b/meta/store.go new file mode 100644 index 0000000000..e0beebb655 --- /dev/null +++ b/meta/store.go @@ -0,0 +1,5 @@ +package meta + +// Store provides access to the clusters configuration and +// meta data +type Store interface{} diff --git a/server.go b/server.go index 697be7e4f8..6842adfee3 100644 --- a/server.go +++ b/server.go @@ -20,8 +20,10 @@ import ( "sync" "time" + "github.com/influxdb/influxdb/data" "github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/messaging" + "github.com/influxdb/influxdb/meta" "golang.org/x/crypto/bcrypt" ) @@ -55,6 +57,39 @@ const ( NoChunkingSize = 0 ) +// Service represents a long running task that is manged by a Server +type Service interface { + Open() error + Close() error +} + +// QueryExecutor executes a query across multiple data nodes +type QueryExecutor interface { + Execute(q *QueryRequest) (chan *Result, error) +} + +// QueryRequest represent a request to run a query across the cluster +type QueryRequest struct { + Query *influxql.Query + Database string + User *User + ChunkSize int +} + +// PayloadWriter accepts a WritePointRequest from client facing endpoints such as +// HTTP JSON API, Collectd, Graphite, OpenTSDB, etc. +type PointsWriter interface { + Write(p *WritePointsRequest) error +} + +// WritePointsRequest represents a request to write point data to the cluster +type WritePointsRequest struct { + Database string + RetentionPolicy string + ConsistencyLevel ConsistencyLevel + Points []Point +} + // Server represents a collection of metadata and raw metric data. type Server struct { mu sync.RWMutex @@ -101,6 +136,21 @@ type Server struct { // Build information. Version string CommitHash string + + // The local data node that manages local shard data + dn data.Node + + // The meta store for accessing and updating cluster and schema data + ms meta.Store + + // The services running on this node + services []Service + + // Handles write request for local and remote nodes + pw PointsWriter + + // Handles queries for local and remote nodes + qe QueryExecutor } // NewServer returns a new instance of Server. @@ -115,6 +165,8 @@ func NewServer() *Server { shards: make(map[uint64]*Shard), stats: NewStats("server"), Logger: log.New(os.Stderr, "[server] ", log.LstdFlags), + pw: &Coordinator{}, + qe: &Coordinator{}, } // Server will always return with authentication enabled. // This ensures that disabling authentication must be an explicit decision. @@ -123,6 +175,24 @@ func NewServer() *Server { return &s } +func (s *Server) openServices() error { + for _, n := range s.services { + if err := n.Open(); err != nil { + return err + } + } + return nil +} + +func (s *Server) closeServices() error { + for _, n := range s.services { + if err := n.Close(); err != nil { + return err + } + } + return nil +} + func (s *Server) BrokerURLs() []url.URL { return s.client.URLs() } @@ -225,7 +295,7 @@ func (s *Server) Open(path string, client MessagingClient) error { // TODO: Associate series ids with shards. - return nil + return s.openServices() } // opened returns true when the server is open. Must be called under lock. @@ -243,6 +313,10 @@ func (s *Server) close() error { return ErrServerClosed } + if err := s.closeServices(); err != nil { + return err + } + if s.rpDone != nil { close(s.rpDone) s.rpDone = nil diff --git a/shard.go b/shard.go index e672137f82..0316a189c1 100644 --- a/shard.go +++ b/shard.go @@ -266,6 +266,14 @@ func (s *Shard) HasDataNodeID(id uint64) bool { return false } +func (s *Shard) Write(r *WritePointsRequest) error { + return nil +} + +func (s *Shard) Read(timestamp time.Time) ([]Point, error) { + return nil, nil +} + // readSeries reads encoded series data from a shard. func (s *Shard) readSeries(seriesID uint64, timestamp int64) (values []byte, err error) { err = s.store.View(func(tx *bolt.Tx) error { diff --git a/shard_test.go b/shard_test.go new file mode 100644 index 0000000000..d69496201c --- /dev/null +++ b/shard_test.go @@ -0,0 +1,45 @@ +package influxdb + +import ( + "reflect" + "testing" + "time" +) + +func TestShardWrite(t *testing.T) { + // Enable when shard can convert a WritePointsRequest to stored data. + // Needs filed encoding/types saved on the shard + t.Skip("not implemented yet") + + sh := &Shard{ID: 1} + + pt := Point{ + Name: "cpu", + Tags: map[string]string{"host": "server"}, + Timestamp: time.Unix(1, 2), + Fields: map[string]interface{}{"value": 1.0}, + } + pr := &WritePointsRequest{ + Database: "foo", + RetentionPolicy: "default", + Points: []Point{ + pt}, + } + + if err := sh.Write(pr); err != nil { + t.Errorf("LocalWriter.Write() failed: %v", err) + } + + p, err := sh.Read(pt.Timestamp) + if err != nil { + t.Fatalf("LocalWriter.Read() failed: %v", err) + } + + if exp := 1; len(p) != exp { + t.Fatalf("LocalWriter.Read() points len mismatch. got %v, exp %v", len(p), exp) + } + + if !reflect.DeepEqual(p[0], pt) { + t.Fatalf("LocalWriter.Read() point mismatch. got %v, exp %v", p[0], pt) + } +}