2018-10-11 02:45:11 +00:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"os"
|
|
|
|
"strings"
|
|
|
|
|
2019-01-08 00:37:16 +00:00
|
|
|
platform "github.com/influxdata/influxdb"
|
|
|
|
"github.com/influxdata/influxdb/http"
|
|
|
|
"github.com/influxdata/influxdb/kit/signals"
|
|
|
|
"github.com/influxdata/influxdb/models"
|
|
|
|
"github.com/influxdata/influxdb/write"
|
2018-10-11 02:45:11 +00:00
|
|
|
"github.com/spf13/cobra"
|
|
|
|
"github.com/spf13/viper"
|
|
|
|
)
|
|
|
|
|
|
|
|
var writeCmd = &cobra.Command{
|
2018-10-23 14:17:23 +00:00
|
|
|
Use: "write line protocol or @/path/to/points.txt",
|
2019-01-14 18:54:53 +00:00
|
|
|
Short: "Write points to InfluxDB",
|
|
|
|
Long: `Write a single line of line protocol to InfluxDB,
|
|
|
|
or add an entire file specified with an @ prefix.`,
|
2018-10-11 02:45:11 +00:00
|
|
|
Args: cobra.ExactArgs(1),
|
2019-01-22 18:19:26 +00:00
|
|
|
RunE: wrapCheckSetup(fluxWriteF),
|
2018-10-11 02:45:11 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var writeFlags struct {
|
2018-10-26 02:23:50 +00:00
|
|
|
OrgID string
|
|
|
|
Org string
|
|
|
|
BucketID string
|
|
|
|
Bucket string
|
|
|
|
Precision string
|
2018-10-11 02:45:11 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func init() {
|
2019-01-14 18:54:53 +00:00
|
|
|
writeCmd.PersistentFlags().StringVar(&writeFlags.OrgID, "org-id", "", "The ID of the organization that owns the bucket")
|
2018-10-11 02:45:11 +00:00
|
|
|
viper.BindEnv("ORG_ID")
|
|
|
|
if h := viper.GetString("ORG_ID"); h != "" {
|
|
|
|
writeFlags.OrgID = h
|
|
|
|
}
|
|
|
|
|
2019-01-14 18:54:53 +00:00
|
|
|
writeCmd.PersistentFlags().StringVarP(&writeFlags.Org, "org", "o", "", "The name of the organization that owns the bucket")
|
2018-10-11 02:45:11 +00:00
|
|
|
viper.BindEnv("ORG")
|
|
|
|
if h := viper.GetString("ORG"); h != "" {
|
|
|
|
writeFlags.Org = h
|
|
|
|
}
|
|
|
|
|
2019-01-14 18:54:53 +00:00
|
|
|
writeCmd.PersistentFlags().StringVar(&writeFlags.BucketID, "bucket-id", "", "The ID of destination bucket")
|
2018-10-11 02:45:11 +00:00
|
|
|
viper.BindEnv("BUCKET_ID")
|
|
|
|
if h := viper.GetString("BUCKET_ID"); h != "" {
|
|
|
|
writeFlags.BucketID = h
|
|
|
|
}
|
|
|
|
|
2019-01-14 18:54:53 +00:00
|
|
|
writeCmd.PersistentFlags().StringVarP(&writeFlags.Bucket, "bucket", "b", "", "The name of destination bucket")
|
2018-10-11 02:45:11 +00:00
|
|
|
viper.BindEnv("BUCKET_NAME")
|
2018-10-23 14:17:23 +00:00
|
|
|
if h := viper.GetString("BUCKET_NAME"); h != "" {
|
2018-10-11 02:45:11 +00:00
|
|
|
writeFlags.Bucket = h
|
|
|
|
}
|
2018-10-26 02:23:50 +00:00
|
|
|
|
2019-01-14 18:54:53 +00:00
|
|
|
writeCmd.PersistentFlags().StringVarP(&writeFlags.Precision, "precision", "p", "ns", "Precision of the timestamps of the lines")
|
2018-10-26 02:23:50 +00:00
|
|
|
viper.BindEnv("PRECISION")
|
|
|
|
if p := viper.GetString("PRECISION"); p != "" {
|
|
|
|
writeFlags.Precision = p
|
|
|
|
}
|
2018-10-11 02:45:11 +00:00
|
|
|
}
|
|
|
|
|
2018-10-23 14:59:52 +00:00
|
|
|
func fluxWriteF(cmd *cobra.Command, args []string) error {
|
2018-10-23 14:17:23 +00:00
|
|
|
ctx := context.Background()
|
2018-10-11 02:45:11 +00:00
|
|
|
|
|
|
|
if writeFlags.Org != "" && writeFlags.OrgID != "" {
|
|
|
|
cmd.Usage()
|
2018-11-21 14:22:35 +00:00
|
|
|
return fmt.Errorf("please specify one of org or org-id")
|
2018-10-11 02:45:11 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if writeFlags.Bucket != "" && writeFlags.BucketID != "" {
|
|
|
|
cmd.Usage()
|
2018-11-21 14:22:35 +00:00
|
|
|
return fmt.Errorf("please specify one of bucket or bucket-id")
|
2018-10-11 02:45:11 +00:00
|
|
|
}
|
|
|
|
|
2018-10-26 02:23:50 +00:00
|
|
|
if !models.ValidPrecision(writeFlags.Precision) {
|
|
|
|
cmd.Usage()
|
|
|
|
return fmt.Errorf("invalid precision")
|
|
|
|
}
|
|
|
|
|
2018-10-11 02:45:11 +00:00
|
|
|
bs := &http.BucketService{
|
|
|
|
Addr: flags.host,
|
|
|
|
Token: flags.token,
|
|
|
|
}
|
|
|
|
|
2018-10-12 02:49:47 +00:00
|
|
|
var err error
|
2018-10-11 02:45:11 +00:00
|
|
|
filter := platform.BucketFilter{}
|
2018-10-12 02:49:47 +00:00
|
|
|
|
2018-10-11 02:45:11 +00:00
|
|
|
if writeFlags.BucketID != "" {
|
2018-10-12 02:49:47 +00:00
|
|
|
filter.ID, err = platform.IDFromString(writeFlags.BucketID)
|
2018-10-11 02:45:11 +00:00
|
|
|
if err != nil {
|
2019-01-22 19:34:01 +00:00
|
|
|
return fmt.Errorf("failed to decode bucket-id: %v", err)
|
2018-10-11 02:45:11 +00:00
|
|
|
}
|
|
|
|
}
|
2018-10-12 02:49:47 +00:00
|
|
|
if writeFlags.Bucket != "" {
|
|
|
|
filter.Name = &writeFlags.Bucket
|
|
|
|
}
|
2018-10-11 02:45:11 +00:00
|
|
|
|
|
|
|
if writeFlags.OrgID != "" {
|
2018-10-12 02:49:47 +00:00
|
|
|
filter.OrganizationID, err = platform.IDFromString(writeFlags.OrgID)
|
2018-10-11 02:45:11 +00:00
|
|
|
if err != nil {
|
2019-01-22 19:34:01 +00:00
|
|
|
return fmt.Errorf("failed to decode org-id id: %v", err)
|
2018-10-11 02:45:11 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if writeFlags.Org != "" {
|
2019-04-09 21:26:54 +00:00
|
|
|
filter.Org = &writeFlags.Org
|
2018-10-11 02:45:11 +00:00
|
|
|
}
|
|
|
|
|
2018-10-23 14:17:23 +00:00
|
|
|
buckets, n, err := bs.FindBuckets(ctx, filter)
|
2018-10-11 02:45:11 +00:00
|
|
|
if err != nil {
|
2019-01-22 19:34:01 +00:00
|
|
|
return fmt.Errorf("failed to retrieve buckets: %v", err)
|
2018-10-11 02:45:11 +00:00
|
|
|
}
|
|
|
|
|
2018-10-23 14:17:23 +00:00
|
|
|
if n == 0 {
|
2019-01-22 19:34:01 +00:00
|
|
|
if writeFlags.Bucket != "" {
|
|
|
|
return fmt.Errorf("bucket %q was not found", writeFlags.Bucket)
|
|
|
|
}
|
|
|
|
|
|
|
|
if writeFlags.BucketID != "" {
|
|
|
|
return fmt.Errorf("bucket with id %q does not exist", writeFlags.BucketID)
|
|
|
|
}
|
2018-10-23 14:17:23 +00:00
|
|
|
}
|
|
|
|
|
2019-04-10 21:16:35 +00:00
|
|
|
bucketID, orgID := buckets[0].ID, buckets[0].OrgID
|
2018-10-11 02:45:11 +00:00
|
|
|
|
|
|
|
var r io.Reader
|
|
|
|
if args[0] == "-" {
|
|
|
|
r = os.Stdin
|
|
|
|
} else if len(args[0]) > 0 && args[0][0] == '@' {
|
|
|
|
f, err := os.Open(args[0][1:])
|
|
|
|
if err != nil {
|
2019-01-22 19:34:01 +00:00
|
|
|
return fmt.Errorf("failed to open %q: %v", args[0][1:], err)
|
2018-10-11 02:45:11 +00:00
|
|
|
}
|
|
|
|
defer f.Close()
|
|
|
|
r = f
|
|
|
|
} else {
|
|
|
|
r = strings.NewReader(args[0])
|
|
|
|
}
|
|
|
|
|
2018-10-24 20:51:28 +00:00
|
|
|
s := write.Batcher{
|
|
|
|
Service: &http.WriteService{
|
2018-10-26 02:23:50 +00:00
|
|
|
Addr: flags.host,
|
|
|
|
Token: flags.token,
|
|
|
|
Precision: writeFlags.Precision,
|
2018-10-24 20:51:28 +00:00
|
|
|
},
|
2018-10-11 02:45:11 +00:00
|
|
|
}
|
|
|
|
|
2018-10-24 20:51:28 +00:00
|
|
|
ctx = signals.WithStandardSignals(ctx)
|
2019-01-22 19:34:01 +00:00
|
|
|
if err := s.Write(ctx, orgID, bucketID, r); err != nil && err != context.Canceled {
|
|
|
|
return fmt.Errorf("failed to write data: %v", err)
|
2018-10-24 20:51:28 +00:00
|
|
|
}
|
2019-01-22 19:34:01 +00:00
|
|
|
|
2018-10-24 20:51:28 +00:00
|
|
|
return nil
|
2018-10-11 02:45:11 +00:00
|
|
|
}
|