influxdb/storage/points_writer.go

128 lines
3.1 KiB
Go
Raw Normal View History

2018-10-05 11:43:56 +00:00
package storage
import (
"context"
"fmt"
"time"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/models"
)
2018-10-05 11:43:56 +00:00
// PointsWriter describes the ability to write points into a storage engine.
type PointsWriter interface {
2020-07-23 17:28:57 +00:00
WritePoints(ctx context.Context, orgID influxdb.ID, bucketID influxdb.ID, points []models.Point) error
2018-10-05 11:43:56 +00:00
}
2019-08-02 16:29:27 +00:00
// LoggingPointsWriter wraps an underlying points writer but writes logs to
// another bucket when an error occurs.
type LoggingPointsWriter struct {
// Wrapped points writer. Errored writes from here will be logged.
Underlying PointsWriter
// Service used to look up logging bucket.
BucketFinder BucketFinder
// Name of the bucket to log to.
LogBucketName string
}
// WritePoints writes points to the underlying PointsWriter. Logs on error.
2020-07-23 17:28:57 +00:00
func (w *LoggingPointsWriter) WritePoints(ctx context.Context, orgID influxdb.ID, bucketID influxdb.ID, p []models.Point) error {
if len(p) == 0 {
return nil
}
// Write to underlying writer and exit immediately if successful.
2020-07-23 17:28:57 +00:00
err := w.Underlying.WritePoints(ctx, orgID, bucketID, p)
if err == nil {
return nil
}
// Attempt to lookup log bucket.
bkts, n, e := w.BucketFinder.FindBuckets(ctx, influxdb.BucketFilter{
OrganizationID: &orgID,
Name: &w.LogBucketName,
})
if e != nil {
return e
} else if n == 0 {
return fmt.Errorf("logging bucket not found: %q", w.LogBucketName)
}
// Log error to bucket.
pt, e := models.NewPoint(
2020-04-24 14:58:24 +00:00
"write_errors",
nil,
models.Fields{"error": err.Error()},
time.Now(),
)
if e != nil {
return e
}
2020-04-24 14:58:24 +00:00
if e := w.Underlying.WritePoints(ctx, orgID, bkts[0].ID, []models.Point{pt}); e != nil {
return e
}
return err
}
2019-08-02 16:29:27 +00:00
type BufferedPointsWriter struct {
buf []models.Point
n int
wr PointsWriter
err error
}
2020-07-23 17:28:57 +00:00
//TODO - org id bucket id
2019-08-02 16:29:27 +00:00
func NewBufferedPointsWriter(size int, pointswriter PointsWriter) *BufferedPointsWriter {
return &BufferedPointsWriter{
buf: make([]models.Point, size),
wr: pointswriter,
}
}
// WritePoints writes the points to the underlying PointsWriter.
func (b *BufferedPointsWriter) WritePoints(ctx context.Context, orgID influxdb.ID, bucketID influxdb.ID, p []models.Point) error {
2019-08-02 16:29:27 +00:00
for len(p) > b.Available() && b.err == nil {
if b.Buffered() == 0 {
// Large write, empty buffer.
// Write directly from p to avoid copy.
b.err = b.wr.WritePoints(ctx, orgID, bucketID, p)
2019-08-03 04:52:33 +00:00
return b.err
2019-08-02 16:29:27 +00:00
}
2019-08-03 04:52:33 +00:00
n := copy(b.buf[b.n:], p)
b.n += n
b.err = b.Flush(ctx)
2019-08-02 16:29:27 +00:00
p = p[n:]
}
if b.err != nil {
return b.err
}
b.n += copy(b.buf[b.n:], p)
return nil
}
// Available returns how many models.Points are unused in the buffer.
func (b *BufferedPointsWriter) Available() int { return len(b.buf) - b.n }
// Buffered returns the number of models.Points that have been written into the current buffer.
2019-08-03 04:52:33 +00:00
func (b *BufferedPointsWriter) Buffered() int { return b.n }
2019-08-02 16:29:27 +00:00
// Flush writes any buffered data to the underlying PointsWriter.
func (b *BufferedPointsWriter) Flush(ctx context.Context) error {
if b.err != nil {
return b.err
}
if b.n == 0 {
return nil
}
2020-07-23 17:28:57 +00:00
b.err = b.wr.WritePoints(ctx, 0, 0, b.buf[:b.n])
2019-08-02 16:29:27 +00:00
if b.err != nil {
return b.err
}
b.n = 0
return nil
}