Update backup/restore to use MetaClient
parent
2e8cfce7be
commit
8456169855
|
@ -281,7 +281,6 @@ func (cmd *Command) downloadAndVerify(req *snapshotter.Request, path string, val
|
|||
|
||||
// download downloads a snapshot of either the metastore or a shard from a host to a given path.
|
||||
func (cmd *Command) download(req *snapshotter.Request, path string) error {
|
||||
// FIXME This needs to use the meta client now to download the snapshot
|
||||
// Create local file to write to.
|
||||
f, err := os.Create(path)
|
||||
if err != nil {
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
|
||||
"github.com/influxdb/influxdb/cmd/influxd/backup"
|
||||
"github.com/influxdb/influxdb/cmd/influxd/help"
|
||||
"github.com/influxdb/influxdb/cmd/influxd/restore"
|
||||
"github.com/influxdb/influxdb/cmd/influxd/run"
|
||||
)
|
||||
|
||||
|
@ -118,6 +119,11 @@ func (m *Main) Run(args ...string) error {
|
|||
if err := name.Run(args...); err != nil {
|
||||
return fmt.Errorf("backup: %s", err)
|
||||
}
|
||||
case "restore":
|
||||
name := restore.NewCommand()
|
||||
if err := name.Run(args...); err != nil {
|
||||
return fmt.Errorf("restore: %s", err)
|
||||
}
|
||||
case "config":
|
||||
if err := run.NewPrintConfigCommand().Run(args...); err != nil {
|
||||
return fmt.Errorf("config: %s", err)
|
||||
|
|
|
@ -7,6 +7,8 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -160,6 +162,7 @@ func (cmd *Command) unpackMeta() error {
|
|||
// Copy meta config and remove peers so it starts in single mode.
|
||||
c := cmd.MetaConfig
|
||||
c.JoinPeers = nil
|
||||
c.LoggingEnabled = false
|
||||
|
||||
// Initialize meta store.
|
||||
store := meta.NewService(c)
|
||||
|
@ -175,15 +178,20 @@ func (cmd *Command) unpackMeta() error {
|
|||
select {
|
||||
case err := <-store.Err():
|
||||
return err
|
||||
default:
|
||||
}
|
||||
|
||||
client := meta.NewClient([]string{store.HTTPAddr()}, false)
|
||||
client.SetLogger(log.New(ioutil.Discard, "", 0))
|
||||
if err := client.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
// Force set the full metadata.
|
||||
if err := store.SetData(&data); err != nil {
|
||||
if err := client.SetData(&data); err != nil {
|
||||
return fmt.Errorf("set data: %s", err)
|
||||
}
|
||||
|
||||
fmt.Fprintln(cmd.Stdout, "Metastore restore successful")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -357,4 +365,4 @@ func (ln *nopListener) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ln *nopListener) Addr() net.Addr { return nil }
|
||||
func (ln *nopListener) Addr() net.Addr { return &net.TCPAddr{} }
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
)
|
||||
|
||||
func TestServer_BackupAndRestore(t *testing.T) {
|
||||
t.Skip("The backup package needs to use the meta client now to download the snapshot")
|
||||
config := NewConfig()
|
||||
config.Data.Engine = "tsm1"
|
||||
config.Data.Dir, _ = ioutil.TempDir("", "data_backup")
|
||||
|
|
|
@ -880,6 +880,14 @@ func (c *Client) DropSubscription(database, rp, name string) error {
|
|||
)
|
||||
}
|
||||
|
||||
func (c *Client) SetData(data *Data) error {
|
||||
return c.retryUntilExec(internal.Command_SetDataCommand, internal.E_SetDataCommand_Command,
|
||||
&internal.SetDataCommand{
|
||||
Data: data.marshal(),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func (c *Client) ExecuteStatement(stmt influxql.Statement) *influxql.Result {
|
||||
return c.executor.ExecuteStatement(stmt)
|
||||
}
|
||||
|
@ -893,7 +901,15 @@ func (c *Client) WaitForDataChanged() chan struct{} {
|
|||
}
|
||||
|
||||
func (c *Client) MarshalBinary() ([]byte, error) {
|
||||
return nil, nil
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
return c.cacheData.MarshalBinary()
|
||||
}
|
||||
|
||||
func (c *Client) SetLogger(l *log.Logger) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.logger = l
|
||||
}
|
||||
|
||||
func (c *Client) index() uint64 {
|
||||
|
|
|
@ -3,6 +3,7 @@ package meta
|
|||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
|
@ -39,8 +40,13 @@ func NewService(c *Config) *Service {
|
|||
https: c.HTTPSEnabled,
|
||||
cert: c.HTTPSCertificate,
|
||||
err: make(chan error),
|
||||
Logger: log.New(os.Stderr, "[meta] ", log.LstdFlags),
|
||||
}
|
||||
if c.LoggingEnabled {
|
||||
s.Logger = log.New(os.Stderr, "[meta] ", log.LstdFlags)
|
||||
} else {
|
||||
s.Logger = log.New(ioutil.Discard, "", 0)
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue