Merge pull request #2722 from influxdb/influxd

Basic influxd service wired up for reading/writing
pull/2724/head
Cory LaNou 2015-06-01 12:37:56 -06:00
commit ed8470d50d
13 changed files with 192 additions and 128 deletions

View File

@ -44,11 +44,11 @@ var (
// PointsWriter handles writes across multiple local and remote data nodes.
type PointsWriter struct {
nodeID uint64
mu sync.RWMutex
closing chan struct{}
MetaStore interface {
NodeID() uint64
RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
}
@ -64,9 +64,8 @@ type PointsWriter struct {
}
// NewPointsWriter returns a new instance of PointsWriter for a node.
func NewPointsWriter(localID uint64) *PointsWriter {
func NewPointsWriter() *PointsWriter {
return &PointsWriter{
nodeID: localID,
closing: make(chan struct{}),
}
}
@ -210,7 +209,7 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
for _, nodeID := range shard.OwnerIDs {
go func(shardID, nodeID uint64, points []tsdb.Point) {
if w.nodeID == nodeID {
if w.MetaStore.NodeID() == nodeID {
err := w.Store.WriteToShard(shardID, points)
// If we've written to shard that should exist on the current node, but the store has
// not actually created this shard, tell it to create it and retry the write
@ -223,13 +222,10 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
err = w.Store.WriteToShard(shardID, points)
}
ch <- err
// FIXME: When ShardWriter is implemented, this should never be nil
} else if w.ShardWriter != nil {
ch <- w.ShardWriter.WriteShard(shardID, nodeID, points)
} else {
ch <- ErrWriteFailed
return
}
ch <- w.ShardWriter.WriteShard(shardID, nodeID, points)
}(shard.ID, nodeID, points)
}
@ -245,11 +241,12 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
case err := <-ch:
// If the write returned an error, continue to the next response
if err != nil {
// FIXME
println(err.Error())
continue
}
wrote += 1
}
}

View File

@ -17,6 +17,7 @@ func TestPointsWriter_MapShards_One(t *testing.T) {
ms := MetaStore{}
rp := NewRetentionPolicy("myp", time.Hour, 3)
ms.NodeIDFn = func() uint64 { return 1 }
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
return rp, nil
}
@ -53,6 +54,7 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) {
AttachShardGroupInfo(rp, []uint64{1, 2, 3})
AttachShardGroupInfo(rp, []uint64{1, 2, 3})
ms.NodeIDFn = func() uint64 { return 1 }
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
return rp, nil
}
@ -241,6 +243,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {
}
ms := NewMetaStore()
ms.NodeIDFn = func() uint64 { return 1 }
c := cluster.PointsWriter{
MetaStore: ms,
ShardWriter: sw,
@ -298,10 +301,13 @@ func NewMetaStore() *MetaStore {
}
type MetaStore struct {
NodeIDFn func() uint64
RetentionPolicyFn func(database, name string) (*meta.RetentionPolicyInfo, error)
CreateShardGroupIfNotExistsFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
}
func (m MetaStore) NodeID() uint64 { return m.NodeIDFn() }
func (m MetaStore) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) {
return m.RetentionPolicyFn(database, name)
}

View File

@ -47,7 +47,7 @@ func (s *Service) Open() error {
}
s.ln = ln
s.Logger.Println("listening on TCP connection", ln.Addr().String())
s.Logger.Println("listening on TCP:", ln.Addr().String())
// Begin serving conections.
s.wg.Add(1)

View File

