2015-05-28 21:47:47 +00:00
|
|
|
package cluster
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
import (
|
|
|
|
"encoding/binary"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"log"
|
|
|
|
"net"
|
|
|
|
"os"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
2015-05-29 19:50:05 +00:00
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
"github.com/influxdb/influxdb/tsdb"
|
|
|
|
)
|
|
|
|
|
2015-06-05 01:40:48 +00:00
|
|
|
// MaxMessageSize defines how large a message can be before we reject it
|
|
|
|
const MaxMessageSize = 1024 * 1024 * 1024 // 1GB
|
|
|
|
|
2015-06-05 22:54:12 +00:00
|
|
|
// MuxHeader is the header byte used in the TCP mux.
|
|
|
|
const MuxHeader = 2
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
// Service processes data received over raw TCP connections.
|
2015-05-28 21:47:47 +00:00
|
|
|
type Service struct {
|
2015-06-05 22:54:12 +00:00
|
|
|
mu sync.RWMutex
|
2015-05-30 20:00:46 +00:00
|
|
|
|
|
|
|
wg sync.WaitGroup
|
|
|
|
closing chan struct{}
|
|
|
|
|
2015-06-05 22:54:12 +00:00
|
|
|
Listener net.Listener
|
|
|
|
|
2015-06-02 20:45:52 +00:00
|
|
|
TSDBStore interface {
|
|
|
|
WriteToShard(shardID uint64, points []tsdb.Point) error
|
2015-05-30 20:00:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Logger *log.Logger
|
2015-05-28 21:47:47 +00:00
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
// NewService returns a new instance of Service.
|
2015-05-29 19:50:05 +00:00
|
|
|
func NewService(c Config) *Service {
|
2015-05-30 20:00:46 +00:00
|
|
|
return &Service{
|
|
|
|
closing: make(chan struct{}),
|
|
|
|
Logger: log.New(os.Stderr, "[tcp] ", log.LstdFlags),
|
|
|
|
}
|
2015-05-28 21:47:47 +00:00
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
// Open opens the network listener and begins serving requests.
|
|
|
|
func (s *Service) Open() error {
|
|
|
|
// Begin serving conections.
|
|
|
|
s.wg.Add(1)
|
|
|
|
go s.serve()
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-06-03 15:58:39 +00:00
|
|
|
// SetLogger sets the internal logger to the logger passed in.
|
|
|
|
func (s *Service) SetLogger(l *log.Logger) {
|
|
|
|
s.Logger = l
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
// serve accepts connections from the listener and handles them.
|
|
|
|
func (s *Service) serve() {
|
|
|
|
defer s.wg.Done()
|
|
|
|
|
|
|
|
for {
|
|
|
|
// Check if the service is shutting down.
|
|
|
|
select {
|
|
|
|
case <-s.closing:
|
|
|
|
return
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
// Accept the next connection.
|
2015-06-05 22:54:12 +00:00
|
|
|
conn, err := s.Listener.Accept()
|
2015-05-30 20:00:46 +00:00
|
|
|
if opErr, ok := err.(*net.OpError); ok && opErr.Temporary() {
|
|
|
|
s.Logger.Println("error temporarily accepting TCP connection", err.Error())
|
|
|
|
continue
|
|
|
|
} else if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Delegate connection handling to a separate goroutine.
|
|
|
|
s.wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer s.wg.Done()
|
|
|
|
s.handleConn(conn)
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close shuts down the listener and waits for all connections to finish.
|
|
|
|
func (s *Service) Close() error {
|
2015-06-05 22:54:12 +00:00
|
|
|
if s.Listener != nil {
|
|
|
|
s.Listener.Close()
|
2015-05-30 20:00:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Shut down all handlers.
|
|
|
|
close(s.closing)
|
|
|
|
s.wg.Wait()
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// handleConn services an individual TCP connection.
|
|
|
|
func (s *Service) handleConn(conn net.Conn) {
|
|
|
|
// Ensure connection is closed when service is closed.
|
|
|
|
closing := make(chan struct{})
|
|
|
|
defer close(closing)
|
|
|
|
go func() {
|
|
|
|
select {
|
|
|
|
case <-closing:
|
|
|
|
case <-s.closing:
|
|
|
|
}
|
|
|
|
conn.Close()
|
|
|
|
}()
|
|
|
|
|
|
|
|
for {
|
|
|
|
// Read type-length-value.
|
|
|
|
typ, buf, err := ReadTLV(conn)
|
|
|
|
if err != nil && strings.Contains(err.Error(), "closed network connection") {
|
|
|
|
return
|
|
|
|
} else if err != nil {
|
|
|
|
s.Logger.Printf("unable to read type-length-value %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Delegate message processing by type.
|
|
|
|
switch typ {
|
|
|
|
case writeShardRequestMessage:
|
|
|
|
err := s.processWriteShardRequest(buf)
|
|
|
|
s.writeShardResponse(conn, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Service) processWriteShardRequest(buf []byte) error {
|
|
|
|
// Build request
|
|
|
|
var req WriteShardRequest
|
|
|
|
if err := req.UnmarshalBinary(buf); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-06-02 20:45:52 +00:00
|
|
|
if err := s.TSDBStore.WriteToShard(req.ShardID(), req.Points()); err != nil {
|
2015-05-30 20:00:46 +00:00
|
|
|
return fmt.Errorf("write shard: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Service) writeShardResponse(w io.Writer, e error) {
|
|
|
|
// Build response.
|
|
|
|
var resp WriteShardResponse
|
|
|
|
if e != nil {
|
|
|
|
resp.SetCode(1)
|
|
|
|
resp.SetMessage(e.Error())
|
|
|
|
} else {
|
|
|
|
resp.SetCode(0)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Marshal response to binary.
|
|
|
|
buf, err := resp.MarshalBinary()
|
|
|
|
if err != nil {
|
|
|
|
s.Logger.Printf("error marshalling shard response: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Write to connection.
|
|
|
|
if err := WriteTLV(w, writeShardResponseMessage, buf); err != nil {
|
|
|
|
s.Logger.Printf("write shard response error: %s", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// ReadTLV reads a type-length-value record from r.
|
|
|
|
func ReadTLV(r io.Reader) (byte, []byte, error) {
|
|
|
|
var typ [1]byte
|
|
|
|
if _, err := io.ReadFull(r, typ[:]); err != nil {
|
|
|
|
return 0, nil, fmt.Errorf("read message type: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read the size of the message.
|
|
|
|
var sz int64
|
|
|
|
if err := binary.Read(r, binary.BigEndian, &sz); err != nil {
|
|
|
|
return 0, nil, fmt.Errorf("read message size: %s", err)
|
|
|
|
}
|
|
|
|
|
2015-06-05 01:40:48 +00:00
|
|
|
if sz == 0 {
|
|
|
|
return 0, nil, fmt.Errorf("invalid message size: %d", sz)
|
|
|
|
}
|
|
|
|
|
|
|
|
if sz >= MaxMessageSize {
|
|
|
|
return 0, nil, fmt.Errorf("max message size of %d exceeded: %d", MaxMessageSize, sz)
|
|
|
|
}
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
// Read the value.
|
|
|
|
buf := make([]byte, sz)
|
|
|
|
if _, err := io.ReadFull(r, buf); err != nil {
|
|
|
|
return 0, nil, fmt.Errorf("read message value: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return typ[0], buf, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// WriteTLV writes a type-length-value record to w.
|
|
|
|
func WriteTLV(w io.Writer, typ byte, buf []byte) error {
|
|
|
|
if _, err := w.Write([]byte{typ}); err != nil {
|
|
|
|
return fmt.Errorf("write message type: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Write the size of the message.
|
|
|
|
if err := binary.Write(w, binary.BigEndian, int64(len(buf))); err != nil {
|
|
|
|
return fmt.Errorf("write message size: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Write the value.
|
|
|
|
if _, err := w.Write(buf); err != nil {
|
|
|
|
return fmt.Errorf("write message value: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|