Write path interfaces

This adds some placeholder interfaces for the new write path.
pull/2518/head
Jason Wilder 2015-05-07 22:47:52 -06:00
parent b8cf01dd4a
commit 229638cfaf
9 changed files with 402 additions and 2 deletions

1
.gitignore vendored
View File

@ -56,7 +56,6 @@ benchmark.log
# config file
config.toml
/data/
# test data files
integration/migration_data/

61
consistency.go Normal file
View File

@ -0,0 +1,61 @@
package influxdb
// ConsistencyLevel represent a required replication criteria before a write can
// be returned as successful
type ConsistencyLevel int
const (
// ConsistencyLevelAny allows for hinted hand off, potentially no write happened yet
ConsistencyLevelAny ConsistencyLevel = iota
// ConsistencyLevelOne requires at least one data node acknowledged a write
ConsistencyLevelOne
// ConsistencyLevelOne requires a quorum of data nodes to acknowledge a write
ConsistencyLevelQuorum
// ConsistencyLevelAll requires all data nodes to acknowledge a write
ConsistencyLevelAll
)
func newConsistencyPolicyN(need int) ConsistencyPolicy {
return &policyNum{
need: need,
}
}
func newConsistencyOwnerPolicy(ownerID int) ConsistencyPolicy {
return &policyOwner{
ownerID: ownerID,
}
}
// ConsistencyPolicy verifies a write consistency level has be met
type ConsistencyPolicy interface {
IsDone(writerID int, err error) bool
}
// policyNum implements One, Quorum, and All
type policyNum struct {
failed, succeeded, need int
}
// IsDone determines if the policy has been satisfied based on the given
// writerID or error
func (p *policyNum) IsDone(writerID int, err error) bool {
if err == nil {
p.succeeded++
return p.succeeded >= p.need
}
p.failed++
return p.need-p.failed-p.succeeded >= p.need-p.succeeded
}
type policyOwner struct {
ownerID int
}
func (p *policyOwner) IsDone(writerID int, err error) bool {
return p.ownerID == writerID
}

120
consistency_test.go Normal file
View File

@ -0,0 +1,120 @@
package influxdb
import (
"fmt"
"testing"
)
func TestConsistencyOne(t *testing.T) {
ap := newConsistencyPolicyN(1)
if got := ap.IsDone(0, nil); got != true {
t.Errorf("ack one mismatch: got %v, exp %v", got, true)
}
}
func TestConsistencyOneError(t *testing.T) {
ap := newConsistencyPolicyN(1)
if got := ap.IsDone(0, fmt.Errorf("foo")); got != false {
t.Errorf("ack one error mismatch: got %v, exp %v", got, false)
}
}
func TestConsistencyOneMultiple(t *testing.T) {
ap := newConsistencyPolicyN(1)
if got := ap.IsDone(0, nil); got != true {
t.Errorf("ack one error mismatch: got %v, exp %v", got, true)
}
if got := ap.IsDone(1, nil); got != true {
t.Errorf("ack one error mismatch: got %v, exp %v", got, true)
}
}
func TestConsistencyAll(t *testing.T) {
ap := newConsistencyPolicyN(3)
if got := ap.IsDone(0, nil); got != false {
t.Errorf("ack one error mismatch: got %v, exp %v", got, false)
}
if got := ap.IsDone(1, nil); got != false {
t.Errorf("ack one error mismatch: got %v, exp %v", got, false)
}
if got := ap.IsDone(2, nil); got != true {
t.Errorf("ack one error mismatch: got %v, exp %v", got, true)
}
}
func TestConsistencyAllError(t *testing.T) {
ap := newConsistencyPolicyN(3)
if got := ap.IsDone(0, nil); got != false {
t.Errorf("ack one error mismatch: got %v, exp %v", got, false)
}
if got := ap.IsDone(1, fmt.Errorf("foo")); got != false {
t.Errorf("ack one error mismatch: got %v, exp %v", got, false)
}
if got := ap.IsDone(2, nil); got != false {
t.Errorf("ack one error mismatch: got %v, exp %v", got, true)
}
}
func TestConsistencyQuorumError(t *testing.T) {
ap := newConsistencyPolicyN(2)
if got := ap.IsDone(0, nil); got != false {
t.Errorf("ack one error mismatch: got %v, exp %v", got, false)
}
if got := ap.IsDone(1, fmt.Errorf("foo")); got != false {
t.Errorf("ack one error mismatch: got %v, exp %v", got, false)
}
if got := ap.IsDone(2, nil); got != true {
t.Errorf("ack one error mismatch: got %v, exp %v", got, true)
}
}
func TestConsistencyOwner(t *testing.T) {
ap := newConsistencyOwnerPolicy(2)
// non-owner, not done
if got := ap.IsDone(0, nil); got != false {
t.Errorf("ack one error mismatch: got %v, exp %v", got, false)
}
// non-owner, not done
if got := ap.IsDone(1, nil); got != false {
t.Errorf("ack one error mismatch: got %v, exp %v", got, false)
}
// owner, should be done
if got := ap.IsDone(2, nil); got != true {
t.Errorf("ack one error mismatch: got %v, exp %v", got, true)
}
}
func TestConsistencyOwnerError(t *testing.T) {
ap := newConsistencyOwnerPolicy(2)
// non-owner succeeds, should not be done
if got := ap.IsDone(0, nil); got != false {
t.Errorf("ack one error mismatch: got %v, exp %v", got, false)
}
// non-owner failed, should not be done
if got := ap.IsDone(1, fmt.Errorf("foo")); got != false {
t.Errorf("ack one error mismatch: got %v, exp %v", got, false)
}
// owner failed
if got := ap.IsDone(2, fmt.Errorf("foo")); got != true {
t.Errorf("ack one error mismatch: got %v, exp %v", got, true)
}
}

