Merge pull request #3975 from benbjohnson/copy-shard

Copier service
pull/3815/head
Ben Johnson 2015-09-03 13:07:49 -06:00
commit 0163945898
11 changed files with 580 additions and 0 deletions

View File

@ -9,6 +9,7 @@ With this release InfluxDB is moving to Go 1.5.
- [#3916](https://github.com/influxdb/influxdb/pull/3916): New statistics and diagnostics support. Graphite first to be instrumented.
- [#3901](https://github.com/influxdb/influxdb/pull/3901): Add consistency level option to influx cli Thanks @takayuki
- [#3876](https://github.com/influxdb/influxdb/pull/3876): Allow the following syntax in CQs: INTO "1hPolicy".:MEASUREMENT
- [#3975](https://github.com/influxdb/influxdb/pull/3975): Add shard copy service
### Bugfixes
- [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803.

View File

@ -18,6 +18,7 @@ import (
"github.com/influxdb/influxdb/services/admin"
"github.com/influxdb/influxdb/services/collectd"
"github.com/influxdb/influxdb/services/continuous_querier"
"github.com/influxdb/influxdb/services/copier"
"github.com/influxdb/influxdb/services/graphite"
"github.com/influxdb/influxdb/services/hh"
"github.com/influxdb/influxdb/services/httpd"
@ -57,6 +58,7 @@ type Server struct {
// These references are required for the tcp muxer.
ClusterService *cluster.Service
SnapshotterService *snapshotter.Service
CopierService *copier.Service
Monitor *monitor.Monitor
@ -134,6 +136,7 @@ func NewServer(c *Config, version string) (*Server, error) {
s.appendClusterService(c.Cluster)
s.appendPrecreatorService(c.Precreator)
s.appendSnapshotterService()
s.appendCopierService()
s.appendAdminService(c.Admin)
s.appendContinuousQueryService(c.ContinuousQuery)
s.appendHTTPDService(c.HTTPD)
@ -170,6 +173,13 @@ func (s *Server) appendSnapshotterService() {
s.SnapshotterService = srv
}
func (s *Server) appendCopierService() {
srv := copier.NewService()
srv.TSDBStore = s.TSDBStore
s.Services = append(s.Services, srv)
s.CopierService = srv
}
func (s *Server) appendRetentionPolicyService(c retention.Config) {
if !c.Enabled {
return
@ -324,6 +334,7 @@ func (s *Server) Open() error {
s.ClusterService.Listener = mux.Listen(cluster.MuxHeader)
s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)
s.CopierService.Listener = mux.Listen(copier.MuxHeader)
go mux.Serve(ln)
// Open meta store.

View File

@ -0,0 +1,57 @@
// Code generated by protoc-gen-gogo.
// source: internal/internal.proto
// DO NOT EDIT!
/*
Package internal is a generated protocol buffer package.
It is generated from these files:
internal/internal.proto
It has these top-level messages:
Request
Response
*/
package internal
import proto "github.com/gogo/protobuf/proto"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = math.Inf
type Request struct {
ShardID *uint64 `protobuf:"varint,1,req" json:"ShardID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Request) Reset() { *m = Request{} }
func (m *Request) String() string { return proto.CompactTextString(m) }
func (*Request) ProtoMessage() {}
func (m *Request) GetShardID() uint64 {
if m != nil && m.ShardID != nil {
return *m.ShardID
}
return 0
}
type Response struct {
Error *string `protobuf:"bytes,1,opt" json:"Error,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Response) Reset() { *m = Response{} }
func (m *Response) String() string { return proto.CompactTextString(m) }
func (*Response) ProtoMessage() {}
func (m *Response) GetError() string {
if m != nil && m.Error != nil {
return *m.Error
}
return ""
}
func init() {
}

View File

@ -0,0 +1,9 @@
package internal;
message Request {
required uint64 ShardID = 1;
}
message Response {
optional string Error = 1;
}

261
services/copier/service.go Normal file
View File

@ -0,0 +1,261 @@
package copier
import (
"encoding/binary"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"strings"
"sync"
"github.com/gogo/protobuf/proto"
"github.com/influxdb/influxdb/services/copier/internal"
"github.com/influxdb/influxdb/tcp"
"github.com/influxdb/influxdb/tsdb"
)
//go:generate protoc --gogo_out=. internal/internal.proto
// MuxHeader is the header byte used for the TCP muxer.
const MuxHeader = 6
// Service manages the listener for the endpoint.
type Service struct {
wg sync.WaitGroup
err chan error
TSDBStore interface {
Shard(id uint64) *tsdb.Shard
}
Listener net.Listener
Logger *log.Logger
}
// NewService returns a new instance of Service.
func NewService() *Service {
return &Service{
err: make(chan error),
Logger: log.New(os.Stderr, "[copier] ", log.LstdFlags),
}
}
// Open starts the service.
func (s *Service) Open() error {
s.Logger.Println("Starting copier service")
s.wg.Add(1)
go s.serve()
return nil
}
// Close implements the Service interface.
func (s *Service) Close() error {
if s.Listener != nil {
s.Listener.Close()
}
s.wg.Wait()
return nil
}
// SetLogger sets the internal logger to the logger passed in.
func (s *Service) SetLogger(l *log.Logger) {
s.Logger = l
}
// Err returns a channel for fatal out-of-band errors.
func (s *Service) Err() <-chan error { return s.err }
// serve serves shard copy requests from the listener.
func (s *Service) serve() {
defer s.wg.Done()
for {
// Wait for next connection.
conn, err := s.Listener.Accept()
if err != nil && strings.Contains(err.Error(), "connection closed") {
s.Logger.Println("copier listener closed")
return
} else if err != nil {
s.Logger.Println("error accepting copier request: ", err.Error())
continue
}
// Handle connection in separate goroutine.
s.wg.Add(1)
go func(conn net.Conn) {
defer s.wg.Done()
defer conn.Close()
if err := s.handleConn(conn); err != nil {
s.Logger.Println(err)
}
}(conn)
}
}
// handleConn processes conn. This is run in a separate goroutine.
func (s *Service) handleConn(conn net.Conn) error {
// Read request from connection.
req, err := s.readRequest(conn)
if err != nil {
return fmt.Errorf("read request: %s", err)
}
// Retrieve shard.
sh := s.TSDBStore.Shard(req.GetShardID())
// Return error response if the shard doesn't exist.
if sh == nil {
if err := s.writeResponse(conn, &internal.Response{
Error: proto.String(fmt.Sprintf("shard not found: id=%d", req.GetShardID())),
}); err != nil {
return fmt.Errorf("write error response: %s", err)
}
return nil
}
// Write successful response.
if err := s.writeResponse(conn, &internal.Response{}); err != nil {
return fmt.Errorf("write response: %s", err)
}
// Write shard to response.
if _, err := sh.WriteTo(conn); err != nil {
return fmt.Errorf("write shard: %s", err)
}
return nil
}
// readRequest reads and unmarshals a Request from r.
func (s *Service) readRequest(r io.Reader) (*internal.Request, error) {
// Read request length.
var n uint32
if err := binary.Read(r, binary.BigEndian, &n); err != nil {
return nil, fmt.Errorf("read request length: %s", err)
}
// Read body.
buf := make([]byte, n)
if _, err := io.ReadFull(r, buf); err != nil {
return nil, fmt.Errorf("read request: %s", err)
}
// Unmarshal request.
req := &internal.Request{}
if err := proto.Unmarshal(buf, req); err != nil {
return nil, fmt.Errorf("unmarshal request: %s", err)
}
return req, nil
}
// writeResponse marshals and writes a Response to w.
func (s *Service) writeResponse(w io.Writer, resp *internal.Response) error {
// Marshal the response to a byte slice.
buf, err := proto.Marshal(resp)
if err != nil {
return fmt.Errorf("marshal error: %s", err)
}
// Write response length to writer.
if err := binary.Write(w, binary.BigEndian, uint32(len(buf))); err != nil {
return fmt.Errorf("write response length error: %s", err)
}
// Write body to writer.
if _, err := w.Write(buf); err != nil {
return fmt.Errorf("write body error: %s", err)
}
return nil
}
// Client represents a client for connecting remotely to a copier service.
type Client struct {
host string
}
// NewClient return a new instance of Client.
func NewClient(host string) *Client {
return &Client{
host: host,
}
}
// ShardReader returns a reader for streaming shard data.
// Returned ReadCloser must be closed by the caller.
func (c *Client) ShardReader(id uint64) (io.ReadCloser, error) {
// Connect to remote server.
conn, err := tcp.Dial("tcp", c.host, MuxHeader)
if err != nil {
return nil, err
}
// Send request to server.
if err := c.writeRequest(conn, &internal.Request{ShardID: proto.Uint64(id)}); err != nil {
return nil, fmt.Errorf("write request: %s", err)
}
// Read response from the server.
resp, err := c.readResponse(conn)
if err != nil {
return nil, fmt.Errorf("read response: %s", err)
}
// If there was an error then return it and close connection.
if resp.GetError() != "" {
conn.Close()
return nil, errors.New(resp.GetError())
}
// Returning remaining stream for caller to consume.
return conn, nil
}
// writeRequest marshals and writes req to w.
func (c *Client) writeRequest(w io.Writer, req *internal.Request) error {
// Marshal request.
buf, err := proto.Marshal(req)
if err != nil {
return fmt.Errorf("marshal request: %s", err)
}
// Write request length.
if err := binary.Write(w, binary.BigEndian, uint32(len(buf))); err != nil {
return fmt.Errorf("write request length: %s", err)
}
// Send request to server.
if _, err := w.Write(buf); err != nil {
return fmt.Errorf("write request body: %s", err)
}
return nil
}
// readResponse reads and unmarshals a Response from r.
func (c *Client) readResponse(r io.Reader) (*internal.Response, error) {
// Read response length.
var n uint32
if err := binary.Read(r, binary.BigEndian, &n); err != nil {
return nil, fmt.Errorf("read response length: %s", err)
}
// Read response.
buf := make([]byte, n)
if _, err := io.ReadFull(r, buf); err != nil {
return nil, fmt.Errorf("read response: %s", err)
}
// Unmarshal response.
resp := &internal.Response{}
if err := proto.Unmarshal(buf, resp); err != nil {
return nil, fmt.Errorf("unmarshal response: %s", err)
}
return resp, nil
}

View File

@ -0,0 +1,184 @@
package copier_test
import (
"bytes"
"encoding/binary"
"io"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"testing"
"github.com/influxdb/influxdb/services/copier"
"github.com/influxdb/influxdb/tcp"
"github.com/influxdb/influxdb/tsdb"
_ "github.com/influxdb/influxdb/tsdb/engine"
)
// Ensure the service can return shard data.
func TestService_handleConn(t *testing.T) {
s := MustOpenService()
defer s.Close()
// Mock shard.
sh := MustOpenShard(123)
defer sh.Close()
s.TSDBStore.ShardFn = func(id uint64) *tsdb.Shard {
if id != 123 {
t.Fatalf("unexpected id: %d", id)
}
return sh.Shard
}
// Create client and request shard from service.
c := copier.NewClient(s.Addr().String())
r, err := c.ShardReader(123)
if err != nil {
t.Fatal(err)
} else if r == nil {
t.Fatal("expected reader")
}
defer r.Close()
// Slurp from reader.
var n uint64
if err := binary.Read(r, binary.BigEndian, &n); err != nil {
t.Fatal(err)
}
buf := make([]byte, n)
if _, err := io.ReadFull(r, buf); err != nil {
t.Fatal(err)
}
// Read database from disk.
exp, err := ioutil.ReadFile(sh.Path())
if err != nil {
t.Fatal(err)
}
// Trim expected bytes since bolt won't read beyond the HWM.
exp = exp[0:len(buf)]
// Compare disk and reader contents.
if !bytes.Equal(exp, buf) {
t.Fatalf("data mismatch: exp=len(%d), got=len(%d)", len(exp), len(buf))
}
}
// Ensure the service can return an error to the client.
func TestService_handleConn_Error(t *testing.T) {
s := MustOpenService()
defer s.Close()
// Mock missing shard.
s.TSDBStore.ShardFn = func(id uint64) *tsdb.Shard { return nil }
// Create client and request shard from service.
c := copier.NewClient(s.Addr().String())
r, err := c.ShardReader(123)
if err == nil || err.Error() != `shard not found: id=123` {
t.Fatalf("unexpected error: %s", err)
} else if r != nil {
t.Fatal("expected nil reader")
}
}
// Service represents a test wrapper for copier.Service.
type Service struct {
*copier.Service
ln net.Listener
TSDBStore ServiceTSDBStore
}
// NewService returns a new instance of Service.
func NewService() *Service {
s := &Service{
Service: copier.NewService(),
}
s.Service.TSDBStore = &s.TSDBStore
if !testing.Verbose() {
s.SetLogger(log.New(ioutil.Discard, "", 0))
}
return s
}
// MustOpenService returns a new, opened service. Panic on error.
func MustOpenService() *Service {
// Open randomly assigned port.
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(err)
}
// Start muxer.
mux := tcp.NewMux()
// Create new service and attach mux'd listener.
s := NewService()
s.ln = ln
s.Listener = mux.Listen(copier.MuxHeader)
go mux.Serve(ln)
if err := s.Open(); err != nil {
panic(err)
}
return s
}
// Close shuts down the service and the attached listener.
func (s *Service) Close() error {
s.ln.Close()
err := s.Service.Close()
return err
}
// Addr returns the address of the service.
func (s *Service) Addr() net.Addr { return s.ln.Addr() }
// ServiceTSDBStore is a mock that implements copier.Service.TSDBStore.
type ServiceTSDBStore struct {
ShardFn func(id uint64) *tsdb.Shard
}
func (ss *ServiceTSDBStore) Shard(id uint64) *tsdb.Shard { return ss.ShardFn(id) }
// Shard is a test wrapper for tsdb.Shard.
type Shard struct {
*tsdb.Shard
path string
}
// MustOpenShard returns a temporary, opened shard.
func MustOpenShard(id uint64) *Shard {
path, err := ioutil.TempDir("", "copier-")
if err != nil {
panic(err)
}
sh := &Shard{
Shard: tsdb.NewShard(id,
tsdb.NewDatabaseIndex(),
filepath.Join(path, "data"),
filepath.Join(path, "wal"),
tsdb.NewEngineOptions(),
),
path: path,
}
if err := sh.Open(); err != nil {
sh.Close()
panic(err)
}
return sh
}
func (sh *Shard) Close() error {
err := sh.Shard.Close()
os.RemoveAll(sh.Path())
return err
}

View File

@ -126,3 +126,17 @@ func (ln *listener) Close() error { return nil }
// Addr always returns nil.
func (ln *listener) Addr() net.Addr { return nil }
// Dial connects to a remote mux listener with a given header byte.
func Dial(network, address string, header byte) (net.Conn, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
if _, err := conn.Write([]byte{header}); err != nil {
return nil, fmt.Errorf("write mux header: %s", err)
}
return conn, nil
}

View File

@ -33,6 +33,8 @@ type Engine interface {
DeleteSeries(keys []string) error
DeleteMeasurement(name string, seriesKeys []string) error
SeriesCount() (n int, err error)
io.WriterTo
}
// NewEngineFunc creates a new engine.

View File

@ -524,6 +524,25 @@ func (e *Engine) Begin(writable bool) (tsdb.Tx, error) {
// DB returns the underlying Bolt database.
func (e *Engine) DB() *bolt.DB { return e.db }
// WriteTo writes the length and contents of the engine to w.
func (e *Engine) WriteTo(w io.Writer) (n int64, err error) {
tx, err := e.db.Begin(false)
if err != nil {
return 0, err
}
defer tx.Rollback()
// Write size.
if err := binary.Write(w, binary.BigEndian, uint64(tx.Size())); err != nil {
return 0, err
}
// Write data.
n, err = tx.WriteTo(w)
n += 8 // size header
return
}
// Tx represents a transaction.
type Tx struct {
*bolt.Tx

View File

@ -574,6 +574,25 @@ func (e *Engine) SeriesBucketStats(key string) (stats bolt.BucketStats, err erro
return stats, err
}
// WriteTo writes the length and contents of the engine to w.
func (e *Engine) WriteTo(w io.Writer) (n int64, err error) {
tx, err := e.db.Begin(false)
if err != nil {
return 0, err
}
defer tx.Rollback()
// Write size.
if err := binary.Write(w, binary.BigEndian, uint64(tx.Size())); err != nil {
return 0, err
}
// Write data.
n, err = tx.WriteTo(w)
n += 8 // size header
return
}
// Stats represents internal engine statistics.
type Stats struct {
Size int64 // BoltDB data size

View File

@ -382,6 +382,9 @@ func (s *Shard) validateSeriesAndFields(points []Point) ([]*SeriesCreate, []*Fie
// SeriesCount returns the number of series buckets on the shard.
func (s *Shard) SeriesCount() (int, error) { return s.engine.SeriesCount() }
// WriteTo writes the shard's data to w.
func (s *Shard) WriteTo(w io.Writer) (int64, error) { return s.engine.WriteTo(w) }
type MeasurementFields struct {
Fields map[string]*Field `json:"fields"`
Codec *FieldCodec