Backup node.json with metastore backup
parent
0328ac1a7e
commit
716714364a
|
@ -1,6 +1,7 @@
|
|||
package backup
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
|
@ -14,7 +15,6 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/services/meta"
|
||||
"github.com/influxdb/influxdb/services/snapshotter"
|
||||
"github.com/influxdb/influxdb/tcp"
|
||||
)
|
||||
|
@ -228,15 +228,17 @@ func (cmd *Command) backupMetastore() error {
|
|||
}
|
||||
|
||||
return cmd.downloadAndVerify(req, metastoreArchivePath, func(file string) error {
|
||||
var data meta.Data
|
||||
binData, err := ioutil.ReadFile(file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if data.UnmarshalBinary(binData) != nil {
|
||||
|
||||
magic := btou64(binData[:8])
|
||||
if magic != snapshotter.BackupMagicHeader {
|
||||
cmd.Logger.Println("Invalid metadata blob, ensure the metadata service is running (default port 8088)")
|
||||
return errors.New("invalid metadata received")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
@ -363,3 +365,7 @@ func retentionAndShardFromPath(path string) (retention, shard string, err error)
|
|||
|
||||
return a[1], a[2], nil
|
||||
}
|
||||
|
||||
func btou64(b []byte) uint64 {
|
||||
return binary.BigEndian.Uint64(b)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package restore
|
|||
import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
|
@ -17,6 +18,7 @@ import (
|
|||
|
||||
"github.com/influxdb/influxdb/cmd/influxd/backup"
|
||||
"github.com/influxdb/influxdb/services/meta"
|
||||
"github.com/influxdb/influxdb/services/snapshotter"
|
||||
)
|
||||
|
||||
// Command represents the program execution for "influxd restore".
|
||||
|
@ -153,9 +155,30 @@ func (cmd *Command) unpackMeta() error {
|
|||
return fmt.Errorf("copy: %s", err)
|
||||
}
|
||||
|
||||
b := buf.Bytes()
|
||||
var i int
|
||||
|
||||
// Make sure the file is actually a meta store backup file
|
||||
magic := btou64(b[:8])
|
||||
if magic != snapshotter.BackupMagicHeader {
|
||||
return fmt.Errorf("invalid metadata file")
|
||||
}
|
||||
i += 8
|
||||
|
||||
// Size of the meta store bytes
|
||||
length := int(btou64(b[i : i+8]))
|
||||
i += 8
|
||||
metaBytes := b[i : i+length]
|
||||
i += int(length)
|
||||
|
||||
// Size of the node.json bytes
|
||||
length = int(btou64(b[i : i+8]))
|
||||
i += 8
|
||||
nodeBytes := b[i:]
|
||||
|
||||
// Unpack into metadata.
|
||||
var data meta.Data
|
||||
if err := data.UnmarshalBinary(buf.Bytes()); err != nil {
|
||||
if err := data.UnmarshalBinary(metaBytes); err != nil {
|
||||
return fmt.Errorf("unmarshal: %s", err)
|
||||
}
|
||||
|
||||
|
@ -164,6 +187,16 @@ func (cmd *Command) unpackMeta() error {
|
|||
c.JoinPeers = nil
|
||||
c.LoggingEnabled = false
|
||||
|
||||
// Create the meta dir
|
||||
if os.MkdirAll(c.Dir, 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write node.json back to meta dir
|
||||
if err := ioutil.WriteFile(filepath.Join(c.Dir, "node.json"), nodeBytes, 0655); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize meta store.
|
||||
store := meta.NewService(c)
|
||||
store.RaftListener = newNopListener()
|
||||
|
@ -366,3 +399,14 @@ func (ln *nopListener) Close() error {
|
|||
}
|
||||
|
||||
func (ln *nopListener) Addr() net.Addr { return &net.TCPAddr{} }
|
||||
|
||||
// u64tob converts a uint64 into an 8-byte slice.
|
||||
func u64tob(v uint64) []byte {
|
||||
b := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(b, v)
|
||||
return b
|
||||
}
|
||||
|
||||
func btou64(b []byte) uint64 {
|
||||
return binary.BigEndian.Uint64(b)
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -75,6 +76,15 @@ func TestServer_BackupAndRestore(t *testing.T) {
|
|||
t.Fatalf("error restoring: %s", err.Error())
|
||||
}
|
||||
|
||||
// Make sure node.json was restored
|
||||
nodePath := filepath.Join(config.Meta.Dir, "node.json")
|
||||
if _, err := os.Stat(nodePath); err != nil || os.IsNotExist(err) {
|
||||
t.Fatalf("node.json should exist")
|
||||
}
|
||||
// Need to remove the restored node.json because the metaservers it points to are to the ephemeral ports
|
||||
// of itself. Opening the server below will hang indefinitely if we try to use them.
|
||||
os.RemoveAll(nodePath)
|
||||
|
||||
// now open it up and verify we're good
|
||||
s := OpenServer(config, "")
|
||||
defer s.Close()
|
||||
|
|
|
@ -230,6 +230,7 @@ func (s *Server) appendSnapshotterService() {
|
|||
srv := snapshotter.NewService()
|
||||
srv.TSDBStore = s.TSDBStore
|
||||
srv.MetaClient = s.MetaClient
|
||||
srv.Node = s.Node
|
||||
s.Services = append(s.Services, srv)
|
||||
s.SnapshotterService = srv
|
||||
}
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
package snapshotter
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
|
@ -16,14 +18,22 @@ import (
|
|||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// MuxHeader is the header byte used for the TCP muxer.
|
||||
const MuxHeader = 3
|
||||
const (
|
||||
// MuxHeader is the header byte used for the TCP muxer.
|
||||
MuxHeader = 3
|
||||
|
||||
// BackupMagicHeader is the first 8 bytes used to identify and validate
|
||||
// a metastore backup file
|
||||
BackupMagicHeader = 0x59590101
|
||||
)
|
||||
|
||||
// Service manages the listener for the snapshot endpoint.
|
||||
type Service struct {
|
||||
wg sync.WaitGroup
|
||||
err chan error
|
||||
|
||||
Node *influxdb.Node
|
||||
|
||||
MetaClient interface {
|
||||
encoding.BinaryMarshaler
|
||||
Database(name string) (*meta.DatabaseInfo, error)
|
||||
|
@ -109,12 +119,7 @@ func (s *Service) handleConn(conn net.Conn) error {
|
|||
return err
|
||||
}
|
||||
case RequestMetastoreBackup:
|
||||
// Retrieve and serialize the current meta data.
|
||||
buf, err := s.MetaClient.MarshalBinary()
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal meta: %s", err)
|
||||
}
|
||||
if _, err := conn.Write(buf); err != nil {
|
||||
if err := s.writeMetaStore(conn); err != nil {
|
||||
return err
|
||||
}
|
||||
case RequestDatabaseInfo:
|
||||
|
@ -128,6 +133,41 @@ func (s *Service) handleConn(conn net.Conn) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) writeMetaStore(conn net.Conn) error {
|
||||
// Retrieve and serialize the current meta data.
|
||||
buf, err := s.MetaClient.MarshalBinary()
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal meta: %s", err)
|
||||
}
|
||||
|
||||
var b bytes.Buffer
|
||||
enc := json.NewEncoder(&b)
|
||||
if err := enc.Encode(s.Node); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := conn.Write(u64tob(BackupMagicHeader)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := conn.Write(u64tob(uint64(len(buf)))); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := conn.Write(buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := conn.Write(u64tob(uint64(b.Len()))); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := b.WriteTo(conn); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// writeDatabaseInfo will write the relative paths of all shards in the database on
|
||||
// this server into the connection
|
||||
func (s *Service) writeDatabaseInfo(conn net.Conn, database string) error {
|
||||
|
@ -247,3 +287,14 @@ type Request struct {
|
|||
type Response struct {
|
||||
Paths []string
|
||||
}
|
||||
|
||||
// u64tob converts a uint64 into an 8-byte slice.
|
||||
func u64tob(v uint64) []byte {
|
||||
b := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(b, v)
|
||||
return b
|
||||
}
|
||||
|
||||
func btou64(b []byte) uint64 {
|
||||
return binary.BigEndian.Uint64(b)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue