reading and writing yo!

pull/2722/head
Cory LaNou 2015-06-01 11:20:57 -06:00 committed by InfluxDB
parent 1df33af196
commit 3597565955
11 changed files with 154 additions and 49 deletions

View File

@ -44,11 +44,11 @@ var (
// PointsWriter handles writes across multiple local and remote data nodes.
type PointsWriter struct {
nodeID uint64
mu sync.RWMutex
closing chan struct{}
MetaStore interface {
NodeID() uint64
RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
}
@ -64,9 +64,8 @@ type PointsWriter struct {
}
// NewPointsWriter returns a new instance of PointsWriter for a node.
func NewPointsWriter(localID uint64) *PointsWriter {
func NewPointsWriter() *PointsWriter {
return &PointsWriter{
nodeID: localID,
closing: make(chan struct{}),
}
}
@ -210,7 +209,7 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
for _, nodeID := range shard.OwnerIDs {
go func(shardID, nodeID uint64, points []tsdb.Point) {
if w.nodeID == nodeID {
if w.MetaStore.NodeID() == nodeID {
err := w.Store.WriteToShard(shardID, points)
// If we've written to shard that should exist on the current node, but the store has
// not actually created this shard, tell it to create it and retry the write

View File

@ -17,6 +17,7 @@ func TestPointsWriter_MapShards_One(t *testing.T) {
ms := MetaStore{}
rp := NewRetentionPolicy("myp", time.Hour, 3)
ms.NodeIDFn = func() uint64 { return 1 }
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
return rp, nil
}
@ -53,6 +54,7 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) {
AttachShardGroupInfo(rp, []uint64{1, 2, 3})
AttachShardGroupInfo(rp, []uint64{1, 2, 3})
ms.NodeIDFn = func() uint64 { return 1 }
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
return rp, nil
}
@ -241,6 +243,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {
}
ms := NewMetaStore()
ms.NodeIDFn = func() uint64 { return 1 }
c := cluster.PointsWriter{
MetaStore: ms,
ShardWriter: sw,
@ -298,10 +301,13 @@ func NewMetaStore() *MetaStore {
}
type MetaStore struct {
NodeIDFn func() uint64
RetentionPolicyFn func(database, name string) (*meta.RetentionPolicyInfo, error)
CreateShardGroupIfNotExistsFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
}
func (m MetaStore) NodeID() uint64 { return m.NodeIDFn() }
func (m MetaStore) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) {
return m.RetentionPolicyFn(database, name)
}

View File

@ -47,7 +47,7 @@ func (s *Service) Open() error {
}
s.ln = ln
s.Logger.Println("listening on TCP connection", ln.Addr().String())
s.Logger.Println("listening on TCP:", ln.Addr().String())
// Begin serving conections.
s.wg.Add(1)

View File

@ -37,14 +37,11 @@ const (
DefaultAPIReadTimeout = 5 * time.Second
// DefaultHostName represents the default host name to use if it is never provided
DefaultHostName = "localhost"
DefaultHostname = "localhost"
// DefaultBindAddress represents the bind address to use if none is specified
DefaultBindAddress = "0.0.0.0"
// DefaultClusterPort represents the default port the cluster runs ons.
DefaultClusterPort = 8086
// DefaultOpenTSDBDatabaseName is the default OpenTSDB database if none is specified
DefaultOpenTSDBDatabaseName = "opentsdb"
@ -89,7 +86,7 @@ type Config struct {
// NewConfig returns an instance of Config with reasonable defaults.
func NewConfig() *Config {
c := &Config{}
c.Hostname = DefaultHostName
c.Hostname = DefaultHostname
c.BindAddress = DefaultBindAddress
c.Meta = meta.NewConfig()

View File

@ -3,6 +3,7 @@ package run
import (
"fmt"
"net"
"time"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/meta"
@ -31,7 +32,7 @@ type Server struct {
func NewServer(c *Config, joinURLs string) *Server {
// Construct base meta store and data store.
s := &Server{
MetaStore: meta.NewStore(c.Meta.Dir),
MetaStore: meta.NewStore(c.Meta.Dir, c.Hostname),
TSDBStore: tsdb.NewStore(c.Data.Dir),
}
@ -40,8 +41,13 @@ func NewServer(c *Config, joinURLs string) *Server {
s.QueryExecutor.MetaStore = s.MetaStore
s.QueryExecutor.MetaStatementExecutor = &meta.StatementExecutor{Store: s.MetaStore}
// Set the shard writer
s.ShardWriter = cluster.NewShardWriter(time.Duration(c.Cluster.ShardWriterTimeout))
// Initialize points writer.
s.PointsWriter = cluster.NewPointsWriter(1) // FIXME: Find ID.
s.PointsWriter = cluster.NewPointsWriter()
s.PointsWriter.MetaStore = s.MetaStore
s.PointsWriter.Store = s.TSDBStore
s.PointsWriter.ShardWriter = s.ShardWriter
// Append services.
@ -70,6 +76,9 @@ func (s *Server) appendAdminService(c admin.Config) {
func (s *Server) appendHTTPDService(c httpd.Config) {
srv := httpd.NewService(c)
srv.Handler.MetaStore = s.MetaStore
srv.Handler.QueryExecutor = s.QueryExecutor
srv.Handler.PointsWriter = s.PointsWriter
s.Services = append(s.Services, srv)
}

View File

@ -4,10 +4,12 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"strconv"
"sync"
"time"
@ -33,6 +35,9 @@ type Store struct {
path string
opened bool
id uint64 // local node id
host string // local hostname
data *Data
raft *raft.Raft
peers raft.PeerStore
@ -56,9 +61,10 @@ type Store struct {
}
// NewStore returns a new instance of Store.
func NewStore(path string) *Store {
func NewStore(path, host string) *Store {
return &Store{
path: path,
host: host,
data: &Data{},
HeartbeatTimeout: DefaultHeartbeatTimeout,
ElectionTimeout: DefaultElectionTimeout,
@ -72,8 +78,12 @@ func NewStore(path string) *Store {
// Returns an empty string when the store is closed.
func (s *Store) Path() string { return s.path }
// IDPath returns the path to the local node ID file.
func (s *Store) IDPath() string { return filepath.Join(s.path, "id") }
// Open opens and initializes the raft store.
func (s *Store) Open() error {
if err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
@ -83,7 +93,6 @@ func (s *Store) Open() error {
}
s.opened = true
if err := func() error {
// Create the root directory if it doesn't already exist.
if err := os.MkdirAll(s.path, 0777); err != nil {
return fmt.Errorf("mkdir all: %s", err)
@ -133,12 +142,24 @@ func (s *Store) Open() error {
}
s.raft = r
// Load existing ID.
if err := s.readID(); err != nil {
return fmt.Errorf("read id: %s", err)
}
return nil
}(); err != nil {
s.close()
return err
}
// If the ID doesn't exist then create a new node.
if s.id == 0 {
if err := s.createLocalNode(); err != nil {
return fmt.Errorf("create local node: %s", err)
}
}
return nil
}
@ -172,6 +193,52 @@ func (s *Store) close() error {
return nil
}
// readID reads the local node ID from the ID file.
func (s *Store) readID() error {
b, err := ioutil.ReadFile(s.IDPath())
if os.IsNotExist(err) {
s.id = 0
return nil
} else if err != nil {
return fmt.Errorf("read file: %s", err)
}
id, err := strconv.ParseUint(string(b), 10, 64)
if err != nil {
return fmt.Errorf("parse id: %s", err)
}
s.id = id
s.Logger.Printf("read local node id: %d", s.id)
return nil
}
// createLocalNode creates the node for this local instance.
// Writes the id of the node to file on success.
func (s *Store) createLocalNode() error {
// Wait for leader.
<-s.LeaderCh()
// Create new node.
ni, err := s.CreateNode(s.host)
if err != nil {
return fmt.Errorf("create node: %s", err)
}
// Write node id to file.
if err := ioutil.WriteFile(s.IDPath(), []byte(strconv.FormatUint(ni.ID, 10)), 0666); err != nil {
return fmt.Errorf("write file: %s", err)
}
// Set ID locally.
s.id = ni.ID
s.Logger.Printf("created local node: id=%d, host=%s", s.id, s.host)
return nil
}
// LeaderCh returns a channel that notifies on leadership change.
// Panics when the store has not been opened yet.
func (s *Store) LeaderCh() <-chan bool {
@ -181,6 +248,10 @@ func (s *Store) LeaderCh() <-chan bool {
return s.raft.LeaderCh()
}
// NodeID returns the identifier for the local node.
// Panics if the node has not joined the cluster.
func (s *Store) NodeID() uint64 { return s.id }
// Node returns a node by id.
func (s *Store) Node(id uint64) (ni *NodeInfo, err error) {
err = s.read(func(data *Data) error {

View File

@ -12,6 +12,7 @@ type Config struct {
func NewConfig() Config {
return Config{
Enabled: true,
BindAddress: ":8086",
LogEnabled: true,
}
}

View File

@ -59,14 +59,16 @@ type Handler struct {
}
QueryExecutor interface {
ExecuteQuery(q *influxql.Query, db string, user *meta.UserInfo, chunkSize int) (<-chan *influxql.Result, error)
ExecuteQuery(q *influxql.Query, db string, chunkSize int) (<-chan *influxql.Result, error)
}
SeriesWriter interface {
WriteSeries(database, retentionPolicy string, points []tsdb.Point) error
}
PointsWriter cluster.PointsWriter
PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
}
Logger *log.Logger
loggingEnabled bool // Log every HTTP access.
@ -193,7 +195,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
// Execute query.
w.Header().Add("content-type", "application/json")
results, err := h.QueryExecutor.ExecuteQuery(query, db, user, chunkSize)
results, err := h.QueryExecutor.ExecuteQuery(query, db, chunkSize)
if _, ok := err.(meta.AuthError); ok {
w.WriteHeader(http.StatusUnauthorized)
@ -751,7 +753,7 @@ func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *meta.U
return
}
res, err := h.QueryExecutor.ExecuteQuery(query, db, user, DefaultChunkSize)
res, err := h.QueryExecutor.ExecuteQuery(query, db, DefaultChunkSize)
if err != nil {
w.Write([]byte("*** SERVER-SIDE ERROR. MISSING DATA ***"))
w.Write(delim)
@ -801,7 +803,7 @@ func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *meta.U
// Return all the measurements from the given DB
func (h *Handler) showMeasurements(db string, user *meta.UserInfo) ([]string, error) {
var measurements []string
c, err := h.QueryExecutor.ExecuteQuery(&influxql.Query{Statements: []influxql.Statement{&influxql.ShowMeasurementsStatement{}}}, db, user, 0)
c, err := h.QueryExecutor.ExecuteQuery(&influxql.Query{Statements: []influxql.Statement{&influxql.ShowMeasurementsStatement{}}}, db, 0)
if err != nil {
return measurements, err
}

View File

@ -2,36 +2,50 @@ package httpd
import (
"fmt"
"log"
"net"
"net/http"
"os"
"strings"
)
// Service manages the listener and handler for an HTTP endpoint.
type Service struct {
listener net.Listener
ln net.Listener
addr string
err chan error
Handler Handler
Handler *Handler
Logger *log.Logger
}
// NewService returns a new instance of Service.
func NewService(c Config) *Service {
return &Service{
s := &Service{
addr: c.BindAddress,
err: make(chan error),
Handler: NewHandler(
c.AuthEnabled,
c.LogEnabled,
"FIXME",
),
Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags),
}
s.Handler.Logger = s.Logger
return s
}
// Open starts the service
func (s *Service) Open() error {
// Open listener.
listener, err := net.Listen("tcp", s.addr)
ln, err := net.Listen("tcp", s.addr)
if err != nil {
return err
}
s.listener = listener
s.ln = ln
s.Logger.Println("listening on HTTP:", ln.Addr().String())
// Begin listening for requests in a separate goroutine.
go s.serve()
@ -40,8 +54,8 @@ func (s *Service) Open() error {
// Close closes the underlying listener.
func (s *Service) Close() error {
if s.listener != nil {
return s.listener.Close()
if s.ln != nil {
return s.ln.Close()
}
return nil
}
@ -51,8 +65,8 @@ func (s *Service) Err() <-chan error { return s.err }
// Addr returns the listener's address. Returns nil if listener is closed.
func (s *Service) Addr() net.Addr {
if s.listener != nil {
return s.listener.Addr()
if s.ln != nil {
return s.ln.Addr()
}
return nil
}
@ -61,7 +75,7 @@ func (s *Service) Addr() net.Addr {
func (s *Service) serve() {
// The listener was closed so exit
// See https://github.com/golang/go/issues/4373
err := http.Serve(s.listener, &s.Handler)
err := http.Serve(s.ln, s.Handler)
if err != nil && !strings.Contains(err.Error(), "closed") {
s.err <- fmt.Errorf("listener failed: addr=%s, err=%s", s.Addr(), err)
}

View File

@ -1,12 +1,18 @@
echo "creating database"
#!/bin/bash
set -e
echo "> creating database"
curl -G http://localhost:8086/query --data-urlencode "q=CREATE DATABASE foo"
echo "creating retention policy"
echo ""
echo "> creating retention policy"
curl -G http://localhost:8086/query --data-urlencode "q=CREATE RETENTION POLICY bar ON foo DURATION 1h REPLICATION 3 DEFAULT"
echo "inserting data"
curl -d '{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"time": "2015-01-26T22:01:11.703Z","fields": {"value": 100}}]}' -H "Content-Type: application/json" http://localhost:8086/write
echo ""
echo "> inserting data"
curl -v -X POST "http://localhost:8086/write_points?db=foo&rp=bar" -d 'cpu,host=server01 value=1'
echo "querying data"
curl -G http://localhost:8086/query --data-urlencode "db=foo" --data-urlencode "q=SELECT sum(value) FROM \"foo\".\"bar\".cpu GROUP BY time(1h)"
echo ""
echo "> querying data"
curl -G http://localhost:8086/query --data-urlencode "db=foo" --data-urlencode "q=SELECT * FROM \"foo\".\"bar\".cpu"

View File

@ -40,7 +40,7 @@ type QueryExecutor struct {
Logger *log.Logger
// the local daata store
// the local data store
store *Store
}
@ -125,7 +125,7 @@ func (q *QueryExecutor) Authorize(u *meta.UserInfo, query *influxql.Query, datab
// ExecuteQuery executes an InfluxQL query against the server.
// It sends results down the passed in chan and closes it when done. It will close the chan
// on the first statement that throws an error.
func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chunkSize int) (chan *influxql.Result, error) {
func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error) {
q.Stats.Add("queriesRx", int64(len(query.Statements)))
// Execute each statement. Keep the iterator external so we can