http(write): support timestamp precision
parent
087e7b4f1d
commit
d2ecf8aad6
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/influxdata/platform"
|
||||
"github.com/influxdata/platform/http"
|
||||
"github.com/influxdata/platform/kit/signals"
|
||||
"github.com/influxdata/platform/models"
|
||||
"github.com/influxdata/platform/write"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
|
@ -25,10 +26,11 @@ var writeCmd = &cobra.Command{
|
|||
}
|
||||
|
||||
var writeFlags struct {
|
||||
OrgID string
|
||||
Org string
|
||||
BucketID string
|
||||
Bucket string
|
||||
OrgID string
|
||||
Org string
|
||||
BucketID string
|
||||
Bucket string
|
||||
Precision string
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@ -55,6 +57,12 @@ func init() {
|
|||
if h := viper.GetString("BUCKET_NAME"); h != "" {
|
||||
writeFlags.Bucket = h
|
||||
}
|
||||
|
||||
writeCmd.PersistentFlags().StringVarP(&writeFlags.Org, "precision", "p", "ns", "precision of the timestamps of the lines")
|
||||
viper.BindEnv("PRECISION")
|
||||
if p := viper.GetString("PRECISION"); p != "" {
|
||||
writeFlags.Precision = p
|
||||
}
|
||||
}
|
||||
|
||||
func fluxWriteF(cmd *cobra.Command, args []string) error {
|
||||
|
@ -70,6 +78,11 @@ func fluxWriteF(cmd *cobra.Command, args []string) error {
|
|||
return fmt.Errorf("Please specify one of bucket or bucket-id")
|
||||
}
|
||||
|
||||
if !models.ValidPrecision(writeFlags.Precision) {
|
||||
cmd.Usage()
|
||||
return fmt.Errorf("invalid precision")
|
||||
}
|
||||
|
||||
bs := &http.BucketService{
|
||||
Addr: flags.host,
|
||||
Token: flags.token,
|
||||
|
@ -125,8 +138,9 @@ func fluxWriteF(cmd *cobra.Command, args []string) error {
|
|||
|
||||
s := write.Batcher{
|
||||
Service: &http.WriteService{
|
||||
Addr: flags.host,
|
||||
Token: flags.token,
|
||||
Addr: flags.host,
|
||||
Token: flags.token,
|
||||
Precision: writeFlags.Precision,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -10,7 +10,9 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
ErrorHeader = "X-Influx-Error"
|
||||
// ErrorHeader is the standard location for influx errors to be reported.
|
||||
ErrorHeader = "X-Influx-Error"
|
||||
// ReferenceHeader is the header for the reference error reference code.
|
||||
ReferenceHeader = "X-Influx-Reference"
|
||||
|
||||
errorHeaderMaxLength = 256
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/platform"
|
||||
pcontext "github.com/influxdata/platform/context"
|
||||
|
@ -74,7 +75,7 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
req := decodeWriteRequest(ctx, r)
|
||||
req, err := decodeWriteRequest(ctx, r)
|
||||
if err != nil {
|
||||
EncodeError(ctx, err, w)
|
||||
return
|
||||
|
@ -148,7 +149,7 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
points, err := models.ParsePoints(data)
|
||||
points, err := models.ParsePointsWithPrecision(data, time.Now(), req.Precision)
|
||||
if err != nil {
|
||||
logger.Info("Error parsing points", zap.Error(err))
|
||||
EncodeError(ctx, err, w)
|
||||
|
@ -170,30 +171,50 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
|
|||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func decodeWriteRequest(ctx context.Context, r *http.Request) *postWriteRequest {
|
||||
func decodeWriteRequest(ctx context.Context, r *http.Request) (*postWriteRequest, error) {
|
||||
qp := r.URL.Query()
|
||||
p := qp.Get("precision")
|
||||
if p == "" {
|
||||
p = "ns"
|
||||
}
|
||||
|
||||
if !models.ValidPrecision(p) {
|
||||
return nil, errors.InvalidDataf("invalid precision")
|
||||
}
|
||||
|
||||
return &postWriteRequest{
|
||||
Bucket: qp.Get("bucket"),
|
||||
Org: qp.Get("org"),
|
||||
}
|
||||
Bucket: qp.Get("bucket"),
|
||||
Org: qp.Get("org"),
|
||||
Precision: p,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type postWriteRequest struct {
|
||||
Org string
|
||||
Bucket string
|
||||
Org string
|
||||
Bucket string
|
||||
Precision string
|
||||
}
|
||||
|
||||
// WriteService sends data over HTTP to influxdb via line protocol.
|
||||
type WriteService struct {
|
||||
Addr string
|
||||
Token string
|
||||
Precision string
|
||||
InsecureSkipVerify bool
|
||||
}
|
||||
|
||||
var _ platform.WriteService = (*WriteService)(nil)
|
||||
|
||||
func (s *WriteService) Write(ctx context.Context, orgID, bucketID platform.ID, r io.Reader) error {
|
||||
precision := s.Precision
|
||||
if precision == "" {
|
||||
precision = "ns"
|
||||
}
|
||||
|
||||
if !models.ValidPrecision(precision) {
|
||||
return fmt.Errorf("invalid precision")
|
||||
}
|
||||
|
||||
u, err := newURL(s.Addr, writePath)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -226,6 +247,7 @@ func (s *WriteService) Write(ctx context.Context, orgID, bucketID platform.ID, r
|
|||
params := req.URL.Query()
|
||||
params.Set("org", string(org))
|
||||
params.Set("bucket", string(bucket))
|
||||
params.Set("precision", string(precision))
|
||||
req.URL.RawQuery = params.Encode()
|
||||
|
||||
hc := newClient(u.Scheme, s.InsecureSkipVerify)
|
||||
|
|
|
@ -315,6 +315,16 @@ func ParseName(buf []byte) []byte {
|
|||
return UnescapeMeasurement(name)
|
||||
}
|
||||
|
||||
// ValidPrecision checks if the precision is known.
|
||||
func ValidPrecision(precision string) bool {
|
||||
switch precision {
|
||||
case "n", "ns", "u", "us", "ms", "s":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// ParsePointsWithPrecision is similar to ParsePoints, but allows the
|
||||
// caller to provide a precision for time.
|
||||
//
|
||||
|
|
Loading…
Reference in New Issue