Implement codec's encode

pull/1580/head
Philip O'Toole 2015-02-16 11:40:38 -08:00
parent 8cdee6a997
commit 6db74aa9bb
2 changed files with 110 additions and 90 deletions

View File

@ -182,72 +182,6 @@ func (m *Measurement) FieldByName(name string) *Field {
return nil return nil
} }
// EncodeFields encodes a set of field ids and values to a byte slice.
func (m *Measurement) EncodeFields(values map[uint8]interface{}) []byte {
// Sort fields for consistency.
fieldIDs := make([]uint8, 0, len(values))
for fieldID := range values {
fieldIDs = append(fieldIDs, fieldID)
}
sort.Sort(uint8Slice(fieldIDs))
// Allocate byte slice and write field count.
b := make([]byte, 1, 10)
b[0] = byte(len(values))
// Write out each field.
for _, fieldID := range fieldIDs {
var buf []byte
field := m.Field(fieldID)
if field == nil {
panic(fmt.Sprintf("field ID %d has no mapping", fieldID))
}
switch field.Type {
case influxql.Number:
v := values[fieldID]
// Convert integers to floats.
if intval, ok := v.(int); ok {
v = float64(intval)
}
buf = make([]byte, 9)
binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(v.(float64)))
case influxql.Boolean:
v := values[fieldID].(bool)
// Only 1 byte need for a boolean.
buf = make([]byte, 2)
if v {
buf[1] = byte(1)
}
case influxql.String:
v := values[fieldID].(string)
if len(v) > maxStringLength {
v = v[:maxStringLength]
}
// Make a buffer for field ID, the string length, and the string.
buf = make([]byte, len(v)+3)
// Set the string length, then copy the string itself.
binary.BigEndian.PutUint16(buf[1:3], uint16(len(v)))
for k, v := range []byte(v) {
buf[k+3] = byte(v)
}
default:
panic(fmt.Sprintf("unsupported value type: %T", values[fieldID]))
}
// Always set the field ID as the leading byte.
buf[0] = fieldID
// Append temp buffer to the end.
b = append(b, buf...)
}
return b
}
// DecodeFields decodes a byte slice into a set of field ids and values. // DecodeFields decodes a byte slice into a set of field ids and values.
func (m *Measurement) DecodeFields(b []byte) map[uint8]interface{} { func (m *Measurement) DecodeFields(b []byte) map[uint8]interface{} {
if len(b) == 0 { if len(b) == 0 {
@ -349,27 +283,6 @@ func (m *Measurement) seriesByTags(tags map[string]string) *Series {
return m.series[string(marshalTags(tags))] return m.series[string(marshalTags(tags))]
} }
// mapValues converts a map of values with string keys to field id keys.
//
// If a field exists, but its type is different, return an error. If error is
// nil, but at least 1 field has not been mapped, nil is returned as the map.
func (m *Measurement) mapValues(values map[string]interface{}) (map[uint8]interface{}, error) {
other := make(map[uint8]interface{}, len(values))
for k, v := range values {
f := m.FieldByName(k)
if f == nil {
panic(fmt.Sprintf("field does not exist for %s", k))
} else if influxql.InspectDataType(v) != f.Type {
return nil, fmt.Errorf("field %s is not of type %s", k, f.Type)
} else {
other[f.ID] = v
}
}
// All fields exist, and are of the expected type.
return other, nil
}
func (m *Measurement) seriesIDsAndFilters(stmt *influxql.SelectStatement) (seriesIDs, map[uint32]influxql.Expr) { func (m *Measurement) seriesIDsAndFilters(stmt *influxql.SelectStatement) (seriesIDs, map[uint32]influxql.Expr) {
seriesIdsToExpr := make(map[uint32]influxql.Expr) seriesIdsToExpr := make(map[uint32]influxql.Expr)
if stmt.Condition == nil { if stmt.Condition == nil {
@ -770,6 +683,105 @@ type Field struct {
// Fields represents a list of fields. // Fields represents a list of fields.
type Fields []*Field type Fields []*Field
// FieldCodec providecs encoding and decoding functionality for the fields of a given
// Measurement. It is a distinct type to avoid locking writes on this node while
// potentially long-running queries are executing.
//
// It is not affected by changes to the Measurement object after codec creation.
type FieldCodec struct {
fieldsByID map[uint8]*Field
fieldsByName map[string]*Field
}
// NewFieldCodec returns a FieldCodec for the given Measurement. Must be called with
// a RLock that protects the Measurement.
func NewFieldCodec(m *Measurement) *FieldCodec {
fieldsByID := make(map[uint8]*Field, len(m.Fields))
fieldsByName := make(map[string]*Field, len(m.Fields))
for _, f := range m.Fields {
fieldsByID[f.ID] = f
fieldsByName[f.Name] = f
}
return &FieldCodec{fieldsByID: fieldsByID, fieldsByName: fieldsByName}
}
// EncodeFields converts a map of values with string keys to a byte slice of field
// IDs and values.
//
// If a field exists in the codec, but its type is different, an error is returned. If
// a field is not present in the codec, the system panics.
func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error) {
// Allocate byte slice and write field count.
b := make([]byte, 1, 10)
b[0] = byte(len(values))
for k, v := range values {
field := f.fieldsByName[k]
if field == nil {
panic(fmt.Sprintf("field does not exist for %s", k))
} else if influxql.InspectDataType(v) != field.Type {
return nil, fmt.Errorf("field %s is not of type %s", k, field.Type)
}
var buf []byte
switch field.Type {
case influxql.Number:
var value float64
// Convert integers to floats.
if intval, ok := v.(int); ok {
value = float64(intval)
} else {
value = v.(float64)
}
buf = make([]byte, 9)
binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(value))
case influxql.Boolean:
value := v.(bool)
// Only 1 byte need for a boolean.
buf = make([]byte, 2)
if value {
buf[1] = byte(1)
}
case influxql.String:
value := v.(string)
if len(value) > maxStringLength {
value = value[:maxStringLength]
}
// Make a buffer for field ID, the string length, and the string.
buf = make([]byte, len(value)+3)
// Set the string length, then copy the string itself.
binary.BigEndian.PutUint16(buf[1:3], uint16(len(value)))
for i, c := range []byte(value) {
buf[i+3] = byte(c)
}
default:
panic(fmt.Sprintf("unsupported value type: %T", v))
}
// Always set the field ID as the leading byte.
buf[0] = field.ID
// Append temp buffer to the end.
b = append(b, buf...)
}
return b, nil
}
// DecodeByID
func (f *FieldCodec) DecodeByID(fieldId uint8, b []byte) (interface{}, error) {
return 0, nil
}
// DecodeByName
func (f *FieldCodec) DecodeByName(fieldName uint8, b []byte) (interface{}, error) {
return 0, nil
}
// Series belong to a Measurement and represent unique time series in a database // Series belong to a Measurement and represent unique time series in a database
type Series struct { type Series struct {
ID uint32 ID uint32

View File

@ -1662,15 +1662,23 @@ func (s *Server) writePoint(database, retentionPolicy string, point *Point) (uin
return 0, err return 0, err
} }
// Convert string-key/values to fieldID-key/values. // Get a field codec.
rawValues, err := m.mapValues(values) s.mu.RLock()
codec := NewFieldCodec(m)
s.mu.RUnlock()
if codec == nil {
panic("field codec is nil")
}
// Convert string-key/values to encoded fields.
encodedFields, err := codec.EncodeFields(values)
if err != nil { if err != nil {
return 0, err return 0, err
} }
// Encode point header. // Encode point header.
data := marshalPointHeader(seriesID, timestamp.UnixNano()) data := marshalPointHeader(seriesID, timestamp.UnixNano())
data = append(data, m.EncodeFields(rawValues)...) data = append(data, encodedFields...)
// Publish "raw write series" message on shard's topic to broker. // Publish "raw write series" message on shard's topic to broker.
return s.client.Publish(&messaging.Message{ return s.client.Publish(&messaging.Message{