influxdb/cmd/influx_tools/internal/format/binary/writer.go

369 lines
7.3 KiB
Go

package binary
import (
"bufio"
"fmt"
"io"
"time"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/format"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/tlv"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
"google.golang.org/protobuf/proto"
)
type Writer struct {
w *bufio.Writer
buf []byte
db, rp string
duration time.Duration
err error
bw *bucketWriter
state writeState
wroteHeader bool
msg struct {
bucketHeader BucketHeader
bucketFooter BucketFooter
seriesHeader SeriesHeader
seriesFooter SeriesFooter
}
stats struct {
series int
counts [8]struct {
series, values int
}
}
}
type writeState int
const (
writeHeader writeState = iota
writeBucket
writeSeries
writeSeriesHeader
writePoints
)
func NewWriter(w io.Writer, database, rp string, duration time.Duration) *Writer {
var wr *bufio.Writer
if wr, _ = w.(*bufio.Writer); wr == nil {
wr = bufio.NewWriter(w)
}
return &Writer{w: wr, db: database, rp: rp, duration: duration}
}
func (w *Writer) WriteStats(o io.Writer) {
fmt.Fprintf(o, "total series: %d\n", w.stats.series)
for i := 0; i < 5; i++ {
ft := FieldType(i)
fmt.Fprintf(o, "%s unique series: %d\n", ft, w.stats.counts[i].series)
fmt.Fprintf(o, "%s total values : %d\n", ft, w.stats.counts[i].values)
}
}
func (w *Writer) NewBucket(start, end int64) (format.BucketWriter, error) {
if w.state == writeHeader {
w.writeHeader()
}
if w.err != nil {
return nil, w.err
}
if w.state != writeBucket {
panic(fmt.Sprintf("writer state: got=%v, exp=%v", w.state, writeBucket))
}
w.bw = &bucketWriter{w: w, start: start, end: end}
w.writeBucketHeader(start, end)
return w.bw, w.err
}
func (w *Writer) Close() error {
if w.err == ErrWriteAfterClose {
return nil
}
if w.err != nil {
return w.err
}
w.err = ErrWriteAfterClose
return nil
}
func (w *Writer) writeHeader() {
w.state = writeBucket
w.wroteHeader = true
w.write(Magic[:])
h := Header{
Version: Header_Version0,
Database: w.db,
RetentionPolicy: w.rp,
ShardDuration: int64(w.duration),
}
w.writeTypeMessage(HeaderType, &h)
}
func (w *Writer) writeBucketHeader(start, end int64) {
w.state = writeSeries
w.msg.bucketHeader.Start = start
w.msg.bucketHeader.End = end
w.writeTypeMessage(BucketHeaderType, &w.msg.bucketHeader)
}
func (w *Writer) writeBucketFooter() {
w.state = writeBucket
w.writeTypeMessage(BucketFooterType, &w.msg.bucketFooter)
}
func (w *Writer) writeSeriesHeader(key, field []byte, ft FieldType) {
w.state = writePoints
w.stats.series++
w.stats.counts[ft&7].series++
w.msg.seriesHeader.SeriesKey = key
w.msg.seriesHeader.Field = field
w.msg.seriesHeader.FieldType = ft
w.writeTypeMessage(SeriesHeaderType, &w.msg.seriesHeader)
}
func (w *Writer) writeSeriesFooter(ft FieldType, count int) {
w.stats.counts[ft&7].values += count
w.writeTypeMessage(SeriesFooterType, &w.msg.seriesFooter)
}
func (w *Writer) write(p []byte) {
if w.err != nil {
return
}
_, w.err = w.w.Write(p)
}
func (w *Writer) writeTypeMessage(typ MessageType, msg proto.Message) {
if w.err != nil {
return
}
w.buf, w.err = proto.MarshalOptions{}.MarshalAppend(w.buf[0:0], msg)
w.writeTypeBytes(typ)
}
func (w *Writer) writeTypeBytes(typ MessageType) {
if w.err != nil {
return
}
w.err = tlv.WriteTLV(w.w, byte(typ), w.buf)
}
type bucketWriter struct {
w *Writer
err error
start, end int64
key []byte
field []byte
n int
closed bool
}
func (bw *bucketWriter) Err() error {
if bw.w.err != nil {
return bw.w.err
}
return bw.err
}
func (bw *bucketWriter) hasErr() bool {
return bw.w.err != nil || bw.err != nil
}
func (bw *bucketWriter) BeginSeries(name, field []byte, typ influxql.DataType, tags models.Tags) {
if bw.hasErr() {
return
}
if bw.w.state != writeSeries {
panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writeSeries))
}
bw.w.state = writeSeriesHeader
bw.key = models.AppendMakeKey(bw.key[:0], name, tags)
bw.field = field
}
func (bw *bucketWriter) EndSeries() {
if bw.hasErr() {
return
}
if bw.w.state != writePoints && bw.w.state != writeSeriesHeader {
panic(fmt.Sprintf("writer state: got=%v, exp=%v,%v", bw.w.state, writeSeriesHeader, writePoints))
}
if bw.w.state == writePoints {
bw.w.writeSeriesFooter(FieldType_IntegerFieldType, bw.n)
}
bw.w.state = writeSeries
}
func (bw *bucketWriter) WriteIntegerCursor(cur tsdb.IntegerArrayCursor) {
if bw.hasErr() {
return
}
if bw.w.state == writeSeriesHeader {
bw.w.writeSeriesHeader(bw.key, bw.field, FieldType_IntegerFieldType)
}
if bw.w.state != writePoints {
panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints))
}
var msg IntegerPoints
for {
a := cur.Next()
if a.Len() == 0 {
break
}
bw.n += a.Len()
msg.Timestamps = a.Timestamps
msg.Values = a.Values
bw.w.writeTypeMessage(IntegerPointsType, &msg)
}
}
func (bw *bucketWriter) WriteFloatCursor(cur tsdb.FloatArrayCursor) {
if bw.hasErr() {
return
}
if bw.w.state == writeSeriesHeader {
bw.w.writeSeriesHeader(bw.key, bw.field, FieldType_FloatFieldType)
}
if bw.w.state != writePoints {
panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints))
}
var msg FloatPoints
for {
a := cur.Next()
if a.Len() == 0 {
break
}
bw.n += a.Len()
msg.Timestamps = a.Timestamps
msg.Values = a.Values
bw.w.writeTypeMessage(FloatPointsType, &msg)
}
}
func (bw *bucketWriter) WriteUnsignedCursor(cur tsdb.UnsignedArrayCursor) {
if bw.hasErr() {
return
}
if bw.w.state == writeSeriesHeader {
bw.w.writeSeriesHeader(bw.key, bw.field, FieldType_UnsignedFieldType)
}
if bw.w.state != writePoints {
panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints))
}
var msg UnsignedPoints
for {
a := cur.Next()
if a.Len() == 0 {
break
}
bw.n += a.Len()
msg.Timestamps = a.Timestamps
msg.Values = a.Values
bw.w.writeTypeMessage(UnsignedPointsType, &msg)
}
}
func (bw *bucketWriter) WriteBooleanCursor(cur tsdb.BooleanArrayCursor) {
if bw.hasErr() {
return
}
if bw.w.state == writeSeriesHeader {
bw.w.writeSeriesHeader(bw.key, bw.field, FieldType_BooleanFieldType)
}
if bw.w.state != writePoints {
panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints))
}
var msg BooleanPoints
for {
a := cur.Next()
if a.Len() == 0 {
break
}
bw.n += a.Len()
msg.Timestamps = a.Timestamps
msg.Values = a.Values
bw.w.writeTypeMessage(BooleanPointsType, &msg)
}
}
func (bw *bucketWriter) WriteStringCursor(cur tsdb.StringArrayCursor) {
if bw.hasErr() {
return
}
if bw.w.state == writeSeriesHeader {
bw.w.writeSeriesHeader(bw.key, bw.field, FieldType_StringFieldType)
}
if bw.w.state != writePoints {
panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints))
}
var msg StringPoints
for {
a := cur.Next()
if a.Len() == 0 {
break
}
bw.n += a.Len()
msg.Timestamps = a.Timestamps
msg.Values = a.Values
bw.w.writeTypeMessage(StringPointsType, &msg)
}
}
func (bw *bucketWriter) Close() error {
if bw.closed {
return nil
}
bw.closed = true
if bw.hasErr() {
return bw.Err()
}
bw.w.bw = nil
bw.w.writeBucketFooter()
bw.err = ErrWriteBucketAfterClose
return bw.w.w.Flush()
}