commit
8a5d649d3f
|
@ -101,6 +101,12 @@
|
|||
packages = ["."]
|
||||
revision = "88950e537e7e644cd746a3102037b5d2b723e9f5"
|
||||
|
||||
[[projects]]
|
||||
name = "github.com/cespare/xxhash"
|
||||
packages = ["."]
|
||||
revision = "5c37fe3735342a2e0d01c87a907579987c8936cc"
|
||||
version = "v1.0.0"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
name = "github.com/codahale/hdrhistogram"
|
||||
|
@ -531,6 +537,12 @@
|
|||
revision = "f58768cc1a7a7e77a3bd49e98cdd21419399b6a3"
|
||||
version = "v1.2.0"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
name = "github.com/segmentio/kafka-go"
|
||||
packages = ["."]
|
||||
revision = "44a19ca9cf5925ba3c5153dd0d9e5b44e8dfd717"
|
||||
|
||||
[[projects]]
|
||||
name = "github.com/sergi/go-diff"
|
||||
packages = ["diffmatchpatch"]
|
||||
|
@ -706,6 +718,6 @@
|
|||
[solve-meta]
|
||||
analyzer-name = "dep"
|
||||
analyzer-version = 1
|
||||
inputs-digest = "8037672c58ee1166ab921f553c2be239edfce5df42133e1b92b86ffb3c41144b"
|
||||
inputs-digest = "8666620edf4d5f9d8ea8917eba18447194a86b306dead9083728b9df5dd5e391"
|
||||
solver-name = "gps-cdcl"
|
||||
solver-version = 1
|
||||
|
|
|
@ -0,0 +1,404 @@
|
|||
package functions
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cespare/xxhash"
|
||||
"github.com/influxdata/line-protocol"
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/execute"
|
||||
"github.com/influxdata/platform/query/plan"
|
||||
"github.com/influxdata/platform/query/semantic"
|
||||
"github.com/pkg/errors"
|
||||
kafka "github.com/segmentio/kafka-go"
|
||||
)
|
||||
|
||||
const (
|
||||
// ToKafkaKind is the Kind for the ToKafka ifql function
|
||||
ToKafkaKind = "toKafka"
|
||||
)
|
||||
|
||||
type ToKafkaOpSpec struct {
|
||||
Brokers []string `json:"brokers"`
|
||||
Topic string `json:"topic"`
|
||||
Balancer string `json:"balancer"`
|
||||
Name string `json:"name"`
|
||||
NameColumn string `json:"name_column"` // either name or name_column must be set, if none is set try to use the "_measurement" column.
|
||||
TimeColumn string `json:"time_column"`
|
||||
TagColumns []string `json:"tag_columns"`
|
||||
ValueColumns []string `json:"value_columns"`
|
||||
MsgBufSize int `json:"msg_buffer_size"` // the maximim number of messages to buffer before sending to kafka, the library we use defaults to 100
|
||||
}
|
||||
|
||||
func init() {
|
||||
query.RegisterFunction(ToKafkaKind, createToKafkaOpSpec, ToKafkaSignature)
|
||||
query.RegisterOpSpec(ToKafkaKind,
|
||||
func() query.OperationSpec { return &ToKafkaOpSpec{} })
|
||||
plan.RegisterProcedureSpec(ToKafkaKind, newToKafkaProcedure, ToKafkaKind)
|
||||
execute.RegisterTransformation(ToKafkaKind, createToKafkaTransformation)
|
||||
}
|
||||
|
||||
// DefaultKafkaWriterFactory is a terrible name for a way to make a kafkaWriter that is injectable for testing
|
||||
var DefaultKafkaWriterFactory = func(conf kafka.WriterConfig) KafkaWriter {
|
||||
return kafka.NewWriter(conf)
|
||||
}
|
||||
|
||||
// KafkaWriter is an interface for what we need fromDefaultKafkaWriterFactory
|
||||
type KafkaWriter interface {
|
||||
io.Closer
|
||||
WriteMessages(context.Context, ...kafka.Message) error
|
||||
}
|
||||
|
||||
// ReadArgs loads a query.Arguments into ToKafkaOpSpec. It sets several default values.
|
||||
// If the time_column isn't set, it defaults to execute.TimeColLabel.
|
||||
// If the value_column isn't set it defaults to a []string{execute.DefaultValueColLabel}.
|
||||
func (o *ToKafkaOpSpec) ReadArgs(args query.Arguments) error {
|
||||
var err error
|
||||
var ok bool
|
||||
|
||||
brokers, err := args.GetRequiredArray("brokers", semantic.String)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l := brokers.Len()
|
||||
|
||||
o.Brokers = make([]string, l)
|
||||
if brokers.Len() < 1 {
|
||||
return errors.New("at least one broker is required")
|
||||
}
|
||||
for i := 0; i < l; i++ {
|
||||
o.Brokers[i] = brokers.Get(i).Str()
|
||||
}
|
||||
|
||||
o.Topic, err = args.GetRequiredString("topic")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(o.Topic) == 0 {
|
||||
return errors.New("invalid topic name")
|
||||
}
|
||||
|
||||
o.Balancer, _, err = args.GetString("balancer")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
o.Name, ok, err = args.GetString("name")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
o.NameColumn, ok, err = args.GetString("name_column")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
o.NameColumn = "_measurement"
|
||||
}
|
||||
}
|
||||
o.TimeColumn, ok, err = args.GetString("time_column")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
o.TimeColumn = execute.DefaultTimeColLabel
|
||||
}
|
||||
tagColumns, ok, err := args.GetArray("tag_columns", semantic.String)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o.TagColumns = o.TagColumns[:0]
|
||||
if ok {
|
||||
for i := 0; i < tagColumns.Len(); i++ {
|
||||
o.TagColumns = append(o.TagColumns, tagColumns.Get(i).Str())
|
||||
}
|
||||
sort.Strings(o.TagColumns)
|
||||
}
|
||||
valueColumns, ok, err := args.GetArray("value_columns", semantic.String)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o.ValueColumns = o.ValueColumns[:0]
|
||||
if !ok || valueColumns.Len() == 0 {
|
||||
o.ValueColumns = append(o.ValueColumns, execute.DefaultValueColLabel)
|
||||
} else {
|
||||
for i := 0; i < valueColumns.Len(); i++ {
|
||||
o.TagColumns = append(o.ValueColumns, valueColumns.Get(i).Str())
|
||||
}
|
||||
sort.Strings(o.TagColumns)
|
||||
}
|
||||
|
||||
msgBufSize, ok, err := args.GetInt("msg_buffer_size")
|
||||
o.MsgBufSize = int(msgBufSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if o.MsgBufSize < 0 || !ok {
|
||||
o.MsgBufSize = 0 // so the library will set it to the default
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
func createToKafkaOpSpec(args query.Arguments, a *query.Administration) (query.OperationSpec, error) {
|
||||
if err := a.AddParentFromArgs(args); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := new(ToKafkaOpSpec)
|
||||
if err := s.ReadArgs(args); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
var ToKafkaSignature = query.DefaultFunctionSignature()
|
||||
|
||||
func (ToKafkaOpSpec) Kind() query.OperationKind {
|
||||
return ToKafkaKind
|
||||
}
|
||||
|
||||
type ToKafkaProcedureSpec struct {
|
||||
Spec *ToKafkaOpSpec
|
||||
balancer kafka.Balancer
|
||||
}
|
||||
|
||||
func (o *ToKafkaProcedureSpec) Kind() plan.ProcedureKind {
|
||||
return ToKafkaKind
|
||||
}
|
||||
func (o *ToKafkaProcedureSpec) Copy() plan.ProcedureSpec {
|
||||
s := o.Spec
|
||||
res := &ToKafkaProcedureSpec{
|
||||
Spec: &ToKafkaOpSpec{
|
||||
Brokers: append([]string(nil), s.Brokers...),
|
||||
Topic: s.Topic,
|
||||
Balancer: s.Balancer,
|
||||
Name: s.Name,
|
||||
NameColumn: s.NameColumn,
|
||||
TimeColumn: s.TimeColumn,
|
||||
TagColumns: append([]string(nil), s.TagColumns...),
|
||||
ValueColumns: append([]string(nil), s.ValueColumns...),
|
||||
},
|
||||
}
|
||||
switch s.Balancer {
|
||||
case "hash", "": //hash is default for compatibility with enterprise
|
||||
res.balancer = &kafka.Hash{}
|
||||
|
||||
case "round-robin":
|
||||
res.balancer = &kafka.RoundRobin{}
|
||||
|
||||
case "least-bytes":
|
||||
res.balancer = &kafka.LeastBytes{}
|
||||
}
|
||||
return res
|
||||
}
|
||||
func newToKafkaProcedure(qs query.OperationSpec, a plan.Administration) (plan.ProcedureSpec, error) {
|
||||
spec, ok := qs.(*ToKafkaOpSpec)
|
||||
if !ok && spec != nil {
|
||||
return nil, fmt.Errorf("invalid spec type %T", qs)
|
||||
}
|
||||
return &ToKafkaProcedureSpec{Spec: spec}, nil
|
||||
}
|
||||
func createToKafkaTransformation(id execute.DatasetID, mode execute.AccumulationMode, spec plan.ProcedureSpec, a execute.Administration) (execute.Transformation, execute.Dataset, error) {
|
||||
s, ok := spec.(*ToKafkaProcedureSpec)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("invalid spec type %T", spec)
|
||||
}
|
||||
cache := execute.NewBlockBuilderCache(a.Allocator())
|
||||
d := execute.NewDataset(id, mode, cache)
|
||||
t := NewToKafkaTransformation(d, cache, s)
|
||||
return t, d, nil
|
||||
}
|
||||
|
||||
type ToKafkaTransformation struct {
|
||||
d execute.Dataset
|
||||
cache execute.BlockBuilderCache
|
||||
spec *ToKafkaProcedureSpec
|
||||
}
|
||||
|
||||
func (t *ToKafkaTransformation) RetractBlock(id execute.DatasetID, key query.PartitionKey) error {
|
||||
return t.d.RetractBlock(key)
|
||||
}
|
||||
func NewToKafkaTransformation(d execute.Dataset, cache execute.BlockBuilderCache, spec *ToKafkaProcedureSpec) *ToKafkaTransformation {
|
||||
return &ToKafkaTransformation{
|
||||
d: d,
|
||||
cache: cache,
|
||||
spec: spec,
|
||||
}
|
||||
}
|
||||
|
||||
type toKafkaMetric struct {
|
||||
tags []*protocol.Tag
|
||||
fields []*protocol.Field
|
||||
name string
|
||||
t time.Time
|
||||
}
|
||||
|
||||
func (m *toKafkaMetric) TagList() []*protocol.Tag {
|
||||
return m.tags
|
||||
}
|
||||
func (m *toKafkaMetric) FieldList() []*protocol.Field {
|
||||
return m.fields
|
||||
}
|
||||
func (m *toKafkaMetric) truncateTagsAndFields() {
|
||||
m.fields = m.fields[:0]
|
||||
m.tags = m.tags[:0]
|
||||
}
|
||||
func (m *toKafkaMetric) Name() string {
|
||||
return m.name
|
||||
}
|
||||
func (m *toKafkaMetric) Time() time.Time {
|
||||
return m.t
|
||||
}
|
||||
|
||||
func (t *ToKafkaTransformation) Process(id execute.DatasetID, b query.Block) (err error) {
|
||||
w := DefaultKafkaWriterFactory(kafka.WriterConfig{
|
||||
Brokers: t.spec.Spec.Brokers,
|
||||
Topic: t.spec.Spec.Topic,
|
||||
Balancer: t.spec.balancer,
|
||||
BatchSize: t.spec.Spec.MsgBufSize,
|
||||
QueueCapacity: t.spec.Spec.MsgBufSize,
|
||||
})
|
||||
|
||||
defer func() {
|
||||
err2 := w.Close()
|
||||
// don't overwrite current error
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if err2 != nil {
|
||||
// allow Process to return the error from the defered Close()
|
||||
err = err2
|
||||
return
|
||||
}
|
||||
}()
|
||||
pr, pw := io.Pipe() // TODO: replce the pipe with something faster
|
||||
// I'd like a linereader in line-protocol
|
||||
m := &toKafkaMetric{}
|
||||
e := protocol.NewEncoder(pw)
|
||||
e.FailOnFieldErr(true)
|
||||
e.SetFieldSortOrder(protocol.SortFields)
|
||||
cols := b.Cols()
|
||||
labels := make(map[string]idxType, len(cols))
|
||||
for i, col := range cols {
|
||||
labels[col.Label] = idxType{Idx: i, Type: col.Type}
|
||||
}
|
||||
// do time
|
||||
timeColLabel := t.spec.Spec.TimeColumn
|
||||
timeColIdx, ok := labels[timeColLabel]
|
||||
if !ok {
|
||||
return errors.New("Could not get time column")
|
||||
}
|
||||
if timeColIdx.Type != query.TTime {
|
||||
return fmt.Errorf("column %s is not of type %s", timeColLabel, timeColIdx.Type)
|
||||
}
|
||||
var measurementNameCol string
|
||||
if t.spec.Spec.Name == "" {
|
||||
measurementNameCol = t.spec.Spec.NameColumn
|
||||
}
|
||||
// check if each col is a tag or value and cache this value for the loop
|
||||
colMetadatas := b.Cols()
|
||||
isTag := make([]bool, len(colMetadatas))
|
||||
isValue := make([]bool, len(colMetadatas))
|
||||
for i, col := range colMetadatas {
|
||||
isValue[i] = sort.SearchStrings(t.spec.Spec.ValueColumns, col.Label) < len(t.spec.Spec.ValueColumns) && t.spec.Spec.ValueColumns[sort.SearchStrings(t.spec.Spec.ValueColumns, col.Label)] == col.Label
|
||||
isTag[i] = sort.SearchStrings(t.spec.Spec.TagColumns, col.Label) < len(t.spec.Spec.TagColumns) && t.spec.Spec.TagColumns[sort.SearchStrings(t.spec.Spec.TagColumns, col.Label)] == col.Label
|
||||
}
|
||||
m.name = t.spec.Spec.Name
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
err = b.Do(func(er query.ColReader) error {
|
||||
l := er.Len()
|
||||
for i := 0; i < l; i++ {
|
||||
m.truncateTagsAndFields()
|
||||
for j, col := range er.Cols() {
|
||||
switch {
|
||||
case col.Label == timeColLabel:
|
||||
m.t = er.Times(j)[i].Time()
|
||||
case measurementNameCol != "" && measurementNameCol == col.Label:
|
||||
if col.Type != query.TString {
|
||||
return errors.New("invalid type for measurement column")
|
||||
}
|
||||
m.name = er.Strings(j)[i]
|
||||
case isTag[j]:
|
||||
if col.Type != query.TString {
|
||||
return errors.New("invalid type for measurement column")
|
||||
}
|
||||
m.tags = append(m.tags, &protocol.Tag{Key: col.Label, Value: er.Strings(j)[i]})
|
||||
case isValue[j]:
|
||||
switch col.Type {
|
||||
case query.TFloat:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Floats(j)[i]})
|
||||
case query.TInt:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Ints(j)[i]})
|
||||
case query.TUInt:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.UInts(j)[i]})
|
||||
case query.TString:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Strings(j)[i]})
|
||||
case query.TTime:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Times(j)[i]})
|
||||
case query.TBool:
|
||||
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Bools(j)[i]})
|
||||
default:
|
||||
return fmt.Errorf("invalid type for column %s", col.Label)
|
||||
}
|
||||
}
|
||||
}
|
||||
_, err := e.Encode(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
pw.Close()
|
||||
wg.Done()
|
||||
}()
|
||||
// write the data to kafka
|
||||
{
|
||||
scan := bufio.NewScanner(pr)
|
||||
msgBuf := make([]kafka.Message, 128)
|
||||
i := 0
|
||||
// todo, make this a little more async
|
||||
for scan.Scan() {
|
||||
v := append([]byte(nil), scan.Bytes()...) // we do this since scan.Bytes()'s result can be overwritten by calls to Scan()
|
||||
if cap(msgBuf[i].Key) != 8 {
|
||||
msgBuf[i].Key = make([]byte, 8)
|
||||
}
|
||||
binary.LittleEndian.PutUint64(msgBuf[i].Key, xxhash.Sum64(v))
|
||||
msgBuf[i].Value = v
|
||||
if i == t.spec.Spec.MsgBufSize-1 {
|
||||
if err = w.WriteMessages(context.Background(), msgBuf...); err != nil {
|
||||
return err
|
||||
}
|
||||
msgBuf = msgBuf[:0]
|
||||
i = 0
|
||||
}
|
||||
i++
|
||||
}
|
||||
// send the remainder of the messages
|
||||
if len(msgBuf) > 0 {
|
||||
err = w.WriteMessages(context.Background(), msgBuf[:i]...)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
return err
|
||||
}
|
||||
|
||||
func (t *ToKafkaTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error {
|
||||
return t.d.UpdateWatermark(pt)
|
||||
}
|
||||
|
||||
func (t *ToKafkaTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error {
|
||||
return t.d.UpdateProcessingTime(pt)
|
||||
}
|
||||
|
||||
func (t *ToKafkaTransformation) Finish(id execute.DatasetID, err error) {
|
||||
t.d.Finish(err)
|
||||
}
|
|
@ -0,0 +1,433 @@
|
|||
package functions_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/execute"
|
||||
"github.com/influxdata/platform/query/execute/executetest"
|
||||
"github.com/influxdata/platform/query/functions"
|
||||
"github.com/influxdata/platform/query/querytest"
|
||||
kafka "github.com/segmentio/kafka-go"
|
||||
)
|
||||
|
||||
// type kafkaClientMock = func
|
||||
|
||||
func TestToKafka_NewQuery(t *testing.T) {
|
||||
tests := []querytest.NewQueryTestCase{
|
||||
{
|
||||
Name: "from with database",
|
||||
Raw: `from(db:"mydb") |> toKafka(brokers:["brokerurl:8989"], name:"series1", topic:"totallynotfaketopic")`,
|
||||
Want: &query.Spec{
|
||||
Operations: []*query.Operation{
|
||||
{
|
||||
ID: "from0",
|
||||
Spec: &functions.FromOpSpec{
|
||||
Database: "mydb",
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "toKafka1",
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic", //Balancer: &kafka.Hash{},
|
||||
Name: "series1",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{execute.DefaultValueColLabel},
|
||||
},
|
||||
},
|
||||
},
|
||||
Edges: []query.Edge{
|
||||
{Parent: "from0", Child: "toKafka1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
tc := tc
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
querytest.NewQueryTestHelper(t, tc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type kafkaMock struct {
|
||||
sync.Mutex
|
||||
data [][]kafka.Message
|
||||
}
|
||||
|
||||
func (k *kafkaMock) reset() {
|
||||
k.Lock()
|
||||
k.data = [][]kafka.Message{}
|
||||
k.Unlock()
|
||||
}
|
||||
|
||||
func (k *kafkaMock) Close() error { return nil }
|
||||
|
||||
func (k *kafkaMock) WriteMessages(_ context.Context, msgs ...kafka.Message) error {
|
||||
k.Lock()
|
||||
k.data = append(k.data, msgs)
|
||||
k.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestToKafka_Process(t *testing.T) {
|
||||
data := &kafkaMock{}
|
||||
functions.DefaultKafkaWriterFactory = func(_ kafka.WriterConfig) functions.KafkaWriter {
|
||||
return data
|
||||
}
|
||||
|
||||
type wanted struct {
|
||||
Block []*executetest.Block
|
||||
Result [][]kafka.Message
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
spec *functions.ToKafkaProcedureSpec
|
||||
data []query.Block
|
||||
want wanted
|
||||
}{
|
||||
{
|
||||
name: "colblock with name in _measurement",
|
||||
spec: &functions.ToKafkaProcedureSpec{
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
NameColumn: "_measurement",
|
||||
},
|
||||
},
|
||||
data: []query.Block{execute.CopyBlock(&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_measurement", Type: query.TString},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), "a", 2.0, "one"},
|
||||
{execute.Time(21), "a", 2.0, "one"},
|
||||
{execute.Time(21), "b", 1.0, "seven"},
|
||||
{execute.Time(31), "a", 3.0, "nine"},
|
||||
{execute.Time(41), "c", 4.0, "elevendyone"},
|
||||
},
|
||||
}, executetest.UnlimitedAllocator)},
|
||||
want: wanted{
|
||||
Block: []*executetest.Block(nil),
|
||||
Result: [][]kafka.Message{{
|
||||
{Value: []byte("a _value=2 11"), Key: []byte{0xf1, 0xb0, 0x29, 0xd7, 0x9d, 0x04, 0x31, 0x7c}},
|
||||
{Value: []byte("a _value=2 21"), Key: []byte{0xb5, 0xc2, 0xe4, 0x78, 0x95, 0xe0, 0x62, 0x66}},
|
||||
{Value: []byte("b _value=1 21"), Key: []byte{0x0e, 0x62, 0x4e, 0xe7, 0x36, 0xac, 0x77, 0xf3}},
|
||||
{Value: []byte("a _value=3 31"), Key: []byte{0xf5, 0xd5, 0x22, 0x4d, 0x27, 0x9d, 0x8d, 0xb5}},
|
||||
{Value: []byte("c _value=4 41"), Key: []byte{0x05, 0x5b, 0xc5, 0x41, 0x67, 0x78, 0x04, 0xda}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one block with measurement name in _measurement",
|
||||
spec: &functions.ToKafkaProcedureSpec{
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
NameColumn: "_measurement",
|
||||
},
|
||||
},
|
||||
data: []query.Block{&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_measurement", Type: query.TString},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), "a", 2.0, "one"},
|
||||
{execute.Time(21), "a", 2.0, "one"},
|
||||
{execute.Time(21), "b", 1.0, "seven"},
|
||||
{execute.Time(31), "a", 3.0, "nine"},
|
||||
{execute.Time(41), "c", 4.0, "elevendyone"},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Block: []*executetest.Block(nil),
|
||||
Result: [][]kafka.Message{{
|
||||
{Value: []byte("a _value=2 11"), Key: []byte{0xf1, 0xb0, 0x29, 0xd7, 0x9d, 0x04, 0x31, 0x7c}},
|
||||
{Value: []byte("a _value=2 21"), Key: []byte{0xb5, 0xc2, 0xe4, 0x78, 0x95, 0xe0, 0x62, 0x66}},
|
||||
{Value: []byte("b _value=1 21"), Key: []byte{0x0e, 0x62, 0x4e, 0xe7, 0x36, 0xac, 0x77, 0xf3}},
|
||||
{Value: []byte("a _value=3 31"), Key: []byte{0xf5, 0xd5, 0x22, 0x4d, 0x27, 0x9d, 0x8d, 0xb5}},
|
||||
{Value: []byte("c _value=4 41"), Key: []byte{0x05, 0x5b, 0xc5, 0x41, 0x67, 0x78, 0x04, 0xda}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one block with measurement name in _measurement and tag",
|
||||
spec: &functions.ToKafkaProcedureSpec{
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
TagColumns: []string{"fred"},
|
||||
NameColumn: "_measurement",
|
||||
},
|
||||
},
|
||||
data: []query.Block{&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_measurement", Type: query.TString},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), "a", 2.0, "one"},
|
||||
{execute.Time(21), "a", 2.0, "one"},
|
||||
{execute.Time(21), "b", 1.0, "seven"},
|
||||
{execute.Time(31), "a", 3.0, "nine"},
|
||||
{execute.Time(41), "c", 4.0, "elevendyone"},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Block: []*executetest.Block(nil),
|
||||
Result: [][]kafka.Message{{
|
||||
{Value: []byte("a,fred=one _value=2 11"), Key: []byte{0xe9, 0xde, 0xc5, 0x1e, 0xfb, 0x26, 0x77, 0xfe}},
|
||||
{Value: []byte("a,fred=one _value=2 21"), Key: []byte{0x52, 0x6d, 0x0a, 0xe8, 0x1d, 0xb3, 0xe5, 0xeb}},
|
||||
{Value: []byte("b,fred=seven _value=1 21"), Key: []byte{0x18, 0x91, 0xed, 0x7e, 0x79, 0x5c, 0xc2, 0xe3}},
|
||||
{Value: []byte("a,fred=nine _value=3 31"), Key: []byte{0x75, 0x15, 0xe5, 0x3e, 0xdd, 0xfd, 0x4f, 0x9a}},
|
||||
{Value: []byte("c,fred=elevendyone _value=4 41"), Key: []byte{0xd4, 0xc9, 0xca, 0xea, 0xa6, 0x8d, 0x14, 0x4b}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one block",
|
||||
spec: &functions.ToKafkaProcedureSpec{
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
Name: "one_block",
|
||||
},
|
||||
},
|
||||
data: []query.Block{&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0},
|
||||
{execute.Time(21), 1.0},
|
||||
{execute.Time(31), 3.0},
|
||||
{execute.Time(41), 4.0},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Block: []*executetest.Block(nil),
|
||||
Result: [][]kafka.Message{{
|
||||
{Value: []byte("one_block _value=2 11"), Key: []byte{0x92, 0x7e, 0x77, 0xb1, 0x2c, 0x35, 0x13, 0x12}},
|
||||
{Value: []byte("one_block _value=1 21"), Key: []byte{0x39, 0x39, 0xb2, 0x11, 0xd1, 0x1b, 0x44, 0x57}},
|
||||
{Value: []byte("one_block _value=3 31"), Key: []byte{0xa2, 0xc1, 0x71, 0x42, 0xa8, 0xbb, 0x91, 0x67}},
|
||||
{Value: []byte("one_block _value=4 41"), Key: []byte{0x82, 0x3b, 0x2e, 0x58, 0xec, 0x53, 0x62, 0x4e}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one block with unused tag",
|
||||
spec: &functions.ToKafkaProcedureSpec{
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
Name: "one_block_w_unused_tag",
|
||||
},
|
||||
},
|
||||
data: []query.Block{&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0, "one"},
|
||||
{execute.Time(21), 1.0, "seven"},
|
||||
{execute.Time(31), 3.0, "nine"},
|
||||
{execute.Time(41), 4.0, "elevendyone"},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Block: []*executetest.Block(nil),
|
||||
Result: [][]kafka.Message{{
|
||||
{Value: []byte("one_block_w_unused_tag _value=2 11"), Key: []byte{0x62, 0xda, 0xe8, 0xc3, 0x2b, 0x88, 0x74, 0x54}},
|
||||
{Value: []byte("one_block_w_unused_tag _value=1 21"), Key: []byte{0xff, 0x23, 0xa3, 0x84, 0xe4, 0xcb, 0x77, 0x79}},
|
||||
{Value: []byte("one_block_w_unused_tag _value=3 31"), Key: []byte{0xc5, 0x02, 0x43, 0x34, 0x66, 0xb6, 0x43, 0x87}},
|
||||
{Value: []byte("one_block_w_unused_tag _value=4 41"), Key: []byte{0x65, 0xd6, 0x94, 0x12, 0xfa, 0x92, 0x30, 0xff}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one block with tag",
|
||||
spec: &functions.ToKafkaProcedureSpec{
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
TagColumns: []string{"fred"},
|
||||
Name: "one_block_w_tag",
|
||||
},
|
||||
},
|
||||
data: []query.Block{&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0, "one"},
|
||||
{execute.Time(21), 1.0, "seven"},
|
||||
{execute.Time(31), 3.0, "nine"},
|
||||
{execute.Time(41), 4.0, "elevendyone"},
|
||||
},
|
||||
}},
|
||||
want: wanted{
|
||||
Block: []*executetest.Block(nil),
|
||||
Result: [][]kafka.Message{{
|
||||
{Value: []byte("one_block_w_tag,fred=one _value=2 11"), Key: []byte{0xca, 0xc3, 0xec, 0x04, 0x42, 0xec, 0x85, 0x84}},
|
||||
{Value: []byte("one_block_w_tag,fred=seven _value=1 21"), Key: []byte{0x6c, 0x2b, 0xb7, 0xf8, 0x98, 0xce, 0x12, 0x64}},
|
||||
{Value: []byte("one_block_w_tag,fred=nine _value=3 31"), Key: []byte{0x41, 0x73, 0x13, 0xd6, 0x5c, 0xf1, 0x18, 0xd3}},
|
||||
{Value: []byte("one_block_w_tag,fred=elevendyone _value=4 41"), Key: []byte{0x83, 0x42, 0x25, 0x68, 0x66, 0x44, 0x67, 0x14}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "multi block",
|
||||
spec: &functions.ToKafkaProcedureSpec{
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
TagColumns: []string{"fred"},
|
||||
Name: "multi_block",
|
||||
},
|
||||
},
|
||||
data: []query.Block{
|
||||
&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0, "one"},
|
||||
{execute.Time(21), 1.0, "seven"},
|
||||
{execute.Time(31), 3.0, "nine"},
|
||||
},
|
||||
},
|
||||
&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(51), 2.0, "one"},
|
||||
{execute.Time(61), 1.0, "seven"},
|
||||
{execute.Time(71), 3.0, "nine"},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: wanted{
|
||||
Block: []*executetest.Block(nil),
|
||||
Result: [][]kafka.Message{{
|
||||
{Value: []byte("multi_block,fred=one _value=2 11"), Key: []byte{0x41, 0x9d, 0x7f, 0x17, 0xc8, 0x21, 0xfb, 0x69}},
|
||||
{Value: []byte("multi_block,fred=seven _value=1 21"), Key: []byte{0x8f, 0x83, 0x72, 0x66, 0x7b, 0x78, 0x77, 0x18}},
|
||||
{Value: []byte("multi_block,fred=nine _value=3 31"), Key: []byte{0x1c, 0x4a, 0x50, 0x5f, 0xa1, 0xfc, 0xf3, 0x56}},
|
||||
}, {
|
||||
{Value: []byte("multi_block,fred=one _value=2 51"), Key: []byte{0x77, 0x44, 0x9c, 0x9c, 0x68, 0xca, 0xc1, 0x13}},
|
||||
{Value: []byte("multi_block,fred=seven _value=1 61"), Key: []byte{0x48, 0x23, 0xfe, 0x07, 0x61, 0x79, 0x09, 0x74}},
|
||||
{Value: []byte("multi_block,fred=nine _value=3 71"), Key: []byte{0x74, 0xc5, 0xa3, 0x42, 0xcb, 0x91, 0x99, 0x7c}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "multi collist blocks",
|
||||
spec: &functions.ToKafkaProcedureSpec{
|
||||
Spec: &functions.ToKafkaOpSpec{
|
||||
Brokers: []string{"brokerurl:8989"},
|
||||
Topic: "totallynotfaketopic",
|
||||
TimeColumn: execute.DefaultTimeColLabel,
|
||||
ValueColumns: []string{"_value"},
|
||||
TagColumns: []string{"fred"},
|
||||
Name: "multi_collist_blocks",
|
||||
},
|
||||
},
|
||||
data: []query.Block{
|
||||
execute.CopyBlock(
|
||||
&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(11), 2.0, "one"},
|
||||
{execute.Time(21), 1.0, "seven"},
|
||||
{execute.Time(31), 3.0, "nine"},
|
||||
},
|
||||
}, executetest.UnlimitedAllocator),
|
||||
&executetest.Block{
|
||||
ColMeta: []query.ColMeta{
|
||||
{Label: "_time", Type: query.TTime},
|
||||
{Label: "_value", Type: query.TFloat},
|
||||
{Label: "fred", Type: query.TString},
|
||||
},
|
||||
Data: [][]interface{}{
|
||||
{execute.Time(51), 2.0, "one"},
|
||||
{execute.Time(61), 1.0, "seven"},
|
||||
{execute.Time(71), 3.0, "nine"},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: wanted{
|
||||
Block: []*executetest.Block(nil),
|
||||
Result: [][]kafka.Message{{
|
||||
{Value: []byte("multi_collist_blocks,fred=one _value=2 11"), Key: []byte{0xfc, 0xab, 0xa3, 0x68, 0x81, 0x48, 0x7d, 0x8a}},
|
||||
{Value: []byte("multi_collist_blocks,fred=seven _value=1 21"), Key: []byte{0x9f, 0xe1, 0x82, 0x97, 0x49, 0x92, 0x56, 0x1a}},
|
||||
{Value: []byte("multi_collist_blocks,fred=nine _value=3 31"), Key: []byte{0x73, 0x3c, 0x1a, 0x62, 0xfa, 0x01, 0xcd, 0xa7}},
|
||||
}, {
|
||||
{Value: []byte("multi_collist_blocks,fred=one _value=2 51"), Key: []byte{0xb9, 0x23, 0xd6, 0x3a, 0x7e, 0x71, 0xa6, 0xde}},
|
||||
{Value: []byte("multi_collist_blocks,fred=seven _value=1 61"), Key: []byte{0x0a, 0x70, 0x1f, 0xbe, 0xfd, 0x40, 0x2f, 0xd8}},
|
||||
{Value: []byte("multi_collist_blocks,fred=nine _value=3 71"), Key: []byte{0x67, 0x4b, 0xf0, 0xf1, 0xb0, 0xf5, 0x99, 0x5a}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
executetest.ProcessTestHelper(
|
||||
t,
|
||||
tc.data,
|
||||
tc.want.Block,
|
||||
func(d execute.Dataset, c execute.BlockBuilderCache) execute.Transformation {
|
||||
return functions.NewToKafkaTransformation(d, c, tc.spec)
|
||||
},
|
||||
)
|
||||
if !cmp.Equal(tc.want.Result, data.data, cmpopts.EquateNaNs()) {
|
||||
t.Log(cmp.Diff(tc.want.Result, data.data))
|
||||
t.Fail()
|
||||
}
|
||||
data.reset()
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue