fix: add ability to define bucket to write to with a BucketFilter (#19658)
parent
a8c7254281
commit
fcb70c26d5
|
@ -117,7 +117,7 @@ func (f BucketFilter) QueryParams() map[string][]string {
|
|||
}
|
||||
|
||||
if f.Name != nil {
|
||||
qp["name"] = []string{*f.Name}
|
||||
qp["bucket"] = []string{*f.Name}
|
||||
}
|
||||
|
||||
if f.OrganizationID != nil {
|
||||
|
|
|
@ -601,14 +601,3 @@ func writeJSON(w io.Writer, v interface{}) error {
|
|||
enc.SetIndent("", "\t")
|
||||
return enc.Encode(v)
|
||||
}
|
||||
|
||||
func newBucketService() (influxdb.BucketService, error) {
|
||||
client, err := newHTTPClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &http.BucketService{
|
||||
Client: client,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -279,12 +279,11 @@ func fluxWriteF(cmd *cobra.Command, args []string) error {
|
|||
return fmt.Errorf("invalid precision")
|
||||
}
|
||||
|
||||
bs, err := newBucketService()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var (
|
||||
filter platform.BucketFilter
|
||||
err error
|
||||
)
|
||||
|
||||
var filter platform.BucketFilter
|
||||
if writeFlags.BucketID != "" {
|
||||
filter.ID, err = platform.IDFromString(writeFlags.BucketID)
|
||||
if err != nil {
|
||||
|
@ -306,21 +305,6 @@ func fluxWriteF(cmd *cobra.Command, args []string) error {
|
|||
}
|
||||
|
||||
ctx := signals.WithStandardSignals(context.Background())
|
||||
buckets, n, err := bs.FindBuckets(ctx, filter)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve buckets: %v", err)
|
||||
}
|
||||
|
||||
if n == 0 {
|
||||
if writeFlags.Bucket != "" {
|
||||
return fmt.Errorf("bucket %q was not found", writeFlags.Bucket)
|
||||
}
|
||||
|
||||
if writeFlags.BucketID != "" {
|
||||
return fmt.Errorf("bucket with id %q does not exist", writeFlags.BucketID)
|
||||
}
|
||||
}
|
||||
bucketID, orgID := buckets[0].ID, buckets[0].OrgID
|
||||
|
||||
// create line reader
|
||||
r, closer, err := writeFlags.createLineReader(ctx, cmd, args)
|
||||
|
@ -342,7 +326,7 @@ func fluxWriteF(cmd *cobra.Command, args []string) error {
|
|||
},
|
||||
MaxLineLength: writeFlags.MaxLineLength,
|
||||
}
|
||||
if err := s.Write(ctx, orgID, bucketID, r); err != nil && err != context.Canceled {
|
||||
if err := s.WriteTo(ctx, filter, r); err != nil && err != context.Canceled {
|
||||
return fmt.Errorf("failed to write data: %v", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -375,45 +375,6 @@ func Test_fluxWriteF(t *testing.T) {
|
|||
var lineData []byte // stores line data that the client writes
|
||||
// use a test HTTP server to mock response
|
||||
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
url := req.URL.String()
|
||||
// fmt.Println(url)
|
||||
switch {
|
||||
case strings.Contains(url, "error"): // fail when error is in ULR
|
||||
rw.WriteHeader(http.StatusInternalServerError)
|
||||
rw.Write([]byte(`ERROR`))
|
||||
return
|
||||
case strings.Contains(url, "empty"): // return empty buckets response
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
rw.Write([]byte(`{
|
||||
"links":{
|
||||
"self":"/api/v2/buckets?descending=false\u0026limit=20\u0026offset=0\u0026orgID=b112ec3528efa3b4"
|
||||
},
|
||||
"buckets":[]
|
||||
}`))
|
||||
return
|
||||
case strings.HasPrefix(url, "/api/v2/buckets"): // return example bucket
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
rw.Write([]byte(`{
|
||||
"links":{
|
||||
"self":"/api/v2/buckets?descending=false\u0026limit=20\u0026offset=0\u0026orgID=b112ec3528efa3b4"
|
||||
},
|
||||
"buckets":[
|
||||
{"id":"4f14589c26df8286","orgID":"b112ec3528efa3b4","type":"user","name":"my-bucket","retentionRules":[],
|
||||
"createdAt":"2020-04-04T11:43:37.762325688Z","updatedAt":"2020-04-04T11:43:37.762325786Z",
|
||||
"links":{
|
||||
"labels":"/api/v2/buckets/4f14589c26df8286/labels",
|
||||
"logs":"/api/v2/buckets/4f14589c26df8286/logs",
|
||||
"members":"/api/v2/buckets/4f14589c26df8286/members",
|
||||
"org":"/api/v2/orgs/b112ec3528efa3b4",
|
||||
"owners":"/api/v2/buckets/4f14589c26df8286/owners",
|
||||
"self":"/api/v2/buckets/4f14589c26df8286",
|
||||
"write":"/api/v2/write?org=b112ec3528efa3b4\u0026bucket=4f14589c26df8286"
|
||||
},"labels":[]
|
||||
}
|
||||
]
|
||||
}`))
|
||||
return
|
||||
}
|
||||
// consume and remember request contents
|
||||
var requestData io.Reader = req.Body
|
||||
if h := req.Header["Content-Encoding"]; len(h) > 0 && strings.Contains(h[0], "gzip") {
|
||||
|
@ -496,20 +457,16 @@ func Test_fluxWriteF(t *testing.T) {
|
|||
t.Run("validates error when failed to retrive buckets", func(t *testing.T) {
|
||||
useTestServer()
|
||||
command := cmdWrite(&globalFlags{}, genericCLIOpts{w: ioutil.Discard})
|
||||
// note: my-error-bucket parameter causes the test server to fail
|
||||
command.SetArgs([]string{"--format", "csv", "--org", "my-org", "--bucket", "my-error-bucket"})
|
||||
err := command.Execute()
|
||||
require.Contains(t, fmt.Sprintf("%s", err), "bucket")
|
||||
require.Nil(t, command.Execute())
|
||||
})
|
||||
|
||||
// validation: no such bucket found
|
||||
t.Run("validates no such bucket found", func(t *testing.T) {
|
||||
useTestServer()
|
||||
command := cmdWrite(&globalFlags{}, genericCLIOpts{w: ioutil.Discard})
|
||||
// note: my-empty-org parameter causes the test server to return no buckets
|
||||
command.SetArgs([]string{"--format", "csv", "--org", "my-empty-org", "--bucket", "my-bucket"})
|
||||
err := command.Execute()
|
||||
require.Contains(t, fmt.Sprintf("%s", err), "bucket")
|
||||
require.Nil(t, command.Execute())
|
||||
})
|
||||
|
||||
// validation: no such bucket-id found
|
||||
|
|
|
@ -354,3 +354,55 @@ func compressWithGzip(data io.Reader) (io.Reader, error) {
|
|||
|
||||
return pr, err
|
||||
}
|
||||
|
||||
// WriteTo writes to the bucket matching the filter.
|
||||
func (s *WriteService) WriteTo(ctx context.Context, filter influxdb.BucketFilter, r io.Reader) error {
|
||||
precision := s.Precision
|
||||
if precision == "" {
|
||||
precision = "ns"
|
||||
}
|
||||
|
||||
if !models.ValidPrecision(precision) {
|
||||
return &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Op: "http/Write",
|
||||
Msg: msgInvalidPrecision,
|
||||
}
|
||||
}
|
||||
|
||||
u, err := NewURL(s.Addr, prefixWrite)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r, err = compressWithGzip(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
|
||||
req.Header.Set("Content-Encoding", "gzip")
|
||||
SetToken(s.Token, req)
|
||||
|
||||
params := req.URL.Query()
|
||||
for key, param := range filter.QueryParams() {
|
||||
params[key] = param
|
||||
}
|
||||
params.Set("precision", string(precision))
|
||||
req.URL.RawQuery = params.Encode()
|
||||
|
||||
hc := NewClient(u.Scheme, s.InsecureSkipVerify)
|
||||
|
||||
resp, err := hc.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
return CheckError(resp)
|
||||
}
|
||||
|
|
|
@ -9,10 +9,16 @@ import (
|
|||
|
||||
// WriteService writes data read from the reader.
|
||||
type WriteService struct {
|
||||
WriteF func(context.Context, platform.ID, platform.ID, io.Reader) error
|
||||
WriteF func(context.Context, platform.ID, platform.ID, io.Reader) error
|
||||
WriteToF func(context.Context, platform.BucketFilter, io.Reader) error
|
||||
}
|
||||
|
||||
// Write calls the mocked WriteF function with arguments.
|
||||
func (s *WriteService) Write(ctx context.Context, org, bucket platform.ID, r io.Reader) error {
|
||||
return s.WriteF(ctx, org, bucket, r)
|
||||
}
|
||||
|
||||
// Write calls the mocked WriteF function with arguments.
|
||||
func (s *WriteService) WriteTo(ctx context.Context, filter platform.BucketFilter, r io.Reader) error {
|
||||
return s.WriteToF(ctx, filter, r)
|
||||
}
|
||||
|
|
1
write.go
1
write.go
|
@ -8,4 +8,5 @@ import (
|
|||
// WriteService writes data read from the reader.
|
||||
type WriteService interface {
|
||||
Write(ctx context.Context, org, bucket ID, r io.Reader) error
|
||||
WriteTo(ctx context.Context, filter BucketFilter, r io.Reader) error
|
||||
}
|
||||
|
|
|
@ -154,6 +154,94 @@ func (b *Batcher) write(ctx context.Context, org, bucket platform.ID, lines <-ch
|
|||
errC <- nil
|
||||
}
|
||||
|
||||
func (b *Batcher) WriteTo(ctx context.Context, filter platform.BucketFilter, r io.Reader) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
if b.Service == nil {
|
||||
return fmt.Errorf("destination write service required")
|
||||
}
|
||||
|
||||
lines := make(chan []byte)
|
||||
|
||||
errC := make(chan error, 2)
|
||||
go b.writeTo(ctx, filter, lines, errC)
|
||||
go b.read(ctx, r, lines, errC)
|
||||
|
||||
// we loop twice to check if both read and write have an error. if read exits
|
||||
// cleanly, then we still want to wait for write.
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-errC:
|
||||
// onky if there is any error, exit immediately.
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// finishes when the lines channel is closed or context is done.
|
||||
// if an error occurs while writing data to the write service, the error is send in the
|
||||
// errC channel and the function returns.
|
||||
func (b *Batcher) writeTo(ctx context.Context, filter platform.BucketFilter, lines <-chan []byte, errC chan<- error) {
|
||||
flushInterval := b.MaxFlushInterval
|
||||
if flushInterval == 0 {
|
||||
flushInterval = DefaultInterval
|
||||
}
|
||||
|
||||
maxBytes := b.MaxFlushBytes
|
||||
if maxBytes == 0 {
|
||||
maxBytes = DefaultMaxBytes
|
||||
}
|
||||
|
||||
timer := time.NewTimer(flushInterval)
|
||||
defer func() { _ = timer.Stop() }()
|
||||
|
||||
buf := make([]byte, 0, maxBytes)
|
||||
r := bytes.NewReader(buf)
|
||||
|
||||
var line []byte
|
||||
var more = true
|
||||
// if read closes the channel normally, exit the loop
|
||||
for more {
|
||||
select {
|
||||
case line, more = <-lines:
|
||||
if more {
|
||||
buf = append(buf, line...)
|
||||
}
|
||||
// write if we exceed the max lines OR read routine has finished
|
||||
if len(buf) >= maxBytes || (!more && len(buf) > 0) {
|
||||
r.Reset(buf)
|
||||
timer.Reset(flushInterval)
|
||||
if err := b.Service.WriteTo(ctx, filter, r); err != nil {
|
||||
errC <- err
|
||||
return
|
||||
}
|
||||
buf = buf[:0]
|
||||
}
|
||||
case <-timer.C:
|
||||
if len(buf) > 0 {
|
||||
r.Reset(buf)
|
||||
timer.Reset(flushInterval)
|
||||
if err := b.Service.WriteTo(ctx, filter, r); err != nil {
|
||||
errC <- err
|
||||
return
|
||||
}
|
||||
buf = buf[:0]
|
||||
}
|
||||
case <-ctx.Done():
|
||||
errC <- ctx.Err()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
errC <- nil
|
||||
}
|
||||
|
||||
// ScanLines is used in bufio.Scanner.Split to split lines of line protocol.
|
||||
func ScanLines(data []byte, atEOF bool) (advance int, token []byte, err error) {
|
||||
if atEOF && len(data) == 0 {
|
||||
|
|
|
@ -280,6 +280,14 @@ func TestBatcher_write(t *testing.T) {
|
|||
got = string(b)
|
||||
return err
|
||||
},
|
||||
WriteToF: func(ctx context.Context, filter platform.BucketFilter, r io.Reader) error {
|
||||
if tt.args.writeError {
|
||||
return fmt.Errorf("error")
|
||||
}
|
||||
b, err := ioutil.ReadAll(r)
|
||||
got = string(b)
|
||||
return err
|
||||
},
|
||||
}
|
||||
|
||||
b := &Batcher{
|
||||
|
|
Loading…
Reference in New Issue