commit
50be500777
|
@ -1,3 +1,29 @@
|
|||
package cluster
|
||||
|
||||
type Config struct{}
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/toml"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultBindAddress is the default bind address for the HTTP server.
|
||||
DefaultBindAddress = ":8087"
|
||||
|
||||
// DefaultShardWriterTimeout is the default timeout set on shard writers.
|
||||
DefaultShardWriterTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// Config represents the configuration for the the clustering service.
|
||||
type Config struct {
|
||||
BindAddress string `toml:"bind-address"`
|
||||
ShardWriterTimeout toml.Duration `toml:"shard-writer-timeout"`
|
||||
}
|
||||
|
||||
// NewConfig returns an instance of Config with defaults.
|
||||
func NewConfig() Config {
|
||||
return Config{
|
||||
BindAddress: DefaultBindAddress,
|
||||
ShardWriterTimeout: toml.Duration(DefaultShardWriterTimeout),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
package cluster_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
"github.com/influxdb/influxdb/cluster"
|
||||
)
|
||||
|
||||
func TestConfig_Parse(t *testing.T) {
|
||||
// Parse configuration.
|
||||
var c cluster.Config
|
||||
if _, err := toml.Decode(`
|
||||
bind-address = ":8080"
|
||||
shard-writer-timeout = "10s"
|
||||
`, &c); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Validate configuration.
|
||||
if c.BindAddress != ":8080" {
|
||||
t.Fatalf("unexpected bind address: %s", c.BindAddress)
|
||||
} else if time.Duration(c.ShardWriterTimeout) != 10*time.Second {
|
||||
t.Fatalf("unexpected bind address: %s", c.ShardWriterTimeout)
|
||||
}
|
||||
}
|
|
@ -26,7 +26,8 @@ var _ = math.Inf
|
|||
|
||||
type WriteShardRequest struct {
|
||||
ShardID *uint64 `protobuf:"varint,1,req" json:"ShardID,omitempty"`
|
||||
Points []*Point `protobuf:"bytes,2,rep" json:"Points,omitempty"`
|
||||
OwnerID *uint64 `protobuf:"varint,2,req" json:"OwnerID,omitempty"`
|
||||
Points []*Point `protobuf:"bytes,3,rep" json:"Points,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -41,6 +42,13 @@ func (m *WriteShardRequest) GetShardID() uint64 {
|
|||
return 0
|
||||
}
|
||||
|
||||
func (m *WriteShardRequest) GetOwnerID() uint64 {
|
||||
if m != nil && m.OwnerID != nil {
|
||||
return *m.OwnerID
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *WriteShardRequest) GetPoints() []*Point {
|
||||
if m != nil {
|
||||
return m.Points
|
||||
|
|
|
@ -2,7 +2,8 @@ package internal;
|
|||
|
||||
message WriteShardRequest {
|
||||
required uint64 ShardID = 1;
|
||||
repeated Point Points = 2;
|
||||
required uint64 OwnerID = 2;
|
||||
repeated Point Points = 3;
|
||||
}
|
||||
|
||||
message Field {
|
||||
|
|
|
@ -1,226 +0,0 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrBindAddressRequired is returned when starting the Server
|
||||
// without providing a bind address
|
||||
ErrBindAddressRequired = errors.New("bind address required")
|
||||
|
||||
// ErrServerClosed return when closing an already closed graphite server.
|
||||
ErrServerClosed = errors.New("server already closed")
|
||||
)
|
||||
|
||||
type writer interface {
|
||||
WriteShard(shardID uint64, points []tsdb.Point) error
|
||||
}
|
||||
|
||||
// Server processes data received over raw TCP connections.
|
||||
type Server struct {
|
||||
writer writer
|
||||
listener *net.Listener
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
Logger *log.Logger
|
||||
|
||||
shutdown chan struct{}
|
||||
|
||||
mu sync.RWMutex
|
||||
// the actual addr the server opens on
|
||||
addr net.Addr
|
||||
// used to initially spin up the server, could be a zero port
|
||||
laddr string
|
||||
}
|
||||
|
||||
// NewServer returns a new instance of a Server.
|
||||
func NewServer(w writer, laddr string) *Server {
|
||||
return &Server{
|
||||
writer: w,
|
||||
laddr: laddr,
|
||||
Logger: log.New(os.Stderr, "[tcp] ", log.LstdFlags),
|
||||
shutdown: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Open instructs the Server to start processing connections
|
||||
func (s *Server) Open() error {
|
||||
if s.laddr == "" { // Make sure we have an laddr
|
||||
return ErrBindAddressRequired
|
||||
}
|
||||
|
||||
ln, err := net.Listen("tcp", s.laddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.mu.Lock()
|
||||
s.listener = &ln
|
||||
s.addr = ln.Addr()
|
||||
s.mu.Unlock()
|
||||
|
||||
s.Logger.Println("listening on TCP connection", ln.Addr().String())
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
|
||||
for {
|
||||
// Are we shutting down? If so, exit
|
||||
select {
|
||||
case <-s.shutdown:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
conn, err := ln.Accept()
|
||||
if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
|
||||
s.Logger.Println("error temporarily accepting TCP connection", err.Error())
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
s.Logger.Println("TCP listener closed")
|
||||
return
|
||||
}
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.handleConnection(conn)
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close will close the listener
|
||||
func (s *Server) Close() error {
|
||||
// Stop accepting client connections
|
||||
if s.listener != nil {
|
||||
err := (*s.listener).Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
return ErrServerClosed
|
||||
}
|
||||
// Shut down all handlers
|
||||
close(s.shutdown)
|
||||
s.wg.Wait()
|
||||
s.listener = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleConnection services an individual TCP connection.
|
||||
func (s *Server) handleConnection(conn net.Conn) {
|
||||
|
||||
// Start our reader up in a go routine so we don't block checking our close channel
|
||||
go func() {
|
||||
for {
|
||||
var messageType byte
|
||||
|
||||
err := binary.Read(conn, binary.LittleEndian, &messageType)
|
||||
if err != nil {
|
||||
s.Logger.Printf("unable to read message type %s", err)
|
||||
return
|
||||
}
|
||||
s.processMessage(messageType, conn)
|
||||
|
||||
select {
|
||||
case <-s.shutdown:
|
||||
// Are we shutting down? If so, exit
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.shutdown:
|
||||
// Are we shutting down? If so, exit
|
||||
conn.Close()
|
||||
s.wg.Done()
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) processMessage(messageType byte, conn net.Conn) {
|
||||
switch messageType {
|
||||
case writeShardRequestMessage:
|
||||
err := s.writeShardRequest(conn)
|
||||
s.writeShardResponse(conn, err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Server) writeShardRequest(conn net.Conn) error {
|
||||
var size int64
|
||||
if err := binary.Read(conn, binary.LittleEndian, &size); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
message := make([]byte, size)
|
||||
|
||||
reader := io.LimitReader(conn, size)
|
||||
if _, err := reader.Read(message); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var wsr WriteShardRequest
|
||||
if err := wsr.UnmarshalBinary(message); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.writer.WriteShard(wsr.ShardID(), wsr.Points())
|
||||
}
|
||||
|
||||
func (s *Server) writeShardResponse(conn net.Conn, e error) {
|
||||
var mt byte = writeShardResponseMessage
|
||||
if err := binary.Write(conn, binary.LittleEndian, &mt); err != nil {
|
||||
s.Logger.Printf("error writing shard response message type: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
var wsr WriteShardResponse
|
||||
if e != nil {
|
||||
wsr.SetCode(1)
|
||||
wsr.SetMessage(e.Error())
|
||||
} else {
|
||||
wsr.SetCode(0)
|
||||
}
|
||||
|
||||
b, err := wsr.MarshalBinary()
|
||||
if err != nil {
|
||||
s.Logger.Printf("error marshalling shard response: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
size := int64(len(b))
|
||||
|
||||
if err := binary.Write(conn, binary.LittleEndian, &size); err != nil {
|
||||
s.Logger.Printf("error writing shard response length: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := conn.Write(b); err != nil {
|
||||
s.Logger.Printf("error writing shard response: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) Addr() net.Addr {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.addr
|
||||
|
||||
}
|
|
@ -1,100 +0,0 @@
|
|||
package cluster_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/cluster"
|
||||
"github.com/influxdb/influxdb/meta"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
type metaStore struct {
|
||||
host string
|
||||
}
|
||||
|
||||
func (m *metaStore) Node(nodeID uint64) (*meta.NodeInfo, error) {
|
||||
return &meta.NodeInfo{
|
||||
ID: nodeID,
|
||||
Host: m.host,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type testServer struct {
|
||||
writeShardFunc func(shardID uint64, points []tsdb.Point) error
|
||||
}
|
||||
|
||||
func newTestServer(f func(shardID uint64, points []tsdb.Point) error) testServer {
|
||||
return testServer{
|
||||
writeShardFunc: f,
|
||||
}
|
||||
}
|
||||
|
||||
type serverResponses []serverResponse
|
||||
type serverResponse struct {
|
||||
shardID uint64
|
||||
points []tsdb.Point
|
||||
}
|
||||
|
||||
func (t testServer) WriteShard(shardID uint64, points []tsdb.Point) error {
|
||||
return t.writeShardFunc(shardID, points)
|
||||
}
|
||||
|
||||
func writeShardSuccess(shardID uint64, points []tsdb.Point) error {
|
||||
responses <- &serverResponse{
|
||||
shardID: shardID,
|
||||
points: points,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeShardFail(shardID uint64, points []tsdb.Point) error {
|
||||
return fmt.Errorf("failed to write")
|
||||
}
|
||||
|
||||
var responses = make(chan *serverResponse, 1024)
|
||||
|
||||
func (testServer) ResponseN(n int) ([]*serverResponse, error) {
|
||||
var a []*serverResponse
|
||||
for {
|
||||
select {
|
||||
case r := <-responses:
|
||||
a = append(a, r)
|
||||
if len(a) == n {
|
||||
return a, nil
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
return a, fmt.Errorf("unexpected response count: expected: %d, actual: %d", n, len(a))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_Close_ErrServerClosed(t *testing.T) {
|
||||
var (
|
||||
ts testServer
|
||||
s = cluster.NewServer(ts, "127.0.0.1:0")
|
||||
)
|
||||
|
||||
if e := s.Open(); e != nil {
|
||||
t.Fatalf("err does not match. expected %v, got %v", nil, e)
|
||||
}
|
||||
|
||||
// Close the server
|
||||
s.Close()
|
||||
|
||||
// Try to close it again
|
||||
if err := s.Close(); err != cluster.ErrServerClosed {
|
||||
t.Fatalf("expected an error, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_Close_ErrBindAddressRequired(t *testing.T) {
|
||||
var (
|
||||
ts testServer
|
||||
s = cluster.NewServer(ts, "")
|
||||
)
|
||||
if e := s.Open(); e == nil {
|
||||
t.Fatalf("exprected error %s, got nil.", cluster.ErrBindAddressRequired)
|
||||
}
|
||||
}
|
|
@ -10,7 +10,7 @@ import (
|
|||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
const defaultWriteTimeout = 5 * time.Second
|
||||
const DefaultWriteTimeout = 5 * time.Second
|
||||
|
||||
// ConsistencyLevel represent a required replication criteria before a write can
|
||||
// be returned as successful
|
||||
|
@ -31,20 +31,19 @@ const (
|
|||
)
|
||||
|
||||
var (
|
||||
// ErrTimeout is returned when a write times out
|
||||
// ErrTimeout is returned when a write times out.
|
||||
ErrTimeout = errors.New("timeout")
|
||||
|
||||
// ErrPartialWrite is returned when a write partially succeeds but does
|
||||
// not meet the requested consistency level
|
||||
// not meet the requested consistency level.
|
||||
ErrPartialWrite = errors.New("partial write")
|
||||
|
||||
// ErrWriteFailed is returned when no writes succeeded
|
||||
// ErrWriteFailed is returned when no writes succeeded.
|
||||
ErrWriteFailed = errors.New("write failed")
|
||||
)
|
||||
|
||||
// Coordinator handle queries and writes across multiple local and remote
|
||||
// data nodes.
|
||||
type Coordinator struct {
|
||||
// PointsWriter handles writes across multiple local and remote data nodes.
|
||||
type PointsWriter struct {
|
||||
nodeID uint64
|
||||
mu sync.RWMutex
|
||||
closing chan struct{}
|
||||
|
@ -59,13 +58,14 @@ type Coordinator struct {
|
|||
WriteToShard(shardID uint64, points []tsdb.Point) error
|
||||
}
|
||||
|
||||
ClusterWriter interface {
|
||||
Write(shardID, ownerID uint64, points []tsdb.Point) error
|
||||
ShardWriter interface {
|
||||
WriteShard(shardID, ownerID uint64, points []tsdb.Point) error
|
||||
}
|
||||
}
|
||||
|
||||
func NewCoordinator(localID uint64) *Coordinator {
|
||||
return &Coordinator{
|
||||
// NewPointsWriter returns a new instance of PointsWriter for a node.
|
||||
func NewPointsWriter(localID uint64) *PointsWriter {
|
||||
return &PointsWriter{
|
||||
nodeID: localID,
|
||||
closing: make(chan struct{}),
|
||||
}
|
||||
|
@ -96,21 +96,21 @@ func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p tsdb.Point) {
|
|||
s.Shards[shardInfo.ID] = shardInfo
|
||||
}
|
||||
|
||||
func (c *Coordinator) Open() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.closing == nil {
|
||||
c.closing = make(chan struct{})
|
||||
func (w *PointsWriter) Open() error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
if w.closing == nil {
|
||||
w.closing = make(chan struct{})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Coordinator) Close() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.closing != nil {
|
||||
close(c.closing)
|
||||
c.closing = nil
|
||||
func (w *PointsWriter) Close() error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
if w.closing != nil {
|
||||
close(w.closing)
|
||||
w.closing = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -118,7 +118,7 @@ func (c *Coordinator) Close() error {
|
|||
// MapShards maps the points contained in wp to a ShardMapping. If a point
|
||||
// maps to a shard group or shard that does not currently exist, it will be
|
||||
// created before returning the mapping.
|
||||
func (c *Coordinator) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {
|
||||
func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {
|
||||
|
||||
// Stub out the MapShards call to return a single node/shard setup
|
||||
if os.Getenv("INFLUXDB_ALPHA1") != "" {
|
||||
|
@ -136,7 +136,7 @@ func (c *Coordinator) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {
|
|||
// holds the start time ranges for required shard groups
|
||||
timeRanges := map[time.Time]*meta.ShardGroupInfo{}
|
||||
|
||||
rp, err := c.MetaStore.RetentionPolicy(wp.Database, wp.RetentionPolicy)
|
||||
rp, err := w.MetaStore.RetentionPolicy(wp.Database, wp.RetentionPolicy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -147,7 +147,7 @@ func (c *Coordinator) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {
|
|||
|
||||
// holds all the shard groups and shards that are required for writes
|
||||
for t := range timeRanges {
|
||||
sg, err := c.MetaStore.CreateShardGroupIfNotExists(wp.Database, wp.RetentionPolicy, t)
|
||||
sg, err := w.MetaStore.CreateShardGroupIfNotExists(wp.Database, wp.RetentionPolicy, t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -163,10 +163,9 @@ func (c *Coordinator) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {
|
|||
return mapping, nil
|
||||
}
|
||||
|
||||
// Write is coordinates multiple writes across local and remote data nodes
|
||||
// according the request consistency level
|
||||
func (c *Coordinator) Write(p *WritePointsRequest) error {
|
||||
shardMappings, err := c.MapShards(p)
|
||||
// WritePoints writes across multiple local and remote data nodes according the consistency level.
|
||||
func (w *PointsWriter) WritePoints(p *WritePointsRequest) error {
|
||||
shardMappings, err := w.MapShards(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -176,13 +175,13 @@ func (c *Coordinator) Write(p *WritePointsRequest) error {
|
|||
ch := make(chan error, len(shardMappings.Points))
|
||||
for shardID, points := range shardMappings.Points {
|
||||
go func(shard *meta.ShardInfo, database, retentionPolicy string, points []tsdb.Point) {
|
||||
ch <- c.writeToShard(shard, p.Database, p.RetentionPolicy, p.ConsistencyLevel, points)
|
||||
ch <- w.writeToShard(shard, p.Database, p.RetentionPolicy, p.ConsistencyLevel, points)
|
||||
}(shardMappings.Shards[shardID], p.Database, p.RetentionPolicy, points)
|
||||
}
|
||||
|
||||
for range shardMappings.Points {
|
||||
select {
|
||||
case <-c.closing:
|
||||
case <-w.closing:
|
||||
return ErrWriteFailed
|
||||
case err := <-ch:
|
||||
if err != nil {
|
||||
|
@ -195,7 +194,7 @@ func (c *Coordinator) Write(p *WritePointsRequest) error {
|
|||
|
||||
// writeToShards writes points to a shard and ensures a write consistency level has been met. If the write
|
||||
// partially succceds, ErrPartialWrite is returned.
|
||||
func (c *Coordinator) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,
|
||||
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,
|
||||
consistency ConsistencyLevel, points []tsdb.Point) error {
|
||||
// The required number of writes to achieve the requested consistency level
|
||||
required := len(shard.OwnerIDs)
|
||||
|
@ -211,23 +210,23 @@ func (c *Coordinator) writeToShard(shard *meta.ShardInfo, database, retentionPol
|
|||
|
||||
for _, nodeID := range shard.OwnerIDs {
|
||||
go func(shardID, nodeID uint64, points []tsdb.Point) {
|
||||
if c.nodeID == nodeID {
|
||||
err := c.Store.WriteToShard(shardID, points)
|
||||
if w.nodeID == nodeID {
|
||||
err := w.Store.WriteToShard(shardID, points)
|
||||
// If we've written to shard that should exist on the current node, but the store has
|
||||
// not actually created this shard, tell it to create it and retry the write
|
||||
if err == tsdb.ErrShardNotFound {
|
||||
err = c.Store.CreateShard(database, retentionPolicy, shardID)
|
||||
err = w.Store.CreateShard(database, retentionPolicy, shardID)
|
||||
if err != nil {
|
||||
ch <- err
|
||||
return
|
||||
}
|
||||
err = c.Store.WriteToShard(shardID, points)
|
||||
err = w.Store.WriteToShard(shardID, points)
|
||||
}
|
||||
ch <- err
|
||||
|
||||
// FIXME: When ClusterWriter is implemented, this should never be nil
|
||||
} else if c.ClusterWriter != nil {
|
||||
ch <- c.ClusterWriter.Write(shardID, nodeID, points)
|
||||
// FIXME: When ShardWriter is implemented, this should never be nil
|
||||
} else if w.ShardWriter != nil {
|
||||
ch <- w.ShardWriter.WriteShard(shardID, nodeID, points)
|
||||
} else {
|
||||
ch <- ErrWriteFailed
|
||||
}
|
||||
|
@ -235,10 +234,10 @@ func (c *Coordinator) writeToShard(shard *meta.ShardInfo, database, retentionPol
|
|||
}
|
||||
|
||||
var wrote int
|
||||
timeout := time.After(defaultWriteTimeout)
|
||||
timeout := time.After(DefaultWriteTimeout)
|
||||
for range shard.OwnerIDs {
|
||||
select {
|
||||
case <-c.closing:
|
||||
case <-w.closing:
|
||||
return ErrWriteFailed
|
||||
case <-timeout:
|
||||
// return timeout error to caller
|
|
@ -8,14 +8,12 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/cluster"
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
"github.com/influxdb/influxdb/meta"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// TestCoordinatorEnsureShardMappingOne tests that a single point maps to
|
||||
// a single shard
|
||||
func TestCoordinatorEnsureShardMappingOne(t *testing.T) {
|
||||
// Ensures the points writer maps a single point to a single shard.
|
||||
func TestPointsWriter_MapShards_One(t *testing.T) {
|
||||
ms := MetaStore{}
|
||||
rp := NewRetentionPolicy("myp", time.Hour, 3)
|
||||
|
||||
|
@ -27,7 +25,7 @@ func TestCoordinatorEnsureShardMappingOne(t *testing.T) {
|
|||
return &rp.ShardGroups[0], nil
|
||||
}
|
||||
|
||||
c := cluster.Coordinator{MetaStore: ms}
|
||||
c := cluster.PointsWriter{MetaStore: ms}
|
||||
pr := &cluster.WritePointsRequest{
|
||||
Database: "mydb",
|
||||
RetentionPolicy: "myrp",
|
||||
|
@ -48,9 +46,8 @@ func TestCoordinatorEnsureShardMappingOne(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestCoordinatorEnsureShardMappingMultiple tests that MapShards maps multiple points
|
||||
// across shard group boundaries to multiple shards
|
||||
func TestCoordinatorEnsureShardMappingMultiple(t *testing.T) {
|
||||
// Ensures the points writer maps a multiple points across shard group boundries.
|
||||
func TestPointsWriter_MapShards_Multiple(t *testing.T) {
|
||||
ms := MetaStore{}
|
||||
rp := NewRetentionPolicy("myp", time.Hour, 3)
|
||||
AttachShardGroupInfo(rp, []uint64{1, 2, 3})
|
||||
|
@ -69,7 +66,7 @@ func TestCoordinatorEnsureShardMappingMultiple(t *testing.T) {
|
|||
panic("should not get here")
|
||||
}
|
||||
|
||||
c := cluster.Coordinator{MetaStore: ms}
|
||||
c := cluster.PointsWriter{MetaStore: ms}
|
||||
pr := &cluster.WritePointsRequest{
|
||||
Database: "mydb",
|
||||
RetentionPolicy: "myrp",
|
||||
|
@ -111,8 +108,7 @@ func TestCoordinatorEnsureShardMappingMultiple(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCoordinatorWrite(t *testing.T) {
|
||||
|
||||
func TestPointsWriter_WritePoints(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
consistency cluster.ConsistencyLevel
|
||||
|
@ -228,7 +224,7 @@ func TestCoordinatorWrite(t *testing.T) {
|
|||
// Local cluster.Node ShardWriter
|
||||
// lock on the write increment since these functions get called in parallel
|
||||
var mu sync.Mutex
|
||||
dn := &fakeShardWriter{
|
||||
sw := &fakeShardWriter{
|
||||
ShardWriteFn: func(shardID, nodeID uint64, points []tsdb.Point) error {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
@ -244,28 +240,26 @@ func TestCoordinatorWrite(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
ms := newTestMetaStore()
|
||||
c := cluster.Coordinator{
|
||||
MetaStore: ms,
|
||||
ClusterWriter: dn,
|
||||
Store: store,
|
||||
ms := NewMetaStore()
|
||||
c := cluster.PointsWriter{
|
||||
MetaStore: ms,
|
||||
ShardWriter: sw,
|
||||
Store: store,
|
||||
}
|
||||
|
||||
if err := c.Write(pr); err != test.expErr {
|
||||
t.Errorf("Coordinator.Write(): '%s' error: got %v, exp %v", test.name, err, test.expErr)
|
||||
if err := c.WritePoints(pr); err != test.expErr {
|
||||
t.Errorf("PointsWriter.WritePoints(): '%s' error: got %v, exp %v", test.name, err, test.expErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
shardID uint64
|
||||
)
|
||||
var shardID uint64
|
||||
|
||||
type fakeShardWriter struct {
|
||||
ShardWriteFn func(shardID, nodeID uint64, points []tsdb.Point) error
|
||||
}
|
||||
|
||||
func (f *fakeShardWriter) Write(shardID, nodeID uint64, points []tsdb.Point) error {
|
||||
func (f *fakeShardWriter) WriteShard(shardID, nodeID uint64, points []tsdb.Point) error {
|
||||
return f.ShardWriteFn(shardID, nodeID, points)
|
||||
}
|
||||
|
||||
|
@ -282,7 +276,7 @@ func (f *fakeStore) CreateShard(database, retentionPolicy string, shardID uint64
|
|||
return f.CreateShardfn(database, retentionPolicy, shardID)
|
||||
}
|
||||
|
||||
func newTestMetaStore() *MetaStore {
|
||||
func NewMetaStore() *MetaStore {
|
||||
ms := &MetaStore{}
|
||||
rp := NewRetentionPolicy("myp", time.Hour, 3)
|
||||
AttachShardGroupInfo(rp, []uint64{1, 2, 3})
|
||||
|
@ -304,145 +298,19 @@ func newTestMetaStore() *MetaStore {
|
|||
}
|
||||
|
||||
type MetaStore struct {
|
||||
OpenFn func(path string) error
|
||||
CloseFn func() error
|
||||
|
||||
CreateContinuousQueryFn func(query string) (*meta.ContinuousQueryInfo, error)
|
||||
DropContinuousQueryFn func(query string) error
|
||||
|
||||
NodeFn func(id uint64) (*meta.NodeInfo, error)
|
||||
NodeByHostFn func(host string) (*meta.NodeInfo, error)
|
||||
CreateNodeFn func(host string) (*meta.NodeInfo, error)
|
||||
DeleteNodeFn func(id uint64) error
|
||||
|
||||
DatabaseFn func(name string) (*meta.DatabaseInfo, error)
|
||||
CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error)
|
||||
CreateDatabaseIfNotExistsFn func(name string) (*meta.DatabaseInfo, error)
|
||||
DropDatabaseFn func(name string) error
|
||||
|
||||
RetentionPolicyFn func(database, name string) (*meta.RetentionPolicyInfo, error)
|
||||
CreateRetentionPolicyFn func(database string, rp *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
|
||||
CreateRetentionPolicyIfNotExistsFn func(database string, rp *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
|
||||
SetDefaultRetentionPolicyFn func(database, name string) error
|
||||
UpdateRetentionPolicyFn func(database, name string, rpu *meta.RetentionPolicyUpdate) (*meta.RetentionPolicyInfo, error)
|
||||
DeleteRetentionPolicyFn func(database, name string) error
|
||||
|
||||
ShardGroupFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
|
||||
RetentionPolicyFn func(database, name string) (*meta.RetentionPolicyInfo, error)
|
||||
CreateShardGroupIfNotExistsFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
|
||||
DeleteShardGroupFn func(database, policy string, shardID uint64) error
|
||||
|
||||
UserFn func(username string) (*meta.UserInfo, error)
|
||||
CreateUserFn func(username, password string, admin bool) (*meta.UserInfo, error)
|
||||
UpdateUserFn func(username, password string) (*meta.UserInfo, error)
|
||||
DeleteUserFn func(username string) error
|
||||
SetPrivilegeFn func(p influxql.Privilege, username string, dbname string) error
|
||||
}
|
||||
|
||||
func (m MetaStore) Open(path string) error {
|
||||
return m.OpenFn(path)
|
||||
}
|
||||
|
||||
func (m MetaStore) Close() error {
|
||||
return m.CloseFn()
|
||||
}
|
||||
|
||||
func (m MetaStore) CreateContinuousQuery(query string) (*meta.ContinuousQueryInfo, error) {
|
||||
return m.CreateContinuousQueryFn(query)
|
||||
}
|
||||
|
||||
func (m MetaStore) DropContinuousQuery(query string) error {
|
||||
return m.DropContinuousQueryFn(query)
|
||||
}
|
||||
|
||||
func (m MetaStore) Node(id uint64) (*meta.NodeInfo, error) {
|
||||
return m.NodeFn(id)
|
||||
}
|
||||
|
||||
func (m MetaStore) NodeByHost(host string) (*meta.NodeInfo, error) {
|
||||
return m.NodeByHostFn(host)
|
||||
}
|
||||
|
||||
func (m MetaStore) CreateNode(host string) (*meta.NodeInfo, error) {
|
||||
return m.CreateNodeFn(host)
|
||||
}
|
||||
|
||||
func (m MetaStore) DeleteNode(id uint64) error {
|
||||
return m.DeleteNodeFn(id)
|
||||
}
|
||||
|
||||
func (m MetaStore) Database(name string) (*meta.DatabaseInfo, error) {
|
||||
return m.DatabaseFn(name)
|
||||
}
|
||||
|
||||
func (m MetaStore) CreateDatabase(name string) (*meta.DatabaseInfo, error) {
|
||||
return m.CreateDatabaseFn(name)
|
||||
}
|
||||
|
||||
func (m MetaStore) CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error) {
|
||||
return m.CreateDatabaseIfNotExistsFn(name)
|
||||
}
|
||||
|
||||
func (m MetaStore) DropDatabase(name string) error {
|
||||
return m.DropDatabaseFn(name)
|
||||
}
|
||||
|
||||
func (m MetaStore) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) {
|
||||
return m.RetentionPolicyFn(database, name)
|
||||
}
|
||||
|
||||
func (m MetaStore) CreateRetentionPolicy(database string, rp *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) {
|
||||
return m.CreateRetentionPolicyFn(database, rp)
|
||||
}
|
||||
|
||||
func (m MetaStore) CreateRetentionPolicyIfNotExists(database string, rp *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) {
|
||||
return m.CreateRetentionPolicyIfNotExistsFn(database, rp)
|
||||
}
|
||||
|
||||
func (m MetaStore) SetDefaultRetentionPolicy(database, name string) error {
|
||||
return m.SetDefaultRetentionPolicyFn(database, name)
|
||||
}
|
||||
|
||||
func (m MetaStore) UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate) (*meta.RetentionPolicyInfo, error) {
|
||||
return m.UpdateRetentionPolicyFn(database, name, rpu)
|
||||
}
|
||||
|
||||
func (m MetaStore) DeleteRetentionPolicy(database, name string) error {
|
||||
return m.DeleteRetentionPolicyFn(database, name)
|
||||
}
|
||||
|
||||
func (m MetaStore) ShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
|
||||
return m.ShardGroupFn(database, policy, timestamp)
|
||||
}
|
||||
|
||||
func (m MetaStore) CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
|
||||
return m.CreateShardGroupIfNotExistsFn(database, policy, timestamp)
|
||||
}
|
||||
|
||||
func (m MetaStore) DeleteShardGroup(database, policy string, shardID uint64) error {
|
||||
return m.DeleteShardGroupFn(database, policy, shardID)
|
||||
}
|
||||
|
||||
func (m MetaStore) User(username string) (*meta.UserInfo, error) {
|
||||
return m.UserFn(username)
|
||||
}
|
||||
|
||||
func (m MetaStore) CreateUser(username, password string, admin bool) (*meta.UserInfo, error) {
|
||||
return m.CreateUserFn(username, password, admin)
|
||||
}
|
||||
|
||||
func (m MetaStore) UpdateUser(username, password string) (*meta.UserInfo, error) {
|
||||
return m.UpdateUserFn(username, password)
|
||||
}
|
||||
func (m MetaStore) DeleteUser(username string) error {
|
||||
return m.DeleteUserFn(username)
|
||||
}
|
||||
|
||||
func (m MetaStore) SetPrivilege(p influxql.Privilege, username string, dbname string) error {
|
||||
return m.SetPrivilegeFn(p, username, dbname)
|
||||
}
|
||||
|
||||
func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *meta.RetentionPolicyInfo {
|
||||
|
||||
shards := []meta.ShardInfo{}
|
||||
ownerIDs := []uint64{}
|
||||
for i := 1; i <= nodeCount; i++ {
|
|
@ -1,14 +0,0 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// QueryCoordinator provides an interface for translating queries to the
|
||||
// appropriate metastore or data node function calls.
|
||||
type QueryCoordinator struct {
|
||||
MetaStore interface {
|
||||
//...
|
||||
}
|
||||
indexes map[string]*tsdb.DatabaseIndex
|
||||
}
|
|
@ -1 +0,0 @@
|
|||
package cluster_test
|
|
@ -10,12 +10,6 @@ import (
|
|||
|
||||
//go:generate protoc --gogo_out=. internal/data.proto
|
||||
|
||||
// PointsWriter accepts a WritePointRequest from client facing endpoints such as
|
||||
// HTTP JSON API, Collectd, Graphite, OpenTSDB, etc.
|
||||
type PointsWriter interface {
|
||||
Write(p *WritePointsRequest) error
|
||||
}
|
||||
|
||||
// WritePointsRequest represents a request to write point data to the cluster
|
||||
type WritePointsRequest struct {
|
||||
Database string
|
||||
|
@ -41,17 +35,13 @@ type WriteShardResponse struct {
|
|||
pb internal.WriteShardResponse
|
||||
}
|
||||
|
||||
func (w *WriteShardRequest) ShardID() uint64 {
|
||||
return w.pb.GetShardID()
|
||||
}
|
||||
func (w *WriteShardRequest) SetShardID(id uint64) { w.pb.ShardID = &id }
|
||||
func (w *WriteShardRequest) ShardID() uint64 { return w.pb.GetShardID() }
|
||||
|
||||
func (w *WriteShardRequest) SetShardID(id uint64) {
|
||||
w.pb.ShardID = &id
|
||||
}
|
||||
func (w *WriteShardRequest) SetOwnerID(id uint64) { w.pb.OwnerID = &id }
|
||||
func (w *WriteShardRequest) OwnerID() uint64 { return w.pb.GetOwnerID() }
|
||||
|
||||
func (w *WriteShardRequest) Points() []tsdb.Point {
|
||||
return w.unmarhalPoints()
|
||||
}
|
||||
func (w *WriteShardRequest) Points() []tsdb.Point { return w.unmarshalPoints() }
|
||||
|
||||
func (w *WriteShardRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) {
|
||||
w.AddPoints([]tsdb.Point{tsdb.NewPoint(
|
||||
|
@ -125,7 +115,7 @@ func (w *WriteShardRequest) UnmarshalBinary(buf []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (w *WriteShardRequest) unmarhalPoints() []tsdb.Point {
|
||||
func (w *WriteShardRequest) unmarshalPoints() []tsdb.Point {
|
||||
points := make([]tsdb.Point, len(w.pb.GetPoints()))
|
||||
for i, p := range w.pb.GetPoints() {
|
||||
pt := tsdb.NewPoint(
|
||||
|
@ -159,22 +149,11 @@ func (w *WriteShardRequest) unmarhalPoints() []tsdb.Point {
|
|||
return points
|
||||
}
|
||||
|
||||
func (w *WriteShardResponse) SetCode(code int) {
|
||||
c32 := int32(code)
|
||||
w.pb.Code = &c32
|
||||
}
|
||||
func (w *WriteShardResponse) SetCode(code int) { w.pb.Code = proto.Int32(int32(code)) }
|
||||
func (w *WriteShardResponse) SetMessage(message string) { w.pb.Message = &message }
|
||||
|
||||
func (w *WriteShardResponse) SetMessage(message string) {
|
||||
w.pb.Message = &message
|
||||
}
|
||||
|
||||
func (w *WriteShardResponse) Code() int {
|
||||
return int(w.pb.GetCode())
|
||||
}
|
||||
|
||||
func (w *WriteShardResponse) Message() string {
|
||||
return w.pb.GetMessage()
|
||||
}
|
||||
func (w *WriteShardResponse) Code() int { return int(w.pb.GetCode()) }
|
||||
func (w *WriteShardResponse) Message() string { return w.pb.GetMessage() }
|
||||
|
||||
// MarshalBinary encodes the object to a binary format.
|
||||
func (w *WriteShardResponse) MarshalBinary() ([]byte, error) {
|
||||
|
|
|
@ -7,12 +7,17 @@ import (
|
|||
|
||||
func TestWriteShardRequestBinary(t *testing.T) {
|
||||
sr := &WriteShardRequest{}
|
||||
sr.SetShardID(uint64(1))
|
||||
|
||||
sr.SetShardID(uint64(1))
|
||||
if exp := uint64(1); sr.ShardID() != exp {
|
||||
t.Fatalf("ShardID mismatch: got %v, exp %v", sr.ShardID(), exp)
|
||||
}
|
||||
|
||||
sr.SetOwnerID(uint64(1))
|
||||
if exp := uint64(1); sr.OwnerID() != exp {
|
||||
t.Fatalf("OwnerID mismatch: got %v, exp %v", sr.OwnerID(), exp)
|
||||
}
|
||||
|
||||
sr.AddPoint("cpu", 1.0, time.Unix(0, 0), map[string]string{"host": "serverA"})
|
||||
sr.AddPoint("cpu", 2.0, time.Unix(0, 0).Add(time.Hour), nil)
|
||||
sr.AddPoint("cpu_load", 3.0, time.Unix(0, 0).Add(time.Hour+time.Second), nil)
|
||||
|
|
|
@ -1,14 +1,218 @@
|
|||
package cluster
|
||||
|
||||
import "net"
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// Service processes data received over raw TCP connections.
|
||||
type Service struct {
|
||||
mu sync.RWMutex
|
||||
addr string
|
||||
ln net.Listener
|
||||
|
||||
wg sync.WaitGroup
|
||||
closing chan struct{}
|
||||
|
||||
ShardWriter interface {
|
||||
WriteShard(shardID, ownerID uint64, points []tsdb.Point) error
|
||||
}
|
||||
|
||||
Logger *log.Logger
|
||||
}
|
||||
|
||||
// NewService returns a new instance of Service.
|
||||
func NewService(c Config) *Service {
|
||||
return &Service{}
|
||||
return &Service{
|
||||
addr: c.BindAddress,
|
||||
closing: make(chan struct{}),
|
||||
Logger: log.New(os.Stderr, "[tcp] ", log.LstdFlags),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) Open() error { return nil }
|
||||
func (s *Service) Close() error { return nil }
|
||||
func (s *Service) Addr() net.Addr { return nil }
|
||||
// Open opens the network listener and begins serving requests.
|
||||
func (s *Service) Open() error {
|
||||
// Open TCP listener.
|
||||
ln, err := net.Listen("tcp", s.addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.ln = ln
|
||||
|
||||
s.Logger.Println("listening on TCP connection", ln.Addr().String())
|
||||
|
||||
// Begin serving conections.
|
||||
s.wg.Add(1)
|
||||
go s.serve()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// serve accepts connections from the listener and handles them.
|
||||
func (s *Service) serve() {
|
||||
defer s.wg.Done()
|
||||
|
||||
for {
|
||||
// Check if the service is shutting down.
|
||||
select {
|
||||
case <-s.closing:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Accept the next connection.
|
||||
conn, err := s.ln.Accept()
|
||||
if opErr, ok := err.(*net.OpError); ok && opErr.Temporary() {
|
||||
s.Logger.Println("error temporarily accepting TCP connection", err.Error())
|
||||
continue
|
||||
} else if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Delegate connection handling to a separate goroutine.
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
s.handleConn(conn)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// Close shuts down the listener and waits for all connections to finish.
|
||||
func (s *Service) Close() error {
|
||||
if s.ln != nil {
|
||||
s.ln.Close()
|
||||
}
|
||||
|
||||
// Shut down all handlers.
|
||||
close(s.closing)
|
||||
s.wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Addr returns the network address of the service.
|
||||
func (s *Service) Addr() net.Addr {
|
||||
if s.ln != nil {
|
||||
return s.ln.Addr()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleConn services an individual TCP connection.
|
||||
func (s *Service) handleConn(conn net.Conn) {
|
||||
// Ensure connection is closed when service is closed.
|
||||
closing := make(chan struct{})
|
||||
defer close(closing)
|
||||
go func() {
|
||||
select {
|
||||
case <-closing:
|
||||
case <-s.closing:
|
||||
}
|
||||
conn.Close()
|
||||
}()
|
||||
|
||||
for {
|
||||
// Read type-length-value.
|
||||
typ, buf, err := ReadTLV(conn)
|
||||
if err != nil && strings.Contains(err.Error(), "closed network connection") {
|
||||
return
|
||||
} else if err != nil {
|
||||
s.Logger.Printf("unable to read type-length-value %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Delegate message processing by type.
|
||||
switch typ {
|
||||
case writeShardRequestMessage:
|
||||
err := s.processWriteShardRequest(buf)
|
||||
s.writeShardResponse(conn, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) processWriteShardRequest(buf []byte) error {
|
||||
// Build request
|
||||
var req WriteShardRequest
|
||||
if err := req.UnmarshalBinary(buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.ShardWriter.WriteShard(req.ShardID(), req.OwnerID(), req.Points()); err != nil {
|
||||
return fmt.Errorf("write shard: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) writeShardResponse(w io.Writer, e error) {
|
||||
// Build response.
|
||||
var resp WriteShardResponse
|
||||
if e != nil {
|
||||
resp.SetCode(1)
|
||||
resp.SetMessage(e.Error())
|
||||
} else {
|
||||
resp.SetCode(0)
|
||||
}
|
||||
|
||||
// Marshal response to binary.
|
||||
buf, err := resp.MarshalBinary()
|
||||
if err != nil {
|
||||
s.Logger.Printf("error marshalling shard response: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Write to connection.
|
||||
if err := WriteTLV(w, writeShardResponseMessage, buf); err != nil {
|
||||
s.Logger.Printf("write shard response error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ReadTLV reads a type-length-value record from r.
|
||||
func ReadTLV(r io.Reader) (byte, []byte, error) {
|
||||
var typ [1]byte
|
||||
if _, err := io.ReadFull(r, typ[:]); err != nil {
|
||||
return 0, nil, fmt.Errorf("read message type: %s", err)
|
||||
}
|
||||
|
||||
// Read the size of the message.
|
||||
var sz int64
|
||||
if err := binary.Read(r, binary.BigEndian, &sz); err != nil {
|
||||
return 0, nil, fmt.Errorf("read message size: %s", err)
|
||||
}
|
||||
|
||||
// Read the value.
|
||||
buf := make([]byte, sz)
|
||||
if _, err := io.ReadFull(r, buf); err != nil {
|
||||
return 0, nil, fmt.Errorf("read message value: %s", err)
|
||||
}
|
||||
|
||||
return typ[0], buf, nil
|
||||
}
|
||||
|
||||
// WriteTLV writes a type-length-value record to w.
|
||||
func WriteTLV(w io.Writer, typ byte, buf []byte) error {
|
||||
if _, err := w.Write([]byte{typ}); err != nil {
|
||||
return fmt.Errorf("write message type: %s", err)
|
||||
}
|
||||
|
||||
// Write the size of the message.
|
||||
if err := binary.Write(w, binary.BigEndian, int64(len(buf))); err != nil {
|
||||
return fmt.Errorf("write message size: %s", err)
|
||||
}
|
||||
|
||||
// Write the value.
|
||||
if _, err := w.Write(buf); err != nil {
|
||||
return fmt.Errorf("write message value: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
package cluster_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/meta"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
type metaStore struct {
|
||||
host string
|
||||
}
|
||||
|
||||
func (m *metaStore) Node(nodeID uint64) (*meta.NodeInfo, error) {
|
||||
return &meta.NodeInfo{
|
||||
ID: nodeID,
|
||||
Host: m.host,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type testService struct {
|
||||
writeShardFunc func(shardID, ownerID uint64, points []tsdb.Point) error
|
||||
}
|
||||
|
||||
func newTestService(f func(shardID, ownerID uint64, points []tsdb.Point) error) testService {
|
||||
return testService{
|
||||
writeShardFunc: f,
|
||||
}
|
||||
}
|
||||
|
||||
type serviceResponses []serviceResponse
|
||||
type serviceResponse struct {
|
||||
shardID uint64
|
||||
ownerID uint64
|
||||
points []tsdb.Point
|
||||
}
|
||||
|
||||
func (t testService) WriteShard(shardID, ownerID uint64, points []tsdb.Point) error {
|
||||
return t.writeShardFunc(shardID, ownerID, points)
|
||||
}
|
||||
|
||||
func writeShardSuccess(shardID, ownerID uint64, points []tsdb.Point) error {
|
||||
responses <- &serviceResponse{
|
||||
shardID: shardID,
|
||||
ownerID: ownerID,
|
||||
points: points,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeShardFail(shardID, ownerID uint64, points []tsdb.Point) error {
|
||||
return fmt.Errorf("failed to write")
|
||||
}
|
||||
|
||||
var responses = make(chan *serviceResponse, 1024)
|
||||
|
||||
func (testService) ResponseN(n int) ([]*serviceResponse, error) {
|
||||
var a []*serviceResponse
|
||||
for {
|
||||
select {
|
||||
case r := <-responses:
|
||||
a = append(a, r)
|
||||
if len(a) == n {
|
||||
return a, nil
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
return a, fmt.Errorf("unexpected response count: expected: %d, actual: %d", n, len(a))
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,149 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/meta"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
"gopkg.in/fatih/pool.v2"
|
||||
)
|
||||
|
||||
const (
|
||||
writeShardRequestMessage byte = iota + 1
|
||||
writeShardResponseMessage
|
||||
)
|
||||
|
||||
// ShardWriter writes a set of points to a shard.
|
||||
type ShardWriter struct {
|
||||
pool *clientPool
|
||||
timeout time.Duration
|
||||
|
||||
MetaStore interface {
|
||||
Node(id uint64) (ni *meta.NodeInfo, err error)
|
||||
}
|
||||
}
|
||||
|
||||
// NewShardWriter returns a new instance of ShardWriter.
|
||||
func NewShardWriter(timeout time.Duration) *ShardWriter {
|
||||
return &ShardWriter{
|
||||
pool: newClientPool(),
|
||||
timeout: timeout,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []tsdb.Point) error {
|
||||
c, err := w.dial(ownerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn, ok := c.(*pool.PoolConn)
|
||||
if !ok {
|
||||
panic("wrong connection type")
|
||||
}
|
||||
defer conn.Close() // return to pool
|
||||
|
||||
// Build write request.
|
||||
var request WriteShardRequest
|
||||
request.SetShardID(shardID)
|
||||
request.SetOwnerID(ownerID)
|
||||
request.AddPoints(points)
|
||||
|
||||
// Marshal into protocol buffers.
|
||||
buf, err := request.MarshalBinary()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write request.
|
||||
conn.SetWriteDeadline(time.Now().Add(w.timeout))
|
||||
if err := WriteTLV(conn, writeShardRequestMessage, buf); err != nil {
|
||||
conn.MarkUnusable()
|
||||
return err
|
||||
}
|
||||
|
||||
// Read the response.
|
||||
conn.SetReadDeadline(time.Now().Add(w.timeout))
|
||||
_, buf, err = ReadTLV(conn)
|
||||
if err != nil {
|
||||
conn.MarkUnusable()
|
||||
return err
|
||||
}
|
||||
|
||||
// Unmarshal response.
|
||||
var response WriteShardResponse
|
||||
if err := response.UnmarshalBinary(buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if response.Code() != 0 {
|
||||
return fmt.Errorf("error code %d: %s", response.Code(), response.Message())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ShardWriter) dial(nodeID uint64) (net.Conn, error) {
|
||||
// If we don't have a connection pool for that addr yet, create one
|
||||
_, ok := c.pool.getPool(nodeID)
|
||||
if !ok {
|
||||
factory := &connFactory{nodeID: nodeID, clientPool: c.pool, timeout: c.timeout}
|
||||
factory.metaStore = c.MetaStore
|
||||
|
||||
p, err := pool.NewChannelPool(1, 3, factory.dial)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.pool.setPool(nodeID, p)
|
||||
}
|
||||
return c.pool.conn(nodeID)
|
||||
}
|
||||
|
||||
func (w *ShardWriter) Close() error {
|
||||
if w.pool == nil {
|
||||
return fmt.Errorf("client already closed")
|
||||
}
|
||||
w.pool.close()
|
||||
w.pool = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
maxConnections = 500
|
||||
maxRetries = 3
|
||||
)
|
||||
|
||||
var errMaxConnectionsExceeded = fmt.Errorf("can not exceed max connections of %d", maxConnections)
|
||||
|
||||
type connFactory struct {
|
||||
nodeID uint64
|
||||
timeout time.Duration
|
||||
|
||||
clientPool interface {
|
||||
size() int
|
||||
}
|
||||
|
||||
metaStore interface {
|
||||
Node(id uint64) (ni *meta.NodeInfo, err error)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *connFactory) dial() (net.Conn, error) {
|
||||
if c.clientPool.size() > maxConnections {
|
||||
return nil, errMaxConnectionsExceeded
|
||||
}
|
||||
|
||||
ni, err := c.metaStore.Node(c.nodeID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn, err := net.DialTimeout("tcp", ni.Host, c.timeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
|
@ -0,0 +1,178 @@
|
|||
package cluster_test
|
||||
|
||||
import (
|
||||
"net"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/cluster"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// Ensure the shard writer can successful write a single request.
|
||||
func TestShardWriter_WriteShard_Success(t *testing.T) {
|
||||
ts := newTestService(writeShardSuccess)
|
||||
s := cluster.NewService(cluster.Config{BindAddress: "127.0.0.1:0"})
|
||||
s.ShardWriter = ts
|
||||
if err := s.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
w := cluster.NewShardWriter(time.Minute)
|
||||
w.MetaStore = &metaStore{host: s.Addr().String()}
|
||||
|
||||
// Build a single point.
|
||||
now := time.Now()
|
||||
var points []tsdb.Point
|
||||
points = append(points, tsdb.NewPoint("cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now))
|
||||
|
||||
// Write to shard and close.
|
||||
if err := w.WriteShard(1, 2, points); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := w.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Validate response.
|
||||
responses, err := ts.ResponseN(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if responses[0].shardID != 1 {
|
||||
t.Fatalf("unexpected shard id: %d", responses[0].shardID)
|
||||
}
|
||||
|
||||
// Validate point.
|
||||
if p := responses[0].points[0]; p.Name() != "cpu" {
|
||||
t.Fatalf("unexpected name: %s", p.Name())
|
||||
} else if p.Fields()["value"] != int64(100) {
|
||||
t.Fatalf("unexpected 'value' field: %d", p.Fields()["value"])
|
||||
} else if p.Tags()["host"] != "server01" {
|
||||
t.Fatalf("unexpected 'host' tag: %s", p.Tags()["host"])
|
||||
} else if p.Time().UnixNano() != now.UnixNano() {
|
||||
t.Fatalf("unexpected time: %s", p.Time())
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the shard writer can successful write a multiple requests.
|
||||
func TestShardWriter_WriteShard_Multiple(t *testing.T) {
|
||||
ts := newTestService(writeShardSuccess)
|
||||
s := cluster.NewService(cluster.Config{BindAddress: "127.0.0.1:0"})
|
||||
s.ShardWriter = ts
|
||||
if err := s.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
w := cluster.NewShardWriter(time.Minute)
|
||||
w.MetaStore = &metaStore{host: s.Addr().String()}
|
||||
|
||||
// Build a single point.
|
||||
now := time.Now()
|
||||
var points []tsdb.Point
|
||||
points = append(points, tsdb.NewPoint("cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now))
|
||||
|
||||
// Write to shard twice and close.
|
||||
if err := w.WriteShard(1, 2, points); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := w.WriteShard(1, 2, points); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := w.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Validate response.
|
||||
responses, err := ts.ResponseN(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if responses[0].shardID != 1 {
|
||||
t.Fatalf("unexpected shard id: %d", responses[0].shardID)
|
||||
}
|
||||
|
||||
// Validate point.
|
||||
if p := responses[0].points[0]; p.Name() != "cpu" {
|
||||
t.Fatalf("unexpected name: %s", p.Name())
|
||||
} else if p.Fields()["value"] != int64(100) {
|
||||
t.Fatalf("unexpected 'value' field: %d", p.Fields()["value"])
|
||||
} else if p.Tags()["host"] != "server01" {
|
||||
t.Fatalf("unexpected 'host' tag: %s", p.Tags()["host"])
|
||||
} else if p.Time().UnixNano() != now.UnixNano() {
|
||||
t.Fatalf("unexpected time: %s", p.Time())
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the shard writer returns an error when the server fails to accept the write.
|
||||
func TestShardWriter_WriteShard_Error(t *testing.T) {
|
||||
ts := newTestService(writeShardFail)
|
||||
s := cluster.NewService(cluster.Config{BindAddress: "127.0.0.1:0"})
|
||||
s.ShardWriter = ts
|
||||
if err := s.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
w := cluster.NewShardWriter(time.Minute)
|
||||
w.MetaStore = &metaStore{host: s.Addr().String()}
|
||||
now := time.Now()
|
||||
|
||||
shardID := uint64(1)
|
||||
ownerID := uint64(2)
|
||||
var points []tsdb.Point
|
||||
points = append(points, tsdb.NewPoint(
|
||||
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
|
||||
))
|
||||
|
||||
if err := w.WriteShard(shardID, ownerID, points); err == nil || err.Error() != "error code 1: write shard: failed to write" {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the shard writer returns an error when dialing times out.
|
||||
func TestShardWriter_Write_ErrDialTimeout(t *testing.T) {
|
||||
ts := newTestService(writeShardSuccess)
|
||||
s := cluster.NewService(cluster.Config{BindAddress: "127.0.0.1:0"})
|
||||
s.ShardWriter = ts
|
||||
if err := s.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
w := cluster.NewShardWriter(time.Nanosecond)
|
||||
w.MetaStore = &metaStore{host: s.Addr().String()}
|
||||
now := time.Now()
|
||||
|
||||
shardID := uint64(1)
|
||||
ownerID := uint64(2)
|
||||
var points []tsdb.Point
|
||||
points = append(points, tsdb.NewPoint(
|
||||
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
|
||||
))
|
||||
|
||||
if err, exp := w.WriteShard(shardID, ownerID, points), "i/o timeout"; err == nil || !strings.Contains(err.Error(), exp) {
|
||||
t.Fatalf("expected error %v, to contain %s", err, exp)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the shard writer returns an error when reading times out.
|
||||
func TestShardWriter_Write_ErrReadTimeout(t *testing.T) {
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
w := cluster.NewShardWriter(time.Millisecond)
|
||||
w.MetaStore = &metaStore{host: ln.Addr().String()}
|
||||
now := time.Now()
|
||||
|
||||
shardID := uint64(1)
|
||||
ownerID := uint64(2)
|
||||
var points []tsdb.Point
|
||||
points = append(points, tsdb.NewPoint(
|
||||
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
|
||||
))
|
||||
|
||||
if err := w.WriteShard(shardID, ownerID, points); err == nil || !strings.Contains(err.Error(), "i/o timeout") {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
}
|
|
@ -1,170 +0,0 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/meta"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
"gopkg.in/fatih/pool.v2"
|
||||
)
|
||||
|
||||
const (
|
||||
writeShardRequestMessage byte = iota + 1
|
||||
writeShardResponseMessage
|
||||
)
|
||||
|
||||
const (
|
||||
maxConnections = 500
|
||||
maxRetries = 3
|
||||
)
|
||||
|
||||
var errMaxConnectionsExceeded = fmt.Errorf("can not exceed max connections of %d", maxConnections)
|
||||
|
||||
type metaStore interface {
|
||||
Node(id uint64) (ni *meta.NodeInfo, err error)
|
||||
}
|
||||
|
||||
type connFactory struct {
|
||||
metaStore metaStore
|
||||
nodeID uint64
|
||||
timeout time.Duration
|
||||
clientPool interface {
|
||||
size() int
|
||||
}
|
||||
}
|
||||
|
||||
func (c *connFactory) dial() (net.Conn, error) {
|
||||
if c.clientPool.size() > maxConnections {
|
||||
return nil, errMaxConnectionsExceeded
|
||||
}
|
||||
|
||||
nodeInfo, err := c.metaStore.Node(c.nodeID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn, err := net.DialTimeout("tcp", nodeInfo.Host, c.timeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
type Writer struct {
|
||||
pool *clientPool
|
||||
metaStore metaStore
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
func NewWriter(m metaStore, timeout time.Duration) *Writer {
|
||||
return &Writer{
|
||||
pool: newClientPool(),
|
||||
metaStore: m,
|
||||
timeout: timeout,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Writer) dial(nodeID uint64) (net.Conn, error) {
|
||||
// if we don't have a connection pool for that addr yet, create one
|
||||
_, ok := c.pool.getPool(nodeID)
|
||||
if !ok {
|
||||
factory := &connFactory{nodeID: nodeID, metaStore: c.metaStore, clientPool: c.pool, timeout: c.timeout}
|
||||
p, err := pool.NewChannelPool(1, 3, factory.dial)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.pool.setPool(nodeID, p)
|
||||
}
|
||||
return c.pool.conn(nodeID)
|
||||
}
|
||||
|
||||
func (w *Writer) Write(shardID, ownerID uint64, points []tsdb.Point) error {
|
||||
c, err := w.dial(ownerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conn, ok := c.(*pool.PoolConn)
|
||||
if !ok {
|
||||
panic("wrong connection type")
|
||||
}
|
||||
|
||||
// This will return the connection to the data pool
|
||||
defer conn.Close()
|
||||
|
||||
conn.SetWriteDeadline(time.Now().Add(w.timeout))
|
||||
var mt byte = writeShardRequestMessage
|
||||
if err := binary.Write(conn, binary.LittleEndian, &mt); err != nil {
|
||||
conn.MarkUnusable()
|
||||
return err
|
||||
}
|
||||
|
||||
var request WriteShardRequest
|
||||
request.SetShardID(shardID)
|
||||
request.AddPoints(points)
|
||||
|
||||
b, err := request.MarshalBinary()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
size := int64(len(b))
|
||||
|
||||
conn.SetWriteDeadline(time.Now().Add(w.timeout))
|
||||
if err := binary.Write(conn, binary.LittleEndian, &size); err != nil {
|
||||
conn.MarkUnusable()
|
||||
return err
|
||||
}
|
||||
|
||||
conn.SetWriteDeadline(time.Now().Add(w.timeout))
|
||||
if _, err := conn.Write(b); err != nil {
|
||||
conn.MarkUnusable()
|
||||
return err
|
||||
}
|
||||
|
||||
conn.SetReadDeadline(time.Now().Add(w.timeout))
|
||||
// read back our response
|
||||
if err := binary.Read(conn, binary.LittleEndian, &mt); err != nil {
|
||||
conn.MarkUnusable()
|
||||
return err
|
||||
}
|
||||
|
||||
conn.SetReadDeadline(time.Now().Add(w.timeout))
|
||||
if err := binary.Read(conn, binary.LittleEndian, &size); err != nil {
|
||||
conn.MarkUnusable()
|
||||
return err
|
||||
}
|
||||
|
||||
message := make([]byte, size)
|
||||
|
||||
reader := io.LimitReader(conn, size)
|
||||
conn.SetReadDeadline(time.Now().Add(w.timeout))
|
||||
_, err = reader.Read(message)
|
||||
if err != nil {
|
||||
conn.MarkUnusable()
|
||||
return err
|
||||
}
|
||||
|
||||
var response WriteShardResponse
|
||||
if err := response.UnmarshalBinary(message); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if response.Code() != 0 {
|
||||
return fmt.Errorf("error code %d: %s", response.Code(), response.Message())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) Close() error {
|
||||
if w.pool == nil {
|
||||
return fmt.Errorf("client already closed")
|
||||
}
|
||||
w.pool.close()
|
||||
w.pool = nil
|
||||
return nil
|
||||
}
|
|
@ -1,229 +0,0 @@
|
|||
package cluster_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/influxdb/influxdb/cluster"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
func Test_WriteShardRequestSuccess(t *testing.T) {
|
||||
var (
|
||||
ts = newTestServer(writeShardSuccess)
|
||||
s = cluster.NewServer(ts, "127.0.0.1:0")
|
||||
)
|
||||
e := s.Open()
|
||||
if e != nil {
|
||||
t.Fatalf("err does not match. expected %v, got %v", nil, e)
|
||||
}
|
||||
// Close the server
|
||||
defer s.Close()
|
||||
|
||||
writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}, time.Minute)
|
||||
|
||||
now := time.Now()
|
||||
|
||||
shardID := uint64(1)
|
||||
ownerID := uint64(2)
|
||||
var points []tsdb.Point
|
||||
points = append(points, tsdb.NewPoint(
|
||||
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
|
||||
))
|
||||
|
||||
if err := writer.Write(shardID, ownerID, points); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := writer.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
responses, err := ts.ResponseN(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
response := responses[0]
|
||||
|
||||
if shardID != response.shardID {
|
||||
t.Fatalf("unexpected shardID. exp: %d, got %d", shardID, response.shardID)
|
||||
}
|
||||
|
||||
got := response.points[0]
|
||||
exp := points[0]
|
||||
t.Log("got: ", spew.Sdump(got))
|
||||
t.Log("exp: ", spew.Sdump(exp))
|
||||
|
||||
if got.Name() != exp.Name() {
|
||||
t.Fatal("unexpected name")
|
||||
}
|
||||
|
||||
if got.Fields()["value"] != exp.Fields()["value"] {
|
||||
t.Fatal("unexpected fields")
|
||||
}
|
||||
|
||||
if got.Tags()["host"] != exp.Tags()["host"] {
|
||||
t.Fatal("unexpected tags")
|
||||
}
|
||||
|
||||
if got.Time().UnixNano() != exp.Time().UnixNano() {
|
||||
t.Fatal("unexpected time")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_WriteShardRequestMultipleSuccess(t *testing.T) {
|
||||
var (
|
||||
ts = newTestServer(writeShardSuccess)
|
||||
s = cluster.NewServer(ts, "127.0.0.1:0")
|
||||
)
|
||||
// Start on a random port
|
||||
if e := s.Open(); e != nil {
|
||||
t.Fatalf("err does not match. expected %v, got %v", nil, e)
|
||||
}
|
||||
// Close the server
|
||||
defer s.Close()
|
||||
|
||||
writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}, time.Minute)
|
||||
|
||||
now := time.Now()
|
||||
|
||||
shardID := uint64(1)
|
||||
ownerID := uint64(2)
|
||||
var points []tsdb.Point
|
||||
points = append(points, tsdb.NewPoint(
|
||||
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
|
||||
))
|
||||
|
||||
if err := writer.Write(shardID, ownerID, points); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
now = time.Now()
|
||||
|
||||
points = append(points, tsdb.NewPoint(
|
||||
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
|
||||
))
|
||||
|
||||
if err := writer.Write(shardID, ownerID, points[1:]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := writer.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
responses, err := ts.ResponseN(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
response := responses[0]
|
||||
|
||||
if shardID != response.shardID {
|
||||
t.Fatalf("unexpected shardID. exp: %d, got %d", shardID, response.shardID)
|
||||
}
|
||||
|
||||
got := response.points[0]
|
||||
exp := points[0]
|
||||
t.Log("got: ", spew.Sdump(got))
|
||||
t.Log("exp: ", spew.Sdump(exp))
|
||||
|
||||
if got.Name() != exp.Name() {
|
||||
t.Fatal("unexpected name")
|
||||
}
|
||||
|
||||
if got.Fields()["value"] != exp.Fields()["value"] {
|
||||
t.Fatal("unexpected fields")
|
||||
}
|
||||
|
||||
if got.Tags()["host"] != exp.Tags()["host"] {
|
||||
t.Fatal("unexpected tags")
|
||||
}
|
||||
|
||||
if got.Time().UnixNano() != exp.Time().UnixNano() {
|
||||
t.Fatal("unexpected time")
|
||||
}
|
||||
}
|
||||
func Test_WriteShardRequestError(t *testing.T) {
|
||||
var (
|
||||
ts = newTestServer(writeShardFail)
|
||||
s = cluster.NewServer(ts, "127.0.0.1:0")
|
||||
)
|
||||
// Start on a random port
|
||||
if e := s.Open(); e != nil {
|
||||
t.Fatalf("err does not match. expected %v, got %v", nil, e)
|
||||
}
|
||||
// Close the server
|
||||
defer s.Close()
|
||||
|
||||
writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}, time.Minute)
|
||||
now := time.Now()
|
||||
|
||||
shardID := uint64(1)
|
||||
ownerID := uint64(2)
|
||||
var points []tsdb.Point
|
||||
points = append(points, tsdb.NewPoint(
|
||||
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
|
||||
))
|
||||
|
||||
if err, exp := writer.Write(shardID, ownerID, points), "error code 1: failed to write"; err == nil || err.Error() != exp {
|
||||
t.Fatalf("expected error %s, got %v", exp, err)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_WriterDialTimeout(t *testing.T) {
|
||||
var (
|
||||
ts = newTestServer(writeShardSuccess)
|
||||
s = cluster.NewServer(ts, "127.0.0.1:0")
|
||||
)
|
||||
// Start on a random port
|
||||
if e := s.Open(); e != nil {
|
||||
t.Fatalf("err does not match. expected %v, got %v", nil, e)
|
||||
}
|
||||
// Close the server
|
||||
defer s.Close()
|
||||
|
||||
writer := cluster.NewWriter(&metaStore{host: s.Addr().String()}, time.Nanosecond)
|
||||
now := time.Now()
|
||||
|
||||
shardID := uint64(1)
|
||||
ownerID := uint64(2)
|
||||
var points []tsdb.Point
|
||||
points = append(points, tsdb.NewPoint(
|
||||
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
|
||||
))
|
||||
|
||||
if err, exp := writer.Write(shardID, ownerID, points), "i/o timeout"; err == nil || !strings.Contains(err.Error(), exp) {
|
||||
t.Fatalf("expected error %v, to contain %s", err, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_WriterReadTimeout(t *testing.T) {
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
writer := cluster.NewWriter(&metaStore{host: ln.Addr().String()}, time.Millisecond)
|
||||
now := time.Now()
|
||||
|
||||
shardID := uint64(1)
|
||||
ownerID := uint64(2)
|
||||
var points []tsdb.Point
|
||||
points = append(points, tsdb.NewPoint(
|
||||
"cpu", tsdb.Tags{"host": "server01"}, map[string]interface{}{"value": int64(100)}, now,
|
||||
))
|
||||
|
||||
err = writer.Write(shardID, ownerID, points)
|
||||
if err == nil {
|
||||
t.Fatal("expected read io timeout error")
|
||||
}
|
||||
if exp := fmt.Sprintf("read tcp %s: i/o timeout", ln.Addr().String()); exp != err.Error() {
|
||||
t.Fatalf("expected error %s, got %v", exp, err)
|
||||
}
|
||||
}
|
|
@ -15,6 +15,12 @@ import (
|
|||
"github.com/influxdb/influxdb"
|
||||
)
|
||||
|
||||
var DefaultSnapshotURL = url.URL{
|
||||
Scheme: "http",
|
||||
Host: net.JoinHostPort("127.0.0.1", strconv.Itoa(DefaultClusterPort)),
|
||||
}
|
||||
|
||||
|
||||
// BackupSuffix is a suffix added to the backup while it's in-process.
|
||||
const BackupSuffix = ".pending"
|
||||
|
||||
|
|
|
@ -25,6 +25,9 @@ func main() {
|
|||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Wait indefinitely.
|
||||
<-(chan struct{})(nil)
|
||||
}
|
||||
|
||||
// Main represents the program execution.
|
||||
|
@ -77,9 +80,11 @@ func (m *Main) Run(args ...string) error {
|
|||
if err := help.NewCommand().Run(args...); err != nil {
|
||||
return fmt.Errorf("help: %s", err)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'influxd help' for usage`+"\n\n", name)
|
||||
}
|
||||
|
||||
return fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'influxd help' for usage`+"\n\n", name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// ParseCommandName extracts the command name and args from the args list.
|
||||
|
|
|
@ -61,7 +61,7 @@ func (cmd *Command) Run(args ...string) error {
|
|||
|
||||
// Set parallelism.
|
||||
runtime.GOMAXPROCS(runtime.NumCPU())
|
||||
fmt.Fprintf(cmd.Stderr, "GOMAXPROCS set to %d", runtime.GOMAXPROCS(0))
|
||||
fmt.Fprintf(cmd.Stderr, "GOMAXPROCS set to %d\n", runtime.GOMAXPROCS(0))
|
||||
|
||||
// Parse config
|
||||
config, err := cmd.ParseConfig(options.ConfigPath)
|
||||
|
@ -151,7 +151,7 @@ func (cmd *Command) ParseConfig(path string) (*Config, error) {
|
|||
// Use demo configuration if no config path is specified.
|
||||
if path == "" {
|
||||
fmt.Fprintln(cmd.Stdout, "no configuration provided, using default settings")
|
||||
return NewTestConfig()
|
||||
return NewDemoConfig()
|
||||
}
|
||||
|
||||
fmt.Fprintf(cmd.Stdout, "using configuration at: %s\n", path)
|
||||
|
|
|
@ -5,11 +5,8 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/url"
|
||||
"os/user"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
|
@ -61,23 +58,11 @@ const (
|
|||
DefaultStatisticsRetentionPolicy = "default"
|
||||
)
|
||||
|
||||
var DefaultSnapshotURL = url.URL{
|
||||
Scheme: "http",
|
||||
Host: net.JoinHostPort("127.0.0.1", strconv.Itoa(DefaultClusterPort)),
|
||||
}
|
||||
|
||||
// SnapshotConfig represents the configuration for a snapshot service.
|
||||
// type SnapshotConfig struct {
|
||||
// Enabled bool `toml:"enabled"`
|
||||
// }
|
||||
|
||||
// Config represents the configuration format for the influxd binary.
|
||||
type Config struct {
|
||||
Hostname string `toml:"hostname"`
|
||||
BindAddress string `toml:"bind-address"`
|
||||
ReportingEnabled bool `toml:"reporting-enabled"`
|
||||
// Version string `toml:"-"`
|
||||
// InfluxDBVersion string `toml:"-"`
|
||||
|
||||
Initialization struct {
|
||||
// JoinURLs are cluster URLs to use when joining a node to a cluster the first time it boots. After,
|
||||
|
@ -109,6 +94,7 @@ func NewConfig() *Config {
|
|||
|
||||
c.Meta = meta.NewConfig()
|
||||
c.Data = tsdb.NewConfig()
|
||||
c.Cluster = cluster.NewConfig()
|
||||
c.HTTPD = httpd.NewConfig()
|
||||
c.Monitoring = monitor.NewConfig()
|
||||
c.ContinuousQuery = continuous_querier.NewConfig()
|
||||
|
@ -116,9 +102,8 @@ func NewConfig() *Config {
|
|||
return c
|
||||
}
|
||||
|
||||
// NewTestConfig returns an instance of Config with reasonable defaults suitable
|
||||
// for testing a local server w/ broker and data nodes active
|
||||
func NewTestConfig() (*Config, error) {
|
||||
// NewDemoConfig returns the config that runs when no config is specified.
|
||||
func NewDemoConfig() (*Config, error) {
|
||||
c := NewConfig()
|
||||
|
||||
// By default, store meta and data files in current users home directory
|
||||
|
@ -131,7 +116,6 @@ func NewTestConfig() (*Config, error) {
|
|||
c.Data.Dir = filepath.Join(u.HomeDir, ".influxdb/data")
|
||||
|
||||
c.Admin.Enabled = true
|
||||
c.Admin.BindAddress = ":8083"
|
||||
|
||||
c.Monitoring.Enabled = false
|
||||
//c.Snapshot.Enabled = true
|
||||
|
|
|
@ -55,8 +55,9 @@ func (cmd *PrintConfigCommand) Run(args ...string) error {
|
|||
// Returns a demo configuration if path is blank.
|
||||
func (cmd *PrintConfigCommand) parseConfig(path string) (*Config, error) {
|
||||
if path == "" {
|
||||
return NewTestConfig()
|
||||
return NewDemoConfig()
|
||||
}
|
||||
|
||||
config := NewConfig()
|
||||
if _, err := toml.DecodeFile(path, &config); err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -14,9 +14,15 @@ import (
|
|||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// Server represents a container for the metadata and storage data and services.
|
||||
// It is built using a Config and it manages the startup and shutdown of all
|
||||
// services in the proper order.
|
||||
type Server struct {
|
||||
MetaStore *meta.Store
|
||||
TSDBStore *tsdb.Store
|
||||
MetaStore *meta.Store
|
||||
TSDBStore *tsdb.Store
|
||||
QueryExecutor *tsdb.QueryExecutor
|
||||
PointsWriter *cluster.PointsWriter
|
||||
ShardWriter *cluster.ShardWriter
|
||||
|
||||
Services []Service
|
||||
}
|
||||
|
@ -29,39 +35,59 @@ func NewServer(c *Config, joinURLs string) *Server {
|
|||
TSDBStore: tsdb.NewStore(c.Data.Dir),
|
||||
}
|
||||
|
||||
// Add cluster Service
|
||||
s.Services = append(s.Services, cluster.NewService(c.Cluster))
|
||||
// Initialize query executor.
|
||||
s.QueryExecutor = tsdb.NewQueryExecutor(s.TSDBStore)
|
||||
s.QueryExecutor.MetaStore = s.MetaStore
|
||||
s.QueryExecutor.MetaStatementExecutor = &meta.StatementExecutor{Store: s.MetaStore}
|
||||
|
||||
// Add admin Service
|
||||
if c.Admin.Enabled {
|
||||
s.Services = append(s.Services, admin.NewService(c.Admin))
|
||||
}
|
||||
// Initialize points writer.
|
||||
s.PointsWriter = cluster.NewPointsWriter(1) // FIXME: Find ID.
|
||||
s.PointsWriter.ShardWriter = s.ShardWriter
|
||||
|
||||
// HTTP API Service
|
||||
if c.HTTPD.Enabled {
|
||||
s.Services = append(s.Services, httpd.NewService(c.HTTPD))
|
||||
}
|
||||
|
||||
// Graphite services
|
||||
// Append services.
|
||||
s.appendClusterService(c.Cluster)
|
||||
s.appendAdminService(c.Admin)
|
||||
s.appendHTTPDService(c.HTTPD)
|
||||
s.appendCollectdService(c.Collectd)
|
||||
s.appendOpenTSDBService(c.OpenTSDB)
|
||||
for _, g := range c.Graphites {
|
||||
if g.Enabled {
|
||||
s.Services = append(s.Services, graphite.NewService(g))
|
||||
}
|
||||
}
|
||||
|
||||
// Collectd service
|
||||
if c.Collectd.Enabled {
|
||||
s.Services = append(s.Services, collectd.NewService(c.Collectd))
|
||||
}
|
||||
|
||||
// OpenTSDB services
|
||||
if c.OpenTSDB.Enabled {
|
||||
s.Services = append(s.Services, opentsdb.NewService(c.OpenTSDB))
|
||||
s.appendGraphiteService(g)
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Server) appendClusterService(c cluster.Config) {
|
||||
srv := cluster.NewService(c)
|
||||
srv.ShardWriter = s.ShardWriter
|
||||
s.Services = append(s.Services, srv)
|
||||
}
|
||||
|
||||
func (s *Server) appendAdminService(c admin.Config) {
|
||||
srv := admin.NewService(c)
|
||||
s.Services = append(s.Services, srv)
|
||||
}
|
||||
|
||||
func (s *Server) appendHTTPDService(c httpd.Config) {
|
||||
srv := httpd.NewService(c)
|
||||
s.Services = append(s.Services, srv)
|
||||
}
|
||||
|
||||
func (s *Server) appendCollectdService(c collectd.Config) {
|
||||
srv := collectd.NewService(c)
|
||||
s.Services = append(s.Services, srv)
|
||||
}
|
||||
|
||||
func (s *Server) appendOpenTSDBService(c opentsdb.Config) {
|
||||
srv := opentsdb.NewService(c)
|
||||
s.Services = append(s.Services, srv)
|
||||
}
|
||||
|
||||
func (s *Server) appendGraphiteService(c graphite.Config) {
|
||||
srv := graphite.NewService(c)
|
||||
s.Services = append(s.Services, srv)
|
||||
}
|
||||
|
||||
// Open opens the meta and data store and all services.
|
||||
func (s *Server) Open() error {
|
||||
if err := func() error {
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
package influxdb
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrFieldsRequired is returned when a point does not any fields.
|
||||
ErrFieldsRequired = errors.New("fields required")
|
||||
|
||||
// ErrFieldTypeConflict is returned when a new field already exists with a different type.
|
||||
ErrFieldTypeConflict = errors.New("field type conflict")
|
||||
)
|
||||
|
||||
func ErrDatabaseNotFound(name string) error { return Errorf("database not found: %s", name) }
|
||||
|
||||
func ErrMeasurementNotFound(name string) error { return Errorf("measurement not found: %s", name) }
|
||||
|
||||
func Errorf(format string, a ...interface{}) (err error) {
|
||||
if _, file, line, ok := runtime.Caller(2); ok {
|
||||
a = append(a, file, line)
|
||||
err = fmt.Errorf(format+" (%s:%d)", a...)
|
||||
} else {
|
||||
err = fmt.Errorf(format, a...)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// IsClientError indicates whether an error is a known client error.
|
||||
func IsClientError(err error) bool {
|
||||
if err == ErrFieldsRequired {
|
||||
return true
|
||||
}
|
||||
if err == ErrFieldTypeConflict {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// mustMarshal encodes a value to JSON.
|
||||
// This will panic if an error occurs. This should only be used internally when
|
||||
// an invalid marshal will cause corruption and a panic is appropriate.
|
||||
func mustMarshalJSON(v interface{}) []byte {
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
panic("marshal: " + err.Error())
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// mustUnmarshalJSON decodes a value from JSON.
|
||||
// This will panic if an error occurs. This should only be used internally when
|
||||
// an invalid unmarshal will cause corruption and a panic is appropriate.
|
||||
func mustUnmarshalJSON(b []byte, v interface{}) {
|
||||
if err := json.Unmarshal(b, v); err != nil {
|
||||
panic("unmarshal: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// assert will panic with a given formatted message if the given condition is false.
|
||||
func assert(condition bool, msg string, v ...interface{}) {
|
||||
if !condition {
|
||||
panic(fmt.Sprintf("assert failed: "+msg, v...))
|
||||
}
|
||||
}
|
||||
|
||||
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
|
||||
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
|
240
influxdb.go
240
influxdb.go
|
@ -1,240 +0,0 @@
|
|||
package influxdb
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/client"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
var startTime time.Time
|
||||
|
||||
func init() {
|
||||
startTime = time.Now().UTC()
|
||||
}
|
||||
|
||||
var (
|
||||
// ErrServerOpen is returned when opening an already open server.
|
||||
ErrServerOpen = errors.New("server already open")
|
||||
|
||||
// ErrServerClosed is returned when closing an already closed server.
|
||||
ErrServerClosed = errors.New("server already closed")
|
||||
|
||||
// ErrUnableToJoin is returned when a server cannot join a cluster.
|
||||
ErrUnableToJoin = errors.New("unable to join")
|
||||
|
||||
// ErrDataNodeURLRequired is returned when creating a data node without a URL.
|
||||
ErrDataNodeURLRequired = errors.New("data node url required")
|
||||
|
||||
// ErrNoDataNodeAvailable is returned when there are no data nodes available
|
||||
ErrNoDataNodeAvailable = errors.New("data node not available")
|
||||
|
||||
// ErrDataNodeExists is returned when creating a duplicate data node.
|
||||
ErrDataNodeExists = errors.New("data node exists")
|
||||
|
||||
// ErrDataNodeNotFound is returned when dropping a non-existent data node or
|
||||
// attempting to join another data node when no data nodes exist yet
|
||||
ErrDataNodeNotFound = errors.New("data node not found")
|
||||
|
||||
// ErrDataNodeRequired is returned when using a blank data node id.
|
||||
ErrDataNodeRequired = errors.New("data node required")
|
||||
|
||||
// ErrDatabaseNameRequired is returned when creating a database without a name.
|
||||
ErrDatabaseNameRequired = errors.New("database name required")
|
||||
|
||||
// ErrDatabaseExists is returned when creating a duplicate database.
|
||||
ErrDatabaseExists = errors.New("database exists")
|
||||
|
||||
// ErrDatabaseRequired is returned when using a blank database name.
|
||||
ErrDatabaseRequired = errors.New("database required")
|
||||
|
||||
// ErrClusterAdminExists is returned when creating a duplicate admin.
|
||||
ErrClusterAdminExists = errors.New("cluster admin exists")
|
||||
|
||||
// ErrClusterAdminNotFound is returned when deleting a non-existent admin.
|
||||
ErrClusterAdminNotFound = errors.New("cluster admin not found")
|
||||
|
||||
// ErrUserExists is returned when creating a duplicate user.
|
||||
ErrUserExists = errors.New("user exists")
|
||||
|
||||
// ErrUserNotFound is returned when deleting a non-existent user.
|
||||
ErrUserNotFound = errors.New("user not found")
|
||||
|
||||
// ErrUsernameRequired is returned when using a blank username.
|
||||
ErrUsernameRequired = errors.New("username required")
|
||||
|
||||
// ErrInvalidUsername is returned when using a username with invalid characters.
|
||||
ErrInvalidUsername = errors.New("invalid username")
|
||||
|
||||
// ErrRetentionPolicyExists is returned when creating a duplicate shard space.
|
||||
ErrRetentionPolicyExists = errors.New("retention policy exists")
|
||||
|
||||
// ErrRetentionPolicyNotFound is returned when deleting a non-existent shard space.
|
||||
ErrRetentionPolicyNotFound = errors.New("retention policy not found")
|
||||
|
||||
// ErrRetentionPolicyNameRequired is returned using a blank shard space name.
|
||||
ErrRetentionPolicyNameRequired = errors.New("retention policy name required")
|
||||
|
||||
// ErrDefaultRetentionPolicyNotFound is returned when using the default
|
||||
// policy on a database but the default has not been set.
|
||||
ErrDefaultRetentionPolicyNotFound = errors.New("default retention policy not found")
|
||||
|
||||
// ErrShardNotFound is returned when attempting to access a non-existent shard
|
||||
ErrShardNotFound = errors.New("shard not found")
|
||||
|
||||
// ErrShardNotLocal is returned when a server attempts to access a shard that is not local
|
||||
ErrShardNotLocal = errors.New("shard not local")
|
||||
|
||||
// ErrShardShortRead returned when the number of bytes read from a shard is less than expected.
|
||||
ErrShardShortRead = errors.New("shard read returned insufficient data")
|
||||
|
||||
// ErrInvalidPointBuffer is returned when a buffer containing data for writing is invalid
|
||||
ErrInvalidPointBuffer = errors.New("invalid point buffer")
|
||||
|
||||
// ErrReadAccessDenied is returned when a user attempts to read
|
||||
// data that he or she does not have permission to read.
|
||||
ErrReadAccessDenied = errors.New("read access denied")
|
||||
|
||||
// ErrReadWritePermissionsRequired is returned when required read/write permissions aren't provided.
|
||||
ErrReadWritePermissionsRequired = errors.New("read/write permissions required")
|
||||
|
||||
// ErrInvalidQuery is returned when executing an unknown query type.
|
||||
ErrInvalidQuery = errors.New("invalid query")
|
||||
|
||||
// ErrMeasurementNameRequired is returned when a point does not contain a name.
|
||||
ErrMeasurementNameRequired = errors.New("measurement name required")
|
||||
|
||||
// ErrFieldsRequired is returned when a point does not any fields.
|
||||
ErrFieldsRequired = errors.New("fields required")
|
||||
|
||||
// FieldIsNull is returned when one of a point's field is null.
|
||||
ErrFieldIsNull = errors.New("field value is null")
|
||||
|
||||
// ErrFieldOverflow is returned when too many fields are created on a measurement.
|
||||
ErrFieldOverflow = errors.New("field overflow")
|
||||
|
||||
// ErrFieldTypeConflict is returned when a new field already exists with a different type.
|
||||
ErrFieldTypeConflict = errors.New("field type conflict")
|
||||
|
||||
// ErrFieldNotFound is returned when a field cannot be found.
|
||||
ErrFieldNotFound = errors.New("field not found")
|
||||
|
||||
// ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID
|
||||
// there is no mapping for.
|
||||
ErrFieldUnmappedID = errors.New("field ID not mapped")
|
||||
|
||||
// ErrSeriesNotFound is returned when looking up a non-existent series by database, name and tags
|
||||
ErrSeriesNotFound = errors.New("series not found")
|
||||
|
||||
// ErrSeriesExists is returned when attempting to set the id of a series by database, name and tags that already exists
|
||||
ErrSeriesExists = errors.New("series already exists")
|
||||
|
||||
// ErrNotExecuted is returned when a statement is not executed in a query.
|
||||
// This can occur when a previous statement in the same query has errored.
|
||||
ErrNotExecuted = errors.New("not executed")
|
||||
|
||||
// ErrInvalidGrantRevoke is returned when a statement requests an invalid
|
||||
// privilege for a user on the cluster or a database.
|
||||
ErrInvalidGrantRevoke = errors.New("invalid privilege requested")
|
||||
|
||||
// ErrContinuousQueryExists is returned when creating a duplicate continuous query.
|
||||
ErrContinuousQueryExists = errors.New("continuous query already exists")
|
||||
|
||||
// ErrContinuousQueryNotFound is returned when dropping a nonexistent continuous query.
|
||||
ErrContinuousQueryNotFound = errors.New("continuous query not found")
|
||||
)
|
||||
|
||||
func ErrDatabaseNotFound(name string) error { return Errorf("database not found: %s", name) }
|
||||
|
||||
func ErrMeasurementNotFound(name string) error { return Errorf("measurement not found: %s", name) }
|
||||
|
||||
func Errorf(format string, a ...interface{}) (err error) {
|
||||
if _, file, line, ok := runtime.Caller(2); ok {
|
||||
a = append(a, file, line)
|
||||
err = fmt.Errorf(format+" (%s:%d)", a...)
|
||||
} else {
|
||||
err = fmt.Errorf(format, a...)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// IsClientError indicates whether an error is a known client error.
|
||||
func IsClientError(err error) bool {
|
||||
if err == ErrFieldsRequired {
|
||||
return true
|
||||
}
|
||||
if err == ErrFieldTypeConflict {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// mustMarshal encodes a value to JSON.
|
||||
// This will panic if an error occurs. This should only be used internally when
|
||||
// an invalid marshal will cause corruption and a panic is appropriate.
|
||||
func mustMarshalJSON(v interface{}) []byte {
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
panic("marshal: " + err.Error())
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// mustUnmarshalJSON decodes a value from JSON.
|
||||
// This will panic if an error occurs. This should only be used internally when
|
||||
// an invalid unmarshal will cause corruption and a panic is appropriate.
|
||||
func mustUnmarshalJSON(b []byte, v interface{}) {
|
||||
if err := json.Unmarshal(b, v); err != nil {
|
||||
panic("unmarshal: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// assert will panic with a given formatted message if the given condition is false.
|
||||
func assert(condition bool, msg string, v ...interface{}) {
|
||||
if !condition {
|
||||
panic(fmt.Sprintf("assert failed: "+msg, v...))
|
||||
}
|
||||
}
|
||||
|
||||
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
|
||||
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
|
||||
|
||||
// NormalizeBatchPoints returns a slice of Points, created by populating individual
|
||||
// points within the batch, which do not have times or tags, with the top-level
|
||||
// values.
|
||||
func NormalizeBatchPoints(bp client.BatchPoints) ([]tsdb.Point, error) {
|
||||
points := []tsdb.Point{}
|
||||
for _, p := range bp.Points {
|
||||
if p.Time.IsZero() {
|
||||
if bp.Time.IsZero() {
|
||||
p.Time = time.Now()
|
||||
} else {
|
||||
p.Time = bp.Time
|
||||
}
|
||||
}
|
||||
if p.Precision == "" && bp.Precision != "" {
|
||||
p.Precision = bp.Precision
|
||||
}
|
||||
p.Time = client.SetPrecision(p.Time, p.Precision)
|
||||
if len(bp.Tags) > 0 {
|
||||
if p.Tags == nil {
|
||||
p.Tags = make(map[string]string)
|
||||
}
|
||||
for k := range bp.Tags {
|
||||
if p.Tags[k] == "" {
|
||||
p.Tags[k] = bp.Tags[k]
|
||||
}
|
||||
}
|
||||
}
|
||||
// Need to convert from a client.Point to a influxdb.Point
|
||||
points = append(points, tsdb.NewPoint(p.Name, p.Tags, p.Fields, p.Time))
|
||||
}
|
||||
|
||||
return points, nil
|
||||
}
|
|
@ -1,76 +0,0 @@
|
|||
package influxdb_test
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/client"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
func TestNormalizeBatchPoints(t *testing.T) {
|
||||
now := time.Now()
|
||||
tests := []struct {
|
||||
name string
|
||||
bp client.BatchPoints
|
||||
p []tsdb.Point
|
||||
err string
|
||||
}{
|
||||
{
|
||||
name: "default",
|
||||
bp: client.BatchPoints{
|
||||
Points: []client.Point{
|
||||
{Name: "cpu", Tags: map[string]string{"region": "useast"}, Time: now, Fields: map[string]interface{}{"value": 1.0}},
|
||||
},
|
||||
},
|
||||
p: []tsdb.Point{
|
||||
tsdb.NewPoint("cpu", map[string]string{"region": "useast"}, map[string]interface{}{"value": 1.0}, now),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "merge time",
|
||||
bp: client.BatchPoints{
|
||||
Time: now,
|
||||
Points: []client.Point{
|
||||
{Name: "cpu", Tags: map[string]string{"region": "useast"}, Fields: map[string]interface{}{"value": 1.0}},
|
||||
},
|
||||
},
|
||||
p: []tsdb.Point{
|
||||
tsdb.NewPoint("cpu", map[string]string{"region": "useast"}, map[string]interface{}{"value": 1.0}, now),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "merge tags",
|
||||
bp: client.BatchPoints{
|
||||
Tags: map[string]string{"day": "monday"},
|
||||
Points: []client.Point{
|
||||
{Name: "cpu", Tags: map[string]string{"region": "useast"}, Time: now, Fields: map[string]interface{}{"value": 1.0}},
|
||||
{Name: "memory", Time: now, Fields: map[string]interface{}{"value": 2.0}},
|
||||
},
|
||||
},
|
||||
p: []tsdb.Point{
|
||||
tsdb.NewPoint("cpu", map[string]string{"day": "monday", "region": "useast"}, map[string]interface{}{"value": 1.0}, now),
|
||||
tsdb.NewPoint("memory", map[string]string{"day": "monday"}, map[string]interface{}{"value": 2.0}, now),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Logf("running test %q", test.name)
|
||||
p, e := influxdb.NormalizeBatchPoints(test.bp)
|
||||
if test.err == "" && e != nil {
|
||||
t.Errorf("unexpected error %v", e)
|
||||
} else if test.err != "" && e == nil {
|
||||
t.Errorf("expected error %s, got <nil>", test.err)
|
||||
} else if e != nil && test.err != e.Error() {
|
||||
t.Errorf("unexpected error. expected: %s, got %v", test.err, e)
|
||||
}
|
||||
if !reflect.DeepEqual(p, test.p) {
|
||||
t.Logf("expected: %+v", test.p)
|
||||
t.Logf("got: %+v", p)
|
||||
t.Error("failed to normalize.")
|
||||
}
|
||||
}
|
||||
}
|
|
@ -34,57 +34,57 @@ type StatementExecutor struct {
|
|||
}
|
||||
|
||||
// ExecuteStatement executes stmt against the meta store as user.
|
||||
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement) *influxql.Result {
|
||||
switch stmt := stmt.(type) {
|
||||
case *influxql.CreateDatabaseStatement:
|
||||
return e.executeCreateDatabaseStatement(stmt, user)
|
||||
return e.executeCreateDatabaseStatement(stmt)
|
||||
case *influxql.DropDatabaseStatement:
|
||||
return e.executeDropDatabaseStatement(stmt, user)
|
||||
return e.executeDropDatabaseStatement(stmt)
|
||||
case *influxql.ShowDatabasesStatement:
|
||||
return e.executeShowDatabasesStatement(stmt, user)
|
||||
return e.executeShowDatabasesStatement(stmt)
|
||||
case *influxql.ShowServersStatement:
|
||||
return e.executeShowServersStatement(stmt, user)
|
||||
return e.executeShowServersStatement(stmt)
|
||||
case *influxql.CreateUserStatement:
|
||||
return e.executeCreateUserStatement(stmt, user)
|
||||
return e.executeCreateUserStatement(stmt)
|
||||
case *influxql.SetPasswordUserStatement:
|
||||
return e.executeSetPasswordUserStatement(stmt, user)
|
||||
return e.executeSetPasswordUserStatement(stmt)
|
||||
case *influxql.DropUserStatement:
|
||||
return e.executeDropUserStatement(stmt, user)
|
||||
return e.executeDropUserStatement(stmt)
|
||||
case *influxql.ShowUsersStatement:
|
||||
return e.executeShowUsersStatement(stmt, user)
|
||||
return e.executeShowUsersStatement(stmt)
|
||||
case *influxql.GrantStatement:
|
||||
return e.executeGrantStatement(stmt, user)
|
||||
return e.executeGrantStatement(stmt)
|
||||
case *influxql.RevokeStatement:
|
||||
return e.executeRevokeStatement(stmt, user)
|
||||
return e.executeRevokeStatement(stmt)
|
||||
case *influxql.CreateRetentionPolicyStatement:
|
||||
return e.executeCreateRetentionPolicyStatement(stmt, user)
|
||||
return e.executeCreateRetentionPolicyStatement(stmt)
|
||||
case *influxql.AlterRetentionPolicyStatement:
|
||||
return e.executeAlterRetentionPolicyStatement(stmt, user)
|
||||
return e.executeAlterRetentionPolicyStatement(stmt)
|
||||
case *influxql.DropRetentionPolicyStatement:
|
||||
return e.executeDropRetentionPolicyStatement(stmt, user)
|
||||
return e.executeDropRetentionPolicyStatement(stmt)
|
||||
case *influxql.ShowRetentionPoliciesStatement:
|
||||
return e.executeShowRetentionPoliciesStatement(stmt, user)
|
||||
return e.executeShowRetentionPoliciesStatement(stmt)
|
||||
case *influxql.CreateContinuousQueryStatement:
|
||||
return e.executeCreateContinuousQueryStatement(stmt, user)
|
||||
return e.executeCreateContinuousQueryStatement(stmt)
|
||||
case *influxql.DropContinuousQueryStatement:
|
||||
return e.executeDropContinuousQueryStatement(stmt, user)
|
||||
return e.executeDropContinuousQueryStatement(stmt)
|
||||
case *influxql.ShowContinuousQueriesStatement:
|
||||
return e.executeShowContinuousQueriesStatement(stmt, user)
|
||||
return e.executeShowContinuousQueriesStatement(stmt)
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported statement type: %T", stmt))
|
||||
}
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeCreateDatabaseStatement(q *influxql.CreateDatabaseStatement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) executeCreateDatabaseStatement(q *influxql.CreateDatabaseStatement) *influxql.Result {
|
||||
_, err := e.Store.CreateDatabase(q.Name)
|
||||
return &influxql.Result{Err: err}
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeDropDatabaseStatement(q *influxql.DropDatabaseStatement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) executeDropDatabaseStatement(q *influxql.DropDatabaseStatement) *influxql.Result {
|
||||
return &influxql.Result{Err: e.Store.DropDatabase(q.Name)}
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement) *influxql.Result {
|
||||
dis, err := e.Store.Databases()
|
||||
if err != nil {
|
||||
return &influxql.Result{Err: err}
|
||||
|
@ -97,7 +97,7 @@ func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDataba
|
|||
return &influxql.Result{Series: []*influxql.Row{row}}
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeShowServersStatement(q *influxql.ShowServersStatement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) executeShowServersStatement(q *influxql.ShowServersStatement) *influxql.Result {
|
||||
nis, err := e.Store.Nodes()
|
||||
if err != nil {
|
||||
return &influxql.Result{Err: err}
|
||||
|
@ -110,7 +110,7 @@ func (e *StatementExecutor) executeShowServersStatement(q *influxql.ShowServersS
|
|||
return &influxql.Result{Series: []*influxql.Row{row}}
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeCreateUserStatement(q *influxql.CreateUserStatement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) executeCreateUserStatement(q *influxql.CreateUserStatement) *influxql.Result {
|
||||
admin := false
|
||||
if q.Privilege != nil {
|
||||
admin = (*q.Privilege == influxql.AllPrivileges)
|
||||
|
@ -120,36 +120,36 @@ func (e *StatementExecutor) executeCreateUserStatement(q *influxql.CreateUserSta
|
|||
return &influxql.Result{Err: err}
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPasswordUserStatement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPasswordUserStatement) *influxql.Result {
|
||||
return &influxql.Result{Err: e.Store.UpdateUser(q.Name, q.Password)}
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStatement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) executeDropUserStatement(q *influxql.DropUserStatement) *influxql.Result {
|
||||
return &influxql.Result{Err: e.Store.DropUser(q.Name)}
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeShowUsersStatement(q *influxql.ShowUsersStatement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) executeShowUsersStatement(q *influxql.ShowUsersStatement) *influxql.Result {
|
||||
uis, err := e.Store.Users()
|
||||
if err != nil {
|
||||
return &influxql.Result{Err: err}
|
||||
}
|
||||
|
||||
row := &influxql.Row{Columns: []string{"user", "admin"}}
|
||||
for _, user := range uis {
|
||||
row.Values = append(row.Values, []interface{}{user.Name, user.Admin})
|
||||
for _, ui := range uis {
|
||||
row.Values = append(row.Values, []interface{}{ui.Name, ui.Admin})
|
||||
}
|
||||
return &influxql.Result{Series: []*influxql.Row{row}}
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) executeGrantStatement(stmt *influxql.GrantStatement) *influxql.Result {
|
||||
return &influxql.Result{Err: e.Store.SetPrivilege(stmt.User, stmt.On, stmt.Privilege)}
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeRevokeStatement(stmt *influxql.RevokeStatement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) executeRevokeStatement(stmt *influxql.RevokeStatement) *influxql.Result {
|
||||
return &influxql.Result{Err: e.Store.SetPrivilege(stmt.User, stmt.On, influxql.NoPrivileges)}
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeCreateRetentionPolicyStatement(stmt *influxql.CreateRetentionPolicyStatement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) executeCreateRetentionPolicyStatement(stmt *influxql.CreateRetentionPolicyStatement) *influxql.Result {
|
||||
rpi := NewRetentionPolicyInfo(stmt.Name)
|
||||
rpi.Duration = stmt.Duration
|
||||
rpi.ReplicaN = stmt.Replication
|
||||
|
@ -168,7 +168,7 @@ func (e *StatementExecutor) executeCreateRetentionPolicyStatement(stmt *influxql
|
|||
return &influxql.Result{Err: err}
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) *influxql.Result {
|
||||
rpu := &RetentionPolicyUpdate{
|
||||
Duration: stmt.Duration,
|
||||
ReplicaN: stmt.Replication,
|
||||
|
@ -188,11 +188,11 @@ func (e *StatementExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.
|
|||
return &influxql.Result{Err: err}
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeDropRetentionPolicyStatement(q *influxql.DropRetentionPolicyStatement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) executeDropRetentionPolicyStatement(q *influxql.DropRetentionPolicyStatement) *influxql.Result {
|
||||
return &influxql.Result{Err: e.Store.DropRetentionPolicy(q.Database, q.Name)}
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) *influxql.Result {
|
||||
di, err := e.Store.Database(q.Database)
|
||||
if err != nil {
|
||||
return &influxql.Result{Err: err}
|
||||
|
@ -207,19 +207,19 @@ func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.Sh
|
|||
return &influxql.Result{Series: []*influxql.Row{row}}
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeCreateContinuousQueryStatement(q *influxql.CreateContinuousQueryStatement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) executeCreateContinuousQueryStatement(q *influxql.CreateContinuousQueryStatement) *influxql.Result {
|
||||
return &influxql.Result{
|
||||
Err: e.Store.CreateContinuousQuery(q.Database, q.Name, q.Source.String()),
|
||||
}
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeDropContinuousQueryStatement(q *influxql.DropContinuousQueryStatement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) executeDropContinuousQueryStatement(q *influxql.DropContinuousQueryStatement) *influxql.Result {
|
||||
return &influxql.Result{
|
||||
Err: e.Store.DropContinuousQuery(q.Database, q.Name),
|
||||
}
|
||||
}
|
||||
|
||||
func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement, user *UserInfo) *influxql.Result {
|
||||
func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) *influxql.Result {
|
||||
dis, err := e.Store.Databases()
|
||||
if err != nil {
|
||||
return &influxql.Result{Err: err}
|
||||
|
|
|
@ -21,7 +21,7 @@ func TestStatementExecutor_ExecuteStatement_CreateDatabase(t *testing.T) {
|
|||
return &meta.DatabaseInfo{Name: name}, nil
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`CREATE DATABASE foo`), nil); res.Err != nil {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`CREATE DATABASE foo`)); res.Err != nil {
|
||||
t.Fatal(res.Err)
|
||||
} else if res.Series != nil {
|
||||
t.Fatalf("unexpected rows: %#v", res.Series)
|
||||
|
@ -38,7 +38,7 @@ func TestStatementExecutor_ExecuteStatement_DropDatabase(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`DROP DATABASE foo`), nil); res.Err != nil {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`DROP DATABASE foo`)); res.Err != nil {
|
||||
t.Fatal(res.Err)
|
||||
} else if res.Series != nil {
|
||||
t.Fatalf("unexpected rows: %#v", res.Series)
|
||||
|
@ -55,7 +55,7 @@ func TestStatementExecutor_ExecuteStatement_ShowDatabases(t *testing.T) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW DATABASES`), nil); res.Err != nil {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW DATABASES`)); res.Err != nil {
|
||||
t.Fatal(res.Err)
|
||||
} else if !reflect.DeepEqual(res.Series, influxql.Rows{
|
||||
{
|
||||
|
@ -78,7 +78,7 @@ func TestStatementExecutor_ExecuteStatement_ShowDatabases_Err(t *testing.T) {
|
|||
return nil, errors.New("marker")
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW DATABASES`), nil); res.Err == nil || res.Err.Error() != "marker" {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW DATABASES`)); res.Err == nil || res.Err.Error() != "marker" {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ func TestStatementExecutor_ExecuteStatement_ShowServers(t *testing.T) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW SERVERS`), nil); res.Err != nil {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW SERVERS`)); res.Err != nil {
|
||||
t.Fatal(res.Err)
|
||||
} else if !reflect.DeepEqual(res.Series, influxql.Rows{
|
||||
{
|
||||
|
@ -115,7 +115,7 @@ func TestStatementExecutor_ExecuteStatement_ShowServers_Err(t *testing.T) {
|
|||
return nil, errors.New("marker")
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW SERVERS`), nil); res.Err == nil || res.Err.Error() != "marker" {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW SERVERS`)); res.Err == nil || res.Err.Error() != "marker" {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -134,7 +134,7 @@ func TestStatementExecutor_ExecuteStatement_CreateUser(t *testing.T) {
|
|||
return &meta.UserInfo{Name: name, Admin: admin}, nil
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`CREATE USER susy WITH PASSWORD 'pass' WITH ALL PRIVILEGES`), nil); res.Err != nil {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`CREATE USER susy WITH PASSWORD 'pass' WITH ALL PRIVILEGES`)); res.Err != nil {
|
||||
t.Fatal(res.Err)
|
||||
} else if res.Series != nil {
|
||||
t.Fatalf("unexpected rows: %#v", res.Series)
|
||||
|
@ -148,7 +148,7 @@ func TestStatementExecutor_ExecuteStatement_CreateUser_Err(t *testing.T) {
|
|||
return nil, errors.New("marker")
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`CREATE USER susy WITH PASSWORD 'pass'`), nil); res.Err == nil || res.Err.Error() != "marker" {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`CREATE USER susy WITH PASSWORD 'pass'`)); res.Err == nil || res.Err.Error() != "marker" {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -165,7 +165,7 @@ func TestStatementExecutor_ExecuteStatement_SetPassword(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SET PASSWORD FOR susy = 'pass' WITH ALL PRIVILEGES`), nil); res.Err != nil {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SET PASSWORD FOR susy = 'pass' WITH ALL PRIVILEGES`)); res.Err != nil {
|
||||
t.Fatal(res.Err)
|
||||
} else if res.Series != nil {
|
||||
t.Fatalf("unexpected rows: %#v", res.Series)
|
||||
|
@ -179,7 +179,7 @@ func TestStatementExecutor_ExecuteStatement_SetPassword_Err(t *testing.T) {
|
|||
return errors.New("marker")
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SET PASSWORD FOR susy = 'pass'`), nil); res.Err == nil || res.Err.Error() != "marker" {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SET PASSWORD FOR susy = 'pass'`)); res.Err == nil || res.Err.Error() != "marker" {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -194,7 +194,7 @@ func TestStatementExecutor_ExecuteStatement_DropUser(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`DROP USER susy`), nil); res.Err != nil {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`DROP USER susy`)); res.Err != nil {
|
||||
t.Fatal(res.Err)
|
||||
} else if res.Series != nil {
|
||||
t.Fatalf("unexpected rows: %#v", res.Series)
|
||||
|
@ -208,7 +208,7 @@ func TestStatementExecutor_ExecuteStatement_DropUser_Err(t *testing.T) {
|
|||
return errors.New("marker")
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`DROP USER susy`), nil); res.Err == nil || res.Err.Error() != "marker" {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`DROP USER susy`)); res.Err == nil || res.Err.Error() != "marker" {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -223,7 +223,7 @@ func TestStatementExecutor_ExecuteStatement_ShowUsers(t *testing.T) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW USERS`), nil); res.Err != nil {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW USERS`)); res.Err != nil {
|
||||
t.Fatal(res.Err)
|
||||
} else if !reflect.DeepEqual(res.Series, influxql.Rows{
|
||||
{
|
||||
|
@ -245,7 +245,7 @@ func TestStatementExecutor_ExecuteStatement_ShowUsers_Err(t *testing.T) {
|
|||
return nil, errors.New("marker")
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW USERS`), nil); res.Err == nil || res.Err.Error() != "marker" {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW USERS`)); res.Err == nil || res.Err.Error() != "marker" {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -264,7 +264,7 @@ func TestStatementExecutor_ExecuteStatement_Grant(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`GRANT WRITE ON foo TO susy`), nil); res.Err != nil {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`GRANT WRITE ON foo TO susy`)); res.Err != nil {
|
||||
t.Fatal(res.Err)
|
||||
} else if res.Series != nil {
|
||||
t.Fatalf("unexpected rows: %#v", res.Series)
|
||||
|
@ -278,7 +278,7 @@ func TestStatementExecutor_ExecuteStatement_Grant_Err(t *testing.T) {
|
|||
return errors.New("marker")
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`GRANT READ ON foo TO susy`), nil); res.Err == nil || res.Err.Error() != "marker" {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`GRANT READ ON foo TO susy`)); res.Err == nil || res.Err.Error() != "marker" {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -297,7 +297,7 @@ func TestStatementExecutor_ExecuteStatement_Revoke(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`REVOKE ALL PRIVILEGES ON foo FROM susy`), nil); res.Err != nil {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`REVOKE ALL PRIVILEGES ON foo FROM susy`)); res.Err != nil {
|
||||
t.Fatal(res.Err)
|
||||
} else if res.Series != nil {
|
||||
t.Fatalf("unexpected rows: %#v", res.Series)
|
||||
|
@ -311,7 +311,7 @@ func TestStatementExecutor_ExecuteStatement_Revoke_Err(t *testing.T) {
|
|||
return errors.New("marker")
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`REVOKE ALL PRIVILEGES ON foo FROM susy`), nil); res.Err == nil || res.Err.Error() != "marker" {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`REVOKE ALL PRIVILEGES ON foo FROM susy`)); res.Err == nil || res.Err.Error() != "marker" {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -340,7 +340,7 @@ func TestStatementExecutor_ExecuteStatement_CreateRetentionPolicy(t *testing.T)
|
|||
return nil
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`CREATE RETENTION POLICY rp0 ON foo DURATION 2h REPLICATION 3 DEFAULT`), nil); res.Err != nil {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`CREATE RETENTION POLICY rp0 ON foo DURATION 2h REPLICATION 3 DEFAULT`)); res.Err != nil {
|
||||
t.Fatal(res.Err)
|
||||
} else if res.Series != nil {
|
||||
t.Fatalf("unexpected rows: %#v", res.Series)
|
||||
|
@ -354,7 +354,7 @@ func TestStatementExecutor_ExecuteStatement_CreateRetentionPolicy_Err(t *testing
|
|||
return nil, errors.New("marker")
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`CREATE RETENTION POLICY rp0 ON foo DURATION 2h REPLICATION 1`), nil); res.Err == nil || res.Err.Error() != "marker" {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`CREATE RETENTION POLICY rp0 ON foo DURATION 2h REPLICATION 1`)); res.Err == nil || res.Err.Error() != "marker" {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -384,7 +384,7 @@ func TestStatementExecutor_ExecuteStatement_AlterRetentionPolicy(t *testing.T) {
|
|||
}
|
||||
|
||||
stmt := influxql.MustParseStatement(`ALTER RETENTION POLICY rp0 ON foo DURATION 7d REPLICATION 2 DEFAULT`)
|
||||
if res := e.ExecuteStatement(stmt, nil); res.Err != nil {
|
||||
if res := e.ExecuteStatement(stmt); res.Err != nil {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -397,7 +397,7 @@ func TestStatementExecutor_ExecuteStatement_AlterRetentionPolicy_Err(t *testing.
|
|||
}
|
||||
|
||||
stmt := influxql.MustParseStatement(`ALTER RETENTION POLICY rp0 ON foo DURATION 1m REPLICATION 4 DEFAULT`)
|
||||
if res := e.ExecuteStatement(stmt, nil); res.Err == nil || res.Err.Error() != "marker" {
|
||||
if res := e.ExecuteStatement(stmt); res.Err == nil || res.Err.Error() != "marker" {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -413,7 +413,7 @@ func TestStatementExecutor_ExecuteStatement_AlterRetentionPolicy_ErrSetDefault(t
|
|||
}
|
||||
|
||||
stmt := influxql.MustParseStatement(`ALTER RETENTION POLICY rp0 ON foo DURATION 1m REPLICATION 4 DEFAULT`)
|
||||
if res := e.ExecuteStatement(stmt, nil); res.Err == nil || res.Err.Error() != "marker" {
|
||||
if res := e.ExecuteStatement(stmt); res.Err == nil || res.Err.Error() != "marker" {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -431,7 +431,7 @@ func TestStatementExecutor_ExecuteStatement_DropRetentionPolicy(t *testing.T) {
|
|||
}
|
||||
|
||||
stmt := influxql.MustParseStatement(`DROP RETENTION POLICY rp0 ON foo`)
|
||||
if res := e.ExecuteStatement(stmt, nil); res.Err != nil {
|
||||
if res := e.ExecuteStatement(stmt); res.Err != nil {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -444,7 +444,7 @@ func TestStatementExecutor_ExecuteStatement_DropRetentionPolicy_Err(t *testing.T
|
|||
}
|
||||
|
||||
stmt := influxql.MustParseStatement(`DROP RETENTION POLICY rp0 ON foo`)
|
||||
if res := e.ExecuteStatement(stmt, nil); res.Err == nil || res.Err.Error() != "marker" {
|
||||
if res := e.ExecuteStatement(stmt); res.Err == nil || res.Err.Error() != "marker" {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -474,7 +474,7 @@ func TestStatementExecutor_ExecuteStatement_ShowRetentionPolicies(t *testing.T)
|
|||
}, nil
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW RETENTION POLICIES db0`), nil); res.Err != nil {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW RETENTION POLICIES db0`)); res.Err != nil {
|
||||
t.Fatal(res.Err)
|
||||
} else if !reflect.DeepEqual(res.Series, influxql.Rows{
|
||||
{
|
||||
|
@ -496,7 +496,7 @@ func TestStatementExecutor_ExecuteStatement_ShowRetentionPolicies_Err(t *testing
|
|||
return nil, errors.New("marker")
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW RETENTION POLICIES db0`), nil); res.Err == nil || res.Err.Error() != "marker" {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW RETENTION POLICIES db0`)); res.Err == nil || res.Err.Error() != "marker" {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -508,7 +508,7 @@ func TestStatementExecutor_ExecuteStatement_ShowRetentionPolicies_ErrDatabaseNot
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW RETENTION POLICIES db0`), nil); res.Err != meta.ErrDatabaseNotFound {
|
||||
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW RETENTION POLICIES db0`)); res.Err != meta.ErrDatabaseNotFound {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -528,7 +528,7 @@ func TestStatementExecutor_ExecuteStatement_CreateContinuousQuery(t *testing.T)
|
|||
}
|
||||
|
||||
stmt := influxql.MustParseStatement(`CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN SELECT count(*) INTO db1 FROM db0 GROUP BY time(1h) END`)
|
||||
if res := e.ExecuteStatement(stmt, nil); res.Err != nil {
|
||||
if res := e.ExecuteStatement(stmt); res.Err != nil {
|
||||
t.Fatal(res.Err)
|
||||
} else if res.Series != nil {
|
||||
t.Fatalf("unexpected rows: %#v", res.Series)
|
||||
|
@ -543,7 +543,7 @@ func TestStatementExecutor_ExecuteStatement_CreateContinuousQuery_Err(t *testing
|
|||
}
|
||||
|
||||
stmt := influxql.MustParseStatement(`CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN SELECT count(*) INTO db1 FROM db0 GROUP BY time(1h) END`)
|
||||
if res := e.ExecuteStatement(stmt, nil); res.Err == nil || res.Err.Error() != "marker" {
|
||||
if res := e.ExecuteStatement(stmt); res.Err == nil || res.Err.Error() != "marker" {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -561,7 +561,7 @@ func TestStatementExecutor_ExecuteStatement_DropContinuousQuery(t *testing.T) {
|
|||
}
|
||||
|
||||
stmt := influxql.MustParseStatement(`DROP CONTINUOUS QUERY cq0 ON db0`)
|
||||
if res := e.ExecuteStatement(stmt, nil); res.Err != nil {
|
||||
if res := e.ExecuteStatement(stmt); res.Err != nil {
|
||||
t.Fatal(res.Err)
|
||||
} else if res.Series != nil {
|
||||
t.Fatalf("unexpected rows: %#v", res.Series)
|
||||
|
@ -576,7 +576,7 @@ func TestStatementExecutor_ExecuteStatement_DropContinuousQuery_Err(t *testing.T
|
|||
}
|
||||
|
||||
stmt := influxql.MustParseStatement(`DROP CONTINUOUS QUERY cq0 ON db0`)
|
||||
if res := e.ExecuteStatement(stmt, nil); res.Err == nil || res.Err.Error() != "marker" {
|
||||
if res := e.ExecuteStatement(stmt); res.Err == nil || res.Err.Error() != "marker" {
|
||||
t.Fatalf("unexpected error: %s", res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -603,7 +603,7 @@ func TestStatementExecutor_ExecuteStatement_ShowContinuousQueries(t *testing.T)
|
|||
}
|
||||
|
||||
stmt := influxql.MustParseStatement(`SHOW CONTINUOUS QUERIES`)
|
||||
if res := e.ExecuteStatement(stmt, nil); res.Err != nil {
|
||||
if res := e.ExecuteStatement(stmt); res.Err != nil {
|
||||
t.Fatal(res.Err)
|
||||
} else if !reflect.DeepEqual(res.Series, influxql.Rows{
|
||||
{
|
||||
|
@ -634,7 +634,7 @@ func TestStatementExecutor_ExecuteStatement_ShowContinuousQueries_Err(t *testing
|
|||
}
|
||||
|
||||
stmt := influxql.MustParseStatement(`SHOW CONTINUOUS QUERIES`)
|
||||
if res := e.ExecuteStatement(stmt, nil); res.Err == nil || res.Err.Error() != "marker" {
|
||||
if res := e.ExecuteStatement(stmt); res.Err == nil || res.Err.Error() != "marker" {
|
||||
t.Fatal(res.Err)
|
||||
}
|
||||
}
|
||||
|
@ -651,7 +651,7 @@ func TestStatementExecutor_ExecuteStatement_Unsupported(t *testing.T) {
|
|||
|
||||
// Execute a SELECT statement.
|
||||
NewStatementExecutor().ExecuteStatement(
|
||||
influxql.MustParseStatement(`SELECT count(*) FROM db0`), nil,
|
||||
influxql.MustParseStatement(`SELECT count(*) FROM db0`),
|
||||
)
|
||||
}()
|
||||
|
||||
|
|
|
@ -1 +1,17 @@
|
|||
package admin
|
||||
|
||||
const (
|
||||
// DefaultBindAddress is the default bind address for the HTTP server.
|
||||
DefaultBindAddress = ":8083"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Enabled bool `toml:"enabled"`
|
||||
BindAddress string `toml:"bind-address"`
|
||||
}
|
||||
|
||||
func NewConfig() Config {
|
||||
return Config{
|
||||
BindAddress: DefaultBindAddress,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,8 +73,3 @@ func (s *Service) serve() {
|
|||
s.err <- fmt.Errorf("listener error: addr=%s, err=%s", s.Addr(), err)
|
||||
}
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Enabled bool `toml:"enabled"`
|
||||
BindAddress string `toml:"bind-address"`
|
||||
}
|
||||
|
|
|
@ -325,7 +325,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
return
|
||||
}
|
||||
|
||||
points, err := influxdb.NormalizeBatchPoints(bp)
|
||||
points, err := NormalizeBatchPoints(bp)
|
||||
if err != nil {
|
||||
resultError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
|
||||
return
|
||||
|
@ -417,10 +417,9 @@ func (h *Handler) serveWritePoints(w http.ResponseWriter, r *http.Request, user
|
|||
return
|
||||
}
|
||||
|
||||
retentionPolicy := r.Form.Get("rp")
|
||||
consistencyVal := r.Form.Get("consistency")
|
||||
// Determine required consistency level.
|
||||
consistency := cluster.ConsistencyLevelOne
|
||||
switch consistencyVal {
|
||||
switch r.Form.Get("consistency") {
|
||||
case "all":
|
||||
consistency = cluster.ConsistencyLevelAll
|
||||
case "any":
|
||||
|
@ -431,23 +430,21 @@ func (h *Handler) serveWritePoints(w http.ResponseWriter, r *http.Request, user
|
|||
consistency = cluster.ConsistencyLevelQuorum
|
||||
}
|
||||
|
||||
wpr := &cluster.WritePointsRequest{
|
||||
// Write points.
|
||||
if err := h.PointsWriter.WritePoints(&cluster.WritePointsRequest{
|
||||
Database: database,
|
||||
RetentionPolicy: retentionPolicy,
|
||||
RetentionPolicy: r.Form.Get("rp"),
|
||||
ConsistencyLevel: consistency,
|
||||
Points: points,
|
||||
}); influxdb.IsClientError(err) {
|
||||
writeError(influxql.Result{Err: err}, http.StatusBadRequest)
|
||||
return
|
||||
} else if err != nil {
|
||||
writeError(influxql.Result{Err: err}, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.PointsWriter.Write(wpr); err != nil {
|
||||
if influxdb.IsClientError(err) {
|
||||
writeError(influxql.Result{Err: err}, http.StatusBadRequest)
|
||||
} else {
|
||||
writeError(influxql.Result{Err: err}, http.StatusInternalServerError)
|
||||
}
|
||||
return
|
||||
} else {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// serveOptions returns an empty response to comply with OPTIONS pre-flight requests
|
||||
|
@ -853,3 +850,37 @@ func interfaceToString(v interface{}) string {
|
|||
}
|
||||
}
|
||||
*/
|
||||
|
||||
// NormalizeBatchPoints returns a slice of Points, created by populating individual
|
||||
// points within the batch, which do not have times or tags, with the top-level
|
||||
// values.
|
||||
func NormalizeBatchPoints(bp client.BatchPoints) ([]tsdb.Point, error) {
|
||||
points := []tsdb.Point{}
|
||||
for _, p := range bp.Points {
|
||||
if p.Time.IsZero() {
|
||||
if bp.Time.IsZero() {
|
||||
p.Time = time.Now()
|
||||
} else {
|
||||
p.Time = bp.Time
|
||||
}
|
||||
}
|
||||
if p.Precision == "" && bp.Precision != "" {
|
||||
p.Precision = bp.Precision
|
||||
}
|
||||
p.Time = client.SetPrecision(p.Time, p.Precision)
|
||||
if len(bp.Tags) > 0 {
|
||||
if p.Tags == nil {
|
||||
p.Tags = make(map[string]string)
|
||||
}
|
||||
for k := range bp.Tags {
|
||||
if p.Tags[k] == "" {
|
||||
p.Tags[k] = bp.Tags[k]
|
||||
}
|
||||
}
|
||||
}
|
||||
// Need to convert from a client.Point to a influxdb.Point
|
||||
points = append(points, tsdb.NewPoint(p.Name, p.Tags, p.Fields, p.Time))
|
||||
}
|
||||
|
||||
return points, nil
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,79 +0,0 @@
|
|||
package udp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/client"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
const (
|
||||
udpBufferSize = 65536
|
||||
)
|
||||
|
||||
// SeriesWriter defines the interface for the destination of the data.
|
||||
type SeriesWriter interface {
|
||||
WriteSeries(database, retentionPolicy string, points []tsdb.Point) (uint64, error)
|
||||
}
|
||||
|
||||
// UDPServer
|
||||
type UDPServer struct {
|
||||
writer SeriesWriter
|
||||
}
|
||||
|
||||
// NewUDPServer returns a new instance of a UDPServer
|
||||
func NewUDPServer(w SeriesWriter) *UDPServer {
|
||||
u := UDPServer{
|
||||
writer: w,
|
||||
}
|
||||
return &u
|
||||
}
|
||||
|
||||
// ListenAndServe binds the server to the given UDP interface.
|
||||
func (u *UDPServer) ListenAndServe(iface string) error {
|
||||
addr, err := net.ResolveUDPAddr("udp", iface)
|
||||
if err != nil {
|
||||
log.Printf("Failed resolve UDP address %s: %s", iface, err)
|
||||
return err
|
||||
}
|
||||
|
||||
conn, err := net.ListenUDP("udp", addr)
|
||||
if err != nil {
|
||||
log.Printf("Failed set up UDP listener at address %s: %s", addr, err)
|
||||
return err
|
||||
}
|
||||
|
||||
var bp client.BatchPoints
|
||||
buf := make([]byte, udpBufferSize)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
_, _, err := conn.ReadFromUDP(buf)
|
||||
if err != nil {
|
||||
log.Printf("Failed read UDP message: %s.", err)
|
||||
continue
|
||||
}
|
||||
|
||||
dec := json.NewDecoder(bytes.NewReader(buf))
|
||||
if err := dec.Decode(&bp); err != nil {
|
||||
log.Printf("Failed decode JSON UDP message")
|
||||
continue
|
||||
}
|
||||
|
||||
points, err := influxdb.NormalizeBatchPoints(bp)
|
||||
if err != nil {
|
||||
log.Printf("Failed normalize batch points")
|
||||
continue
|
||||
}
|
||||
|
||||
if msgIndex, err := u.writer.WriteSeries(bp.Database, bp.RetentionPolicy, points); err != nil {
|
||||
log.Printf("Server write failed. Message index was %d: %s", msgIndex, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
|
@ -9,6 +9,7 @@ import (
|
|||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
"github.com/influxdb/influxdb/meta"
|
||||
)
|
||||
|
@ -35,12 +36,7 @@ type QueryExecutor struct {
|
|||
}
|
||||
|
||||
// The stats service to report to
|
||||
Stats interface {
|
||||
Add(key string, delta int64)
|
||||
Inc(key string)
|
||||
Name() string
|
||||
Walk(f func(string, int64))
|
||||
}
|
||||
Stats *influxdb.Stats
|
||||
|
||||
Logger *log.Logger
|
||||
|
||||
|
@ -52,6 +48,7 @@ type QueryExecutor struct {
|
|||
func NewQueryExecutor(store *Store) *QueryExecutor {
|
||||
return &QueryExecutor{
|
||||
store: store,
|
||||
Stats: influxdb.NewStats("query_executor"),
|
||||
Logger: log.New(os.Stderr, "[query] ", log.LstdFlags),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -128,7 +128,6 @@ func testStoreAndExecutor() (*Store, *QueryExecutor) {
|
|||
|
||||
executor := NewQueryExecutor(store)
|
||||
executor.MetaStore = &testMetastore{}
|
||||
executor.Stats = &fakeStats{}
|
||||
|
||||
return store, executor
|
||||
}
|
||||
|
@ -211,13 +210,6 @@ func (t *testMetastore) UserCount() (int, error) {
|
|||
return t.userCount, nil
|
||||
}
|
||||
|
||||
type fakeStats struct{}
|
||||
|
||||
func (f *fakeStats) Add(key string, delta int64) {}
|
||||
func (f *fakeStats) Inc(key string) {}
|
||||
func (f *fakeStats) Name() string { return "test" }
|
||||
func (f *fakeStats) Walk(fun func(string, int64)) {}
|
||||
|
||||
// MustParseQuery parses an InfluxQL query. Panic on error.
|
||||
func mustParseQuery(s string) *influxql.Query {
|
||||
q, err := influxql.NewParser(strings.NewReader(s)).ParseQuery()
|
||||
|
|
|
@ -24,10 +24,9 @@ var (
|
|||
)
|
||||
|
||||
type Store struct {
|
||||
mu sync.RWMutex
|
||||
path string
|
||||
|
||||
mu sync.RWMutex
|
||||
|
||||
databaseIndexes map[string]*DatabaseIndex
|
||||
shards map[uint64]*Shard
|
||||
|
||||
|
|
Loading…
Reference in New Issue