refactoring based on feedback

pull/3502/head
Cory LaNou 2015-08-03 13:40:48 -05:00
parent ea58609392
commit 76367d5161
4 changed files with 231 additions and 262 deletions

View File

@ -19,12 +19,14 @@ import (
)
const (
// DEFAULT_HOST is the default host used to connect to an InfluxDB instance
DEFAULT_HOST = "localhost"
// DEFAULT_PORT is the default port used to connect to an InfluxDB instance
DEFAULT_PORT = 8086
// DEFAULT_TIMEOUT is the default connection timeout used to connect to an InfluxDB instance
DEFAULT_TIMEOUT = 0
// DefaultHost is the default host used to connect to an InfluxDB instance
DefaultHost = "localhost"
// DefaultPort is the default port used to connect to an InfluxDB instance
DefaultPort = 8086
// DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance
DefaultTimeout = 0
)
// Query is used to send a command to the server. Both Command and Database are required.
@ -46,14 +48,14 @@ func ParseConnectionString(path string, ssl bool) (url.URL, error) {
}
port = i
if h[0] == "" {
host = DEFAULT_HOST
host = DefaultHost
} else {
host = h[0]
}
} else {
host = path
// If they didn't specify a port, always use the default port
port = DEFAULT_PORT
port = DefaultPort
}
u := url.URL{
@ -87,6 +89,7 @@ func NewConfig(u url.URL, username, password, userAgent string, timeout time.Dur
Username: username,
Password: password,
UserAgent: userAgent,
Timeout: timeout,
}
}
@ -176,7 +179,8 @@ func (c *Client) Query(q Query) (*Response, error) {
// If successful, error is nil and Response is nil
// If an error occurs, Response may contain additional information if populated.
func (c *Client) Write(bp BatchPoints) (*Response, error) {
c.url.Path = "write"
u := c.url
u.Path = "write"
var b bytes.Buffer
for _, p := range bp.Points {
@ -202,7 +206,7 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
}
}
req, err := http.NewRequest("POST", c.url.String(), &b)
req, err := http.NewRequest("POST", u.String(), &b)
if err != nil {
return nil, err
}
@ -212,10 +216,10 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Add("db", bp.Database)
params.Add("rp", bp.RetentionPolicy)
params.Add("precision", bp.Precision)
params.Add("consistency", bp.WriteConsistency)
params.Set("db", bp.Database)
params.Set("rp", bp.RetentionPolicy)
params.Set("precision", bp.Precision)
params.Set("consistency", bp.WriteConsistency)
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
@ -226,7 +230,7 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
var response Response
body, err := ioutil.ReadAll(resp.Body)
if err != nil && err.Error() != "EOF" {
if err != nil {
return nil, err
}
@ -243,12 +247,12 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
// If successful, error is nil and Response is nil
// If an error occurs, Response may contain additional information if populated.
func (c *Client) WriteLineProtocol(data, database, retentionPolicy, precision, writeConsistency string) (*Response, error) {
c.url.Path = "write"
u := c.url
u.Path = "write"
var b bytes.Buffer
b.WriteString(data)
r := strings.NewReader(data)
req, err := http.NewRequest("POST", c.url.String(), &b)
req, err := http.NewRequest("POST", u.String(), r)
if err != nil {
return nil, err
}
@ -258,10 +262,10 @@ func (c *Client) WriteLineProtocol(data, database, retentionPolicy, precision, w
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Add("db", database)
params.Add("rp", retentionPolicy)
params.Add("precision", precision)
params.Add("consistency", writeConsistency)
params.Set("db", database)
params.Set("rp", retentionPolicy)
params.Set("precision", precision)
params.Set("consistency", writeConsistency)
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
@ -272,12 +276,12 @@ func (c *Client) WriteLineProtocol(data, database, retentionPolicy, precision, w
var response Response
body, err := ioutil.ReadAll(resp.Body)
if err != nil && err.Error() != "EOF" {
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
var err = fmt.Errorf(string(body))
err := fmt.Errorf(string(body))
response.Err = err
return &response, err
}

View File

@ -1,222 +0,0 @@
package importer
import (
"bufio"
"compress/gzip"
"fmt"
"io"
"log"
"net/url"
"os"
"strings"
"sync"
"github.com/influxdb/influxdb/client"
)
const batchSize = 5000
// V8Config is the config used to initialize a V8 importer
type V8Config struct {
username, password string
url url.URL
precision string
writeConsistency string
file, version string
compressed bool
}
// NewV8Config returns an initialized *V8Config
func NewV8Config(username, password, precision, writeConsistency, file, version string, u url.URL, compressed bool) *V8Config {
return &V8Config{
username: username,
password: password,
precision: precision,
writeConsistency: writeConsistency,
file: file,
version: version,
url: u,
compressed: compressed,
}
}
// V8 is the importer used for importing 0.8 data
type V8 struct {
client *client.Client
database string
retentionPolicy string
config *V8Config
wg sync.WaitGroup
line, command chan string
done chan struct{}
batch []string
totalInserts, failedInserts, totalCommands int
}
// NewV8 will return an intialized V8 struct
func NewV8(config *V8Config) *V8 {
return &V8{
config: config,
done: make(chan struct{}),
line: make(chan string),
command: make(chan string),
batch: make([]string, 0, batchSize),
}
}
// Import processes the specified file in the V8Config and writes the data to the databases in chukes specified by batchSize
func (v8 *V8) Import() error {
// Create a client and try to connect
config := client.NewConfig(v8.config.url, v8.config.username, v8.config.password, v8.config.version, client.DEFAULT_TIMEOUT)
cl, err := client.NewClient(config)
if err != nil {
return fmt.Errorf("could not create client %s", err)
}
v8.client = cl
if _, _, e := v8.client.Ping(); e != nil {
return fmt.Errorf("failed to connect to %s\n", v8.client.Addr())
}
// Validate args
if v8.config.file == "" {
return fmt.Errorf("file argument required")
}
defer func() {
v8.wg.Wait()
if v8.totalInserts > 0 {
log.Printf("Processed %d commands\n", v8.totalCommands)
log.Printf("Processed %d inserts\n", v8.totalInserts)
log.Printf("Failed %d inserts\n", v8.failedInserts)
}
}()
// Open the file
f, err := os.Open(v8.config.file)
if err != nil {
return err
}
defer f.Close()
var r io.Reader
// If gzipped, wrap in a gzip reader
if v8.config.compressed {
gr, err := gzip.NewReader(f)
if err != nil {
return err
}
defer gr.Close()
// Set the reader to the gzip reader
r = gr
} else {
// Standard text file so our reader can just be the file
r = f
}
// start our accumulator
go v8.batchAccumulator()
// start our command executor
go v8.queryExecutor()
// Get our reader
scanner := bufio.NewScanner(r)
// Process the scanner
v8.processDDL(scanner)
v8.processDML(scanner)
// Signal go routines we are done
close(v8.done)
// Check if we had any errors scanning the file
if err := scanner.Err(); err != nil {
return fmt.Errorf("reading standard input: %s", err)
}
return nil
}
func (v8 *V8) processDDL(scanner *bufio.Scanner) {
for scanner.Scan() {
line := scanner.Text()
// If we find the DML token, we are done with DDL
if strings.HasPrefix(line, "# DML") {
return
}
if strings.HasPrefix(line, "#") {
continue
}
v8.command <- line
}
}
func (v8 *V8) processDML(scanner *bufio.Scanner) {
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "# CONTEXT-DATABASE:") {
v8.database = strings.TrimSpace(strings.Split(line, ":")[1])
}
if strings.HasPrefix(line, "# CONTEXT-RETENTION-POLICY:") {
v8.retentionPolicy = strings.TrimSpace(strings.Split(line, ":")[1])
}
if strings.HasPrefix(line, "#") {
continue
}
v8.line <- line
}
}
func (v8 *V8) execute(command string) {
response, err := v8.client.Query(client.Query{Command: command, Database: v8.database})
if err != nil {
log.Printf("error: %s\n", err)
return
}
if err := response.Error(); err != nil {
log.Printf("error: %s\n", response.Error())
}
}
func (v8 *V8) queryExecutor() {
v8.wg.Add(1)
defer v8.wg.Done()
for {
select {
case c := <-v8.command:
v8.totalCommands++
v8.execute(c)
case <-v8.done:
return
}
}
}
func (v8 *V8) batchAccumulator() {
v8.wg.Add(1)
defer v8.wg.Done()
for {
select {
case l := <-v8.line:
v8.batch = append(v8.batch, l)
if len(v8.batch) == batchSize {
if e := v8.batchWrite(); e != nil {
log.Println("error writing batch: ", e)
v8.failedInserts += len(v8.batch)
} else {
v8.totalInserts += len(v8.batch)
}
v8.batch = v8.batch[:0]
}
case <-v8.done:
v8.totalInserts += len(v8.batch)
return
}
}
}
func (v8 *V8) batchWrite() error {
_, e := v8.client.WriteLineProtocol(strings.Join(v8.batch, "\n"), v8.database, v8.retentionPolicy, v8.config.precision, v8.config.writeConsistency)
return e
}

View File

@ -17,7 +17,7 @@ import (
"text/tabwriter"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/cmd/influx/importer"
"github.com/influxdb/influxdb/importer/v8"
"github.com/peterh/liner"
)
@ -46,7 +46,7 @@ type CommandLine struct {
Execute string
ShowVersion bool
Import bool
File string
Path string
Compressed bool
}
@ -54,8 +54,8 @@ func main() {
c := CommandLine{}
fs := flag.NewFlagSet("InfluxDB shell version "+version, flag.ExitOnError)
fs.StringVar(&c.Host, "host", client.DEFAULT_HOST, "Influxdb host to connect to.")
fs.IntVar(&c.Port, "port", client.DEFAULT_PORT, "Influxdb port to connect to.")
fs.StringVar(&c.Host, "host", client.DefaultHost, "Influxdb host to connect to.")
fs.IntVar(&c.Port, "port", client.DefaultPort, "Influxdb port to connect to.")
fs.StringVar(&c.Username, "username", c.Username, "Username to connect to the server.")
fs.StringVar(&c.Password, "password", c.Password, `Password to connect to the server. Leaving blank will prompt for password (--password="").`)
fs.StringVar(&c.Database, "database", c.Database, "Database to connect to the server.")
@ -65,7 +65,7 @@ func main() {
fs.StringVar(&c.Execute, "execute", c.Execute, "Execute command and quit.")
fs.BoolVar(&c.ShowVersion, "version", false, "Displays the InfluxDB version.")
fs.BoolVar(&c.Import, "import", false, "Import a previous database.")
fs.StringVar(&c.File, "file", "", "file to import")
fs.StringVar(&c.Path, "path", "", "path to the file to import")
fs.BoolVar(&c.Compressed, "compressed", false, "set to true if the import file is compressed")
// Define our own custom usage to print
@ -142,10 +142,9 @@ Examples:
if err := c.ExecuteQuery(c.Execute); err != nil {
c.Line.Close()
os.Exit(1)
} else {
c.Line.Close()
os.Exit(0)
}
c.Line.Close()
os.Exit(0)
}
if c.Import {
@ -156,17 +155,24 @@ Examples:
return
}
config := importer.NewV8Config(c.Username, c.Password, "ns", "any", c.File, version, u, c.Compressed)
config := v8.NewConfig()
config.Username = c.Username
config.Password = c.Password
config.Precision = "ns"
config.WriteConsistency = "any"
config.Path = c.Path
config.Version = version
config.URL = u
config.Compressed = c.Compressed
v8 := importer.NewV8(config)
if err := v8.Import(); err != nil {
i := v8.NewImporter(config)
if err := i.Import(); err != nil {
fmt.Printf("ERROR: %s\n", err)
c.Line.Close()
os.Exit(1)
} else {
c.Line.Close()
os.Exit(0)
}
c.Line.Close()
os.Exit(0)
}
showVersion()
@ -263,7 +269,7 @@ func (c *CommandLine) connect(cmd string) {
return
}
config := client.NewConfig(u, c.Username, c.Password, "InfluxDBShell/"+version, client.DEFAULT_TIMEOUT)
config := client.NewConfig(u, c.Username, c.Password, "InfluxDBShell/"+version, client.DefaultTimeout)
cl, err := client.NewClient(config)
if err != nil {
fmt.Printf("Could not create client %s", err)

181
importer/v8/importer.go Normal file
View File

@ -0,0 +1,181 @@
package v8
import (
"bufio"
"compress/gzip"
"fmt"
"io"
"log"
"net/url"
"os"
"strings"
"github.com/influxdb/influxdb/client"
)
const batchSize = 5000
// Config is the config used to initialize a Importer importer
type Config struct {
Username string
Password string
URL url.URL
Precision string
WriteConsistency string
Path string
Version string
Compressed bool
}
// NewConfig returns an initialized *Config
func NewConfig() *Config {
return &Config{}
}
// Importer is the importer used for importing 0.8 data
type Importer struct {
client *client.Client
database string
retentionPolicy string
config *Config
batch []string
totalInserts int
failedInserts int
totalCommands int
}
// NewImporter will return an intialized Importer struct
func NewImporter(config *Config) *Importer {
return &Importer{
config: config,
batch: make([]string, 0, batchSize),
}
}
// Import processes the specified file in the Config and writes the data to the databases in chunks specified by batchSize
func (i *Importer) Import() error {
// Create a client and try to connect
config := client.NewConfig(i.config.URL, i.config.Username, i.config.Password, i.config.Version, client.DefaultTimeout)
cl, err := client.NewClient(config)
if err != nil {
return fmt.Errorf("could not create client %s", err)
}
i.client = cl
if _, _, e := i.client.Ping(); e != nil {
return fmt.Errorf("failed to connect to %s\n", i.client.Addr())
}
// Validate args
if i.config.Path == "" {
return fmt.Errorf("file argument required")
}
defer func() {
if i.totalInserts > 0 {
log.Printf("Processed %d commands\n", i.totalCommands)
log.Printf("Processed %d inserts\n", i.totalInserts)
log.Printf("Failed %d inserts\n", i.failedInserts)
}
}()
// Open the file
f, err := os.Open(i.config.Path)
if err != nil {
return err
}
defer f.Close()
var r io.Reader
// If gzipped, wrap in a gzip reader
if i.config.Compressed {
gr, err := gzip.NewReader(f)
if err != nil {
return err
}
defer gr.Close()
// Set the reader to the gzip reader
r = gr
} else {
// Standard text file so our reader can just be the file
r = f
}
// Get our reader
scanner := bufio.NewScanner(r)
// Process the scanner
i.processDDL(scanner)
i.processDML(scanner)
// Check if we had any errors scanning the file
if err := scanner.Err(); err != nil {
return fmt.Errorf("reading standard input: %s", err)
}
return nil
}
func (i *Importer) processDDL(scanner *bufio.Scanner) {
for scanner.Scan() {
line := scanner.Text()
// If we find the DML token, we are done with DDL
if strings.HasPrefix(line, "# DML") {
return
}
if strings.HasPrefix(line, "#") {
continue
}
i.queryExecutor(line)
}
}
func (i *Importer) processDML(scanner *bufio.Scanner) {
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "# CONTEXT-DATABASE:") {
i.database = strings.TrimSpace(strings.Split(line, ":")[1])
}
if strings.HasPrefix(line, "# CONTEXT-RETENTION-POLICY:") {
i.retentionPolicy = strings.TrimSpace(strings.Split(line, ":")[1])
}
if strings.HasPrefix(line, "#") {
continue
}
i.batchAccumulator(line)
}
}
func (i *Importer) execute(command string) {
response, err := i.client.Query(client.Query{Command: command, Database: i.database})
if err != nil {
log.Printf("error: %s\n", err)
return
}
if err := response.Error(); err != nil {
log.Printf("error: %s\n", response.Error())
}
}
func (i *Importer) queryExecutor(command string) {
i.totalCommands++
i.execute(command)
}
func (i *Importer) batchAccumulator(line string) {
i.batch = append(i.batch, line)
if len(i.batch) == batchSize {
if e := i.batchWrite(); e != nil {
log.Println("error writing batch: ", e)
i.failedInserts += len(i.batch)
} else {
i.totalInserts += len(i.batch)
}
i.batch = i.batch[:0]
}
}
func (i *Importer) batchWrite() error {
_, e := i.client.WriteLineProtocol(strings.Join(i.batch, "\n"), i.database, i.retentionPolicy, i.config.Precision, i.config.WriteConsistency)
return e
}