package snapshotter import ( "encoding" "encoding/json" "fmt" "log" "net" "os" "strings" "sync" "time" "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/services/meta" "github.com/influxdb/influxdb/tsdb" ) // MuxHeader is the header byte used for the TCP muxer. const MuxHeader = 3 // Service manages the listener for the snapshot endpoint. type Service struct { wg sync.WaitGroup err chan error MetaClient interface { encoding.BinaryMarshaler Database(name string) (*meta.DatabaseInfo, error) } TSDBStore *tsdb.Store Listener net.Listener Logger *log.Logger } // NewService returns a new instance of Service. func NewService() *Service { return &Service{ err: make(chan error), Logger: log.New(os.Stderr, "[snapshot] ", log.LstdFlags), } } // Open starts the service. func (s *Service) Open() error { s.Logger.Println("Starting snapshot service") s.wg.Add(1) go s.serve() return nil } // Close implements the Service interface. func (s *Service) Close() error { if s.Listener != nil { s.Listener.Close() } s.wg.Wait() return nil } // SetLogger sets the internal logger to the logger passed in. func (s *Service) SetLogger(l *log.Logger) { s.Logger = l } // Err returns a channel for fatal out-of-band errors. func (s *Service) Err() <-chan error { return s.err } // serve serves snapshot requests from the listener. func (s *Service) serve() { defer s.wg.Done() for { // Wait for next connection. conn, err := s.Listener.Accept() if err != nil && strings.Contains(err.Error(), "connection closed") { s.Logger.Println("snapshot listener closed") return } else if err != nil { s.Logger.Println("error accepting snapshot request: ", err.Error()) continue } // Handle connection in separate goroutine. s.wg.Add(1) go func(conn net.Conn) { defer s.wg.Done() defer conn.Close() if err := s.handleConn(conn); err != nil { s.Logger.Println(err) } }(conn) } } // handleConn processes conn. This is run in a separate goroutine. func (s *Service) handleConn(conn net.Conn) error { r, err := s.readRequest(conn) if err != nil { return fmt.Errorf("read request: %s", err) } switch r.Type { case RequestShardBackup: if err := s.TSDBStore.BackupShard(r.ShardID, r.Since, conn); err != nil { return err } case RequestMetastoreBackup: // Retrieve and serialize the current meta data. buf, err := s.MetaClient.MarshalBinary() if err != nil { return fmt.Errorf("marshal meta: %s", err) } if _, err := conn.Write(buf); err != nil { return err } case RequestDatabaseInfo: return s.writeDatabaseInfo(conn, r.Database) case RequestRetentionPolicyInfo: return s.writeRetentionPolicyInfo(conn, r.Database, r.RetentionPolicy) default: return fmt.Errorf("request type unknown: %v", r.Type) } return nil } // writeDatabaseInfo will write the relative paths of all shards in the database on // this server into the connection func (s *Service) writeDatabaseInfo(conn net.Conn, database string) error { res := Response{} db, err := s.MetaClient.Database(database) if err != nil { return err } if db == nil { return influxdb.ErrDatabaseNotFound(database) } for _, rp := range db.RetentionPolicies { for _, sg := range rp.ShardGroups { for _, sh := range sg.Shards { // ignore if the shard isn't on the server if s.TSDBStore.Shard(sh.ID) == nil { continue } path, err := s.TSDBStore.ShardRelativePath(sh.ID) if err != nil { return err } res.Paths = append(res.Paths, path) } } } if err := json.NewEncoder(conn).Encode(res); err != nil { return fmt.Errorf("encode resonse: %s", err.Error()) } return nil } // writeDatabaseInfo will write the relative paths of all shards in the retention policy on // this server into the connection func (s *Service) writeRetentionPolicyInfo(conn net.Conn, database, retentionPolicy string) error { res := Response{} db, err := s.MetaClient.Database(database) if err != nil { return err } if db == nil { return influxdb.ErrDatabaseNotFound(database) } var ret *meta.RetentionPolicyInfo for _, rp := range db.RetentionPolicies { if rp.Name == retentionPolicy { ret = &rp break } } if ret == nil { return influxdb.ErrRetentionPolicyNotFound(retentionPolicy) } for _, sg := range ret.ShardGroups { for _, sh := range sg.Shards { // ignore if the shard isn't on the server if s.TSDBStore.Shard(sh.ID) == nil { continue } path, err := s.TSDBStore.ShardRelativePath(sh.ID) if err != nil { return err } res.Paths = append(res.Paths, path) } } if err := json.NewEncoder(conn).Encode(res); err != nil { return fmt.Errorf("encode resonse: %s", err.Error()) } return nil } // readRequest Unmarshals a request object from the conn func (s *Service) readRequest(conn net.Conn) (Request, error) { var r Request if err := json.NewDecoder(conn).Decode(&r); err != nil { return r, err } return r, nil } type RequestType uint8 const ( RequestShardBackup RequestType = iota RequestMetastoreBackup RequestDatabaseInfo RequestRetentionPolicyInfo ) // Request represents a request for a specific backup or for information // about the shards on this server for a database or retention policy type Request struct { Type RequestType Database string RetentionPolicy string ShardID uint64 Since time.Time } // Response contains the relative paths for all the shards on this server // that are in the requested database or retention policy type Response struct { Paths []string }