Merge pull request #9352 from wwilfinger/walter/influx-inspect-export-import

Fix imports of multiple databases in one export file
pull/9407/head
Jonathan A. Sternberg 2018-02-05 14:22:00 -06:00 committed by GitHub
commit 9aeae7ce82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 198 additions and 11 deletions

View File

@ -39,6 +39,7 @@
- [#9353](https://github.com/influxdata/influxdb/pull/9353): Fix panic in msgpack httpd WriteResponse error handler.
- [#9335](https://github.com/influxdata/influxdb/pull/9335): Prevent race condition caused by WaitGroup re-use
- [#9386](https://github.com/influxdata/influxdb/issues/9386): Fix stddev() call to report itself as always returning a float.
- [#9336](https://github.com/influxdata/influxdb/issues/9336): Fix `influx -import` importing to incorrect databases when export file had multiple databases. Thanks @wwilfinger!
## v1.4.3 [unreleased]

View File

@ -42,6 +42,7 @@ type Importer struct {
failedInserts int
totalCommands int
throttlePointsWritten int
startTime time.Time
lastWrite time.Time
throttle *time.Ticker
@ -167,7 +168,7 @@ func (i *Importer) processDDL(scanner *bufio.Reader) error {
}
func (i *Importer) processDML(scanner *bufio.Reader) error {
start := time.Now()
i.startTime = time.Now()
for {
line, err := scanner.ReadString(byte('\n'))
if err != nil && err != io.EOF {
@ -178,9 +179,11 @@ func (i *Importer) processDML(scanner *bufio.Reader) error {
return nil
}
if strings.HasPrefix(line, "# CONTEXT-DATABASE:") {
i.batchWrite()
i.database = strings.TrimSpace(strings.Split(line, ":")[1])
}
if strings.HasPrefix(line, "# CONTEXT-RETENTION-POLICY:") {
i.batchWrite()
i.retentionPolicy = strings.TrimSpace(strings.Split(line, ":")[1])
}
if strings.HasPrefix(line, "#") {
@ -190,7 +193,7 @@ func (i *Importer) processDML(scanner *bufio.Reader) error {
if strings.TrimSpace(line) == "" {
continue
}
i.batchAccumulator(line, start)
i.batchAccumulator(line)
}
}
@ -210,18 +213,10 @@ func (i *Importer) queryExecutor(command string) {
i.execute(command)
}
func (i *Importer) batchAccumulator(line string, start time.Time) {
func (i *Importer) batchAccumulator(line string) {
i.batch = append(i.batch, line)
if len(i.batch) == batchSize {
i.batchWrite()
i.batch = i.batch[:0]
// Give some status feedback every 100000 lines processed
processed := i.totalInserts + i.failedInserts
if processed%100000 == 0 {
since := time.Since(start)
pps := float64(processed) / since.Seconds()
i.stdoutLogger.Printf("Processed %d lines. Time elapsed: %s. Points per second (PPS): %d", processed, since.String(), int64(pps))
}
}
}
@ -259,6 +254,16 @@ func (i *Importer) batchWrite() {
} else {
i.totalInserts += len(i.batch)
}
// Give some status feedback every 100000 lines processed
processed := i.totalInserts + i.failedInserts
if processed%100000 == 0 {
since := time.Since(i.startTime)
pps := float64(processed) / since.Seconds()
i.stdoutLogger.Printf("Processed %d lines. Time elapsed: %s. Points per second (PPS): %d", processed, since.String(), int64(pps))
}
i.batch = i.batch[:0]
i.throttlePointsWritten = 0
i.lastWrite = time.Now()
}

181
tests/export_import_test.go Normal file
View File

@ -0,0 +1,181 @@
package tests
import (
"io/ioutil"
"net"
"os"
"path/filepath"
"testing"
"time"
influx "github.com/influxdata/influxdb/cmd/influx/cli"
"github.com/influxdata/influxdb/cmd/influx_inspect/export"
"strconv"
)
func TestServer_ExportAndImport(t *testing.T) {
t.Parallel()
config := NewConfig()
config.HTTPD.BindAddress = freePort()
config.Data.Engine = "tsm1"
config.Data.CacheSnapshotMemorySize = 1
s := OpenServer(config)
defer s.Close()
if _, ok := s.(*RemoteServer); ok {
t.Skip("Skipping. Cannot influx_inspect export on remote server")
}
exportDir, _ := ioutil.TempDir("", "backup")
defer os.RemoveAll(exportDir)
exportFile := filepath.Join(exportDir, "export")
func() {
tests := make([]Test, 2)
tests = append(tests, Test{
db: "db0",
rp: "rp0",
writes: Writes{
&Write{data: "myseries,host=A value=23 1000000"},
&Write{data: "myseries,host=B value=24 5000000"},
&Write{data: "myseries,host=C value=25 9000000"},
},
queries: []*Query{
&Query{
name: "Test data should from db0 be present",
command: `SELECT * FROM "db0"."rp0"."myseries"`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","A",23],["1970-01-01T00:00:00.005Z","B",24],["1970-01-01T00:00:00.009Z","C",25]]}]}]}`,
},
},
})
tests = append(tests, Test{
db: "db1",
rp: "rp1",
writes: Writes{
&Write{data: "myseries,host=D value=13 1000000"},
&Write{data: "myseries,host=E value=14 5000000"},
&Write{data: "myseries,host=F value=15 9000000"},
},
queries: []*Query{
&Query{
name: "Test data should from db1 be present",
command: `SELECT * FROM "db1"."rp1"."myseries"`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","D",13],["1970-01-01T00:00:00.005Z","E",14],["1970-01-01T00:00:00.009Z","F",15]]}]}]}`,
},
},
})
for _, test := range tests {
for _, query := range test.queries {
t.Run(query.name, func(t *testing.T) {
if !test.initialized {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Skipf("SKIP:: %s", query.name)
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
})
}
}
}()
// wait for the snapshot to write
// Don't think we necessarily need this since influx_inspect export will
// write from the WAL file as well.
time.Sleep(time.Second)
// Export the data we just wrote
exportCmd := export.NewCommand()
if err := exportCmd.Run("-out", exportFile, "-datadir", config.Data.Dir, "-waldir", config.Data.WALDir); err != nil {
t.Fatalf("error exporting: %s", err.Error())
}
// Drop the database to start anew
_, err := s.Query("DROP DATABASE db0")
if err != nil {
t.Fatalf("error dropping database: %s", err.Error())
}
// Import with influx -import
// Nasty code to get the httpd service listener port
host, port, err := net.SplitHostPort(config.HTTPD.BindAddress)
if err != nil {
t.Fatal(err)
}
portInt, err := strconv.Atoi(port)
if err != nil {
t.Fatal(err)
}
importCmd := influx.New("unknown")
importCmd.Host = host
importCmd.Port = portInt
importCmd.Import = true
importCmd.ClientConfig.Precision = "ns"
importCmd.ImporterConfig.Path = exportFile
if err := importCmd.Run(); err != nil {
t.Fatalf("error importing: %s", err.Error())
}
func() {
tests := make([]Test, 2)
tests = append(tests, Test{
db: "db0",
rp: "rp0",
queries: []*Query{
&Query{
name: "Test data from db0 should have been imported",
command: `SELECT * FROM "db0"."rp0"."myseries"`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","A",23],["1970-01-01T00:00:00.005Z","B",24],["1970-01-01T00:00:00.009Z","C",25]]}]}]}`,
},
},
})
tests = append(tests, Test{
db: "db1",
rp: "rp1",
queries: []*Query{
&Query{
name: "Test data from db1 should have been imported",
command: `SELECT * FROM "db1"."rp1"."myseries"`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","D",13],["1970-01-01T00:00:00.005Z","E",14],["1970-01-01T00:00:00.009Z","F",15]]}]}]}`,
},
},
})
for _, test := range tests {
for _, query := range test.queries {
t.Run(query.name, func(t *testing.T) {
if !test.initialized {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Skipf("SKIP:: %s", query.name)
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
})
}
}
}()
}