177 lines
3.9 KiB
Go
177 lines
3.9 KiB
Go
package format
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
|
|
"github.com/influxdata/influxdb/models"
|
|
"github.com/influxdata/influxdb/tsdb"
|
|
"github.com/influxdata/influxql"
|
|
)
|
|
|
|
// ConflictWriter is a Writer that redirects conflicting data to an alternate output.
|
|
type ConflictWriter struct {
|
|
w Writer
|
|
c Writer
|
|
bw aggregateBucketWriter
|
|
}
|
|
|
|
// NewConflictWriter returns a Writer that redirects invalid point data to the conflict Writer.
|
|
func NewConflictWriter(w, conflict Writer) *ConflictWriter {
|
|
return &ConflictWriter{w: w, c: conflict}
|
|
}
|
|
|
|
func (cw *ConflictWriter) NewBucket(start, end int64) (bw BucketWriter, err error) {
|
|
cw.bw.w, err = cw.w.NewBucket(start, end)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cw.bw.c, err = cw.c.NewBucket(start, end)
|
|
if err != nil {
|
|
cw.bw.w.Close()
|
|
return nil, err
|
|
}
|
|
return &cw.bw, nil
|
|
}
|
|
|
|
func (cw *ConflictWriter) Close() error {
|
|
// we care if either error and prioritize the conflict writer lower.
|
|
cerr := cw.c.Close()
|
|
if err := cw.w.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return cerr
|
|
}
|
|
|
|
type bucketState int
|
|
|
|
const (
|
|
beginSeriesBucketState bucketState = iota
|
|
writeBucketState
|
|
writeConflictsBucketState
|
|
)
|
|
|
|
type aggregateBucketWriter struct {
|
|
w BucketWriter
|
|
c BucketWriter
|
|
|
|
state bucketState
|
|
|
|
// current series
|
|
name []byte
|
|
field []byte
|
|
typ influxql.DataType
|
|
tags models.Tags
|
|
mf map[string]influxql.DataType
|
|
}
|
|
|
|
func (bw *aggregateBucketWriter) Err() error {
|
|
switch {
|
|
case bw.w.Err() != nil:
|
|
return bw.w.Err()
|
|
case bw.c.Err() != nil:
|
|
return bw.c.Err()
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (bw *aggregateBucketWriter) BeginSeries(name, field []byte, typ influxql.DataType, tags models.Tags) {
|
|
bw.w.BeginSeries(name, field, typ, tags)
|
|
|
|
if !bytes.Equal(bw.name, name) {
|
|
// new measurement
|
|
bw.name = append(bw.name[:0], name...)
|
|
bw.mf = make(map[string]influxql.DataType)
|
|
}
|
|
|
|
bw.field = append(bw.field[:0], field...)
|
|
bw.tags = tags
|
|
|
|
var ok bool
|
|
bw.typ, ok = bw.mf[string(field)]
|
|
if !ok {
|
|
bw.mf[string(field)] = typ
|
|
bw.typ = typ
|
|
}
|
|
|
|
bw.state = writeBucketState
|
|
}
|
|
|
|
func (bw *aggregateBucketWriter) EndSeries() {
|
|
switch {
|
|
case bw.state == writeBucketState:
|
|
bw.w.EndSeries()
|
|
case bw.state == writeConflictsBucketState:
|
|
bw.w.EndSeries()
|
|
bw.c.EndSeries()
|
|
default:
|
|
panic(fmt.Sprintf("ConflictWriter state: got=%v, exp=%v,%v", bw.state, writeBucketState, writeConflictsBucketState))
|
|
}
|
|
bw.state = beginSeriesBucketState
|
|
}
|
|
|
|
func (bw *aggregateBucketWriter) conflictState(other influxql.DataType) {
|
|
if bw.state == writeBucketState {
|
|
bw.c.BeginSeries(bw.name, bw.field, bw.typ, bw.tags)
|
|
bw.state = writeConflictsBucketState
|
|
}
|
|
}
|
|
|
|
func (bw *aggregateBucketWriter) WriteIntegerCursor(cur tsdb.IntegerArrayCursor) {
|
|
if bw.typ == influxql.Integer {
|
|
bw.w.WriteIntegerCursor(cur)
|
|
} else {
|
|
bw.conflictState(influxql.Integer)
|
|
bw.c.WriteIntegerCursor(cur)
|
|
}
|
|
}
|
|
|
|
func (bw *aggregateBucketWriter) WriteFloatCursor(cur tsdb.FloatArrayCursor) {
|
|
if bw.typ == influxql.Float {
|
|
bw.w.WriteFloatCursor(cur)
|
|
} else {
|
|
bw.conflictState(influxql.Float)
|
|
bw.c.WriteFloatCursor(cur)
|
|
}
|
|
}
|
|
|
|
func (bw *aggregateBucketWriter) WriteUnsignedCursor(cur tsdb.UnsignedArrayCursor) {
|
|
if bw.typ == influxql.Unsigned {
|
|
bw.w.WriteUnsignedCursor(cur)
|
|
} else {
|
|
bw.conflictState(influxql.Unsigned)
|
|
bw.c.WriteUnsignedCursor(cur)
|
|
}
|
|
}
|
|
|
|
func (bw *aggregateBucketWriter) WriteBooleanCursor(cur tsdb.BooleanArrayCursor) {
|
|
if bw.typ == influxql.Boolean {
|
|
bw.w.WriteBooleanCursor(cur)
|
|
} else {
|
|
bw.conflictState(influxql.Boolean)
|
|
bw.c.WriteBooleanCursor(cur)
|
|
}
|
|
}
|
|
|
|
func (bw *aggregateBucketWriter) WriteStringCursor(cur tsdb.StringArrayCursor) {
|
|
if bw.typ == influxql.String {
|
|
bw.w.WriteStringCursor(cur)
|
|
} else {
|
|
bw.conflictState(influxql.String)
|
|
bw.c.WriteStringCursor(cur)
|
|
}
|
|
}
|
|
|
|
func (bw *aggregateBucketWriter) Close() error {
|
|
// we care if either error and prioritize the conflict writer lower.
|
|
cerr := bw.c.Close()
|
|
if err := bw.w.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return cerr
|
|
}
|