Ensure importer emits errors to stderr (#8833)

pull/8863/head
Edd Robinson 2017-09-22 17:17:20 +01:00 committed by GitHub
parent e0caa2e0fd
commit ccb6f7451f
2 changed files with 16 additions and 11 deletions

View File

@ -29,6 +29,7 @@
- [#8851](https://github.com/influxdata/influxdb/pull/8851): Improve performance of `Include` and `Exclude` functions
- [#8854](https://github.com/influxdata/influxdb/pull/8854): Report the task status for a query.
- [#8853](https://github.com/influxdata/influxdb/pull/8853): Reduce allocations, improve `readEntries` performance by simplifying loop
- [#8830](https://github.com/influxdata/influxdb/issues/8830): Separate importer log statements to stdout and stderr.
### Bugfixes

View File

@ -44,14 +44,19 @@ type Importer struct {
throttlePointsWritten int
lastWrite time.Time
throttle *time.Ticker
stderrLogger *log.Logger
stdoutLogger *log.Logger
}
// NewImporter will return an intialized Importer struct
func NewImporter(config Config) *Importer {
config.UserAgent = fmt.Sprintf("influxDB importer/%s", config.Version)
return &Importer{
config: config,
batch: make([]string, 0, batchSize),
config: config,
batch: make([]string, 0, batchSize),
stdoutLogger: log.New(os.Stdout, "", log.LstdFlags),
stderrLogger: log.New(os.Stderr, "", log.LstdFlags),
}
}
@ -74,9 +79,9 @@ func (i *Importer) Import() error {
defer func() {
if i.totalInserts > 0 {
log.Printf("Processed %d commands\n", i.totalCommands)
log.Printf("Processed %d inserts\n", i.totalInserts)
log.Printf("Failed %d inserts\n", i.failedInserts)
i.stdoutLogger.Printf("Processed %d commands\n", i.totalCommands)
i.stdoutLogger.Printf("Processed %d inserts\n", i.totalInserts)
i.stdoutLogger.Printf("Failed %d inserts\n", i.failedInserts)
}
}()
@ -183,11 +188,11 @@ func (i *Importer) processDML(scanner *bufio.Scanner) {
func (i *Importer) execute(command string) {
response, err := i.client.Query(client.Query{Command: command, Database: i.database})
if err != nil {
log.Printf("error: %s\n", err)
i.stderrLogger.Printf("error: %s\n", err)
return
}
if err := response.Error(); err != nil {
log.Printf("error: %s\n", response.Error())
i.stderrLogger.Printf("error: %s\n", response.Error())
}
}
@ -206,7 +211,7 @@ func (i *Importer) batchAccumulator(line string, start time.Time) {
if processed%100000 == 0 {
since := time.Since(start)
pps := float64(processed) / since.Seconds()
log.Printf("Processed %d lines. Time elapsed: %s. Points per second (PPS): %d", processed, since.String(), int64(pps))
i.stdoutLogger.Printf("Processed %d lines. Time elapsed: %s. Points per second (PPS): %d", processed, since.String(), int64(pps))
}
}
}
@ -239,9 +244,8 @@ func (i *Importer) batchWrite() {
_, e := i.client.WriteLineProtocol(strings.Join(i.batch, "\n"), i.database, i.retentionPolicy, i.config.Precision, i.config.WriteConsistency)
if e != nil {
log.Println("error writing batch: ", e)
// Output failed lines to STDOUT so users can capture lines that failed to import
fmt.Println(strings.Join(i.batch, "\n"))
i.stderrLogger.Println("error writing batch: ", e)
i.stderrLogger.Println(strings.Join(i.batch, "\n"))
i.failedInserts += len(i.batch)
} else {
i.totalInserts += len(i.batch)