diff --git a/cluster/points_writer.go b/cluster/points_writer.go index 1119f497b4..b1b3ca9f3c 100644 --- a/cluster/points_writer.go +++ b/cluster/points_writer.go @@ -44,11 +44,11 @@ var ( // PointsWriter handles writes across multiple local and remote data nodes. type PointsWriter struct { - nodeID uint64 mu sync.RWMutex closing chan struct{} MetaStore interface { + NodeID() uint64 RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error) CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) } @@ -64,9 +64,8 @@ type PointsWriter struct { } // NewPointsWriter returns a new instance of PointsWriter for a node. -func NewPointsWriter(localID uint64) *PointsWriter { +func NewPointsWriter() *PointsWriter { return &PointsWriter{ - nodeID: localID, closing: make(chan struct{}), } } @@ -210,7 +209,7 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo for _, nodeID := range shard.OwnerIDs { go func(shardID, nodeID uint64, points []tsdb.Point) { - if w.nodeID == nodeID { + if w.MetaStore.NodeID() == nodeID { err := w.Store.WriteToShard(shardID, points) // If we've written to shard that should exist on the current node, but the store has // not actually created this shard, tell it to create it and retry the write diff --git a/cluster/points_writer_test.go b/cluster/points_writer_test.go index 9579736c7d..0a8ccbc589 100644 --- a/cluster/points_writer_test.go +++ b/cluster/points_writer_test.go @@ -17,6 +17,7 @@ func TestPointsWriter_MapShards_One(t *testing.T) { ms := MetaStore{} rp := NewRetentionPolicy("myp", time.Hour, 3) + ms.NodeIDFn = func() uint64 { return 1 } ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) { return rp, nil } @@ -53,6 +54,7 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) { AttachShardGroupInfo(rp, []uint64{1, 2, 3}) AttachShardGroupInfo(rp, []uint64{1, 2, 3}) + ms.NodeIDFn = func() uint64 { return 1 } ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) { return rp, nil } @@ -241,6 +243,7 @@ func TestPointsWriter_WritePoints(t *testing.T) { } ms := NewMetaStore() + ms.NodeIDFn = func() uint64 { return 1 } c := cluster.PointsWriter{ MetaStore: ms, ShardWriter: sw, @@ -298,10 +301,13 @@ func NewMetaStore() *MetaStore { } type MetaStore struct { + NodeIDFn func() uint64 RetentionPolicyFn func(database, name string) (*meta.RetentionPolicyInfo, error) CreateShardGroupIfNotExistsFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) } +func (m MetaStore) NodeID() uint64 { return m.NodeIDFn() } + func (m MetaStore) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) { return m.RetentionPolicyFn(database, name) } diff --git a/cluster/service.go b/cluster/service.go index 726a1fb674..d2d0a6f616 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -47,7 +47,7 @@ func (s *Service) Open() error { } s.ln = ln - s.Logger.Println("listening on TCP connection", ln.Addr().String()) + s.Logger.Println("listening on TCP:", ln.Addr().String()) // Begin serving conections. s.wg.Add(1) diff --git a/cmd/influxd/run/config.go b/cmd/influxd/run/config.go index acf490dd11..09bb597b8c 100644 --- a/cmd/influxd/run/config.go +++ b/cmd/influxd/run/config.go @@ -37,14 +37,11 @@ const ( DefaultAPIReadTimeout = 5 * time.Second // DefaultHostName represents the default host name to use if it is never provided - DefaultHostName = "localhost" + DefaultHostname = "localhost" // DefaultBindAddress represents the bind address to use if none is specified DefaultBindAddress = "0.0.0.0" - // DefaultClusterPort represents the default port the cluster runs ons. - DefaultClusterPort = 8086 - // DefaultOpenTSDBDatabaseName is the default OpenTSDB database if none is specified DefaultOpenTSDBDatabaseName = "opentsdb" @@ -89,7 +86,7 @@ type Config struct { // NewConfig returns an instance of Config with reasonable defaults. func NewConfig() *Config { c := &Config{} - c.Hostname = DefaultHostName + c.Hostname = DefaultHostname c.BindAddress = DefaultBindAddress c.Meta = meta.NewConfig() diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 3e682989c3..b721cf7c3a 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -3,6 +3,7 @@ package run import ( "fmt" "net" + "time" "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/meta" @@ -31,7 +32,7 @@ type Server struct { func NewServer(c *Config, joinURLs string) *Server { // Construct base meta store and data store. s := &Server{ - MetaStore: meta.NewStore(c.Meta.Dir), + MetaStore: meta.NewStore(c.Meta.Dir, c.Hostname), TSDBStore: tsdb.NewStore(c.Data.Dir), } @@ -40,8 +41,13 @@ func NewServer(c *Config, joinURLs string) *Server { s.QueryExecutor.MetaStore = s.MetaStore s.QueryExecutor.MetaStatementExecutor = &meta.StatementExecutor{Store: s.MetaStore} + // Set the shard writer + s.ShardWriter = cluster.NewShardWriter(time.Duration(c.Cluster.ShardWriterTimeout)) + // Initialize points writer. - s.PointsWriter = cluster.NewPointsWriter(1) // FIXME: Find ID. + s.PointsWriter = cluster.NewPointsWriter() + s.PointsWriter.MetaStore = s.MetaStore + s.PointsWriter.Store = s.TSDBStore s.PointsWriter.ShardWriter = s.ShardWriter // Append services. @@ -70,6 +76,9 @@ func (s *Server) appendAdminService(c admin.Config) { func (s *Server) appendHTTPDService(c httpd.Config) { srv := httpd.NewService(c) + srv.Handler.MetaStore = s.MetaStore + srv.Handler.QueryExecutor = s.QueryExecutor + srv.Handler.PointsWriter = s.PointsWriter s.Services = append(s.Services, srv) } diff --git a/meta/store.go b/meta/store.go index f8941dc9ec..9043dffca9 100644 --- a/meta/store.go +++ b/meta/store.go @@ -4,10 +4,12 @@ import ( "errors" "fmt" "io" + "io/ioutil" "log" "net" "os" "path/filepath" + "strconv" "sync" "time" @@ -33,6 +35,9 @@ type Store struct { path string opened bool + id uint64 // local node id + host string // local hostname + data *Data raft *raft.Raft peers raft.PeerStore @@ -56,9 +61,10 @@ type Store struct { } // NewStore returns a new instance of Store. -func NewStore(path string) *Store { +func NewStore(path, host string) *Store { return &Store{ path: path, + host: host, data: &Data{}, HeartbeatTimeout: DefaultHeartbeatTimeout, ElectionTimeout: DefaultElectionTimeout, @@ -72,18 +78,21 @@ func NewStore(path string) *Store { // Returns an empty string when the store is closed. func (s *Store) Path() string { return s.path } +// IDPath returns the path to the local node ID file. +func (s *Store) IDPath() string { return filepath.Join(s.path, "id") } + // Open opens and initializes the raft store. func (s *Store) Open() error { - s.mu.Lock() - defer s.mu.Unlock() - - // Check if store has already been opened. - if s.opened { - return ErrStoreOpen - } - s.opened = true - if err := func() error { + s.mu.Lock() + defer s.mu.Unlock() + + // Check if store has already been opened. + if s.opened { + return ErrStoreOpen + } + s.opened = true + // Create the root directory if it doesn't already exist. if err := os.MkdirAll(s.path, 0777); err != nil { return fmt.Errorf("mkdir all: %s", err) @@ -133,12 +142,24 @@ func (s *Store) Open() error { } s.raft = r + // Load existing ID. + if err := s.readID(); err != nil { + return fmt.Errorf("read id: %s", err) + } + return nil }(); err != nil { s.close() return err } + // If the ID doesn't exist then create a new node. + if s.id == 0 { + if err := s.createLocalNode(); err != nil { + return fmt.Errorf("create local node: %s", err) + } + } + return nil } @@ -172,6 +193,52 @@ func (s *Store) close() error { return nil } +// readID reads the local node ID from the ID file. +func (s *Store) readID() error { + b, err := ioutil.ReadFile(s.IDPath()) + if os.IsNotExist(err) { + s.id = 0 + return nil + } else if err != nil { + return fmt.Errorf("read file: %s", err) + } + + id, err := strconv.ParseUint(string(b), 10, 64) + if err != nil { + return fmt.Errorf("parse id: %s", err) + } + s.id = id + + s.Logger.Printf("read local node id: %d", s.id) + + return nil +} + +// createLocalNode creates the node for this local instance. +// Writes the id of the node to file on success. +func (s *Store) createLocalNode() error { + // Wait for leader. + <-s.LeaderCh() + + // Create new node. + ni, err := s.CreateNode(s.host) + if err != nil { + return fmt.Errorf("create node: %s", err) + } + + // Write node id to file. + if err := ioutil.WriteFile(s.IDPath(), []byte(strconv.FormatUint(ni.ID, 10)), 0666); err != nil { + return fmt.Errorf("write file: %s", err) + } + + // Set ID locally. + s.id = ni.ID + + s.Logger.Printf("created local node: id=%d, host=%s", s.id, s.host) + + return nil +} + // LeaderCh returns a channel that notifies on leadership change. // Panics when the store has not been opened yet. func (s *Store) LeaderCh() <-chan bool { @@ -181,6 +248,10 @@ func (s *Store) LeaderCh() <-chan bool { return s.raft.LeaderCh() } +// NodeID returns the identifier for the local node. +// Panics if the node has not joined the cluster. +func (s *Store) NodeID() uint64 { return s.id } + // Node returns a node by id. func (s *Store) Node(id uint64) (ni *NodeInfo, err error) { err = s.read(func(data *Data) error { diff --git a/services/httpd/config.go b/services/httpd/config.go index a5c6f9a48a..b350464243 100644 --- a/services/httpd/config.go +++ b/services/httpd/config.go @@ -11,7 +11,8 @@ type Config struct { func NewConfig() Config { return Config{ - Enabled: true, - LogEnabled: true, + Enabled: true, + BindAddress: ":8086", + LogEnabled: true, } } diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 35e44492cc..42ccbd2d4a 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -59,14 +59,16 @@ type Handler struct { } QueryExecutor interface { - ExecuteQuery(q *influxql.Query, db string, user *meta.UserInfo, chunkSize int) (<-chan *influxql.Result, error) + ExecuteQuery(q *influxql.Query, db string, chunkSize int) (<-chan *influxql.Result, error) } SeriesWriter interface { WriteSeries(database, retentionPolicy string, points []tsdb.Point) error } - PointsWriter cluster.PointsWriter + PointsWriter interface { + WritePoints(p *cluster.WritePointsRequest) error + } Logger *log.Logger loggingEnabled bool // Log every HTTP access. @@ -193,7 +195,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. // Execute query. w.Header().Add("content-type", "application/json") - results, err := h.QueryExecutor.ExecuteQuery(query, db, user, chunkSize) + results, err := h.QueryExecutor.ExecuteQuery(query, db, chunkSize) if _, ok := err.(meta.AuthError); ok { w.WriteHeader(http.StatusUnauthorized) @@ -751,7 +753,7 @@ func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *meta.U return } - res, err := h.QueryExecutor.ExecuteQuery(query, db, user, DefaultChunkSize) + res, err := h.QueryExecutor.ExecuteQuery(query, db, DefaultChunkSize) if err != nil { w.Write([]byte("*** SERVER-SIDE ERROR. MISSING DATA ***")) w.Write(delim) @@ -801,7 +803,7 @@ func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *meta.U // Return all the measurements from the given DB func (h *Handler) showMeasurements(db string, user *meta.UserInfo) ([]string, error) { var measurements []string - c, err := h.QueryExecutor.ExecuteQuery(&influxql.Query{Statements: []influxql.Statement{&influxql.ShowMeasurementsStatement{}}}, db, user, 0) + c, err := h.QueryExecutor.ExecuteQuery(&influxql.Query{Statements: []influxql.Statement{&influxql.ShowMeasurementsStatement{}}}, db, 0) if err != nil { return measurements, err } diff --git a/services/httpd/service.go b/services/httpd/service.go index a74777f61e..933badbb67 100644 --- a/services/httpd/service.go +++ b/services/httpd/service.go @@ -2,36 +2,50 @@ package httpd import ( "fmt" + "log" "net" "net/http" + "os" "strings" ) // Service manages the listener and handler for an HTTP endpoint. type Service struct { - listener net.Listener - addr string - err chan error + ln net.Listener + addr string + err chan error - Handler Handler + Handler *Handler + + Logger *log.Logger } // NewService returns a new instance of Service. func NewService(c Config) *Service { - return &Service{ + s := &Service{ addr: c.BindAddress, err: make(chan error), + Handler: NewHandler( + c.AuthEnabled, + c.LogEnabled, + "FIXME", + ), + Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags), } + s.Handler.Logger = s.Logger + return s } // Open starts the service func (s *Service) Open() error { // Open listener. - listener, err := net.Listen("tcp", s.addr) + ln, err := net.Listen("tcp", s.addr) if err != nil { return err } - s.listener = listener + s.ln = ln + + s.Logger.Println("listening on HTTP:", ln.Addr().String()) // Begin listening for requests in a separate goroutine. go s.serve() @@ -40,8 +54,8 @@ func (s *Service) Open() error { // Close closes the underlying listener. func (s *Service) Close() error { - if s.listener != nil { - return s.listener.Close() + if s.ln != nil { + return s.ln.Close() } return nil } @@ -51,8 +65,8 @@ func (s *Service) Err() <-chan error { return s.err } // Addr returns the listener's address. Returns nil if listener is closed. func (s *Service) Addr() net.Addr { - if s.listener != nil { - return s.listener.Addr() + if s.ln != nil { + return s.ln.Addr() } return nil } @@ -61,7 +75,7 @@ func (s *Service) Addr() net.Addr { func (s *Service) serve() { // The listener was closed so exit // See https://github.com/golang/go/issues/4373 - err := http.Serve(s.listener, &s.Handler) + err := http.Serve(s.ln, s.Handler) if err != nil && !strings.Contains(err.Error(), "closed") { s.err <- fmt.Errorf("listener failed: addr=%s, err=%s", s.Addr(), err) } diff --git a/tests/create_write_single_query.sh b/tests/create_write_single_query.sh index 1a17d14b4b..90f19f992e 100755 --- a/tests/create_write_single_query.sh +++ b/tests/create_write_single_query.sh @@ -1,12 +1,18 @@ -echo "creating database" +#!/bin/bash +set -e + +echo "> creating database" curl -G http://localhost:8086/query --data-urlencode "q=CREATE DATABASE foo" -echo "creating retention policy" +echo "" +echo "> creating retention policy" curl -G http://localhost:8086/query --data-urlencode "q=CREATE RETENTION POLICY bar ON foo DURATION 1h REPLICATION 3 DEFAULT" -echo "inserting data" -curl -d '{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"time": "2015-01-26T22:01:11.703Z","fields": {"value": 100}}]}' -H "Content-Type: application/json" http://localhost:8086/write +echo "" +echo "> inserting data" +curl -v -X POST "http://localhost:8086/write_points?db=foo&rp=bar" -d 'cpu,host=server01 value=1' -echo "querying data" -curl -G http://localhost:8086/query --data-urlencode "db=foo" --data-urlencode "q=SELECT sum(value) FROM \"foo\".\"bar\".cpu GROUP BY time(1h)" +echo "" +echo "> querying data" +curl -G http://localhost:8086/query --data-urlencode "db=foo" --data-urlencode "q=SELECT * FROM \"foo\".\"bar\".cpu" diff --git a/tsdb/query_executor.go b/tsdb/query_executor.go index a55996098e..2540b8096b 100644 --- a/tsdb/query_executor.go +++ b/tsdb/query_executor.go @@ -40,7 +40,7 @@ type QueryExecutor struct { Logger *log.Logger - // the local daata store + // the local data store store *Store } @@ -125,7 +125,7 @@ func (q *QueryExecutor) Authorize(u *meta.UserInfo, query *influxql.Query, datab // ExecuteQuery executes an InfluxQL query against the server. // It sends results down the passed in chan and closes it when done. It will close the chan // on the first statement that throws an error. -func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chunkSize int) (chan *influxql.Result, error) { +func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error) { q.Stats.Add("queriesRx", int64(len(query.Statements))) // Execute each statement. Keep the iterator external so we can