70
coordinator.go Normal file
View File

@ -0,0 +1,70 @@
package influxdb
import (
"errors"
"time"
)
const defaultReadTimeout = 5 * time.Second
var ErrTimeout = errors.New("timeout")
// Coordinator handle queries and writes across multiple local and remote
// data nodes.
type Coordinator struct {
}
// Write is coordinates multiple writes across local and remote data nodes
// according the request consistency level
func (c *Coordinator) Write(p *WritePointsRequest) error {
// FIXME: use the consistency level specified by the WritePointsRequest
pol := newConsistencyPolicyN(1)
// FIXME: build set of local and remote point writers
ws := []PointsWriter{}
type result struct {
writerID int
err error
}
ch := make(chan result, len(ws))
for i, w := range ws {
go func(id int, w PointsWriter) {
err := w.Write(p)
ch <- result{id, err}
}(i, w)
}
timeout := time.After(defaultReadTimeout)
for range ws {
select {
case <-timeout:
// return timeout error to caller
return ErrTimeout
case res := <-ch:
if !pol.IsDone(res.writerID, res.err) {
continue
}
if res.err != nil {
return res.err
}
return nil
}
}
panic("unreachable or bad policy impl")
}
func (c *Coordinator) Execute(q *QueryRequest) (chan *Result, error) {
return nil, nil
}
// remoteWriter is a PointWriter for a remote data node
type remoteWriter struct {
//ShardInfo []ShardInfo
//DataNodes DataNodes
}
func (w *remoteWriter) Write(p *WritePointsRequest) error {
return nil
}

18
data/node.go Normal file
View File

@ -0,0 +1,18 @@
package data
func NewDataNode() *Node {
return &Node{}
}
type Node struct {
//ms meta.Store
}
func (n *Node) Open() error {
// Open shards
// Start AE for Node
return nil
}
func (n *Node) Close() error { return nil }
func (n *Node) Init() error { return nil }

5
meta/store.go Normal file
View File

@ -0,0 +1,5 @@
package meta
// Store provides access to the clusters configuration and
// meta data
type Store interface{}

View File

