2018-10-11 02:45:11 +00:00
package main
import (
"context"
2020-06-30 06:11:21 +00:00
"encoding/csv"
2018-10-11 02:45:11 +00:00
"fmt"
"io"
2020-05-28 22:28:45 +00:00
"log"
2020-06-10 08:16:19 +00:00
"net/http"
"net/url"
2018-10-11 02:45:11 +00:00
"os"
2020-10-02 03:07:37 +00:00
"regexp"
"strconv"
2018-10-11 02:45:11 +00:00
"strings"
2020-09-29 10:41:53 +00:00
"github.com/fujiwara/shapeio"
2020-04-03 17:39:20 +00:00
platform "github.com/influxdata/influxdb/v2"
2020-06-10 08:16:19 +00:00
ihttp "github.com/influxdata/influxdb/v2/http"
2020-04-03 17:39:20 +00:00
"github.com/influxdata/influxdb/v2/kit/signals"
"github.com/influxdata/influxdb/v2/models"
2020-05-28 22:28:45 +00:00
"github.com/influxdata/influxdb/v2/pkg/csv2lp"
2020-04-03 17:39:20 +00:00
"github.com/influxdata/influxdb/v2/write"
2018-10-11 02:45:11 +00:00
"github.com/spf13/cobra"
)
2020-03-23 09:00:06 +00:00
const (
inputFormatCsv = "csv"
inputFormatLineProtocol = "lp"
)
2020-03-26 20:09:34 +00:00
type writeFlagsType struct {
2020-05-28 22:28:45 +00:00
org organization
BucketID string
Bucket string
Precision string
Format string
Headers [ ] string
2020-06-10 08:16:19 +00:00
Files [ ] string
URLs [ ] string
2020-05-28 22:28:45 +00:00
Debug bool
SkipRowOnError bool
SkipHeader int
2020-09-29 17:27:20 +00:00
MaxLineLength int
2020-05-28 22:28:45 +00:00
IgnoreDataTypeInColumnName bool
Encoding string
2020-06-30 06:11:21 +00:00
ErrorsFile string
2020-10-02 03:07:37 +00:00
RateLimit string
2018-10-11 02:45:11 +00:00
}
2020-03-26 20:09:34 +00:00
var writeFlags writeFlagsType
2020-02-19 20:43:06 +00:00
func cmdWrite ( f * globalFlags , opt genericCLIOpts ) * cobra . Command {
2020-03-24 07:42:56 +00:00
cmd := opt . newCmd ( "write" , fluxWriteF , true )
2020-03-24 07:21:07 +00:00
cmd . Args = cobra . MaximumNArgs ( 1 )
2020-02-19 20:43:06 +00:00
cmd . Short = "Write points to InfluxDB"
2020-03-24 07:42:56 +00:00
cmd . Long = ` Write data to InfluxDB via stdin, or add an entire file specified with the -f flag `
2018-10-11 02:45:11 +00:00
2020-06-24 21:19:03 +00:00
f . registerFlags ( cmd )
2020-03-25 18:19:38 +00:00
writeFlags . org . register ( cmd , true )
2020-01-10 00:34:30 +00:00
opts := flagOpts {
{
DestP : & writeFlags . BucketID ,
Flag : "bucket-id" ,
Desc : "The ID of destination bucket" ,
Persistent : true ,
} ,
{
DestP : & writeFlags . Bucket ,
Flag : "bucket" ,
Short : 'b' ,
EnvVar : "BUCKET_NAME" ,
Desc : "The name of destination bucket" ,
Persistent : true ,
} ,
{
DestP : & writeFlags . Precision ,
Flag : "precision" ,
Short : 'p' ,
Default : "ns" ,
Desc : "Precision of the timestamps of the lines" ,
Persistent : true ,
} ,
2018-10-11 02:45:11 +00:00
}
2020-01-10 00:34:30 +00:00
opts . mustRegister ( cmd )
2020-03-25 17:43:50 +00:00
cmd . PersistentFlags ( ) . StringVar ( & writeFlags . Format , "format" , "" , "Input format, either lp (Line Protocol) or csv (Comma Separated Values). Defaults to lp unless '.csv' extension" )
2020-05-28 22:28:45 +00:00
cmd . PersistentFlags ( ) . StringArrayVar ( & writeFlags . Headers , "header" , [ ] string { } , "Header prepends lines to input data; Example --header HEADER1 --header HEADER2" )
2020-06-10 08:16:19 +00:00
cmd . PersistentFlags ( ) . StringArrayVarP ( & writeFlags . Files , "file" , "f" , [ ] string { } , "The path to the file to import" )
2020-06-10 12:42:04 +00:00
cmd . PersistentFlags ( ) . StringArrayVarP ( & writeFlags . URLs , "url" , "u" , [ ] string { } , "The URL to import data from" )
2020-05-28 22:28:45 +00:00
cmd . PersistentFlags ( ) . BoolVar ( & writeFlags . Debug , "debug" , false , "Log CSV columns to stderr before reading data rows" )
cmd . PersistentFlags ( ) . BoolVar ( & writeFlags . SkipRowOnError , "skipRowOnError" , false , "Log CSV data errors to stderr and continue with CSV processing" )
cmd . PersistentFlags ( ) . IntVar ( & writeFlags . SkipHeader , "skipHeader" , 0 , "Skip the first <n> rows from input data" )
2020-09-29 17:27:20 +00:00
cmd . PersistentFlags ( ) . IntVar ( & writeFlags . MaxLineLength , "max-line-length" , 16_000_000 , "Specifies the maximum number of bytes that can be read for a single line" )
2020-05-28 22:28:45 +00:00
cmd . Flag ( "skipHeader" ) . NoOptDefVal = "1" // skipHeader flag value is optional, skip the first header when unspecified
cmd . PersistentFlags ( ) . BoolVar ( & writeFlags . IgnoreDataTypeInColumnName , "xIgnoreDataTypeInColumnName" , false , "Ignores dataType which could be specified after ':' in column name" )
cmd . PersistentFlags ( ) . MarkHidden ( "xIgnoreDataTypeInColumnName" ) // should be used only upon explicit advice
cmd . PersistentFlags ( ) . StringVar ( & writeFlags . Encoding , "encoding" , "UTF-8" , "Character encoding of input files or stdin" )
2020-07-01 03:12:55 +00:00
cmd . PersistentFlags ( ) . StringVar ( & writeFlags . ErrorsFile , "errors-file" , "" , "The path to the file to write rejected rows to" )
2020-10-02 03:07:37 +00:00
cmd . PersistentFlags ( ) . StringVar ( & writeFlags . RateLimit , "rate-limit" , "" , "Throttles write, examples: \"5 MB / 5 min\" , \"17kBs\". \"\" (default) disables throttling." )
2020-03-25 17:43:50 +00:00
2020-03-26 20:09:34 +00:00
cmdDryRun := opt . newCmd ( "dryrun" , fluxWriteDryrunF , false )
2020-03-25 17:43:50 +00:00
cmdDryRun . Args = cobra . MaximumNArgs ( 1 )
cmdDryRun . Short = "Write to stdout instead of InfluxDB"
cmdDryRun . Long = ` Write protocol lines to stdout instead of InfluxDB. Troubleshoot conversion from CSV to line protocol. `
2020-06-24 21:19:03 +00:00
f . registerFlags ( cmdDryRun )
2020-03-25 17:43:50 +00:00
cmd . AddCommand ( cmdDryRun )
2020-01-10 00:34:30 +00:00
return cmd
2018-10-11 02:45:11 +00:00
}
2020-05-28 22:28:45 +00:00
func ( writeFlags * writeFlagsType ) dump ( args [ ] string ) {
if writeFlags . Debug {
log . Printf ( "WriteFlags%+v args:%v" , * writeFlags , args )
}
}
2020-03-26 20:09:34 +00:00
// createLineReader uses writeFlags and cli arguments to create a reader that produces line protocol
2020-06-12 03:53:20 +00:00
func ( writeFlags * writeFlagsType ) createLineReader ( ctx context . Context , cmd * cobra . Command , args [ ] string ) ( io . Reader , io . Closer , error ) {
2020-05-28 22:28:45 +00:00
files := writeFlags . Files
if len ( args ) > 0 && len ( args [ 0 ] ) > 1 && args [ 0 ] [ 0 ] == '@' {
2020-03-25 17:43:50 +00:00
// backward compatibility: @ in arg denotes a file
2020-05-28 22:28:45 +00:00
files = append ( files , args [ 0 ] [ 1 : ] )
args = args [ : 0 ]
2020-03-24 07:21:07 +00:00
}
2020-06-10 08:16:19 +00:00
readers := make ( [ ] io . Reader , 0 , 2 * len ( writeFlags . Headers ) + 2 * len ( files ) + 2 * len ( writeFlags . URLs ) + 1 )
closers := make ( [ ] io . Closer , 0 , len ( files ) + len ( writeFlags . URLs ) )
2020-05-28 22:28:45 +00:00
// validate input format
if len ( writeFlags . Format ) > 0 && writeFlags . Format != inputFormatLineProtocol && writeFlags . Format != inputFormatCsv {
return nil , csv2lp . MultiCloser ( closers ... ) , fmt . Errorf ( "unsupported input format: %s" , writeFlags . Format )
}
// validate and setup decoding of files/stdin if encoding is supplied
decode , err := csv2lp . CreateDecoder ( writeFlags . Encoding )
if err != nil {
return nil , csv2lp . MultiCloser ( closers ... ) , err
}
// prepend header lines
if len ( writeFlags . Headers ) > 0 {
for _ , header := range writeFlags . Headers {
readers = append ( readers , strings . NewReader ( header ) , strings . NewReader ( "\n" ) )
2018-10-11 02:45:11 +00:00
}
2020-05-28 22:28:45 +00:00
if len ( writeFlags . Format ) == 0 {
2020-03-23 09:00:06 +00:00
writeFlags . Format = inputFormatCsv
}
2018-10-11 02:45:11 +00:00
}
2020-05-28 22:28:45 +00:00
// add files
if len ( files ) > 0 {
for _ , file := range files {
f , err := os . Open ( file )
if err != nil {
return nil , csv2lp . MultiCloser ( closers ... ) , fmt . Errorf ( "failed to open %q: %v" , file , err )
}
closers = append ( closers , f )
readers = append ( readers , decode ( f ) , strings . NewReader ( "\n" ) )
if len ( writeFlags . Format ) == 0 && strings . HasSuffix ( file , ".csv" ) {
writeFlags . Format = inputFormatCsv
}
}
2020-03-23 09:00:06 +00:00
}
2018-10-11 02:45:11 +00:00
2020-06-10 08:16:19 +00:00
// #18349 allow URL data sources, a simple alternative to `curl -f -s http://... | influx write ...`
if len ( writeFlags . URLs ) > 0 {
2020-06-11 17:25:26 +00:00
client := http . DefaultClient
2020-06-10 08:16:19 +00:00
for _ , addr := range writeFlags . URLs {
u , err := url . Parse ( addr )
if err != nil {
return nil , csv2lp . MultiCloser ( closers ... ) , fmt . Errorf ( "failed to open %q: %v" , addr , err )
}
2020-06-12 03:53:20 +00:00
req , err := http . NewRequestWithContext ( ctx , http . MethodGet , addr , nil )
2020-06-11 17:25:26 +00:00
if err != nil {
return nil , csv2lp . MultiCloser ( closers ... ) , fmt . Errorf ( "failed to open %q: %v" , addr , err )
}
resp , err := client . Do ( req )
2020-06-10 08:16:19 +00:00
if err != nil {
return nil , csv2lp . MultiCloser ( closers ... ) , fmt . Errorf ( "failed to open %q: %v" , addr , err )
}
closers = append ( closers , resp . Body )
if resp . StatusCode / 100 != 2 {
return nil , csv2lp . MultiCloser ( closers ... ) , fmt . Errorf ( "failed to open %q: response status_code=%d" , addr , resp . StatusCode )
}
readers = append ( readers , decode ( resp . Body ) , strings . NewReader ( "\n" ) )
if len ( writeFlags . Format ) == 0 &&
( strings . HasSuffix ( u . Path , ".csv" ) || strings . HasPrefix ( resp . Header . Get ( "Content-Type" ) , "text/csv" ) ) {
writeFlags . Format = inputFormatCsv
}
}
}
2020-05-28 22:28:45 +00:00
// add stdin or a single argument
switch {
case len ( args ) == 0 :
// use also stdIn if it is a terminal
if ! isCharacterDevice ( cmd . InOrStdin ( ) ) {
readers = append ( readers , decode ( cmd . InOrStdin ( ) ) )
}
case args [ 0 ] == "-" :
// "-" also means stdin
readers = append ( readers , decode ( cmd . InOrStdin ( ) ) )
default :
readers = append ( readers , strings . NewReader ( args [ 0 ] ) )
}
// skipHeader lines when set
if writeFlags . SkipHeader != 0 {
// find the last non-string reader (stdin or file)
for i := len ( readers ) - 1 ; i >= 0 ; i -- {
_ , stringReader := readers [ i ] . ( * strings . Reader )
if ! stringReader { // ignore headers and new lines
readers [ i ] = csv2lp . SkipHeaderLinesReader ( writeFlags . SkipHeader , readers [ i ] )
break
}
}
}
2020-06-30 06:11:21 +00:00
// create writer for errors-file, if supplied
var errorsFile * csv . Writer
var rowSkippedListener func ( * csv2lp . CsvToLineReader , error , [ ] string )
if writeFlags . ErrorsFile != "" {
writer , err := os . Create ( writeFlags . ErrorsFile )
if err != nil {
return nil , csv2lp . MultiCloser ( closers ... ) , fmt . Errorf ( "failed to create %q: %v" , writeFlags . ErrorsFile , err )
}
closers = append ( closers , writer )
errorsFile = csv . NewWriter ( writer )
rowSkippedListener = func ( source * csv2lp . CsvToLineReader , lineError error , row [ ] string ) {
log . Println ( lineError )
errorsFile . Comma = source . Comma ( )
errorsFile . Write ( [ ] string { fmt . Sprintf ( "# error : %v" , lineError ) } )
if err := errorsFile . Write ( row ) ; err != nil {
log . Printf ( "Unable to write to error-file: %v\n" , err )
}
errorsFile . Flush ( ) // flush is required
}
}
2020-05-28 22:28:45 +00:00
// concatenate readers
r := io . MultiReader ( readers ... )
2020-03-23 09:00:06 +00:00
if writeFlags . Format == inputFormatCsv {
2020-05-28 22:28:45 +00:00
csvReader := csv2lp . CsvToLineProtocol ( r )
csvReader . LogTableColumns ( writeFlags . Debug )
csvReader . SkipRowOnError ( writeFlags . SkipRowOnError )
csvReader . Table . IgnoreDataTypeInColumnName ( writeFlags . IgnoreDataTypeInColumnName )
// change LineNumber to report file/stdin line numbers properly
csvReader . LineNumber = writeFlags . SkipHeader - len ( writeFlags . Headers )
2020-06-30 06:11:21 +00:00
csvReader . RowSkipped = rowSkippedListener
2020-05-28 22:28:45 +00:00
r = csvReader
2018-10-11 02:45:11 +00:00
}
2020-09-29 10:41:53 +00:00
// throttle reader if requested
2020-10-02 03:07:37 +00:00
rateLimit , err := ToBytesPerSecond ( writeFlags . RateLimit )
if err != nil {
return nil , csv2lp . MultiCloser ( closers ... ) , err
}
if rateLimit > 0.0 {
2020-09-29 10:41:53 +00:00
// LineReader ensures that original reader is consumed in the smallest possible
// units (at most one protocol line) to avoid bigger pauses in throttling
r = csv2lp . NewLineReader ( r )
throttledReader := shapeio . NewReaderWithContext ( r , ctx )
2020-10-02 03:07:37 +00:00
throttledReader . SetRateLimit ( rateLimit )
2020-09-29 10:41:53 +00:00
r = throttledReader
}
2020-05-28 22:28:45 +00:00
return r , csv2lp . MultiCloser ( closers ... ) , nil
2020-03-26 20:09:34 +00:00
}
func fluxWriteF ( cmd * cobra . Command , args [ ] string ) error {
2020-05-28 22:28:45 +00:00
writeFlags . dump ( args ) // print flags when in Debug mode
2020-03-26 20:09:34 +00:00
// validate InfluxDB flags
if err := writeFlags . org . validOrgFlags ( & flags ) ; err != nil {
return err
}
2018-10-11 02:45:11 +00:00
2020-03-26 20:09:34 +00:00
if writeFlags . Bucket != "" && writeFlags . BucketID != "" {
return fmt . Errorf ( "please specify one of bucket or bucket-id" )
}
if ! models . ValidPrecision ( writeFlags . Precision ) {
return fmt . Errorf ( "invalid precision" )
}
2020-10-05 21:05:49 +00:00
var (
filter platform . BucketFilter
err error
)
2020-03-26 20:09:34 +00:00
if writeFlags . BucketID != "" {
filter . ID , err = platform . IDFromString ( writeFlags . BucketID )
2020-03-23 09:00:06 +00:00
if err != nil {
2020-03-26 20:09:34 +00:00
return fmt . Errorf ( "failed to decode bucket-id: %v" , err )
2020-03-23 09:00:06 +00:00
}
2020-03-26 20:09:34 +00:00
}
if writeFlags . Bucket != "" {
filter . Name = & writeFlags . Bucket
}
if writeFlags . org . id != "" {
filter . OrganizationID , err = platform . IDFromString ( writeFlags . org . id )
if err != nil {
return fmt . Errorf ( "failed to decode org-id id: %v" , err )
2020-03-23 09:00:06 +00:00
}
2020-03-26 20:09:34 +00:00
}
if writeFlags . org . name != "" {
filter . Org = & writeFlags . org . name
}
2020-03-23 09:00:06 +00:00
2020-06-12 03:53:20 +00:00
ctx := signals . WithStandardSignals ( context . Background ( ) )
2020-03-26 20:09:34 +00:00
// create line reader
2020-06-12 03:53:20 +00:00
r , closer , err := writeFlags . createLineReader ( ctx , cmd , args )
2020-03-26 20:09:34 +00:00
if closer != nil {
defer closer . Close ( )
}
if err != nil {
return err
}
2020-08-14 19:57:18 +00:00
ac := flags . config ( )
2020-03-26 20:09:34 +00:00
// write to InfluxDB
s := write . Batcher {
2020-06-10 08:16:19 +00:00
Service : & ihttp . WriteService {
2020-08-14 19:57:18 +00:00
Addr : ac . Host ,
Token : ac . Token ,
2020-03-26 20:09:34 +00:00
Precision : writeFlags . Precision ,
InsecureSkipVerify : flags . skipVerify ,
} ,
2020-09-29 17:27:20 +00:00
MaxLineLength : writeFlags . MaxLineLength ,
2020-03-26 20:09:34 +00:00
}
2020-10-05 21:05:49 +00:00
if err := s . WriteTo ( ctx , filter , r ) ; err != nil && err != context . Canceled {
2020-03-26 20:09:34 +00:00
return fmt . Errorf ( "failed to write data: %v" , err )
2018-10-24 20:51:28 +00:00
}
2019-01-22 19:34:01 +00:00
2018-10-24 20:51:28 +00:00
return nil
2018-10-11 02:45:11 +00:00
}
2020-03-26 20:09:34 +00:00
func fluxWriteDryrunF ( cmd * cobra . Command , args [ ] string ) error {
2020-05-28 22:28:45 +00:00
writeFlags . dump ( args ) // print flags when in Debug mode
2020-03-26 20:09:34 +00:00
// create line reader
2020-06-12 03:53:20 +00:00
ctx := signals . WithStandardSignals ( context . Background ( ) )
r , closer , err := writeFlags . createLineReader ( ctx , cmd , args )
2020-03-26 20:09:34 +00:00
if closer != nil {
defer closer . Close ( )
}
if err != nil {
return err
}
// dry run
2020-05-28 22:28:45 +00:00
_ , err = io . Copy ( cmd . OutOrStdout ( ) , r )
2020-03-26 20:09:34 +00:00
if err != nil {
return fmt . Errorf ( "failed: %v" , err )
}
return nil
}
2020-05-28 22:28:45 +00:00
// IsCharacterDevice returns true if the supplied reader is a character device (a terminal)
func isCharacterDevice ( reader io . Reader ) bool {
file , isFile := reader . ( * os . File )
if ! isFile {
return false
}
info , err := file . Stat ( )
if err != nil {
return false
}
return ( info . Mode ( ) & os . ModeCharDevice ) == os . ModeCharDevice
}
2020-10-02 03:07:37 +00:00
var rateLimitRegexp = regexp . MustCompile ( ` ^(\d*\.?\d*)(B|kB|MB)/?(\d*)?(s|sec|m|min)$ ` )
var bytesUnitMultiplier = map [ string ] float64 { "B" : 1 , "kB" : 1024 , "MB" : 1_048_576 }
var timeUnitMultiplier = map [ string ] float64 { "s" : 1 , "sec" : 1 , "m" : 60 , "min" : 60 }
// ToBytesPerSecond converts rate from string to number. The supplied string
// value format must be COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional.
// All spaces are ignored, they can help with formatting. Examples: "5 MB / 5 min", 17kbs. 5.1MB5m.
func ToBytesPerSecond ( rateLimit string ) ( float64 , error ) {
// ignore all spaces
strVal := strings . ReplaceAll ( rateLimit , " " , "" )
if len ( strVal ) == 0 {
return 0 , nil
}
matches := rateLimitRegexp . FindStringSubmatch ( strVal )
if matches == nil {
return 0 , fmt . Errorf ( "invalid rate limit %q: it does not match format COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional, rexpexp: %v" , strVal , rateLimitRegexp )
}
bytes , err := strconv . ParseFloat ( matches [ 1 ] , 64 )
if err != nil {
return 0 , fmt . Errorf ( "invalid rate limit %q: '%v' is not count of bytes: %v" , strVal , matches [ 1 ] , err )
}
bytes = bytes * bytesUnitMultiplier [ matches [ 2 ] ]
var time float64
if len ( matches [ 3 ] ) == 0 {
time = 1 // number is not specified, for example 5kbs or 1Mb/s
} else {
int64Val , err := strconv . ParseUint ( matches [ 3 ] , 10 , 32 )
if err != nil {
return 0 , fmt . Errorf ( "invalid rate limit %q: time is out of range: %v" , strVal , err )
}
if int64Val <= 0 {
return 0 , fmt . Errorf ( "invalid rate limit %q: possitive time expected but %v supplied" , strVal , matches [ 3 ] )
}
time = float64 ( int64Val )
}
time = time * timeUnitMultiplier [ matches [ 4 ] ]
return bytes / time , nil
}