influxdb/influxql/point.go

311 lines
6.7 KiB
Go
Raw Normal View History

2015-11-04 21:06:06 +00:00
package influxql
import (
"bytes"
"encoding/binary"
"io"
2015-11-04 21:06:06 +00:00
"sort"
2016-02-05 17:23:35 +00:00
"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb/influxql/internal"
2015-11-04 21:06:06 +00:00
)
// ZeroTime is the Unix nanosecond timestamp for time.Time{}.
const ZeroTime = int64(-6795364578871345152)
// Point represents a value in a series that occurred at a given time.
type Point interface {
// Name and tags uniquely identify the series the value belongs to.
name() string
tags() Tags
// The time that the value occurred at.
time() int64
// The value at the given time.
value() interface{}
// Auxillary values passed along with the value.
aux() []interface{}
}
// Points represents a list of points.
type Points []Point
// Tags represent a map of keys and values.
// It memoizes its key so it can be used efficiently during query execution.
type Tags struct {
id string
m map[string]string
}
// NewTags returns a new instance of Tags.
func NewTags(m map[string]string) Tags {
if len(m) == 0 {
return Tags{}
}
return Tags{
id: string(encodeTags(m)),
m: m,
}
}
2016-02-05 17:23:35 +00:00
// newTagsID returns a new instance of Tags parses from a tag id.
func newTagsID(id string) Tags {
m := decodeTags([]byte(id))
if len(m) == 0 {
return Tags{}
}
return Tags{id: id, m: m}
}
2015-11-04 21:06:06 +00:00
// ID returns the string identifier for the tags.
func (t Tags) ID() string { return t.id }
// KeyValues returns the underlying map for the tags.
func (t Tags) KeyValues() map[string]string { return t.m }
// Keys returns a sorted list of all keys on the tag.
func (t *Tags) Keys() []string {
if t == nil {
return nil
}
var a []string
for k := range t.m {
a = append(a, k)
}
sort.Strings(a)
return a
}
// Value returns the value for a given key.
func (t *Tags) Value(k string) string {
if t == nil {
return ""
}
return t.m[k]
}
// Subset returns a new tags object with a subset of the keys.
func (t *Tags) Subset(keys []string) Tags {
if t.m == nil || len(keys) == 0 {
return Tags{}
}
// If keys match existing keys, simply return this tagset.
if keysMatch(t.m, keys) {
return *t
}
// Otherwise create new tag set.
m := make(map[string]string, len(keys))
for _, k := range keys {
m[k] = t.m[k]
}
return NewTags(m)
}
// Equals returns true if t equals other.
func (t *Tags) Equals(other *Tags) bool {
if t == nil && other == nil {
return true
} else if t == nil || other == nil {
return false
}
return t.id == other.id
}
// keysMatch returns true if m has exactly the same keys as listed in keys.
func keysMatch(m map[string]string, keys []string) bool {
if len(keys) != len(m) {
return false
}
for _, k := range keys {
if _, ok := m[k]; !ok {
return false
}
}
return true
}
// encodeTags converts a map of strings to an identifier.
func encodeTags(m map[string]string) []byte {
// Empty maps marshal to empty bytes.
if len(m) == 0 {
return nil
}
// Extract keys and determine final size.
sz := (len(m) * 2) - 1 // separators
keys := make([]string, 0, len(m))
for k, v := range m {
keys = append(keys, k)
sz += len(k) + len(v)
}
sort.Strings(keys)
// Generate marshaled bytes.
b := make([]byte, sz)
buf := b
for _, k := range keys {
copy(buf, k)
buf[len(k)] = '\x00'
buf = buf[len(k)+1:]
}
for i, k := range keys {
v := m[k]
copy(buf, v)
if i < len(keys)-1 {
buf[len(v)] = '\x00'
buf = buf[len(v)+1:]
}
}
return b
}
2016-02-05 17:23:35 +00:00
// decodeTags parses an identifier into a map of tags.
func decodeTags(id []byte) map[string]string {
a := bytes.Split(id, []byte{'\x00'})
// There must be an even number of segments.
if len(a) > 0 && len(a)%2 == 1 {
a = a[:len(a)-1]
}
// Return nil if there are no segments.
if len(a) == 0 {
return nil
}
// Decode key/value tags.
m := make(map[string]string)
for i := 0; i < len(a); i += 2 {
m[string(a[i])] = string(a[i+1])
}
return m
}
2016-02-05 17:23:35 +00:00
func encodeAux(aux []interface{}) []*internal.Aux {
pb := make([]*internal.Aux, len(aux))
for i := range aux {
switch v := aux[i].(type) {
case float64:
pb[i] = &internal.Aux{DataType: proto.Int32(Float), FloatValue: proto.Float64(v)}
case *float64:
pb[i] = &internal.Aux{DataType: proto.Int32(Float)}
case int64:
pb[i] = &internal.Aux{DataType: proto.Int32(Integer), IntegerValue: proto.Int64(v)}
case *int64:
pb[i] = &internal.Aux{DataType: proto.Int32(Integer)}
case string:
pb[i] = &internal.Aux{DataType: proto.Int32(String), StringValue: proto.String(v)}
case *string:
pb[i] = &internal.Aux{DataType: proto.Int32(String)}
case bool:
pb[i] = &internal.Aux{DataType: proto.Int32(Boolean), BooleanValue: proto.Bool(v)}
case *bool:
pb[i] = &internal.Aux{DataType: proto.Int32(Boolean)}
default:
pb[i] = &internal.Aux{DataType: proto.Int32(int32(Unknown))}
}
}
return pb
}
func decodeAux(pb []*internal.Aux) []interface{} {
if len(pb) == 0 {
return nil
}
2016-02-05 17:23:35 +00:00
aux := make([]interface{}, len(pb))
for i := range pb {
switch pb[i].GetDataType() {
case Float:
if pb[i].FloatValue != nil {
aux[i] = *pb[i].FloatValue
} else {
aux[i] = (*float64)(nil)
}
case Integer:
if pb[i].IntegerValue != nil {
aux[i] = *pb[i].IntegerValue
} else {
aux[i] = (*int64)(nil)
}
case String:
if pb[i].StringValue != nil {
aux[i] = *pb[i].StringValue
} else {
aux[i] = (*string)(nil)
}
case Boolean:
if pb[i].BooleanValue != nil {
aux[i] = *pb[i].BooleanValue
} else {
aux[i] = (*bool)(nil)
}
default:
aux[i] = nil
}
}
return aux
}
// NewPointDecoder decodes generic points from a reader.
type PointDecoder struct {
r io.Reader
stats IteratorStats
}
// NewPointDecoder returns a new instance of PointDecoder that reads from r.
func NewPointDecoder(r io.Reader) *PointDecoder {
return &PointDecoder{r: r}
}
// Stats returns iterator stats embedded within the stream.
func (dec *PointDecoder) Stats() IteratorStats { return dec.stats }
// DecodePoint reads from the underlying reader and unmarshals into p.
func (dec *PointDecoder) DecodePoint(p *Point) error {
for {
// Read length.
var sz uint32
if err := binary.Read(dec.r, binary.BigEndian, &sz); err != nil {
return err
}
// Read point data.
buf := make([]byte, sz)
if _, err := io.ReadFull(dec.r, buf); err != nil {
return err
}
// Unmarshal into point.
var pb internal.Point
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
}
// If the point contains stats then read stats and retry.
if pb.Stats != nil {
dec.stats = decodeIteratorStats(pb.Stats)
continue
}
if pb.IntegerValue != nil {
*p = decodeIntegerPoint(&pb)
} else if pb.StringValue != nil {
*p = decodeStringPoint(&pb)
} else if pb.BooleanValue != nil {
*p = decodeBooleanPoint(&pb)
} else {
*p = decodeFloatPoint(&pb)
}
return nil
}
}