diff --git a/CHANGELOG.md b/CHANGELOG.md index 44a8cb8477..4b086702d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,7 +40,6 @@ v1.5.0 [unreleased] - [#9353](https://github.com/influxdata/influxdb/pull/9353): Fix panic in msgpack httpd WriteResponse error handler. - [#9335](https://github.com/influxdata/influxdb/pull/9335): Prevent race condition caused by WaitGroup re-use - [#9386](https://github.com/influxdata/influxdb/issues/9386): Fix stddev() call to report itself as always returning a float. -- [#9336](https://github.com/influxdata/influxdb/issues/9336): Fix `influx -import` importing to incorrect databases when export file had multiple databases. Thanks @wwilfinger! - [#9401](https://github.com/influxdata/influxdb/pull/9401): Fix windows history file location. - [#9403](https://github.com/influxdata/influxdb/pull/9403): Do not explicitly specify ports 80 or 443 when they are the default port. - [#8878](https://github.com/influxdata/influxdb/pull/8878): Do not report an error when dropping a CQ on a non-existent DB/RP. diff --git a/importer/v8/importer.go b/importer/v8/importer.go index ad1f024f50..bba7576ad2 100644 --- a/importer/v8/importer.go +++ b/importer/v8/importer.go @@ -221,6 +221,11 @@ func (i *Importer) batchAccumulator(line string) { } func (i *Importer) batchWrite() { + // Exit early if there are no points in the batch. + if len(i.batch) == 0 { + return + } + // Accumulate the batch size to see how many points we have written this second i.throttlePointsWritten += len(i.batch) @@ -254,7 +259,11 @@ func (i *Importer) batchWrite() { } else { i.totalInserts += len(i.batch) } + i.throttlePointsWritten = 0 + i.lastWrite = time.Now() + // Clear the batch and record the number of processed points. + i.batch = i.batch[:0] // Give some status feedback every 100000 lines processed processed := i.totalInserts + i.failedInserts if processed%100000 == 0 { @@ -262,8 +271,4 @@ func (i *Importer) batchWrite() { pps := float64(processed) / since.Seconds() i.stdoutLogger.Printf("Processed %d lines. Time elapsed: %s. Points per second (PPS): %d", processed, since.String(), int64(pps)) } - - i.batch = i.batch[:0] - i.throttlePointsWritten = 0 - i.lastWrite = time.Now() } diff --git a/tests/export_import_test.go b/tests/export_import_test.go deleted file mode 100644 index 6f91a27d90..0000000000 --- a/tests/export_import_test.go +++ /dev/null @@ -1,181 +0,0 @@ -package tests - -import ( - "io/ioutil" - "net" - "os" - "path/filepath" - "testing" - "time" - - influx "github.com/influxdata/influxdb/cmd/influx/cli" - "github.com/influxdata/influxdb/cmd/influx_inspect/export" - "strconv" -) - -func TestServer_ExportAndImport(t *testing.T) { - t.Parallel() - - config := NewConfig() - - config.HTTPD.BindAddress = freePort() - - config.Data.Engine = "tsm1" - config.Data.CacheSnapshotMemorySize = 1 - - s := OpenServer(config) - defer s.Close() - - if _, ok := s.(*RemoteServer); ok { - t.Skip("Skipping. Cannot influx_inspect export on remote server") - } - - exportDir, _ := ioutil.TempDir("", "backup") - defer os.RemoveAll(exportDir) - exportFile := filepath.Join(exportDir, "export") - - func() { - tests := make([]Test, 2) - tests = append(tests, Test{ - db: "db0", - rp: "rp0", - writes: Writes{ - &Write{data: "myseries,host=A value=23 1000000"}, - &Write{data: "myseries,host=B value=24 5000000"}, - &Write{data: "myseries,host=C value=25 9000000"}, - }, - queries: []*Query{ - &Query{ - name: "Test data should from db0 be present", - command: `SELECT * FROM "db0"."rp0"."myseries"`, - exp: `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","A",23],["1970-01-01T00:00:00.005Z","B",24],["1970-01-01T00:00:00.009Z","C",25]]}]}]}`, - }, - }, - }) - - tests = append(tests, Test{ - db: "db1", - rp: "rp1", - writes: Writes{ - &Write{data: "myseries,host=D value=13 1000000"}, - &Write{data: "myseries,host=E value=14 5000000"}, - &Write{data: "myseries,host=F value=15 9000000"}, - }, - queries: []*Query{ - &Query{ - name: "Test data should from db1 be present", - command: `SELECT * FROM "db1"."rp1"."myseries"`, - exp: `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","D",13],["1970-01-01T00:00:00.005Z","E",14],["1970-01-01T00:00:00.009Z","F",15]]}]}]}`, - }, - }, - }) - - for _, test := range tests { - for _, query := range test.queries { - t.Run(query.name, func(t *testing.T) { - if !test.initialized { - if err := test.init(s); err != nil { - t.Fatalf("test init failed: %s", err) - } - } - if query.skip { - t.Skipf("SKIP:: %s", query.name) - } - if err := query.Execute(s); err != nil { - t.Error(query.Error(err)) - } else if !query.success() { - t.Error(query.failureMessage()) - } - }) - } - } - }() - - // wait for the snapshot to write - // Don't think we necessarily need this since influx_inspect export will - // write from the WAL file as well. - time.Sleep(time.Second) - - // Export the data we just wrote - exportCmd := export.NewCommand() - if err := exportCmd.Run("-out", exportFile, "-datadir", config.Data.Dir, "-waldir", config.Data.WALDir); err != nil { - t.Fatalf("error exporting: %s", err.Error()) - } - - // Drop the database to start anew - _, err := s.Query("DROP DATABASE db0") - if err != nil { - t.Fatalf("error dropping database: %s", err.Error()) - } - - // Import with influx -import - - // Nasty code to get the httpd service listener port - host, port, err := net.SplitHostPort(config.HTTPD.BindAddress) - if err != nil { - t.Fatal(err) - } - portInt, err := strconv.Atoi(port) - if err != nil { - t.Fatal(err) - } - - importCmd := influx.New("unknown") - - importCmd.Host = host - importCmd.Port = portInt - importCmd.Import = true - importCmd.ClientConfig.Precision = "ns" - importCmd.ImporterConfig.Path = exportFile - - if err := importCmd.Run(); err != nil { - t.Fatalf("error importing: %s", err.Error()) - } - - func() { - tests := make([]Test, 2) - tests = append(tests, Test{ - db: "db0", - rp: "rp0", - queries: []*Query{ - &Query{ - name: "Test data from db0 should have been imported", - command: `SELECT * FROM "db0"."rp0"."myseries"`, - exp: `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","A",23],["1970-01-01T00:00:00.005Z","B",24],["1970-01-01T00:00:00.009Z","C",25]]}]}]}`, - }, - }, - }) - - tests = append(tests, Test{ - db: "db1", - rp: "rp1", - queries: []*Query{ - &Query{ - name: "Test data from db1 should have been imported", - command: `SELECT * FROM "db1"."rp1"."myseries"`, - exp: `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","D",13],["1970-01-01T00:00:00.005Z","E",14],["1970-01-01T00:00:00.009Z","F",15]]}]}]}`, - }, - }, - }) - - for _, test := range tests { - for _, query := range test.queries { - t.Run(query.name, func(t *testing.T) { - if !test.initialized { - if err := test.init(s); err != nil { - t.Fatalf("test init failed: %s", err) - } - } - if query.skip { - t.Skipf("SKIP:: %s", query.name) - } - if err := query.Execute(s); err != nil { - t.Error(query.Error(err)) - } else if !query.success() { - t.Error(query.failureMessage()) - } - }) - } - } - }() -}