Importer fixes. fixes #4650 #4651

pull/4652/head
Cory LaNou 2015-11-03 16:26:17 -06:00
parent ae8b458d9e
commit ac563ad97d
2 changed files with 25 additions and 11 deletions

View File

@ -94,6 +94,8 @@
- [#4596](https://github.com/influxdb/influxdb/pull/4596): Skip empty string for start position when parsing line protocol @Thanks @ch33hau - [#4596](https://github.com/influxdb/influxdb/pull/4596): Skip empty string for start position when parsing line protocol @Thanks @ch33hau
- [#4610](https://github.com/influxdb/influxdb/pull/4610): Make internal stats names consistent with Go style. - [#4610](https://github.com/influxdb/influxdb/pull/4610): Make internal stats names consistent with Go style.
- [#4625](https://github.com/influxdb/influxdb/pull/4625): Correctly handle bad write requests. Thanks @oiooj. - [#4625](https://github.com/influxdb/influxdb/pull/4625): Correctly handle bad write requests. Thanks @oiooj.
- [#4650](https://github.com/influxdb/influxdb/issues/4650): Importer should skip empty lines
- [#4651](https://github.com/influxdb/influxdb/issues/4651): Importer doesn't flush out last batch
## v0.9.4 [2015-09-14] ## v0.9.4 [2015-09-14]

View File

@ -145,6 +145,10 @@ func (i *Importer) processDDL(scanner *bufio.Scanner) {
if strings.HasPrefix(line, "#") { if strings.HasPrefix(line, "#") {
continue continue
} }
// Skip blank lines
if strings.TrimSpace(line) == "" {
continue
}
i.queryExecutor(line) i.queryExecutor(line)
} }
} }
@ -162,8 +166,14 @@ func (i *Importer) processDML(scanner *bufio.Scanner) {
if strings.HasPrefix(line, "#") { if strings.HasPrefix(line, "#") {
continue continue
} }
// Skip blank lines
if strings.TrimSpace(line) == "" {
continue
}
i.batchAccumulator(line, start) i.batchAccumulator(line, start)
} }
// Call batchWrite one last time to flush anything out in the batch
i.batchWrite()
} }
func (i *Importer) execute(command string) { func (i *Importer) execute(command string) {
@ -185,14 +195,7 @@ func (i *Importer) queryExecutor(command string) {
func (i *Importer) batchAccumulator(line string, start time.Time) { func (i *Importer) batchAccumulator(line string, start time.Time) {
i.batch = append(i.batch, line) i.batch = append(i.batch, line)
if len(i.batch) == batchSize { if len(i.batch) == batchSize {
if e := i.batchWrite(); e != nil { i.batchWrite()
log.Println("error writing batch: ", e)
// Output failed lines to STDOUT so users can capture lines that failed to import
fmt.Println(strings.Join(i.batch, "\n"))
i.failedInserts += len(i.batch)
} else {
i.totalInserts += len(i.batch)
}
i.batch = i.batch[:0] i.batch = i.batch[:0]
// Give some status feedback every 100000 lines processed // Give some status feedback every 100000 lines processed
processed := i.totalInserts + i.failedInserts processed := i.totalInserts + i.failedInserts
@ -204,7 +207,7 @@ func (i *Importer) batchAccumulator(line string, start time.Time) {
} }
} }
func (i *Importer) batchWrite() error { func (i *Importer) batchWrite() {
// Accumulate the batch size to see how many points we have written this second // Accumulate the batch size to see how many points we have written this second
i.throttlePointsWritten += len(i.batch) i.throttlePointsWritten += len(i.batch)
@ -226,11 +229,20 @@ func (i *Importer) batchWrite() error {
// Decrement the batch size back out as it is going to get called again // Decrement the batch size back out as it is going to get called again
i.throttlePointsWritten -= len(i.batch) i.throttlePointsWritten -= len(i.batch)
return i.batchWrite() i.batchWrite()
return
} }
_, e := i.client.WriteLineProtocol(strings.Join(i.batch, "\n"), i.database, i.retentionPolicy, i.config.Precision, i.config.WriteConsistency) _, e := i.client.WriteLineProtocol(strings.Join(i.batch, "\n"), i.database, i.retentionPolicy, i.config.Precision, i.config.WriteConsistency)
if e != nil {
log.Println("error writing batch: ", e)
// Output failed lines to STDOUT so users can capture lines that failed to import
fmt.Println(strings.Join(i.batch, "\n"))
i.failedInserts += len(i.batch)
} else {
i.totalInserts += len(i.batch)
}
i.throttlePointsWritten = 0 i.throttlePointsWritten = 0
i.lastWrite = time.Now() i.lastWrite = time.Now()
return e return
} }