influxdb/cmd/influx_tools/parquet/batcher.go

167 lines
3.9 KiB
Go

package parquet
import (
"context"
"fmt"
"sort"
"go.uber.org/zap"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
)
type row struct {
timestamp int64
tags map[string]string
fields map[string]interface{}
}
type batcher struct {
measurement []byte
shard *tsdb.Shard
typeResolutions map[string]influxql.DataType
converter map[string]func(interface{}) (interface{}, error)
nameResolutions map[string]string
series []seriesEntry
start int64
logger *zap.SugaredLogger
}
func (b *batcher) init() error {
// Setup the type converters for the conflicting fields
b.converter = make(map[string]func(interface{}) (interface{}, error), len(b.typeResolutions))
for field, ftype := range b.typeResolutions {
switch ftype {
case influxql.Float:
b.converter[field] = toFloat
case influxql.Unsigned:
b.converter[field] = toUint
case influxql.Integer:
b.converter[field] = toInt
case influxql.Boolean:
b.converter[field] = toBool
case influxql.String:
b.converter[field] = toString
default:
return fmt.Errorf("unknown converter %v for field %q", ftype, field)
}
}
b.start = models.MinNanoTime
return nil
}
func (b *batcher) reset() {
b.start = models.MinNanoTime
}
func (b *batcher) next(ctx context.Context) ([]row, error) {
// Iterate over the series and fields and accumulate the data row-wise
iter, err := b.shard.CreateCursorIterator(ctx)
if err != nil {
return nil, fmt.Errorf("getting cursor iterator for %q failed: %w", string(b.measurement), err)
}
data := make(map[string]map[int64]row, len(b.series))
end := models.MaxNanoTime
var rowCount int
for _, s := range b.series {
data[s.key] = make(map[int64]row, tsdb.DefaultMaxPointsPerBlock)
tags := make(map[string]string, len(s.tags))
for _, t := range s.tags {
tags[string(t.Key)] = string(t.Value)
}
for field := range s.fields {
cursor, err := iter.Next(ctx,
&tsdb.CursorRequest{
Name: b.measurement,
Tags: s.tags,
Field: field,
Ascending: true,
StartTime: b.start,
EndTime: models.MaxNanoTime,
},
)
if err != nil {
return nil, fmt.Errorf("getting cursor for %s-%s failed: %w", s.key, field, err)
}
if cursor == nil {
continue
}
// Prepare mappings
fname := field
if n, found := b.nameResolutions[field]; found {
fname = n
}
converter := identity
if c, found := b.converter[field]; found {
converter = c
}
fieldEnd := models.MaxNanoTime
c, err := newValueCursor(cursor)
if err != nil {
return nil, fmt.Errorf("creating value cursor failed: %w", err)
}
for {
// Check if we do still have data
timestamp, ok := c.peek()
if !ok {
break
}
timestamp, value := c.next()
v, err := converter(value)
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", value, field, err)
continue
}
if _, found := data[s.key][timestamp]; !found {
data[s.key][timestamp] = row{
timestamp: timestamp,
tags: tags,
fields: make(map[string]interface{}),
}
rowCount++
}
data[s.key][timestamp].fields[fname] = v
fieldEnd = timestamp
}
c.close()
end = min(end, fieldEnd)
}
}
if len(data) == 0 {
return nil, nil
}
// Extract the rows ordered by timestamp
rows := make([]row, 0, rowCount)
for _, tmap := range data {
for _, r := range tmap {
rows = append(rows, r)
}
}
sort.Slice(rows, func(i, j int) bool { return rows[i].timestamp < rows[j].timestamp })
// Only include rows that are before the end-timestamp to avoid duplicate
// or incomplete entries due to not iterating through all data
n := sort.Search(len(rows), func(i int) bool { return rows[i].timestamp > end })
// Remember the earliest datum to use this for the next batch excluding the entry itself
b.start = end + 1
return rows[:n], nil
}