@ -37,14 +37,11 @@ const (
DefaultAPIReadTimeout = 5 * time.Second
// DefaultHostName represents the default host name to use if it is never provided
DefaultHostName = "localhost"
DefaultHostname = "localhost"
// DefaultBindAddress represents the bind address to use if none is specified
DefaultBindAddress = "0.0.0.0"
// DefaultClusterPort represents the default port the cluster runs ons.
DefaultClusterPort = 8086
// DefaultOpenTSDBDatabaseName is the default OpenTSDB database if none is specified
DefaultOpenTSDBDatabaseName = "opentsdb"
@ -89,7 +86,7 @@ type Config struct {
// NewConfig returns an instance of Config with reasonable defaults.
func NewConfig() *Config {
c := &Config{}
c.Hostname = DefaultHostName
c.Hostname = DefaultHostname
c.BindAddress = DefaultBindAddress
c.Meta = meta.NewConfig()

View File

@ -3,6 +3,7 @@ package run
import (
"fmt"
"net"
"time"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/meta"
@ -31,7 +32,7 @@ type Server struct {
func NewServer(c *Config, joinURLs string) *Server {
// Construct base meta store and data store.
s := &Server{
MetaStore: meta.NewStore(c.Meta.Dir),
MetaStore: meta.NewStore(c.Meta.Dir, c.Hostname),
TSDBStore: tsdb.NewStore(c.Data.Dir),
}
@ -40,8 +41,13 @@ func NewServer(c *Config, joinURLs string) *Server {
s.QueryExecutor.MetaStore = s.MetaStore
s.QueryExecutor.MetaStatementExecutor = &meta.StatementExecutor{Store: s.MetaStore}
// Set the shard writer
s.ShardWriter = cluster.NewShardWriter(time.Duration(c.Cluster.ShardWriterTimeout))
// Initialize points writer.
s.PointsWriter = cluster.NewPointsWriter(1) // FIXME: Find ID.
s.PointsWriter = cluster.NewPointsWriter()
s.PointsWriter.MetaStore = s.MetaStore
s.PointsWriter.Store = s.TSDBStore
s.PointsWriter.ShardWriter = s.ShardWriter
// Append services.
@ -70,6 +76,9 @@ func (s *Server) appendAdminService(c admin.Config) {
func (s *Server) appendHTTPDService(c httpd.Config) {
srv := httpd.NewService(c)
srv.Handler.MetaStore = s.MetaStore
srv.Handler.QueryExecutor = s.QueryExecutor
srv.Handler.PointsWriter = s.PointsWriter
s.Services = append(s.Services, srv)
}

View File

@ -4,10 +4,12 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"strconv"
"sync"
"time"
@ -33,6 +35,9 @@ type Store struct {
path string
opened bool
id uint64 // local node id
host string // local hostname
data *Data
raft *raft.Raft
peers raft.PeerStore
@ -56,9 +61,10 @@ type Store struct {
}
// NewStore returns a new instance of Store.
func NewStore(path string) *Store {
func NewStore(path, host string) *Store {
return &Store{
path: path,
host: host,
data: &Data{},
HeartbeatTimeout: DefaultHeartbeatTimeout,
ElectionTimeout: DefaultElectionTimeout,
@ -72,18 +78,21 @@ func NewStore(path string) *Store {
// Returns an empty string when the store is closed.
func (s *Store) Path() string { return s.path }
// IDPath returns the path to the local node ID file.
func (s *Store) IDPath() string { return filepath.Join(s.path, "id") }
// Open opens and initializes the raft store.
func (s *Store) Open() error {
s.mu.Lock()
defer s.mu.Unlock()
// Check if store has already been opened.
if s.opened {
return ErrStoreOpen
}
s.opened = true
if err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
// Check if store has already been opened.
if s.opened {
return ErrStoreOpen
}
s.opened = true
// Create the root directory if it doesn't already exist.
if err := os.MkdirAll(s.path, 0777); err != nil {
return fmt.Errorf("mkdir all: %s", err)
@ -133,12 +142,24 @@ func (s *Store) Open() error {
}
s.raft = r
// Load existing ID.
if err := s.readID(); err != nil {
return fmt.Errorf("read id: %s", err)
}
return nil
}(); err != nil {
s.close()
return err
}
// If the ID doesn't exist then create a new node.
if s.id == 0 {
if err := s.createLocalNode(); err != nil {
return fmt.Errorf("create local node: %s", err)
}
}
return nil
}
@ -172,6 +193,52 @@ func (s *Store) close() error {
return nil
}
// readID reads the local node ID from the ID file.
func (s *Store) readID() error {
b, err := ioutil.ReadFile(s.IDPath())
if os.IsNotExist(err) {
s.id = 0
return nil
} else if err != nil {
return fmt.Errorf("read file: %s", err)
}
id, err := strconv.ParseUint(string(b), 10, 64)
if err != nil {
return fmt.Errorf("parse id: %s", err)
}
s.id = id
s.Logger.Printf("read local node id: %d", s.id)
return nil
}
// createLocalNode creates the node for this local instance.
// Writes the id of the node to file on success.
func (s *Store) createLocalNode() error {
// Wait for leader.
<-s.LeaderCh()
// Create new node.
ni, err := s.CreateNode(s.host)
if err != nil {
return fmt.Errorf("create node: %s", err)
}
// Write node id to file.
if err := ioutil.WriteFile(s.IDPath(), []byte(strconv.FormatUint(ni.ID, 10)), 0666); err != nil {
return fmt.Errorf("write file: %s", err)
}
// Set ID locally.
s.id = ni.ID
s.Logger.Printf("created local node: id=%d, host=%s", s.id, s.host)
return nil
}
// LeaderCh returns a channel that notifies on leadership change.
// Panics when the store has not been opened yet.
func (s *Store) LeaderCh() <-chan bool {
@ -181,6 +248,10 @@ func (s *Store) LeaderCh() <-chan bool {
return s.raft.LeaderCh()
}
// NodeID returns the identifier for the local node.
// Panics if the node has not joined the cluster.
func (s *Store) NodeID() uint64 { return s.id }
// Node returns a node by id.
func (s *Store) Node(id uint64) (ni *NodeInfo, err error) {
err = s.read(func(data *Data) error {

View File

@ -25,15 +25,6 @@ func TestStore_Open(t *testing.T) {
}
defer s.Close() // idempotent
// Wait for leadership change.
select {
case <-s.LeaderCh():
case <-time.After(1 * time.Second):
t.Fatal("no leadership")
}
time.Sleep(100 * time.Millisecond)
// Close store.
if err := s.Close(); err != nil {
t.Fatal(err)
@ -54,19 +45,18 @@ func TestStore_Open_ErrStoreOpen(t *testing.T) {
func TestStore_CreateNode(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create node.
if ni, err := s.CreateNode("host0"); err != nil {
t.Fatal(err)
} else if *ni != (meta.NodeInfo{ID: 1, Host: "host0"}) {
} else if *ni != (meta.NodeInfo{ID: 2, Host: "host0"}) {
t.Fatalf("unexpected node: %#v", ni)
}
// Create another node.
if ni, err := s.CreateNode("host1"); err != nil {
t.Fatal(err)
} else if *ni != (meta.NodeInfo{ID: 2, Host: "host1"}) {
} else if *ni != (meta.NodeInfo{ID: 3, Host: "host1"}) {
t.Fatalf("unexpected node: %#v", ni)
}
}
@ -75,7 +65,6 @@ func TestStore_CreateNode(t *testing.T) {
func TestStore_CreateNode_ErrNodeExists(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create node.
if _, err := s.CreateNode("host0"); err != nil {
@ -92,7 +81,6 @@ func TestStore_CreateNode_ErrNodeExists(t *testing.T) {
func TestStore_Node(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create nodes.
for i := 0; i < 3; i++ {
@ -102,9 +90,9 @@ func TestStore_Node(t *testing.T) {
}
// Find second node.
if ni, err := s.Node(2); err != nil {
if ni, err := s.Node(3); err != nil {
t.Fatal(err)
} else if *ni != (meta.NodeInfo{ID: 2, Host: "host1"}) {
} else if *ni != (meta.NodeInfo{ID: 3, Host: "host1"}) {
t.Fatalf("unexpected node: %#v", ni)
}
}
@ -113,7 +101,6 @@ func TestStore_Node(t *testing.T) {
func TestStore_NodeByHost(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create nodes.
for i := 0; i < 3; i++ {
@ -125,7 +112,7 @@ func TestStore_NodeByHost(t *testing.T) {
// Find second node.
if ni, err := s.NodeByHost("host1"); err != nil {
t.Fatal(err)
} else if *ni != (meta.NodeInfo{ID: 2, Host: "host1"}) {
} else if *ni != (meta.NodeInfo{ID: 3, Host: "host1"}) {
t.Fatalf("unexpected node: %#v", ni)
}
}
@ -134,7 +121,6 @@ func TestStore_NodeByHost(t *testing.T) {
func TestStore_DeleteNode(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create nodes.
for i := 0; i < 3; i++ {
@ -144,18 +130,18 @@ func TestStore_DeleteNode(t *testing.T) {
}
// Remove second node.
if err := s.DeleteNode(2); err != nil {
if err := s.DeleteNode(3); err != nil {
t.Fatal(err)
}
// Ensure remaining nodes are correct.
if ni, _ := s.Node(1); *ni != (meta.NodeInfo{ID: 1, Host: "host0"}) {
if ni, _ := s.Node(2); *ni != (meta.NodeInfo{ID: 2, Host: "host0"}) {
t.Fatalf("unexpected node(1): %#v", ni)
}
if ni, _ := s.Node(2); ni != nil {
if ni, _ := s.Node(3); ni != nil {
t.Fatalf("unexpected node(2): %#v", ni)
}
if ni, _ := s.Node(3); *ni != (meta.NodeInfo{ID: 3, Host: "host2"}) {
if ni, _ := s.Node(4); *ni != (meta.NodeInfo{ID: 4, Host: "host2"}) {
t.Fatalf("unexpected node(3): %#v", ni)
}
}
@ -164,7 +150,6 @@ func TestStore_DeleteNode(t *testing.T) {
func TestStore_DeleteNode_ErrNodeNotFound(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
if err := s.DeleteNode(2); err != meta.ErrNodeNotFound {
t.Fatalf("unexpected error: %s", err)
@ -175,7 +160,6 @@ func TestStore_DeleteNode_ErrNodeNotFound(t *testing.T) {
func TestStore_CreateDatabase(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create database.
if di, err := s.CreateDatabase("db0"); err != nil {
@ -196,7 +180,6 @@ func TestStore_CreateDatabase(t *testing.T) {
func TestStore_DropDatabase(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create databases.
for i := 0; i < 3; i++ {
@ -226,7 +209,6 @@ func TestStore_DropDatabase(t *testing.T) {
func TestStore_DropDatabase_ErrDatabaseNotFound(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
if err := s.DropDatabase("no_such_database"); err != meta.ErrDatabaseNotFound {
t.Fatalf("unexpected error: %s", err)
@ -237,7 +219,6 @@ func TestStore_DropDatabase_ErrDatabaseNotFound(t *testing.T) {
func TestStore_CreateRetentionPolicy(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create database.
if _, err := s.CreateDatabase("db0"); err != nil {
@ -265,7 +246,6 @@ func TestStore_CreateRetentionPolicy(t *testing.T) {
func TestStore_DropRetentionPolicy(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create database.
if _, err := s.CreateDatabase("db0"); err != nil {
@ -300,7 +280,6 @@ func TestStore_DropRetentionPolicy(t *testing.T) {
func TestStore_SetDefaultRetentionPolicy(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create database.
if _, err := s.CreateDatabase("db0"); err != nil {
@ -324,7 +303,6 @@ func TestStore_SetDefaultRetentionPolicy(t *testing.T) {
func TestStore_UpdateRetentionPolicy(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create database.
if _, err := s.CreateDatabase("db0"); err != nil {
@ -359,7 +337,6 @@ func TestStore_UpdateRetentionPolicy(t *testing.T) {
func TestStore_CreateShardGroup(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create node & database.
if _, err := s.CreateNode("host0"); err != nil {
@ -382,7 +359,6 @@ func TestStore_CreateShardGroup(t *testing.T) {
func TestStore_DeleteShardGroup(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create node, database, policy, & group.
if _, err := s.CreateNode("host0"); err != nil {
@ -405,7 +381,6 @@ func TestStore_DeleteShardGroup(t *testing.T) {
func TestStore_CreateContinuousQuery(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create query.
if _, err := s.CreateDatabase("db0"); err != nil {
@ -419,7 +394,6 @@ func TestStore_CreateContinuousQuery(t *testing.T) {
func TestStore_CreateContinuousQuery_ErrContinuousQueryExists(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create continuous query.
if _, err := s.CreateDatabase("db0"); err != nil {
@ -438,7 +412,6 @@ func TestStore_CreateContinuousQuery_ErrContinuousQueryExists(t *testing.T) {
func TestStore_DropContinuousQuery(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create queries.
if _, err := s.CreateDatabase("db0"); err != nil {
@ -471,7 +444,6 @@ func TestStore_DropContinuousQuery(t *testing.T) {
func TestStore_CreateUser(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create user.
if ui, err := s.CreateUser("susy", "pass", true); err != nil {
@ -485,7 +457,6 @@ func TestStore_CreateUser(t *testing.T) {
func TestStore_DropUser(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create users.
if _, err := s.CreateUser("susy", "pass", true); err != nil {
@ -513,7 +484,6 @@ func TestStore_DropUser(t *testing.T) {
func TestStore_UpdateUser(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
// Create users.
if _, err := s.CreateUser("susy", "pass", true); err != nil {
@ -545,7 +515,6 @@ func TestStore_UpdateUser(t *testing.T) {
func TestStore_UserCount(t *testing.T) {
s := MustOpenStore()
defer s.Close()
<-s.LeaderCh()
if count, err := s.UserCount(); count != 0 && err != nil {
t.Fatalf("expected user count to be 0 but was %d", count)
@ -572,7 +541,7 @@ type Store struct {
// NewStore returns a new test wrapper for Store.
func NewStore(path string) *Store {
s := &Store{
Store: meta.NewStore(path),
Store: meta.NewStore(path, "localhost"),
}
s.HeartbeatTimeout = 50 * time.Millisecond
s.ElectionTimeout = 50 * time.Millisecond

View File

@ -11,7 +11,8 @@ type Config struct {
func NewConfig() Config {
return Config{
Enabled: true,
LogEnabled: true,
Enabled: true,
BindAddress: ":8086",
LogEnabled: true,
}
}

View File

@ -59,15 +59,13 @@ type Handler struct {
}
QueryExecutor interface {
ExecuteQuery(q *influxql.Query, db string, user *meta.UserInfo, chunkSize int) (<-chan *influxql.Result, error)
ExecuteQuery(q *influxql.Query, db string, chunkSize int) (<-chan *influxql.Result, error)
}
SeriesWriter interface {
WriteSeries(database, retentionPolicy string, points []tsdb.Point) error
PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
}
PointsWriter cluster.PointsWriter
Logger *log.Logger
loggingEnabled bool // Log every HTTP access.
WriteTrace bool // Detailed logging of write path
@ -193,7 +191,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
// Execute query.
w.Header().Add("content-type", "application/json")
results, err := h.QueryExecutor.ExecuteQuery(query, db, user, chunkSize)
results, err := h.QueryExecutor.ExecuteQuery(query, db, chunkSize)
if _, ok := err.(meta.AuthError); ok {
w.WriteHeader(http.StatusUnauthorized)
@ -336,12 +334,18 @@ func (h *Handler) serveWriteJSON(w http.ResponseWriter, r *http.Request, body []
return
}
if err := h.SeriesWriter.WriteSeries(bp.Database, bp.RetentionPolicy, points); err != nil {
if influxdb.IsClientError(err) {
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
} else {
resultError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
}
// Convert the json batch struct to a points writer struct
if err := h.PointsWriter.WritePoints(&cluster.WritePointsRequest{
Database: bp.Database,
RetentionPolicy: bp.RetentionPolicy,
ConsistencyLevel: cluster.ConsistencyLevelOne,
Points: points,
}); influxdb.IsClientError(err) {
resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
return
} else if err != nil {
resultError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
return
}
@ -751,7 +755,7 @@ func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *meta.U
return
}
res, err := h.QueryExecutor.ExecuteQuery(query, db, user, DefaultChunkSize)
res, err := h.QueryExecutor.ExecuteQuery(query, db, DefaultChunkSize)
if err != nil {
w.Write([]byte("*** SERVER-SIDE ERROR. MISSING DATA ***"))
w.Write(delim)
@ -801,7 +805,7 @@ func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *meta.U
// Return all the measurements from the given DB
func (h *Handler) showMeasurements(db string, user *meta.UserInfo) ([]string, error) {
var measurements []string
c, err := h.QueryExecutor.ExecuteQuery(&influxql.Query{Statements: []influxql.Statement{&influxql.ShowMeasurementsStatement{}}}, db, user, 0)
c, err := h.QueryExecutor.ExecuteQuery(&influxql.Query{Statements: []influxql.Statement{&influxql.ShowMeasurementsStatement{}}}, db, 0)
if err != nil {
return measurements, err
}

View File

@ -133,7 +133,7 @@ func TestBatchWrite_UnmarshalRFC(t *testing.T) {
// Ensure the handler returns results from a query (including nil results).
func TestHandler_Query(t *testing.T) {
h := NewHandler(false)
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, user *meta.UserInfo, chunkSize int) (<-chan *influxql.Result, error) {
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, chunkSize int) (<-chan *influxql.Result, error) {
if q.String() != `SELECT * FROM bar` {
t.Fatalf("unexpected query: %s", q.String())
} else if db != `foo` {
@ -158,7 +158,7 @@ func TestHandler_Query(t *testing.T) {
// Ensure the handler merges results from the same statement.
func TestHandler_Query_MergeResults(t *testing.T) {
h := NewHandler(false)
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, user *meta.UserInfo, chunkSize int) (<-chan *influxql.Result, error) {
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, chunkSize int) (<-chan *influxql.Result, error) {
return NewResultChan(
&influxql.Result{StatementID: 1, Series: influxql.Rows{{Name: "series0"}}},
&influxql.Result{StatementID: 1, Series: influxql.Rows{{Name: "series1"}}},
@ -177,7 +177,7 @@ func TestHandler_Query_MergeResults(t *testing.T) {
// Ensure the handler can parse chunked and chunk size query parameters.
func TestHandler_Query_Chunked(t *testing.T) {
h := NewHandler(false)
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, user *meta.UserInfo, chunkSize int) (<-chan *influxql.Result, error) {
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, chunkSize int) (<-chan *influxql.Result, error) {
if chunkSize != 2 {
t.Fatalf("unexpected chunk size: %d", chunkSize)
}
@ -223,7 +223,7 @@ func TestHandler_Query_ErrInvalidQuery(t *testing.T) {
// Ensure the handler returns a status 401 if the user is not authorized.
func TestHandler_Query_ErrUnauthorized(t *testing.T) {
h := NewHandler(false)
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, user *meta.UserInfo, chunkSize int) (<-chan *influxql.Result, error) {
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, chunkSize int) (<-chan *influxql.Result, error) {
return nil, meta.NewAuthError("marker")
}
@ -237,7 +237,7 @@ func TestHandler_Query_ErrUnauthorized(t *testing.T) {
// Ensure the handler returns a status 500 if an error is returned from the query executor.
func TestHandler_Query_ErrExecuteQuery(t *testing.T) {
h := NewHandler(false)
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, user *meta.UserInfo, chunkSize int) (<-chan *influxql.Result, error) {
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, chunkSize int) (<-chan *influxql.Result, error) {
return nil, errors.New("marker")
}
@ -251,7 +251,7 @@ func TestHandler_Query_ErrExecuteQuery(t *testing.T) {
// Ensure the handler returns a status 200 if an error is returned in the result.
func TestHandler_Query_ErrResult(t *testing.T) {
h := NewHandler(false)
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, user *meta.UserInfo, chunkSize int) (<-chan *influxql.Result, error) {
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, chunkSize int) (<-chan *influxql.Result, error) {
return NewResultChan(&influxql.Result{Err: errors.New("measurement not found")}), nil
}
@ -267,7 +267,7 @@ func TestHandler_Query_ErrResult(t *testing.T) {
// Ensure the handler returns a status 401 if an auth error is returned from the result.
func TestHandler_Query_Result_ErrUnauthorized(t *testing.T) {
h := NewHandler(false)
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, user *meta.UserInfo, chunkSize int) (<-chan *influxql.Result, error) {
h.QueryExecutor.ExecuteQueryFn = func(q *influxql.Query, db string, chunkSize int) (<-chan *influxql.Result, error) {
return NewResultChan(&influxql.Result{Err: meta.NewAuthError("marker")}), nil
}
@ -376,7 +376,6 @@ type Handler struct {
*httpd.Handler
MetaStore HandlerMetaStore
QueryExecutor HandlerQueryExecutor
SeriesWriter HandlerSeriesWriter
}
// NewHandler returns a new instance of Handler.
@ -386,7 +385,6 @@ func NewHandler(requireAuthentication bool) *Handler {
}
h.Handler.MetaStore = &h.MetaStore
h.Handler.QueryExecutor = &h.QueryExecutor
h.Handler.SeriesWriter = &h.SeriesWriter
return h
}
@ -411,20 +409,11 @@ func (s *HandlerMetaStore) Users() ([]meta.UserInfo, error) {
// HandlerQueryExecutor is a mock implementation of Handler.QueryExecutor.
type HandlerQueryExecutor struct {
ExecuteQueryFn func(q *influxql.Query, db string, user *meta.UserInfo, chunkSize int) (<-chan *influxql.Result, error)
ExecuteQueryFn func(q *influxql.Query, db string, chunkSize int) (<-chan *influxql.Result, error)
}
func (e *HandlerQueryExecutor) ExecuteQuery(q *influxql.Query, db string, user *meta.UserInfo, chunkSize int) (<-chan *influxql.Result, error) {
return e.ExecuteQueryFn(q, db, user, chunkSize)
}
// HandlerSeriesWriter is a mock implementation of Handler.SeriesWriter.
type HandlerSeriesWriter struct {
WriteSeriesFn func(database, retentionPolicy string, points []tsdb.Point) error
}
func (w *HandlerSeriesWriter) WriteSeries(database, retentionPolicy string, points []tsdb.Point) error {
return w.WriteSeriesFn(database, retentionPolicy, points)
func (e *HandlerQueryExecutor) ExecuteQuery(q *influxql.Query, db string, chunkSize int) (<-chan *influxql.Result, error) {
return e.ExecuteQueryFn(q, db, chunkSize)
}
// MustNewRequest returns a new HTTP request. Panic on error.

View File

@ -2,36 +2,50 @@ package httpd
import (
"fmt"
"log"
"net"
"net/http"
"os"
"strings"
)
// Service manages the listener and handler for an HTTP endpoint.
type Service struct {
listener net.Listener
addr string
err chan error
ln net.Listener
addr string
err chan error
Handler Handler
Handler *Handler
Logger *log.Logger
}
// NewService returns a new instance of Service.
func NewService(c Config) *Service {
return &Service{
s := &Service{
addr: c.BindAddress,
err: make(chan error),
Handler: NewHandler(
c.AuthEnabled,
c.LogEnabled,
"FIXME",
),
Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags),
}
s.Handler.Logger = s.Logger
return s
}
// Open starts the service
func (s *Service) Open() error {
// Open listener.
listener, err := net.Listen("tcp", s.addr)
ln, err := net.Listen("tcp", s.addr)
if err != nil {
return err
}
s.listener = listener
s.ln = ln
s.Logger.Println("listening on HTTP:", ln.Addr().String())
// Begin listening for requests in a separate goroutine.
go s.serve()
@ -40,8 +54,8 @@ func (s *Service) Open() error {
// Close closes the underlying listener.
func (s *Service) Close() error {
if s.listener != nil {
return s.listener.Close()
if s.ln != nil {
return s.ln.Close()
}
return nil
}
@ -51,8 +65,8 @@ func (s *Service) Err() <-chan error { return s.err }
// Addr returns the listener's address. Returns nil if listener is closed.
func (s *Service) Addr() net.Addr {
if s.listener != nil {
return s.listener.Addr()
if s.ln != nil {
return s.ln.Addr()
}
return nil
}
@ -61,7 +75,7 @@ func (s *Service) Addr() net.Addr {
func (s *Service) serve() {
// The listener was closed so exit
// See https://github.com/golang/go/issues/4373
err := http.Serve(s.listener, &s.Handler)
err := http.Serve(s.ln, s.Handler)
if err != nil && !strings.Contains(err.Error(), "closed") {
s.err <- fmt.Errorf("listener failed: addr=%s, err=%s", s.Addr(), err)
}

View File

@ -1,12 +1,19 @@
echo "creating database"
#!/bin/bash
set -e
echo "> creating database"
curl -G http://localhost:8086/query --data-urlencode "q=CREATE DATABASE foo"
echo "creating retention policy"
curl -G http://localhost:8086/query --data-urlencode "q=CREATE RETENTION POLICY bar ON foo DURATION 1h REPLICATION 3 DEFAULT"
echo ""
echo "> creating retention policy"
curl -G http://localhost:8086/query --data-urlencode "q=CREATE RETENTION POLICY bar ON foo DURATION 1h REPLICATION 1 DEFAULT"
echo "inserting data"
curl -d '{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"time": "2015-01-26T22:01:11.703Z","fields": {"value": 100}}]}' -H "Content-Type: application/json" http://localhost:8086/write
echo ""
echo "> inserting data"
curl -v -X POST "http://localhost:8086/write?db=foo&rp=bar" -d 'cpu,host=server01 value=1.0'
curl -v -d '{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server02"},"fields": {"value": 100}}]}' -H "Content-Type: application/json" http://localhost:8086/write
echo "querying data"
curl -G http://localhost:8086/query --data-urlencode "db=foo" --data-urlencode "q=SELECT sum(value) FROM \"foo\".\"bar\".cpu GROUP BY time(1h)"
echo ""
echo "> querying data"
curl -G http://localhost:8086/query --data-urlencode "db=foo" --data-urlencode "pretty=true" --data-urlencode "q=SELECT * FROM \"foo\".\"bar\".cpu"

View File

@ -40,7 +40,7 @@ type QueryExecutor struct {
Logger *log.Logger
// the local daata store
// the local data store
store *Store
}
@ -125,7 +125,7 @@ func (q *QueryExecutor) Authorize(u *meta.UserInfo, query *influxql.Query, datab
// ExecuteQuery executes an InfluxQL query against the server.
// It sends results down the passed in chan and closes it when done. It will close the chan
// on the first statement that throws an error.
func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chunkSize int) (chan *influxql.Result, error) {
func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error) {
q.Stats.Add("queriesRx", int64(len(query.Statements)))
// Execute each statement. Keep the iterator external so we can