fix(http): fix passing of bucket ID by write-handler client (#20679)

pull/20680/head^2
Daniel Moran 2021-02-02 17:34:40 -05:00 committed by GitHub
parent a0d962cc23
commit 2920b55e19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 134 additions and 132 deletions

View File

@ -62,6 +62,7 @@ Replacement `tsi1` indexes will be automatically generated on startup for shards
1. [20495](https://github.com/influxdata/influxdb/pull/20495): Update Flux functions list in UI to reflect that `v1` package was renamed to `schema`.
1. [20669](https://github.com/influxdata/influxdb/pull/20669): Remove blank lines from payloads sent by `influx write`.
1. [20657](https://github.com/influxdata/influxdb/pull/20657): Allow for creating users without initial passwords in `influx user create`.
1. [20679](https://github.com/influxdata/influxdb/pull/20679): Fix incorrect "bucket not found" errors when passing `--bucket-id` to `influx write`.
## v2.0.3 [2020-12-14]

View File

@ -292,66 +292,6 @@ type WriteService struct {
var _ influxdb.WriteService = (*WriteService)(nil)
func (s *WriteService) Write(ctx context.Context, orgID, bucketID influxdb.ID, r io.Reader) error {
precision := s.Precision
if precision == "" {
precision = "ns"
}
if !models.ValidPrecision(precision) {
return &influxdb.Error{
Code: influxdb.EInvalid,
Op: "http/Write",
Msg: msgInvalidPrecision,
}
}
u, err := NewURL(s.Addr, prefixWrite)
if err != nil {
return err
}
r, err = compressWithGzip(r)
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), r)
if err != nil {
return err
}
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
req.Header.Set("Content-Encoding", "gzip")
SetToken(s.Token, req)
org, err := orgID.Encode()
if err != nil {
return err
}
bucket, err := bucketID.Encode()
if err != nil {
return err
}
params := req.URL.Query()
params.Set("org", string(org))
params.Set("bucket", string(bucket))
params.Set("precision", string(precision))
req.URL.RawQuery = params.Encode()
hc := NewClient(u.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
return CheckError(resp)
}
func compressWithGzip(data io.Reader) (io.Reader, error) {
pr, pw := io.Pipe()
gw := gzip.NewWriter(pw)
@ -401,10 +341,21 @@ func (s *WriteService) WriteTo(ctx context.Context, filter influxdb.BucketFilter
SetToken(s.Token, req)
params := req.URL.Query()
for key, param := range filter.QueryParams() {
params[key] = param
// In other CLI commands that take either an ID or a name as input, the ID
// is prioritized and used to short-circuit looking up the name. We simulate
// the same behavior here for a consistent experience.
if filter.OrganizationID != nil && filter.OrganizationID.Valid() {
params.Set("org", filter.OrganizationID.String())
} else if filter.Org != nil && *filter.Org != "" {
params.Set("org", *filter.Org)
}
params.Set("precision", string(precision))
if filter.ID != nil && filter.ID.Valid() {
params.Set("bucket", filter.ID.String())
} else if filter.Name != nil && *filter.Name != "" {
params.Set("bucket", *filter.Name)
}
params.Set("precision", precision)
req.URL.RawQuery = params.Encode()
hc := NewClient(u.Scheme, s.InsecureSkipVerify)

View File

@ -18,39 +18,104 @@ import (
"github.com/influxdata/influxdb/v2/mock"
influxtesting "github.com/influxdata/influxdb/v2/testing"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
func TestWriteService_Write(t *testing.T) {
func TestWriteService_WriteTo(t *testing.T) {
type args struct {
org influxdb.ID
bucket influxdb.ID
r io.Reader
org string
orgId influxdb.ID
bucket string
bucketId influxdb.ID
r io.Reader
}
orgId := influxdb.ID(1)
org := "org"
bucketId := influxdb.ID(2)
bucket := "bucket"
tests := []struct {
name string
args args
status int
want string
wantErr bool
name string
args args
status int
want string
wantFilters influxdb.BucketFilter
wantErr bool
}{
{
name: "write with org and bucket IDs",
args: args{
org: 1,
bucket: 2,
orgId: orgId,
bucketId: bucketId,
r: strings.NewReader("m,t1=v1 f1=2"),
},
status: http.StatusNoContent,
want: "m,t1=v1 f1=2",
wantFilters: influxdb.BucketFilter{
ID: &bucketId,
OrganizationID: &orgId,
},
},
{
name: "write with org ID and bucket name",
args: args{
orgId: orgId,
bucket: bucket,
r: strings.NewReader("m,t1=v1 f1=2"),
},
status: http.StatusNoContent,
want: "m,t1=v1 f1=2",
wantFilters: influxdb.BucketFilter{
Name: &bucket,
OrganizationID: &orgId,
},
},
{
name: "write with org name and bucket ID",
args: args{
org: org,
bucketId: bucketId,
r: strings.NewReader("m,t1=v1 f1=2"),
},
status: http.StatusNoContent,
want: "m,t1=v1 f1=2",
wantFilters: influxdb.BucketFilter{
ID: &bucketId,
Org: &org,
},
},
{
name: "write with org and bucket names",
args: args{
org: org,
bucket: bucket,
r: strings.NewReader("m,t1=v1 f1=2"),
},
status: http.StatusNoContent,
want: "m,t1=v1 f1=2",
wantFilters: influxdb.BucketFilter{
Name: &bucket,
Org: &org,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var org, bucket *influxdb.ID
var org, bucket *string
var orgId, bucketId *influxdb.ID
var lp []byte
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
org, _ = influxdb.IDFromString(r.URL.Query().Get("org"))
bucket, _ = influxdb.IDFromString(r.URL.Query().Get("bucket"))
orgStr := r.URL.Query().Get("org")
bucketStr := r.URL.Query().Get("bucket")
var err error
if orgId, err = influxdb.IDFromString(orgStr); err != nil {
org = &orgStr
}
if bucketId, err = influxdb.IDFromString(bucketStr); err != nil {
bucket = &bucketStr
}
defer r.Body.Close()
in, _ := gzip.NewReader(r.Body)
defer in.Close()
@ -60,20 +125,17 @@ func TestWriteService_Write(t *testing.T) {
s := &WriteService{
Addr: ts.URL,
}
if err := s.Write(context.Background(), tt.args.org, tt.args.bucket, tt.args.r); (err != nil) != tt.wantErr {
t.Errorf("WriteService.Write() error = %v, wantErr %v", err, tt.wantErr)
}
if got, want := *org, tt.args.org; got != want {
t.Errorf("WriteService.Write() org = %v, want %v", got, want)
}
if got, want := *bucket, tt.args.bucket; got != want {
t.Errorf("WriteService.Write() bucket = %v, want %v", got, want)
}
if got, want := string(lp), tt.want; got != want {
t.Errorf("WriteService.Write() = %v, want %v", got, want)
}
err := s.WriteTo(
context.Background(),
influxdb.BucketFilter{ID: &tt.args.bucketId, Name: &tt.args.bucket, OrganizationID: &tt.args.orgId, Org: &tt.args.org},
tt.args.r,
)
require.Equalf(t, err != nil, tt.wantErr, "error didn't match expectations: %v", err)
require.Equal(t, tt.wantFilters.OrganizationID, orgId)
require.Equal(t, tt.wantFilters.Org, org)
require.Equal(t, tt.wantFilters.ID, bucketId)
require.Equal(t, tt.wantFilters.Name, bucket)
require.Equal(t, tt.want, string(lp))
})
}
}

View File

@ -190,7 +190,7 @@ func (qt *Test) init(ctx context.Context, t *testing.T, p *tests.DefaultPipeline
func (qt *Test) writeTestData(ctx context.Context, t *testing.T, c *tests.Client) {
t.Helper()
for _, w := range qt.writes {
err := c.Write(ctx, qt.orgID, qt.bucketID, strings.NewReader(w.data))
err := c.WriteTo(ctx, influxdb.BucketFilter{ID: &qt.bucketID, OrganizationID: &qt.orgID}, strings.NewReader(w.data))
require.NoError(t, err)
}
}

View File

@ -9,16 +9,10 @@ import (
// WriteService writes data read from the reader.
type WriteService struct {
WriteF func(context.Context, platform.ID, platform.ID, io.Reader) error
WriteToF func(context.Context, platform.BucketFilter, io.Reader) error
}
// Write calls the mocked WriteF function with arguments.
func (s *WriteService) Write(ctx context.Context, org, bucket platform.ID, r io.Reader) error {
return s.WriteF(ctx, org, bucket, r)
}
// Write calls the mocked WriteF function with arguments.
// WriteTo calls the mocked WriteToF function with arguments.
func (s *WriteService) WriteTo(ctx context.Context, filter platform.BucketFilter, r io.Reader) error {
return s.WriteToF(ctx, filter, r)
}

View File

@ -84,7 +84,14 @@ func (c *Client) MustWriteBatch(points string) {
// WriteBatch writes the current batch of points to the HTTP endpoint.
func (c *Client) WriteBatch(points string) error {
return c.WriteService.Write(context.Background(), c.OrgID, c.BucketID, strings.NewReader(points))
return c.WriteService.WriteTo(
context.Background(),
influxdb.BucketFilter{
ID: &c.BucketID,
OrganizationID: &c.OrgID,
},
strings.NewReader(points),
)
}
// Query returns the CSV response from a flux query to the HTTP API.

View File

@ -7,6 +7,5 @@ import (
// WriteService writes data read from the reader.
type WriteService interface {
Write(ctx context.Context, org, bucket ID, r io.Reader) error
WriteTo(ctx context.Context, filter BucketFilter, r io.Reader) error
}

View File

@ -35,13 +35,6 @@ type Batcher struct {
Service platform.WriteService // Service receives batches flushed from Batcher.
}
// Write reads r in batches and writes to a target specified by org and bucket.
func (b *Batcher) Write(ctx context.Context, org, bucket platform.ID, r io.Reader) error {
return b.writeBytes(ctx, r, func(batch []byte) error {
return b.Service.Write(ctx, org, bucket, bytes.NewReader(batch))
})
}
// WriteTo reads r in batches and writes to a target specified by filter.
func (b *Batcher) WriteTo(ctx context.Context, filter platform.BucketFilter, r io.Reader) error {
return b.writeBytes(ctx, r, func(batch []byte) error {

View File

@ -291,16 +291,7 @@ func TestBatcher_write(t *testing.T) {
writeCalled := false
var got string
svc := &mock.WriteService{
WriteF: func(ctx context.Context, org, bucket platform.ID, r io.Reader) error {
writeCalled = true
if tt.args.writeError {
return fmt.Errorf("error")
}
b, err := ioutil.ReadAll(r)
got = string(b)
return err
},
WriteToF: func(ctx context.Context, filter platform.BucketFilter, r io.Reader) error {
WriteToF: func(ctx context.Context, _ platform.BucketFilter, r io.Reader) error {
writeCalled = true
if tt.args.writeError {
return fmt.Errorf("error")
@ -317,7 +308,7 @@ func TestBatcher_write(t *testing.T) {
Service: svc,
}
writeFn := func(batch []byte) error {
return svc.Write(ctx, tt.args.org, tt.args.bucket, bytes.NewReader(batch))
return svc.WriteTo(ctx, platform.BucketFilter{ID: &tt.args.bucket, OrganizationID: &tt.args.org}, bytes.NewReader(batch))
}
go b.write(ctx, writeFn, tt.args.lines, tt.args.errC)
@ -348,7 +339,7 @@ func TestBatcher_write(t *testing.T) {
}
}
func TestBatcher_Write(t *testing.T) {
func TestBatcher_WriteTo(t *testing.T) {
createReader := func(data string) func() io.Reader {
if data == "error" {
return func() io.Reader {
@ -424,7 +415,7 @@ func TestBatcher_Write(t *testing.T) {
gotFlushes int
)
svc := &mock.WriteService{
WriteF: func(ctx context.Context, org, bucket platform.ID, r io.Reader) error {
WriteToF: func(ctx context.Context, _ platform.BucketFilter, r io.Reader) error {
if tt.args.writeError {
return fmt.Errorf("error")
}
@ -442,7 +433,11 @@ func TestBatcher_Write(t *testing.T) {
}
ctx := context.Background()
if err := b.Write(ctx, tt.args.org, tt.args.bucket, tt.args.r()); (err != nil) != tt.wantErr {
if err := b.WriteTo(
ctx,
platform.BucketFilter{ID: &tt.args.bucket, OrganizationID: &tt.args.org},
tt.args.r(),
); (err != nil) != tt.wantErr {
t.Errorf("Batcher.Write() error = %v, wantErr %v", err, tt.wantErr)
}
@ -462,7 +457,7 @@ func TestBatcher_Write(t *testing.T) {
gotFlushes int
)
svc := &mock.WriteService{
WriteToF: func(ctx context.Context, filter platform.BucketFilter, r io.Reader) error {
WriteToF: func(ctx context.Context, _ platform.BucketFilter, r io.Reader) error {
if tt.args.writeError {
return fmt.Errorf("error")
}
@ -498,9 +493,12 @@ func TestBatcher_Write(t *testing.T) {
func TestBatcher_WriteTimeout(t *testing.T) {
// mocking the write service here to either return an error
// or get back all the bytes from the reader.
bucketId := platform.ID(2)
orgId := platform.ID(1)
var got string
svc := &mock.WriteService{
WriteF: func(ctx context.Context, org, bucket platform.ID, r io.Reader) error {
WriteToF: func(ctx context.Context, filter platform.BucketFilter, r io.Reader) error {
b, err := ioutil.ReadAll(r)
got = string(b)
return err
@ -517,19 +515,16 @@ func TestBatcher_WriteTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if err := b.Write(ctx, platform.ID(1), platform.ID(2), r); err != context.DeadlineExceeded {
if err := b.WriteTo(ctx, platform.BucketFilter{ID: &bucketId, OrganizationID: &orgId}, r); err != context.DeadlineExceeded {
t.Errorf("Batcher.Write() with timeout error = %v", err)
}
if got != "" {
t.Errorf(" Batcher.Write() with timeout got %s", got)
}
require.Empty(t, got, "Batcher.Write() with timeout received data")
}
func TestBatcher_WriteWithoutService(t *testing.T) {
b := Batcher{}
err := b.Write(context.Background(), platform.ID(1), platform.ID(1), strings.NewReader("m1,t1=v1 f1=1"))
if err == nil || !strings.Contains(err.Error(), "write service required") {
t.Errorf(" Batcher.Write() error expected, but got %v", err)
}
err := b.WriteTo(context.Background(), platform.BucketFilter{}, strings.NewReader("m1,t1=v1 f1=1"))
require.Error(t, err)
require.Contains(t, err.Error(), "write service required")
}