369 lines
7.3 KiB
Go
369 lines
7.3 KiB
Go
package binary
|
|
|
|
import (
|
|
"bufio"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/influxdata/influxdb/cmd/influx_tools/internal/format"
|
|
"github.com/influxdata/influxdb/cmd/influx_tools/internal/tlv"
|
|
"github.com/influxdata/influxdb/models"
|
|
"github.com/influxdata/influxdb/tsdb"
|
|
"github.com/influxdata/influxql"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
type Writer struct {
|
|
w *bufio.Writer
|
|
buf []byte
|
|
db, rp string
|
|
duration time.Duration
|
|
err error
|
|
bw *bucketWriter
|
|
state writeState
|
|
wroteHeader bool
|
|
|
|
msg struct {
|
|
bucketHeader BucketHeader
|
|
bucketFooter BucketFooter
|
|
seriesHeader SeriesHeader
|
|
seriesFooter SeriesFooter
|
|
}
|
|
|
|
stats struct {
|
|
series int
|
|
counts [8]struct {
|
|
series, values int
|
|
}
|
|
}
|
|
}
|
|
|
|
type writeState int
|
|
|
|
const (
|
|
writeHeader writeState = iota
|
|
writeBucket
|
|
writeSeries
|
|
writeSeriesHeader
|
|
writePoints
|
|
)
|
|
|
|
func NewWriter(w io.Writer, database, rp string, duration time.Duration) *Writer {
|
|
var wr *bufio.Writer
|
|
if wr, _ = w.(*bufio.Writer); wr == nil {
|
|
wr = bufio.NewWriter(w)
|
|
}
|
|
return &Writer{w: wr, db: database, rp: rp, duration: duration}
|
|
}
|
|
|
|
func (w *Writer) WriteStats(o io.Writer) {
|
|
fmt.Fprintf(o, "total series: %d\n", w.stats.series)
|
|
|
|
for i := 0; i < 5; i++ {
|
|
ft := FieldType(i)
|
|
fmt.Fprintf(o, "%s unique series: %d\n", ft, w.stats.counts[i].series)
|
|
fmt.Fprintf(o, "%s total values : %d\n", ft, w.stats.counts[i].values)
|
|
}
|
|
}
|
|
|
|
func (w *Writer) NewBucket(start, end int64) (format.BucketWriter, error) {
|
|
if w.state == writeHeader {
|
|
w.writeHeader()
|
|
}
|
|
|
|
if w.err != nil {
|
|
return nil, w.err
|
|
}
|
|
|
|
if w.state != writeBucket {
|
|
panic(fmt.Sprintf("writer state: got=%v, exp=%v", w.state, writeBucket))
|
|
}
|
|
|
|
w.bw = &bucketWriter{w: w, start: start, end: end}
|
|
w.writeBucketHeader(start, end)
|
|
|
|
return w.bw, w.err
|
|
}
|
|
|
|
func (w *Writer) Close() error {
|
|
if w.err == ErrWriteAfterClose {
|
|
return nil
|
|
}
|
|
if w.err != nil {
|
|
return w.err
|
|
}
|
|
|
|
w.err = ErrWriteAfterClose
|
|
|
|
return nil
|
|
}
|
|
|
|
func (w *Writer) writeHeader() {
|
|
w.state = writeBucket
|
|
w.wroteHeader = true
|
|
|
|
w.write(Magic[:])
|
|
|
|
h := Header{
|
|
Version: Header_Version0,
|
|
Database: w.db,
|
|
RetentionPolicy: w.rp,
|
|
ShardDuration: int64(w.duration),
|
|
}
|
|
w.writeTypeMessage(HeaderType, &h)
|
|
}
|
|
|
|
func (w *Writer) writeBucketHeader(start, end int64) {
|
|
w.state = writeSeries
|
|
w.msg.bucketHeader.Start = start
|
|
w.msg.bucketHeader.End = end
|
|
w.writeTypeMessage(BucketHeaderType, &w.msg.bucketHeader)
|
|
}
|
|
|
|
func (w *Writer) writeBucketFooter() {
|
|
w.state = writeBucket
|
|
w.writeTypeMessage(BucketFooterType, &w.msg.bucketFooter)
|
|
}
|
|
|
|
func (w *Writer) writeSeriesHeader(key, field []byte, ft FieldType) {
|
|
w.state = writePoints
|
|
w.stats.series++
|
|
w.stats.counts[ft&7].series++
|
|
|
|
w.msg.seriesHeader.SeriesKey = key
|
|
w.msg.seriesHeader.Field = field
|
|
w.msg.seriesHeader.FieldType = ft
|
|
w.writeTypeMessage(SeriesHeaderType, &w.msg.seriesHeader)
|
|
}
|
|
|
|
func (w *Writer) writeSeriesFooter(ft FieldType, count int) {
|
|
w.stats.counts[ft&7].values += count
|
|
w.writeTypeMessage(SeriesFooterType, &w.msg.seriesFooter)
|
|
}
|
|
|
|
func (w *Writer) write(p []byte) {
|
|
if w.err != nil {
|
|
return
|
|
}
|
|
_, w.err = w.w.Write(p)
|
|
}
|
|
|
|
func (w *Writer) writeTypeMessage(typ MessageType, msg proto.Message) {
|
|
if w.err != nil {
|
|
return
|
|
}
|
|
|
|
w.buf, w.err = proto.MarshalOptions{}.MarshalAppend(w.buf[0:0], msg)
|
|
w.writeTypeBytes(typ)
|
|
}
|
|
|
|
func (w *Writer) writeTypeBytes(typ MessageType) {
|
|
if w.err != nil {
|
|
return
|
|
}
|
|
w.err = tlv.WriteTLV(w.w, byte(typ), w.buf)
|
|
}
|
|
|
|
type bucketWriter struct {
|
|
w *Writer
|
|
err error
|
|
start, end int64
|
|
key []byte
|
|
field []byte
|
|
n int
|
|
closed bool
|
|
}
|
|
|
|
func (bw *bucketWriter) Err() error {
|
|
if bw.w.err != nil {
|
|
return bw.w.err
|
|
}
|
|
return bw.err
|
|
}
|
|
|
|
func (bw *bucketWriter) hasErr() bool {
|
|
return bw.w.err != nil || bw.err != nil
|
|
}
|
|
|
|
func (bw *bucketWriter) BeginSeries(name, field []byte, typ influxql.DataType, tags models.Tags) {
|
|
if bw.hasErr() {
|
|
return
|
|
}
|
|
|
|
if bw.w.state != writeSeries {
|
|
panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writeSeries))
|
|
}
|
|
bw.w.state = writeSeriesHeader
|
|
|
|
bw.key = models.AppendMakeKey(bw.key[:0], name, tags)
|
|
bw.field = field
|
|
}
|
|
|
|
func (bw *bucketWriter) EndSeries() {
|
|
if bw.hasErr() {
|
|
return
|
|
}
|
|
|
|
if bw.w.state != writePoints && bw.w.state != writeSeriesHeader {
|
|
panic(fmt.Sprintf("writer state: got=%v, exp=%v,%v", bw.w.state, writeSeriesHeader, writePoints))
|
|
}
|
|
if bw.w.state == writePoints {
|
|
bw.w.writeSeriesFooter(FieldType_IntegerFieldType, bw.n)
|
|
}
|
|
bw.w.state = writeSeries
|
|
}
|
|
|
|
func (bw *bucketWriter) WriteIntegerCursor(cur tsdb.IntegerArrayCursor) {
|
|
if bw.hasErr() {
|
|
return
|
|
}
|
|
|
|
if bw.w.state == writeSeriesHeader {
|
|
bw.w.writeSeriesHeader(bw.key, bw.field, FieldType_IntegerFieldType)
|
|
}
|
|
|
|
if bw.w.state != writePoints {
|
|
panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints))
|
|
}
|
|
|
|
var msg IntegerPoints
|
|
for {
|
|
a := cur.Next()
|
|
if a.Len() == 0 {
|
|
break
|
|
}
|
|
|
|
bw.n += a.Len()
|
|
msg.Timestamps = a.Timestamps
|
|
msg.Values = a.Values
|
|
bw.w.writeTypeMessage(IntegerPointsType, &msg)
|
|
}
|
|
}
|
|
|
|
func (bw *bucketWriter) WriteFloatCursor(cur tsdb.FloatArrayCursor) {
|
|
if bw.hasErr() {
|
|
return
|
|
}
|
|
|
|
if bw.w.state == writeSeriesHeader {
|
|
bw.w.writeSeriesHeader(bw.key, bw.field, FieldType_FloatFieldType)
|
|
}
|
|
|
|
if bw.w.state != writePoints {
|
|
panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints))
|
|
}
|
|
|
|
var msg FloatPoints
|
|
for {
|
|
a := cur.Next()
|
|
if a.Len() == 0 {
|
|
break
|
|
}
|
|
|
|
bw.n += a.Len()
|
|
msg.Timestamps = a.Timestamps
|
|
msg.Values = a.Values
|
|
bw.w.writeTypeMessage(FloatPointsType, &msg)
|
|
}
|
|
}
|
|
|
|
func (bw *bucketWriter) WriteUnsignedCursor(cur tsdb.UnsignedArrayCursor) {
|
|
if bw.hasErr() {
|
|
return
|
|
}
|
|
|
|
if bw.w.state == writeSeriesHeader {
|
|
bw.w.writeSeriesHeader(bw.key, bw.field, FieldType_UnsignedFieldType)
|
|
}
|
|
|
|
if bw.w.state != writePoints {
|
|
panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints))
|
|
}
|
|
|
|
var msg UnsignedPoints
|
|
for {
|
|
a := cur.Next()
|
|
if a.Len() == 0 {
|
|
break
|
|
}
|
|
|
|
bw.n += a.Len()
|
|
msg.Timestamps = a.Timestamps
|
|
msg.Values = a.Values
|
|
bw.w.writeTypeMessage(UnsignedPointsType, &msg)
|
|
}
|
|
}
|
|
|
|
func (bw *bucketWriter) WriteBooleanCursor(cur tsdb.BooleanArrayCursor) {
|
|
if bw.hasErr() {
|
|
return
|
|
}
|
|
|
|
if bw.w.state == writeSeriesHeader {
|
|
bw.w.writeSeriesHeader(bw.key, bw.field, FieldType_BooleanFieldType)
|
|
}
|
|
|
|
if bw.w.state != writePoints {
|
|
panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints))
|
|
}
|
|
|
|
var msg BooleanPoints
|
|
for {
|
|
a := cur.Next()
|
|
if a.Len() == 0 {
|
|
break
|
|
}
|
|
|
|
bw.n += a.Len()
|
|
msg.Timestamps = a.Timestamps
|
|
msg.Values = a.Values
|
|
bw.w.writeTypeMessage(BooleanPointsType, &msg)
|
|
}
|
|
}
|
|
|
|
func (bw *bucketWriter) WriteStringCursor(cur tsdb.StringArrayCursor) {
|
|
if bw.hasErr() {
|
|
return
|
|
}
|
|
|
|
if bw.w.state == writeSeriesHeader {
|
|
bw.w.writeSeriesHeader(bw.key, bw.field, FieldType_StringFieldType)
|
|
}
|
|
|
|
if bw.w.state != writePoints {
|
|
panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints))
|
|
}
|
|
|
|
var msg StringPoints
|
|
for {
|
|
a := cur.Next()
|
|
if a.Len() == 0 {
|
|
break
|
|
}
|
|
|
|
bw.n += a.Len()
|
|
msg.Timestamps = a.Timestamps
|
|
msg.Values = a.Values
|
|
bw.w.writeTypeMessage(StringPointsType, &msg)
|
|
}
|
|
}
|
|
|
|
func (bw *bucketWriter) Close() error {
|
|
if bw.closed {
|
|
return nil
|
|
}
|
|
|
|
bw.closed = true
|
|
|
|
if bw.hasErr() {
|
|
return bw.Err()
|
|
}
|
|
|
|
bw.w.bw = nil
|
|
bw.w.writeBucketFooter()
|
|
bw.err = ErrWriteBucketAfterClose
|
|
|
|
return bw.w.w.Flush()
|
|
}
|