From 2920b55e194d921591106f47df0cb2da7121f0bf Mon Sep 17 00:00:00 2001 From: Daniel Moran Date: Tue, 2 Feb 2021 17:34:40 -0500 Subject: [PATCH] fix(http): fix passing of bucket ID by write-handler client (#20679) --- CHANGELOG.md | 1 + http/write_handler.go | 77 ++++-------------- http/write_handler_test.go | 118 +++++++++++++++++++++------- influxql/_v1tests/server_helpers.go | 2 +- mock/write_service.go | 8 +- tests/client.go | 9 ++- write.go | 1 - write/batcher.go | 7 -- write/batcher_test.go | 43 +++++----- 9 files changed, 134 insertions(+), 132 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df608b9bdd..a3d91f5fbd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ Replacement `tsi1` indexes will be automatically generated on startup for shards 1. [20495](https://github.com/influxdata/influxdb/pull/20495): Update Flux functions list in UI to reflect that `v1` package was renamed to `schema`. 1. [20669](https://github.com/influxdata/influxdb/pull/20669): Remove blank lines from payloads sent by `influx write`. 1. [20657](https://github.com/influxdata/influxdb/pull/20657): Allow for creating users without initial passwords in `influx user create`. +1. [20679](https://github.com/influxdata/influxdb/pull/20679): Fix incorrect "bucket not found" errors when passing `--bucket-id` to `influx write`. ## v2.0.3 [2020-12-14] diff --git a/http/write_handler.go b/http/write_handler.go index eff6ed334c..82ebd1ec94 100644 --- a/http/write_handler.go +++ b/http/write_handler.go @@ -292,66 +292,6 @@ type WriteService struct { var _ influxdb.WriteService = (*WriteService)(nil) -func (s *WriteService) Write(ctx context.Context, orgID, bucketID influxdb.ID, r io.Reader) error { - precision := s.Precision - if precision == "" { - precision = "ns" - } - - if !models.ValidPrecision(precision) { - return &influxdb.Error{ - Code: influxdb.EInvalid, - Op: "http/Write", - Msg: msgInvalidPrecision, - } - } - - u, err := NewURL(s.Addr, prefixWrite) - if err != nil { - return err - } - - r, err = compressWithGzip(r) - if err != nil { - return err - } - - req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), r) - if err != nil { - return err - } - - req.Header.Set("Content-Type", "text/plain; charset=utf-8") - req.Header.Set("Content-Encoding", "gzip") - SetToken(s.Token, req) - - org, err := orgID.Encode() - if err != nil { - return err - } - - bucket, err := bucketID.Encode() - if err != nil { - return err - } - - params := req.URL.Query() - params.Set("org", string(org)) - params.Set("bucket", string(bucket)) - params.Set("precision", string(precision)) - req.URL.RawQuery = params.Encode() - - hc := NewClient(u.Scheme, s.InsecureSkipVerify) - - resp, err := hc.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - - return CheckError(resp) -} - func compressWithGzip(data io.Reader) (io.Reader, error) { pr, pw := io.Pipe() gw := gzip.NewWriter(pw) @@ -401,10 +341,21 @@ func (s *WriteService) WriteTo(ctx context.Context, filter influxdb.BucketFilter SetToken(s.Token, req) params := req.URL.Query() - for key, param := range filter.QueryParams() { - params[key] = param + + // In other CLI commands that take either an ID or a name as input, the ID + // is prioritized and used to short-circuit looking up the name. We simulate + // the same behavior here for a consistent experience. + if filter.OrganizationID != nil && filter.OrganizationID.Valid() { + params.Set("org", filter.OrganizationID.String()) + } else if filter.Org != nil && *filter.Org != "" { + params.Set("org", *filter.Org) } - params.Set("precision", string(precision)) + if filter.ID != nil && filter.ID.Valid() { + params.Set("bucket", filter.ID.String()) + } else if filter.Name != nil && *filter.Name != "" { + params.Set("bucket", *filter.Name) + } + params.Set("precision", precision) req.URL.RawQuery = params.Encode() hc := NewClient(u.Scheme, s.InsecureSkipVerify) diff --git a/http/write_handler_test.go b/http/write_handler_test.go index 5cd84f93df..dec2c74ddb 100644 --- a/http/write_handler_test.go +++ b/http/write_handler_test.go @@ -18,39 +18,104 @@ import ( "github.com/influxdata/influxdb/v2/mock" influxtesting "github.com/influxdata/influxdb/v2/testing" "github.com/influxdata/influxdb/v2/tsdb" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) -func TestWriteService_Write(t *testing.T) { +func TestWriteService_WriteTo(t *testing.T) { type args struct { - org influxdb.ID - bucket influxdb.ID - r io.Reader + org string + orgId influxdb.ID + bucket string + bucketId influxdb.ID + r io.Reader } + + orgId := influxdb.ID(1) + org := "org" + bucketId := influxdb.ID(2) + bucket := "bucket" + tests := []struct { - name string - args args - status int - want string - wantErr bool + name string + args args + status int + want string + wantFilters influxdb.BucketFilter + wantErr bool }{ { + name: "write with org and bucket IDs", args: args{ - org: 1, - bucket: 2, + orgId: orgId, + bucketId: bucketId, + r: strings.NewReader("m,t1=v1 f1=2"), + }, + status: http.StatusNoContent, + want: "m,t1=v1 f1=2", + wantFilters: influxdb.BucketFilter{ + ID: &bucketId, + OrganizationID: &orgId, + }, + }, + { + name: "write with org ID and bucket name", + args: args{ + orgId: orgId, + bucket: bucket, r: strings.NewReader("m,t1=v1 f1=2"), }, status: http.StatusNoContent, want: "m,t1=v1 f1=2", + wantFilters: influxdb.BucketFilter{ + Name: &bucket, + OrganizationID: &orgId, + }, + }, + { + name: "write with org name and bucket ID", + args: args{ + org: org, + bucketId: bucketId, + r: strings.NewReader("m,t1=v1 f1=2"), + }, + status: http.StatusNoContent, + want: "m,t1=v1 f1=2", + wantFilters: influxdb.BucketFilter{ + ID: &bucketId, + Org: &org, + }, + }, + { + name: "write with org and bucket names", + args: args{ + org: org, + bucket: bucket, + r: strings.NewReader("m,t1=v1 f1=2"), + }, + status: http.StatusNoContent, + want: "m,t1=v1 f1=2", + wantFilters: influxdb.BucketFilter{ + Name: &bucket, + Org: &org, + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var org, bucket *influxdb.ID + var org, bucket *string + var orgId, bucketId *influxdb.ID var lp []byte ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - org, _ = influxdb.IDFromString(r.URL.Query().Get("org")) - bucket, _ = influxdb.IDFromString(r.URL.Query().Get("bucket")) + orgStr := r.URL.Query().Get("org") + bucketStr := r.URL.Query().Get("bucket") + var err error + if orgId, err = influxdb.IDFromString(orgStr); err != nil { + org = &orgStr + } + if bucketId, err = influxdb.IDFromString(bucketStr); err != nil { + bucket = &bucketStr + } defer r.Body.Close() in, _ := gzip.NewReader(r.Body) defer in.Close() @@ -60,20 +125,17 @@ func TestWriteService_Write(t *testing.T) { s := &WriteService{ Addr: ts.URL, } - if err := s.Write(context.Background(), tt.args.org, tt.args.bucket, tt.args.r); (err != nil) != tt.wantErr { - t.Errorf("WriteService.Write() error = %v, wantErr %v", err, tt.wantErr) - } - if got, want := *org, tt.args.org; got != want { - t.Errorf("WriteService.Write() org = %v, want %v", got, want) - } - - if got, want := *bucket, tt.args.bucket; got != want { - t.Errorf("WriteService.Write() bucket = %v, want %v", got, want) - } - - if got, want := string(lp), tt.want; got != want { - t.Errorf("WriteService.Write() = %v, want %v", got, want) - } + err := s.WriteTo( + context.Background(), + influxdb.BucketFilter{ID: &tt.args.bucketId, Name: &tt.args.bucket, OrganizationID: &tt.args.orgId, Org: &tt.args.org}, + tt.args.r, + ) + require.Equalf(t, err != nil, tt.wantErr, "error didn't match expectations: %v", err) + require.Equal(t, tt.wantFilters.OrganizationID, orgId) + require.Equal(t, tt.wantFilters.Org, org) + require.Equal(t, tt.wantFilters.ID, bucketId) + require.Equal(t, tt.wantFilters.Name, bucket) + require.Equal(t, tt.want, string(lp)) }) } } diff --git a/influxql/_v1tests/server_helpers.go b/influxql/_v1tests/server_helpers.go index 3f426bcb71..a2b8a5c3cf 100644 --- a/influxql/_v1tests/server_helpers.go +++ b/influxql/_v1tests/server_helpers.go @@ -190,7 +190,7 @@ func (qt *Test) init(ctx context.Context, t *testing.T, p *tests.DefaultPipeline func (qt *Test) writeTestData(ctx context.Context, t *testing.T, c *tests.Client) { t.Helper() for _, w := range qt.writes { - err := c.Write(ctx, qt.orgID, qt.bucketID, strings.NewReader(w.data)) + err := c.WriteTo(ctx, influxdb.BucketFilter{ID: &qt.bucketID, OrganizationID: &qt.orgID}, strings.NewReader(w.data)) require.NoError(t, err) } } diff --git a/mock/write_service.go b/mock/write_service.go index 0ea2c6f5d0..51bdc1b191 100644 --- a/mock/write_service.go +++ b/mock/write_service.go @@ -9,16 +9,10 @@ import ( // WriteService writes data read from the reader. type WriteService struct { - WriteF func(context.Context, platform.ID, platform.ID, io.Reader) error WriteToF func(context.Context, platform.BucketFilter, io.Reader) error } -// Write calls the mocked WriteF function with arguments. -func (s *WriteService) Write(ctx context.Context, org, bucket platform.ID, r io.Reader) error { - return s.WriteF(ctx, org, bucket, r) -} - -// Write calls the mocked WriteF function with arguments. +// WriteTo calls the mocked WriteToF function with arguments. func (s *WriteService) WriteTo(ctx context.Context, filter platform.BucketFilter, r io.Reader) error { return s.WriteToF(ctx, filter, r) } diff --git a/tests/client.go b/tests/client.go index 1a648df553..a547f532fb 100644 --- a/tests/client.go +++ b/tests/client.go @@ -84,7 +84,14 @@ func (c *Client) MustWriteBatch(points string) { // WriteBatch writes the current batch of points to the HTTP endpoint. func (c *Client) WriteBatch(points string) error { - return c.WriteService.Write(context.Background(), c.OrgID, c.BucketID, strings.NewReader(points)) + return c.WriteService.WriteTo( + context.Background(), + influxdb.BucketFilter{ + ID: &c.BucketID, + OrganizationID: &c.OrgID, + }, + strings.NewReader(points), + ) } // Query returns the CSV response from a flux query to the HTTP API. diff --git a/write.go b/write.go index 16f06029e8..0ab0f4c76e 100644 --- a/write.go +++ b/write.go @@ -7,6 +7,5 @@ import ( // WriteService writes data read from the reader. type WriteService interface { - Write(ctx context.Context, org, bucket ID, r io.Reader) error WriteTo(ctx context.Context, filter BucketFilter, r io.Reader) error } diff --git a/write/batcher.go b/write/batcher.go index 32245cd40c..02dd6abb8c 100644 --- a/write/batcher.go +++ b/write/batcher.go @@ -35,13 +35,6 @@ type Batcher struct { Service platform.WriteService // Service receives batches flushed from Batcher. } -// Write reads r in batches and writes to a target specified by org and bucket. -func (b *Batcher) Write(ctx context.Context, org, bucket platform.ID, r io.Reader) error { - return b.writeBytes(ctx, r, func(batch []byte) error { - return b.Service.Write(ctx, org, bucket, bytes.NewReader(batch)) - }) -} - // WriteTo reads r in batches and writes to a target specified by filter. func (b *Batcher) WriteTo(ctx context.Context, filter platform.BucketFilter, r io.Reader) error { return b.writeBytes(ctx, r, func(batch []byte) error { diff --git a/write/batcher_test.go b/write/batcher_test.go index fc09188e38..f0c24cab55 100644 --- a/write/batcher_test.go +++ b/write/batcher_test.go @@ -291,16 +291,7 @@ func TestBatcher_write(t *testing.T) { writeCalled := false var got string svc := &mock.WriteService{ - WriteF: func(ctx context.Context, org, bucket platform.ID, r io.Reader) error { - writeCalled = true - if tt.args.writeError { - return fmt.Errorf("error") - } - b, err := ioutil.ReadAll(r) - got = string(b) - return err - }, - WriteToF: func(ctx context.Context, filter platform.BucketFilter, r io.Reader) error { + WriteToF: func(ctx context.Context, _ platform.BucketFilter, r io.Reader) error { writeCalled = true if tt.args.writeError { return fmt.Errorf("error") @@ -317,7 +308,7 @@ func TestBatcher_write(t *testing.T) { Service: svc, } writeFn := func(batch []byte) error { - return svc.Write(ctx, tt.args.org, tt.args.bucket, bytes.NewReader(batch)) + return svc.WriteTo(ctx, platform.BucketFilter{ID: &tt.args.bucket, OrganizationID: &tt.args.org}, bytes.NewReader(batch)) } go b.write(ctx, writeFn, tt.args.lines, tt.args.errC) @@ -348,7 +339,7 @@ func TestBatcher_write(t *testing.T) { } } -func TestBatcher_Write(t *testing.T) { +func TestBatcher_WriteTo(t *testing.T) { createReader := func(data string) func() io.Reader { if data == "error" { return func() io.Reader { @@ -424,7 +415,7 @@ func TestBatcher_Write(t *testing.T) { gotFlushes int ) svc := &mock.WriteService{ - WriteF: func(ctx context.Context, org, bucket platform.ID, r io.Reader) error { + WriteToF: func(ctx context.Context, _ platform.BucketFilter, r io.Reader) error { if tt.args.writeError { return fmt.Errorf("error") } @@ -442,7 +433,11 @@ func TestBatcher_Write(t *testing.T) { } ctx := context.Background() - if err := b.Write(ctx, tt.args.org, tt.args.bucket, tt.args.r()); (err != nil) != tt.wantErr { + if err := b.WriteTo( + ctx, + platform.BucketFilter{ID: &tt.args.bucket, OrganizationID: &tt.args.org}, + tt.args.r(), + ); (err != nil) != tt.wantErr { t.Errorf("Batcher.Write() error = %v, wantErr %v", err, tt.wantErr) } @@ -462,7 +457,7 @@ func TestBatcher_Write(t *testing.T) { gotFlushes int ) svc := &mock.WriteService{ - WriteToF: func(ctx context.Context, filter platform.BucketFilter, r io.Reader) error { + WriteToF: func(ctx context.Context, _ platform.BucketFilter, r io.Reader) error { if tt.args.writeError { return fmt.Errorf("error") } @@ -498,9 +493,12 @@ func TestBatcher_Write(t *testing.T) { func TestBatcher_WriteTimeout(t *testing.T) { // mocking the write service here to either return an error // or get back all the bytes from the reader. + bucketId := platform.ID(2) + orgId := platform.ID(1) + var got string svc := &mock.WriteService{ - WriteF: func(ctx context.Context, org, bucket platform.ID, r io.Reader) error { + WriteToF: func(ctx context.Context, filter platform.BucketFilter, r io.Reader) error { b, err := ioutil.ReadAll(r) got = string(b) return err @@ -517,19 +515,16 @@ func TestBatcher_WriteTimeout(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - if err := b.Write(ctx, platform.ID(1), platform.ID(2), r); err != context.DeadlineExceeded { + if err := b.WriteTo(ctx, platform.BucketFilter{ID: &bucketId, OrganizationID: &orgId}, r); err != context.DeadlineExceeded { t.Errorf("Batcher.Write() with timeout error = %v", err) } - if got != "" { - t.Errorf(" Batcher.Write() with timeout got %s", got) - } + require.Empty(t, got, "Batcher.Write() with timeout received data") } func TestBatcher_WriteWithoutService(t *testing.T) { b := Batcher{} - err := b.Write(context.Background(), platform.ID(1), platform.ID(1), strings.NewReader("m1,t1=v1 f1=1")) - if err == nil || !strings.Contains(err.Error(), "write service required") { - t.Errorf(" Batcher.Write() error expected, but got %v", err) - } + err := b.WriteTo(context.Background(), platform.BucketFilter{}, strings.NewReader("m1,t1=v1 f1=1")) + require.Error(t, err) + require.Contains(t, err.Error(), "write service required") }