@ -20,8 +20,10 @@ import (
"sync"
"time"
"github.com/influxdb/influxdb/data"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/messaging"
"github.com/influxdb/influxdb/meta"
"golang.org/x/crypto/bcrypt"
)
@ -55,6 +57,39 @@ const (
NoChunkingSize = 0
)
// Service represents a long running task that is manged by a Server
type Service interface {
Open() error
Close() error
}
// QueryExecutor executes a query across multiple data nodes
type QueryExecutor interface {
Execute(q *QueryRequest) (chan *Result, error)
}
// QueryRequest represent a request to run a query across the cluster
type QueryRequest struct {
Query *influxql.Query
Database string
User *User
ChunkSize int
}
// PayloadWriter accepts a WritePointRequest from client facing endpoints such as
// HTTP JSON API, Collectd, Graphite, OpenTSDB, etc.
type PointsWriter interface {
Write(p *WritePointsRequest) error
}
// WritePointsRequest represents a request to write point data to the cluster
type WritePointsRequest struct {
Database string
RetentionPolicy string
ConsistencyLevel ConsistencyLevel
Points []Point
}
// Server represents a collection of metadata and raw metric data.
type Server struct {
mu sync.RWMutex
@ -101,6 +136,21 @@ type Server struct {
// Build information.
Version string
CommitHash string
// The local data node that manages local shard data
dn data.Node
// The meta store for accessing and updating cluster and schema data
ms meta.Store
// The services running on this node
services []Service
// Handles write request for local and remote nodes
pw PointsWriter
// Handles queries for local and remote nodes
qe QueryExecutor
}
// NewServer returns a new instance of Server.
@ -115,6 +165,8 @@ func NewServer() *Server {
shards: make(map[uint64]*Shard),
stats: NewStats("server"),
Logger: log.New(os.Stderr, "[server] ", log.LstdFlags),
pw: &Coordinator{},
qe: &Coordinator{},
}
// Server will always return with authentication enabled.
// This ensures that disabling authentication must be an explicit decision.
@ -123,6 +175,24 @@ func NewServer() *Server {
return &s
}
func (s *Server) openServices() error {
for _, n := range s.services {
if err := n.Open(); err != nil {
return err
}
}
return nil
}
func (s *Server) closeServices() error {
for _, n := range s.services {
if err := n.Close(); err != nil {
return err
}
}
return nil
}
func (s *Server) BrokerURLs() []url.URL {
return s.client.URLs()
}
@ -225,7 +295,7 @@ func (s *Server) Open(path string, client MessagingClient) error {
// TODO: Associate series ids with shards.
return nil
return s.openServices()
}
// opened returns true when the server is open. Must be called under lock.
@ -243,6 +313,10 @@ func (s *Server) close() error {
return ErrServerClosed
}
if err := s.closeServices(); err != nil {
return err
}
if s.rpDone != nil {
close(s.rpDone)
s.rpDone = nil

View File

@ -266,6 +266,14 @@ func (s *Shard) HasDataNodeID(id uint64) bool {
return false
}
func (s *Shard) Write(r *WritePointsRequest) error {
return nil
}
func (s *Shard) Read(timestamp time.Time) ([]Point, error) {
return nil, nil
}
// readSeries reads encoded series data from a shard.
func (s *Shard) readSeries(seriesID uint64, timestamp int64) (values []byte, err error) {
err = s.store.View(func(tx *bolt.Tx) error {

45
shard_test.go Normal file
View File

@ -0,0 +1,45 @@
package influxdb
import (
"reflect"
"testing"
"time"
)
func TestShardWrite(t *testing.T) {
// Enable when shard can convert a WritePointsRequest to stored data.
// Needs filed encoding/types saved on the shard
t.Skip("not implemented yet")
sh := &Shard{ID: 1}
pt := Point{
Name: "cpu",
Tags: map[string]string{"host": "server"},
Timestamp: time.Unix(1, 2),
Fields: map[string]interface{}{"value": 1.0},
}
pr := &WritePointsRequest{
Database: "foo",
RetentionPolicy: "default",
Points: []Point{
pt},
}
if err := sh.Write(pr); err != nil {
t.Errorf("LocalWriter.Write() failed: %v", err)
}
p, err := sh.Read(pt.Timestamp)
if err != nil {
t.Fatalf("LocalWriter.Read() failed: %v", err)
}
if exp := 1; len(p) != exp {
t.Fatalf("LocalWriter.Read() points len mismatch. got %v, exp %v", len(p), exp)
}
if !reflect.DeepEqual(p[0], pt) {
t.Fatalf("LocalWriter.Read() point mismatch. got %v, exp %v", p[0], pt)
}
}