influxdb/util.go

155 lines
3.4 KiB
Go
Raw Normal View History

2014-10-22 05:32:19 +00:00
package influxdb
2014-10-23 04:21:48 +00:00
import (
"encoding/json"
"fmt"
"github.com/influxdb/influxdb/protocol"
)
type TimePrecision int
const (
MicrosecondPrecision TimePrecision = iota
MillisecondPrecision
SecondPrecision
)
2014-10-22 05:32:19 +00:00
type serializedSeries struct {
2014-10-23 04:21:48 +00:00
Name string `json:"name"`
Columns []string `json:"columns"`
Points [][]interface{} `json:"points"`
2014-10-22 05:32:19 +00:00
}
//func SortSerializedSeries(s []*serializedSeries) {
// sort.Sort(BySerializedSeriesNameAsc{s})
//}
func (s serializedSeries) Series(precision TimePrecision) (*protocol.Series, error) {
2014-10-23 04:21:48 +00:00
points := make([]*protocol.Point, 0, len(s.Points))
if hasDuplicates(s.Columns) {
2014-10-22 05:32:19 +00:00
return nil, fmt.Errorf("Cannot have duplicate field names")
}
2014-10-23 04:21:48 +00:00
for _, point := range s.Points {
if len(point) != len(s.Columns) {
2014-10-22 05:32:19 +00:00
return nil, fmt.Errorf("invalid payload")
}
values := make([]*protocol.FieldValue, 0, len(point))
var timestamp *int64
var sequence *uint64
2014-10-23 04:21:48 +00:00
for idx, field := range s.Columns {
2014-10-22 05:32:19 +00:00
value := point[idx]
if field == "time" {
switch x := value.(type) {
case json.Number:
f, err := x.Float64()
if err != nil {
return nil, err
}
_timestamp := int64(f)
switch precision {
case SecondPrecision:
_timestamp *= 1000
fallthrough
case MillisecondPrecision:
_timestamp *= 1000
}
timestamp = &_timestamp
continue
default:
return nil, fmt.Errorf("time field must be float but is %T (%v)", value, value)
}
}
if field == "sequence_number" {
switch x := value.(type) {
case json.Number:
f, err := x.Float64()
if err != nil {
return nil, err
}
_sequenceNumber := uint64(f)
sequence = &_sequenceNumber
continue
default:
return nil, fmt.Errorf("sequence_number field must be float but is %T (%v)", value, value)
}
}
switch v := value.(type) {
case string:
values = append(values, &protocol.FieldValue{StringValue: &v})
case json.Number:
i, err := v.Int64()
if err == nil {
values = append(values, &protocol.FieldValue{Int64Value: &i})
break
}
f, err := v.Float64()
if err != nil {
return nil, err
}
values = append(values, &protocol.FieldValue{DoubleValue: &f})
case bool:
values = append(values, &protocol.FieldValue{BoolValue: &v})
case nil:
2014-10-23 04:21:48 +00:00
trueValue := true
values = append(values, &protocol.FieldValue{IsNull: &trueValue})
2014-10-22 05:32:19 +00:00
default:
// if we reached this line then the dynamic type didn't match
return nil, fmt.Errorf("Unknown type %T", value)
}
}
points = append(points, &protocol.Point{
Values: values,
Timestamp: timestamp,
SequenceNumber: sequence,
})
}
2014-10-23 04:21:48 +00:00
fields := removeTimestampFieldDefinition(s.Columns)
2014-10-22 05:32:19 +00:00
series := &protocol.Series{
2014-10-23 04:21:48 +00:00
Name: protocol.String(s.Name),
2014-10-22 05:32:19 +00:00
Fields: fields,
Points: points,
}
return series, nil
}
2014-10-23 04:21:48 +00:00
func hasDuplicates(ss []string) bool {
m := make(map[string]struct{}, len(ss))
for _, s := range ss {
if _, ok := m[s]; ok {
return true
}
m[s] = struct{}{}
}
return false
}
func removeField(fields []string, name string) []string {
index := -1
for idx, field := range fields {
if field == name {
index = idx
break
}
}
if index == -1 {
return fields
}
return append(fields[:index], fields[index+1:]...)
}
func removeTimestampFieldDefinition(fields []string) []string {
fields = removeField(fields, "time")
return removeField(fields, "sequence_number")
}