fix(storage): BufferedPointsWriter is for a single org / bucket only

Fixes experimental.to tests
pull/19446/head
Stuart Carnie 2020-08-03 15:15:00 -07:00
parent 4124968b28
commit 0a644dceed
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
3 changed files with 18 additions and 15 deletions

View File

@ -266,7 +266,7 @@ func NewToTransformation(ctx context.Context, d execute.Dataset, cache execute.T
cache: cache,
spec: spec.Spec,
deps: deps,
buf: storage.NewBufferedPointsWriter(influxdb.DefaultBufferSize, deps.PointsWriter),
buf: storage.NewBufferedPointsWriter(orgID, *bucketID, influxdb.DefaultBufferSize, deps.PointsWriter),
}, nil
}
@ -459,6 +459,6 @@ func (t *ToTransformation) writeTable(ctx context.Context, tbl flux.Table) error
}
}
return t.buf.WritePoints(ctx, t.orgID, t.bucketID, points)
return t.buf.WritePoints(ctx, points)
})
}

View File

@ -351,7 +351,7 @@ func NewToTransformation(ctx context.Context, d execute.Dataset, cache execute.T
spec: toSpec,
implicitTagColumns: spec.TagColumns == nil,
deps: deps,
buf: storage.NewBufferedPointsWriter(DefaultBufferSize, deps.PointsWriter),
buf: storage.NewBufferedPointsWriter(*orgID, *bucketID, DefaultBufferSize, deps.PointsWriter),
}, nil
}
@ -657,7 +657,7 @@ func writeTable(ctx context.Context, t *ToTransformation, tbl flux.Table) (err e
}
}
return t.buf.WritePoints(ctx, t.OrgID, t.BucketID, points)
return t.buf.WritePoints(ctx, points)
})
}

View File

@ -68,27 +68,30 @@ func (w *LoggingPointsWriter) WritePoints(ctx context.Context, orgID influxdb.ID
}
type BufferedPointsWriter struct {
buf []models.Point
n int
wr PointsWriter
err error
buf []models.Point
orgID influxdb.ID
bucketID influxdb.ID
n int
wr PointsWriter
err error
}
//TODO - org id bucket id
func NewBufferedPointsWriter(size int, pointswriter PointsWriter) *BufferedPointsWriter {
func NewBufferedPointsWriter(orgID influxdb.ID, bucketID influxdb.ID, size int, pointswriter PointsWriter) *BufferedPointsWriter {
return &BufferedPointsWriter{
buf: make([]models.Point, size),
wr: pointswriter,
buf: make([]models.Point, size),
orgID: orgID,
bucketID: bucketID,
wr: pointswriter,
}
}
// WritePoints writes the points to the underlying PointsWriter.
func (b *BufferedPointsWriter) WritePoints(ctx context.Context, orgID influxdb.ID, bucketID influxdb.ID, p []models.Point) error {
func (b *BufferedPointsWriter) WritePoints(ctx context.Context, p []models.Point) error {
for len(p) > b.Available() && b.err == nil {
if b.Buffered() == 0 {
// Large write, empty buffer.
// Write directly from p to avoid copy.
b.err = b.wr.WritePoints(ctx, orgID, bucketID, p)
b.err = b.wr.WritePoints(ctx, b.orgID, b.bucketID, p)
return b.err
}
n := copy(b.buf[b.n:], p)
@ -118,7 +121,7 @@ func (b *BufferedPointsWriter) Flush(ctx context.Context) error {
return nil
}
b.err = b.wr.WritePoints(ctx, 0, 0, b.buf[:b.n])
b.err = b.wr.WritePoints(ctx, b.orgID, b.bucketID, b.buf[:b.n])
if b.err != nil {
return b.err
}