From ca7cc021eeccf08f65f32413bdb5c8708203162f Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Fri, 9 Feb 2018 09:53:19 -0600 Subject: [PATCH 1/2] Revert "Merge pull request #9352 from wwilfinger/walter/influx-inspect-export-import" This reverts commit 9aeae7ce82034004e3558bab8397f776498a983e, reversing changes made to 35b44cc2f0ac78c26a852ea1f0a5992a2f440a76. The contributor was unable to sign the contributor license agreement so we have to revert this commit. --- CHANGELOG.md | 1 - importer/v8/importer.go | 27 +++--- tests/export_import_test.go | 181 ------------------------------------ 3 files changed, 11 insertions(+), 198 deletions(-) delete mode 100644 tests/export_import_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index de7d3a191e..b0e682247a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,7 +40,6 @@ v1.5.0 [unreleased] - [#9353](https://github.com/influxdata/influxdb/pull/9353): Fix panic in msgpack httpd WriteResponse error handler. - [#9335](https://github.com/influxdata/influxdb/pull/9335): Prevent race condition caused by WaitGroup re-use - [#9386](https://github.com/influxdata/influxdb/issues/9386): Fix stddev() call to report itself as always returning a float. -- [#9336](https://github.com/influxdata/influxdb/issues/9336): Fix `influx -import` importing to incorrect databases when export file had multiple databases. Thanks @wwilfinger! - [#9401](https://github.com/influxdata/influxdb/pull/9401): Fix windows history file location. v1.4.3 [unreleased] diff --git a/importer/v8/importer.go b/importer/v8/importer.go index ad1f024f50..c9e1439de3 100644 --- a/importer/v8/importer.go +++ b/importer/v8/importer.go @@ -42,7 +42,6 @@ type Importer struct { failedInserts int totalCommands int throttlePointsWritten int - startTime time.Time lastWrite time.Time throttle *time.Ticker @@ -168,7 +167,7 @@ func (i *Importer) processDDL(scanner *bufio.Reader) error { } func (i *Importer) processDML(scanner *bufio.Reader) error { - i.startTime = time.Now() + start := time.Now() for { line, err := scanner.ReadString(byte('\n')) if err != nil && err != io.EOF { @@ -179,11 +178,9 @@ func (i *Importer) processDML(scanner *bufio.Reader) error { return nil } if strings.HasPrefix(line, "# CONTEXT-DATABASE:") { - i.batchWrite() i.database = strings.TrimSpace(strings.Split(line, ":")[1]) } if strings.HasPrefix(line, "# CONTEXT-RETENTION-POLICY:") { - i.batchWrite() i.retentionPolicy = strings.TrimSpace(strings.Split(line, ":")[1]) } if strings.HasPrefix(line, "#") { @@ -193,7 +190,7 @@ func (i *Importer) processDML(scanner *bufio.Reader) error { if strings.TrimSpace(line) == "" { continue } - i.batchAccumulator(line) + i.batchAccumulator(line, start) } } @@ -213,10 +210,18 @@ func (i *Importer) queryExecutor(command string) { i.execute(command) } -func (i *Importer) batchAccumulator(line string) { +func (i *Importer) batchAccumulator(line string, start time.Time) { i.batch = append(i.batch, line) if len(i.batch) == batchSize { i.batchWrite() + i.batch = i.batch[:0] + // Give some status feedback every 100000 lines processed + processed := i.totalInserts + i.failedInserts + if processed%100000 == 0 { + since := time.Since(start) + pps := float64(processed) / since.Seconds() + i.stdoutLogger.Printf("Processed %d lines. Time elapsed: %s. Points per second (PPS): %d", processed, since.String(), int64(pps)) + } } } @@ -254,16 +259,6 @@ func (i *Importer) batchWrite() { } else { i.totalInserts += len(i.batch) } - - // Give some status feedback every 100000 lines processed - processed := i.totalInserts + i.failedInserts - if processed%100000 == 0 { - since := time.Since(i.startTime) - pps := float64(processed) / since.Seconds() - i.stdoutLogger.Printf("Processed %d lines. Time elapsed: %s. Points per second (PPS): %d", processed, since.String(), int64(pps)) - } - - i.batch = i.batch[:0] i.throttlePointsWritten = 0 i.lastWrite = time.Now() } diff --git a/tests/export_import_test.go b/tests/export_import_test.go deleted file mode 100644 index 6f91a27d90..0000000000 --- a/tests/export_import_test.go +++ /dev/null @@ -1,181 +0,0 @@ -package tests - -import ( - "io/ioutil" - "net" - "os" - "path/filepath" - "testing" - "time" - - influx "github.com/influxdata/influxdb/cmd/influx/cli" - "github.com/influxdata/influxdb/cmd/influx_inspect/export" - "strconv" -) - -func TestServer_ExportAndImport(t *testing.T) { - t.Parallel() - - config := NewConfig() - - config.HTTPD.BindAddress = freePort() - - config.Data.Engine = "tsm1" - config.Data.CacheSnapshotMemorySize = 1 - - s := OpenServer(config) - defer s.Close() - - if _, ok := s.(*RemoteServer); ok { - t.Skip("Skipping. Cannot influx_inspect export on remote server") - } - - exportDir, _ := ioutil.TempDir("", "backup") - defer os.RemoveAll(exportDir) - exportFile := filepath.Join(exportDir, "export") - - func() { - tests := make([]Test, 2) - tests = append(tests, Test{ - db: "db0", - rp: "rp0", - writes: Writes{ - &Write{data: "myseries,host=A value=23 1000000"}, - &Write{data: "myseries,host=B value=24 5000000"}, - &Write{data: "myseries,host=C value=25 9000000"}, - }, - queries: []*Query{ - &Query{ - name: "Test data should from db0 be present", - command: `SELECT * FROM "db0"."rp0"."myseries"`, - exp: `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","A",23],["1970-01-01T00:00:00.005Z","B",24],["1970-01-01T00:00:00.009Z","C",25]]}]}]}`, - }, - }, - }) - - tests = append(tests, Test{ - db: "db1", - rp: "rp1", - writes: Writes{ - &Write{data: "myseries,host=D value=13 1000000"}, - &Write{data: "myseries,host=E value=14 5000000"}, - &Write{data: "myseries,host=F value=15 9000000"}, - }, - queries: []*Query{ - &Query{ - name: "Test data should from db1 be present", - command: `SELECT * FROM "db1"."rp1"."myseries"`, - exp: `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","D",13],["1970-01-01T00:00:00.005Z","E",14],["1970-01-01T00:00:00.009Z","F",15]]}]}]}`, - }, - }, - }) - - for _, test := range tests { - for _, query := range test.queries { - t.Run(query.name, func(t *testing.T) { - if !test.initialized { - if err := test.init(s); err != nil { - t.Fatalf("test init failed: %s", err) - } - } - if query.skip { - t.Skipf("SKIP:: %s", query.name) - } - if err := query.Execute(s); err != nil { - t.Error(query.Error(err)) - } else if !query.success() { - t.Error(query.failureMessage()) - } - }) - } - } - }() - - // wait for the snapshot to write - // Don't think we necessarily need this since influx_inspect export will - // write from the WAL file as well. - time.Sleep(time.Second) - - // Export the data we just wrote - exportCmd := export.NewCommand() - if err := exportCmd.Run("-out", exportFile, "-datadir", config.Data.Dir, "-waldir", config.Data.WALDir); err != nil { - t.Fatalf("error exporting: %s", err.Error()) - } - - // Drop the database to start anew - _, err := s.Query("DROP DATABASE db0") - if err != nil { - t.Fatalf("error dropping database: %s", err.Error()) - } - - // Import with influx -import - - // Nasty code to get the httpd service listener port - host, port, err := net.SplitHostPort(config.HTTPD.BindAddress) - if err != nil { - t.Fatal(err) - } - portInt, err := strconv.Atoi(port) - if err != nil { - t.Fatal(err) - } - - importCmd := influx.New("unknown") - - importCmd.Host = host - importCmd.Port = portInt - importCmd.Import = true - importCmd.ClientConfig.Precision = "ns" - importCmd.ImporterConfig.Path = exportFile - - if err := importCmd.Run(); err != nil { - t.Fatalf("error importing: %s", err.Error()) - } - - func() { - tests := make([]Test, 2) - tests = append(tests, Test{ - db: "db0", - rp: "rp0", - queries: []*Query{ - &Query{ - name: "Test data from db0 should have been imported", - command: `SELECT * FROM "db0"."rp0"."myseries"`, - exp: `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","A",23],["1970-01-01T00:00:00.005Z","B",24],["1970-01-01T00:00:00.009Z","C",25]]}]}]}`, - }, - }, - }) - - tests = append(tests, Test{ - db: "db1", - rp: "rp1", - queries: []*Query{ - &Query{ - name: "Test data from db1 should have been imported", - command: `SELECT * FROM "db1"."rp1"."myseries"`, - exp: `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","D",13],["1970-01-01T00:00:00.005Z","E",14],["1970-01-01T00:00:00.009Z","F",15]]}]}]}`, - }, - }, - }) - - for _, test := range tests { - for _, query := range test.queries { - t.Run(query.name, func(t *testing.T) { - if !test.initialized { - if err := test.init(s); err != nil { - t.Fatalf("test init failed: %s", err) - } - } - if query.skip { - t.Skipf("SKIP:: %s", query.name) - } - if err := query.Execute(s); err != nil { - t.Error(query.Error(err)) - } else if !query.success() { - t.Error(query.failureMessage()) - } - }) - } - } - }() -} From c58ca8d0ea48b34d31eb235a600f4e0615e87f68 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Fri, 9 Feb 2018 09:58:20 -0600 Subject: [PATCH 2/2] Fix imports of multiple databases in a single import file from `influx -import` If multiple databases were specified, then the earlier writes would be written to the later database and/or retention policy because points were only written out after the batch size was reached. This forces the batcher to flush whenever the database context switches. --- importer/v8/importer.go | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/importer/v8/importer.go b/importer/v8/importer.go index c9e1439de3..bba7576ad2 100644 --- a/importer/v8/importer.go +++ b/importer/v8/importer.go @@ -42,6 +42,7 @@ type Importer struct { failedInserts int totalCommands int throttlePointsWritten int + startTime time.Time lastWrite time.Time throttle *time.Ticker @@ -167,7 +168,7 @@ func (i *Importer) processDDL(scanner *bufio.Reader) error { } func (i *Importer) processDML(scanner *bufio.Reader) error { - start := time.Now() + i.startTime = time.Now() for { line, err := scanner.ReadString(byte('\n')) if err != nil && err != io.EOF { @@ -178,9 +179,11 @@ func (i *Importer) processDML(scanner *bufio.Reader) error { return nil } if strings.HasPrefix(line, "# CONTEXT-DATABASE:") { + i.batchWrite() i.database = strings.TrimSpace(strings.Split(line, ":")[1]) } if strings.HasPrefix(line, "# CONTEXT-RETENTION-POLICY:") { + i.batchWrite() i.retentionPolicy = strings.TrimSpace(strings.Split(line, ":")[1]) } if strings.HasPrefix(line, "#") { @@ -190,7 +193,7 @@ func (i *Importer) processDML(scanner *bufio.Reader) error { if strings.TrimSpace(line) == "" { continue } - i.batchAccumulator(line, start) + i.batchAccumulator(line) } } @@ -210,22 +213,19 @@ func (i *Importer) queryExecutor(command string) { i.execute(command) } -func (i *Importer) batchAccumulator(line string, start time.Time) { +func (i *Importer) batchAccumulator(line string) { i.batch = append(i.batch, line) if len(i.batch) == batchSize { i.batchWrite() - i.batch = i.batch[:0] - // Give some status feedback every 100000 lines processed - processed := i.totalInserts + i.failedInserts - if processed%100000 == 0 { - since := time.Since(start) - pps := float64(processed) / since.Seconds() - i.stdoutLogger.Printf("Processed %d lines. Time elapsed: %s. Points per second (PPS): %d", processed, since.String(), int64(pps)) - } } } func (i *Importer) batchWrite() { + // Exit early if there are no points in the batch. + if len(i.batch) == 0 { + return + } + // Accumulate the batch size to see how many points we have written this second i.throttlePointsWritten += len(i.batch) @@ -261,4 +261,14 @@ func (i *Importer) batchWrite() { } i.throttlePointsWritten = 0 i.lastWrite = time.Now() + + // Clear the batch and record the number of processed points. + i.batch = i.batch[:0] + // Give some status feedback every 100000 lines processed + processed := i.totalInserts + i.failedInserts + if processed%100000 == 0 { + since := time.Since(i.startTime) + pps := float64(processed) / since.Seconds() + i.stdoutLogger.Printf("Processed %d lines. Time elapsed: %s. Points per second (PPS): %d", processed, since.String(), int64(pps)) + } }