throttle import

pull/3795/head
Cory LaNou 2015-08-21 15:06:45 -05:00
parent 0a6c8b1968
commit ace4737228
2 changed files with 74 additions and 13 deletions

View File

@ -27,7 +27,12 @@ var (
)
const (
default_format = "column"
// defaultFormat is the default format of the results when issuing queries
defaultFormat = "column"
// defaultPPS is the default points per second that the import will throttle at
// by default it's 0, which means it will not throttle
defaultPPS = 0
)
type CommandLine struct {
@ -46,6 +51,7 @@ type CommandLine struct {
Execute string
ShowVersion bool
Import bool
PPS int // Controls how many points per second the import will allow via throttling
Path string
Compressed bool
}
@ -60,11 +66,12 @@ func main() {
fs.StringVar(&c.Password, "password", c.Password, `Password to connect to the server. Leaving blank will prompt for password (--password="").`)
fs.StringVar(&c.Database, "database", c.Database, "Database to connect to the server.")
fs.BoolVar(&c.Ssl, "ssl", false, "Use https for connecting to cluster.")
fs.StringVar(&c.Format, "format", default_format, "Format specifies the format of the server responses: json, csv, or column.")
fs.StringVar(&c.Format, "format", defaultFormat, "Format specifies the format of the server responses: json, csv, or column.")
fs.BoolVar(&c.Pretty, "pretty", false, "Turns on pretty print for the json format.")
fs.StringVar(&c.Execute, "execute", c.Execute, "Execute command and quit.")
fs.BoolVar(&c.ShowVersion, "version", false, "Displays the InfluxDB version.")
fs.BoolVar(&c.Import, "import", false, "Import a previous database.")
fs.IntVar(&c.PPS, "pps", defaultPPS, "How many points per second the import will allow. By default it is zero and will not throttle importing.")
fs.StringVar(&c.Path, "path", "", "path to the file to import")
fs.BoolVar(&c.Compressed, "compressed", false, "set to true if the import file is compressed")
@ -93,6 +100,8 @@ func main() {
Turns on pretty print for the json format.
-import
Import a previous database export from file
-pps
How many points per second the import will allow. By default it is zero and will not throttle importing.
-path
Path to file to import
-compressed
@ -169,6 +178,7 @@ Examples:
config.Version = version
config.URL = u
config.Compressed = c.Compressed
config.PPS = c.PPS
i := v8.NewImporter(config)
if err := i.Import(); err != nil {

View File

@ -9,6 +9,7 @@ import (
"net/url"
"os"
"strings"
"time"
"github.com/influxdb/influxdb/client"
)
@ -25,6 +26,7 @@ type Config struct {
Path string
Version string
Compressed bool
PPS int
}
// NewConfig returns an initialized *Config
@ -34,14 +36,17 @@ func NewConfig() *Config {
// Importer is the importer used for importing 0.8 data
type Importer struct {
client *client.Client
database string
retentionPolicy string
config *Config
batch []string
totalInserts int
failedInserts int
totalCommands int
client *client.Client
database string
retentionPolicy string
config *Config
batch []string
totalInserts int
failedInserts int
totalCommands int
throttlePointsWritten int
lastWrite time.Time
throttle *time.Ticker
}
// NewImporter will return an intialized Importer struct
@ -108,8 +113,19 @@ func (i *Importer) Import() error {
// Get our reader
scanner := bufio.NewScanner(r)
// Process the scanner
// Process the DDL
i.processDDL(scanner)
// set up our throttle channel. Effectively check every millisecond if it's time to process again
// Since there is no other activity at this point the smaller resolution gets us much closer to the
// requested PPS
i.throttle = time.NewTicker(time.Microsecond)
defer i.throttle.Stop()
// Prime the last write
i.lastWrite = time.Now()
// Process the DML
i.processDML(scanner)
// Check if we had any errors scanning the file
@ -135,6 +151,7 @@ func (i *Importer) processDDL(scanner *bufio.Scanner) {
}
func (i *Importer) processDML(scanner *bufio.Scanner) {
start := time.Now()
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "# CONTEXT-DATABASE:") {
@ -146,7 +163,7 @@ func (i *Importer) processDML(scanner *bufio.Scanner) {
if strings.HasPrefix(line, "#") {
continue
}
i.batchAccumulator(line)
i.batchAccumulator(line, start)
}
}
@ -166,7 +183,7 @@ func (i *Importer) queryExecutor(command string) {
i.execute(command)
}
func (i *Importer) batchAccumulator(line string) {
func (i *Importer) batchAccumulator(line string, start time.Time) {
i.batch = append(i.batch, line)
if len(i.batch) == batchSize {
if e := i.batchWrite(); e != nil {
@ -178,10 +195,44 @@ func (i *Importer) batchAccumulator(line string) {
i.totalInserts += len(i.batch)
}
i.batch = i.batch[:0]
// Give some status feedback every 100000 lines processed
processed := i.totalInserts + i.failedInserts
if processed%100000 == 0 {
since := time.Since(start)
pps := float64(processed) / since.Seconds()
log.Printf("Processed %d lines. Time elapsed: %s. Lines per second: %d", processed, since.String(), int64(pps))
}
}
}
func (i *Importer) batchWrite() error {
// Accumulate the batch size to see how many points we have written this second
i.throttlePointsWritten += len(i.batch)
// Find out when we last wrote data
since := time.Since(i.lastWrite)
// Check to see if we've exceeded our points per second for the current timeframe
var currentPPS int
if since.Seconds() > 0 {
currentPPS = int(float64(i.throttlePointsWritten) / since.Seconds())
} else {
currentPPS = i.throttlePointsWritten
}
// If our currentPPS is greater than the PPS specified, then we wait and retry
if int(currentPPS) > i.config.PPS && i.config.PPS != 0 {
//log.Printf("Throttling - Calculated PPS: %d. Last wrote %s ago", currentPPS, since.String())
// Wait for the next tick
<-i.throttle.C
// Decrement the batch size back out as it is going to get called again
i.throttlePointsWritten -= len(i.batch)
return i.batchWrite()
}
_, e := i.client.WriteLineProtocol(strings.Join(i.batch, "\n"), i.database, i.retentionPolicy, i.config.Precision, i.config.WriteConsistency)
i.throttlePointsWritten = 0
i.lastWrite = time.Now()
return e
}