WIP
parent
f0c506faff
commit
24c1f21e4e
|
@ -30,6 +30,7 @@ const ToKind = influxdb.ToKind
|
|||
// TODO(jlapacik) remove this once we have execute.DefaultFieldColLabel
|
||||
const defaultFieldColLabel = "_field"
|
||||
const DefaultMeasurementColLabel = "_measurement"
|
||||
const DefaultBufferSize = 1 << 14
|
||||
|
||||
// ToOpSpec is the flux.OperationSpec for the `to` flux function.
|
||||
type ToOpSpec struct {
|
||||
|
@ -274,6 +275,7 @@ type ToTransformation struct {
|
|||
spec *ToProcedureSpec
|
||||
implicitTagColumns bool
|
||||
deps ToDependencies
|
||||
buf *storage.BufferedPointsWriter
|
||||
}
|
||||
|
||||
// RetractTable retracts the table for the transformation for the `to` flux function.
|
||||
|
@ -300,6 +302,7 @@ func NewToTransformation(ctx context.Context, d execute.Dataset, cache execute.T
|
|||
spec: spec,
|
||||
implicitTagColumns: spec.Spec.TagColumns == nil,
|
||||
deps: deps,
|
||||
buf: storage.NewBufferedPointsWriter(DefaultBufferSize, deps.PointsWriter),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -403,6 +406,9 @@ func (t *ToTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute
|
|||
|
||||
// Finish is called after the `to` flux function's transformation is done processing.
|
||||
func (t *ToTransformation) Finish(id execute.DatasetID, err error) {
|
||||
if err == nil {
|
||||
err = t.buf.Flush(t.ctx)
|
||||
}
|
||||
t.d.Finish(err)
|
||||
}
|
||||
|
||||
|
@ -654,7 +660,7 @@ func writeTable(ctx context.Context, t *ToTransformation, tbl flux.Table) error
|
|||
return err
|
||||
}
|
||||
}
|
||||
return d.PointsWriter.WritePoints(ctx, points)
|
||||
return t.buf.WritePoints(ctx, points)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package storage
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
)
|
||||
|
@ -10,3 +11,62 @@ import (
|
|||
type PointsWriter interface {
|
||||
WritePoints(context.Context, []models.Point) error
|
||||
}
|
||||
|
||||
type BufferedPointsWriter struct {
|
||||
sync.Mutex
|
||||
buf []models.Point
|
||||
n int
|
||||
wr PointsWriter
|
||||
err error
|
||||
}
|
||||
|
||||
func NewBufferedPointsWriter(size int, pointswriter PointsWriter) *BufferedPointsWriter {
|
||||
return &BufferedPointsWriter{
|
||||
buf: make([]models.Point, size),
|
||||
wr: pointswriter,
|
||||
}
|
||||
}
|
||||
|
||||
// WritePoints writes the points to the underlying PointsWriter.
|
||||
func (b *BufferedPointsWriter) WritePoints(ctx context.Context, p []models.Point) error {
|
||||
for len(p) > b.Available() && b.err == nil {
|
||||
var n int
|
||||
if b.Buffered() == 0 {
|
||||
// Large write, empty buffer.
|
||||
// Write directly from p to avoid copy.
|
||||
b.err = b.wr.WritePoints(ctx, p)
|
||||
} else {
|
||||
b.n += copy(b.buf[b.n:], p)
|
||||
b.err = b.Flush(ctx)
|
||||
}
|
||||
p = p[n:]
|
||||
}
|
||||
if b.err != nil {
|
||||
return b.err
|
||||
}
|
||||
b.n += copy(b.buf[b.n:], p)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Available returns how many models.Points are unused in the buffer.
|
||||
func (b *BufferedPointsWriter) Available() int { return len(b.buf) - b.n }
|
||||
|
||||
// Buffered returns the number of models.Points that have been written into the current buffer.
|
||||
func (b *BufferedPointsWriter) Buffered() int { return len(b.buf) }
|
||||
|
||||
// Flush writes any buffered data to the underlying PointsWriter.
|
||||
func (b *BufferedPointsWriter) Flush(ctx context.Context) error {
|
||||
if b.err != nil {
|
||||
return b.err
|
||||
}
|
||||
if b.n == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
b.err = b.wr.WritePoints(ctx, b.buf[:b.n])
|
||||
if b.err != nil {
|
||||
return b.err
|
||||
}
|
||||
b.n = 0
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue