diff --git a/CHANGELOG.md b/CHANGELOG.md index 0436ac30df..c901d8d8ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ - [#9353](https://github.com/influxdata/influxdb/pull/9353): Fix panic in msgpack httpd WriteResponse error handler. - [#9335](https://github.com/influxdata/influxdb/pull/9335): Prevent race condition caused by WaitGroup re-use - [#9386](https://github.com/influxdata/influxdb/issues/9386): Fix stddev() call to report itself as always returning a float. +- [#9336](https://github.com/influxdata/influxdb/issues/9336): Fix `influx -import` importing to incorrect databases when export file had multiple databases. Thanks @wwilfinger! ## v1.4.3 [unreleased] diff --git a/importer/v8/importer.go b/importer/v8/importer.go index c9e1439de3..ad1f024f50 100644 --- a/importer/v8/importer.go +++ b/importer/v8/importer.go @@ -42,6 +42,7 @@ type Importer struct { failedInserts int totalCommands int throttlePointsWritten int + startTime time.Time lastWrite time.Time throttle *time.Ticker @@ -167,7 +168,7 @@ func (i *Importer) processDDL(scanner *bufio.Reader) error { } func (i *Importer) processDML(scanner *bufio.Reader) error { - start := time.Now() + i.startTime = time.Now() for { line, err := scanner.ReadString(byte('\n')) if err != nil && err != io.EOF { @@ -178,9 +179,11 @@ func (i *Importer) processDML(scanner *bufio.Reader) error { return nil } if strings.HasPrefix(line, "# CONTEXT-DATABASE:") { + i.batchWrite() i.database = strings.TrimSpace(strings.Split(line, ":")[1]) } if strings.HasPrefix(line, "# CONTEXT-RETENTION-POLICY:") { + i.batchWrite() i.retentionPolicy = strings.TrimSpace(strings.Split(line, ":")[1]) } if strings.HasPrefix(line, "#") { @@ -190,7 +193,7 @@ func (i *Importer) processDML(scanner *bufio.Reader) error { if strings.TrimSpace(line) == "" { continue } - i.batchAccumulator(line, start) + i.batchAccumulator(line) } } @@ -210,18 +213,10 @@ func (i *Importer) queryExecutor(command string) { i.execute(command) } -func (i *Importer) batchAccumulator(line string, start time.Time) { +func (i *Importer) batchAccumulator(line string) { i.batch = append(i.batch, line) if len(i.batch) == batchSize { i.batchWrite() - i.batch = i.batch[:0] - // Give some status feedback every 100000 lines processed - processed := i.totalInserts + i.failedInserts - if processed%100000 == 0 { - since := time.Since(start) - pps := float64(processed) / since.Seconds() - i.stdoutLogger.Printf("Processed %d lines. Time elapsed: %s. Points per second (PPS): %d", processed, since.String(), int64(pps)) - } } } @@ -259,6 +254,16 @@ func (i *Importer) batchWrite() { } else { i.totalInserts += len(i.batch) } + + // Give some status feedback every 100000 lines processed + processed := i.totalInserts + i.failedInserts + if processed%100000 == 0 { + since := time.Since(i.startTime) + pps := float64(processed) / since.Seconds() + i.stdoutLogger.Printf("Processed %d lines. Time elapsed: %s. Points per second (PPS): %d", processed, since.String(), int64(pps)) + } + + i.batch = i.batch[:0] i.throttlePointsWritten = 0 i.lastWrite = time.Now() } diff --git a/tests/export_import_test.go b/tests/export_import_test.go new file mode 100644 index 0000000000..6f91a27d90 --- /dev/null +++ b/tests/export_import_test.go @@ -0,0 +1,181 @@ +package tests + +import ( + "io/ioutil" + "net" + "os" + "path/filepath" + "testing" + "time" + + influx "github.com/influxdata/influxdb/cmd/influx/cli" + "github.com/influxdata/influxdb/cmd/influx_inspect/export" + "strconv" +) + +func TestServer_ExportAndImport(t *testing.T) { + t.Parallel() + + config := NewConfig() + + config.HTTPD.BindAddress = freePort() + + config.Data.Engine = "tsm1" + config.Data.CacheSnapshotMemorySize = 1 + + s := OpenServer(config) + defer s.Close() + + if _, ok := s.(*RemoteServer); ok { + t.Skip("Skipping. Cannot influx_inspect export on remote server") + } + + exportDir, _ := ioutil.TempDir("", "backup") + defer os.RemoveAll(exportDir) + exportFile := filepath.Join(exportDir, "export") + + func() { + tests := make([]Test, 2) + tests = append(tests, Test{ + db: "db0", + rp: "rp0", + writes: Writes{ + &Write{data: "myseries,host=A value=23 1000000"}, + &Write{data: "myseries,host=B value=24 5000000"}, + &Write{data: "myseries,host=C value=25 9000000"}, + }, + queries: []*Query{ + &Query{ + name: "Test data should from db0 be present", + command: `SELECT * FROM "db0"."rp0"."myseries"`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","A",23],["1970-01-01T00:00:00.005Z","B",24],["1970-01-01T00:00:00.009Z","C",25]]}]}]}`, + }, + }, + }) + + tests = append(tests, Test{ + db: "db1", + rp: "rp1", + writes: Writes{ + &Write{data: "myseries,host=D value=13 1000000"}, + &Write{data: "myseries,host=E value=14 5000000"}, + &Write{data: "myseries,host=F value=15 9000000"}, + }, + queries: []*Query{ + &Query{ + name: "Test data should from db1 be present", + command: `SELECT * FROM "db1"."rp1"."myseries"`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","D",13],["1970-01-01T00:00:00.005Z","E",14],["1970-01-01T00:00:00.009Z","F",15]]}]}]}`, + }, + }, + }) + + for _, test := range tests { + for _, query := range test.queries { + t.Run(query.name, func(t *testing.T) { + if !test.initialized { + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + } + if query.skip { + t.Skipf("SKIP:: %s", query.name) + } + if err := query.Execute(s); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + }) + } + } + }() + + // wait for the snapshot to write + // Don't think we necessarily need this since influx_inspect export will + // write from the WAL file as well. + time.Sleep(time.Second) + + // Export the data we just wrote + exportCmd := export.NewCommand() + if err := exportCmd.Run("-out", exportFile, "-datadir", config.Data.Dir, "-waldir", config.Data.WALDir); err != nil { + t.Fatalf("error exporting: %s", err.Error()) + } + + // Drop the database to start anew + _, err := s.Query("DROP DATABASE db0") + if err != nil { + t.Fatalf("error dropping database: %s", err.Error()) + } + + // Import with influx -import + + // Nasty code to get the httpd service listener port + host, port, err := net.SplitHostPort(config.HTTPD.BindAddress) + if err != nil { + t.Fatal(err) + } + portInt, err := strconv.Atoi(port) + if err != nil { + t.Fatal(err) + } + + importCmd := influx.New("unknown") + + importCmd.Host = host + importCmd.Port = portInt + importCmd.Import = true + importCmd.ClientConfig.Precision = "ns" + importCmd.ImporterConfig.Path = exportFile + + if err := importCmd.Run(); err != nil { + t.Fatalf("error importing: %s", err.Error()) + } + + func() { + tests := make([]Test, 2) + tests = append(tests, Test{ + db: "db0", + rp: "rp0", + queries: []*Query{ + &Query{ + name: "Test data from db0 should have been imported", + command: `SELECT * FROM "db0"."rp0"."myseries"`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","A",23],["1970-01-01T00:00:00.005Z","B",24],["1970-01-01T00:00:00.009Z","C",25]]}]}]}`, + }, + }, + }) + + tests = append(tests, Test{ + db: "db1", + rp: "rp1", + queries: []*Query{ + &Query{ + name: "Test data from db1 should have been imported", + command: `SELECT * FROM "db1"."rp1"."myseries"`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","D",13],["1970-01-01T00:00:00.005Z","E",14],["1970-01-01T00:00:00.009Z","F",15]]}]}]}`, + }, + }, + }) + + for _, test := range tests { + for _, query := range test.queries { + t.Run(query.name, func(t *testing.T) { + if !test.initialized { + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + } + if query.skip { + t.Skipf("SKIP:: %s", query.name) + } + if err := query.Execute(s); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + }) + } + } + }() +}