From 24c1f21e4e281739ebc84831d4fb554e62ecd7b4 Mon Sep 17 00:00:00 2001 From: "j. Emrys Landivar (docmerlin)" Date: Fri, 2 Aug 2019 11:29:27 -0500 Subject: [PATCH] WIP --- query/stdlib/influxdata/influxdb/to.go | 8 +++- storage/points_writer.go | 60 ++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/query/stdlib/influxdata/influxdb/to.go b/query/stdlib/influxdata/influxdb/to.go index 5e5f37eb48..0fbc3f270d 100644 --- a/query/stdlib/influxdata/influxdb/to.go +++ b/query/stdlib/influxdata/influxdb/to.go @@ -30,6 +30,7 @@ const ToKind = influxdb.ToKind // TODO(jlapacik) remove this once we have execute.DefaultFieldColLabel const defaultFieldColLabel = "_field" const DefaultMeasurementColLabel = "_measurement" +const DefaultBufferSize = 1 << 14 // ToOpSpec is the flux.OperationSpec for the `to` flux function. type ToOpSpec struct { @@ -274,6 +275,7 @@ type ToTransformation struct { spec *ToProcedureSpec implicitTagColumns bool deps ToDependencies + buf *storage.BufferedPointsWriter } // RetractTable retracts the table for the transformation for the `to` flux function. @@ -300,6 +302,7 @@ func NewToTransformation(ctx context.Context, d execute.Dataset, cache execute.T spec: spec, implicitTagColumns: spec.Spec.TagColumns == nil, deps: deps, + buf: storage.NewBufferedPointsWriter(DefaultBufferSize, deps.PointsWriter), }, nil } @@ -403,6 +406,9 @@ func (t *ToTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute // Finish is called after the `to` flux function's transformation is done processing. func (t *ToTransformation) Finish(id execute.DatasetID, err error) { + if err == nil { + err = t.buf.Flush(t.ctx) + } t.d.Finish(err) } @@ -654,7 +660,7 @@ func writeTable(ctx context.Context, t *ToTransformation, tbl flux.Table) error return err } } - return d.PointsWriter.WritePoints(ctx, points) + return t.buf.WritePoints(ctx, points) }) } diff --git a/storage/points_writer.go b/storage/points_writer.go index da98ddbcb0..e214ef9a1f 100644 --- a/storage/points_writer.go +++ b/storage/points_writer.go @@ -2,6 +2,7 @@ package storage import ( "context" + "sync" "github.com/influxdata/influxdb/models" ) @@ -10,3 +11,62 @@ import ( type PointsWriter interface { WritePoints(context.Context, []models.Point) error } + +type BufferedPointsWriter struct { + sync.Mutex + buf []models.Point + n int + wr PointsWriter + err error +} + +func NewBufferedPointsWriter(size int, pointswriter PointsWriter) *BufferedPointsWriter { + return &BufferedPointsWriter{ + buf: make([]models.Point, size), + wr: pointswriter, + } +} + +// WritePoints writes the points to the underlying PointsWriter. +func (b *BufferedPointsWriter) WritePoints(ctx context.Context, p []models.Point) error { + for len(p) > b.Available() && b.err == nil { + var n int + if b.Buffered() == 0 { + // Large write, empty buffer. + // Write directly from p to avoid copy. + b.err = b.wr.WritePoints(ctx, p) + } else { + b.n += copy(b.buf[b.n:], p) + b.err = b.Flush(ctx) + } + p = p[n:] + } + if b.err != nil { + return b.err + } + b.n += copy(b.buf[b.n:], p) + return nil +} + +// Available returns how many models.Points are unused in the buffer. +func (b *BufferedPointsWriter) Available() int { return len(b.buf) - b.n } + +// Buffered returns the number of models.Points that have been written into the current buffer. +func (b *BufferedPointsWriter) Buffered() int { return len(b.buf) } + +// Flush writes any buffered data to the underlying PointsWriter. +func (b *BufferedPointsWriter) Flush(ctx context.Context) error { + if b.err != nil { + return b.err + } + if b.n == 0 { + return nil + } + + b.err = b.wr.WritePoints(ctx, b.buf[:b.n]) + if b.err != nil { + return b.err + } + b.n = 0 + return nil +}