diff --git a/database.go b/database.go index e2998e51b2..ab9ef77986 100644 --- a/database.go +++ b/database.go @@ -182,72 +182,6 @@ func (m *Measurement) FieldByName(name string) *Field { return nil } -// EncodeFields encodes a set of field ids and values to a byte slice. -func (m *Measurement) EncodeFields(values map[uint8]interface{}) []byte { - // Sort fields for consistency. - fieldIDs := make([]uint8, 0, len(values)) - for fieldID := range values { - fieldIDs = append(fieldIDs, fieldID) - } - sort.Sort(uint8Slice(fieldIDs)) - - // Allocate byte slice and write field count. - b := make([]byte, 1, 10) - b[0] = byte(len(values)) - - // Write out each field. - for _, fieldID := range fieldIDs { - var buf []byte - - field := m.Field(fieldID) - if field == nil { - panic(fmt.Sprintf("field ID %d has no mapping", fieldID)) - } - switch field.Type { - case influxql.Number: - v := values[fieldID] - // Convert integers to floats. - if intval, ok := v.(int); ok { - v = float64(intval) - } - - buf = make([]byte, 9) - binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(v.(float64))) - case influxql.Boolean: - v := values[fieldID].(bool) - - // Only 1 byte need for a boolean. - buf = make([]byte, 2) - if v { - buf[1] = byte(1) - } - case influxql.String: - v := values[fieldID].(string) - if len(v) > maxStringLength { - v = v[:maxStringLength] - } - // Make a buffer for field ID, the string length, and the string. - buf = make([]byte, len(v)+3) - - // Set the string length, then copy the string itself. - binary.BigEndian.PutUint16(buf[1:3], uint16(len(v))) - for k, v := range []byte(v) { - buf[k+3] = byte(v) - } - default: - panic(fmt.Sprintf("unsupported value type: %T", values[fieldID])) - } - - // Always set the field ID as the leading byte. - buf[0] = fieldID - - // Append temp buffer to the end. - b = append(b, buf...) - } - - return b -} - // DecodeFields decodes a byte slice into a set of field ids and values. func (m *Measurement) DecodeFields(b []byte) map[uint8]interface{} { if len(b) == 0 { @@ -349,27 +283,6 @@ func (m *Measurement) seriesByTags(tags map[string]string) *Series { return m.series[string(marshalTags(tags))] } -// mapValues converts a map of values with string keys to field id keys. -// -// If a field exists, but its type is different, return an error. If error is -// nil, but at least 1 field has not been mapped, nil is returned as the map. -func (m *Measurement) mapValues(values map[string]interface{}) (map[uint8]interface{}, error) { - other := make(map[uint8]interface{}, len(values)) - for k, v := range values { - f := m.FieldByName(k) - if f == nil { - panic(fmt.Sprintf("field does not exist for %s", k)) - } else if influxql.InspectDataType(v) != f.Type { - return nil, fmt.Errorf("field %s is not of type %s", k, f.Type) - } else { - other[f.ID] = v - } - } - - // All fields exist, and are of the expected type. - return other, nil -} - func (m *Measurement) seriesIDsAndFilters(stmt *influxql.SelectStatement) (seriesIDs, map[uint32]influxql.Expr) { seriesIdsToExpr := make(map[uint32]influxql.Expr) if stmt.Condition == nil { @@ -770,6 +683,105 @@ type Field struct { // Fields represents a list of fields. type Fields []*Field +// FieldCodec providecs encoding and decoding functionality for the fields of a given +// Measurement. It is a distinct type to avoid locking writes on this node while +// potentially long-running queries are executing. +// +// It is not affected by changes to the Measurement object after codec creation. +type FieldCodec struct { + fieldsByID map[uint8]*Field + fieldsByName map[string]*Field +} + +// NewFieldCodec returns a FieldCodec for the given Measurement. Must be called with +// a RLock that protects the Measurement. +func NewFieldCodec(m *Measurement) *FieldCodec { + fieldsByID := make(map[uint8]*Field, len(m.Fields)) + fieldsByName := make(map[string]*Field, len(m.Fields)) + for _, f := range m.Fields { + fieldsByID[f.ID] = f + fieldsByName[f.Name] = f + } + return &FieldCodec{fieldsByID: fieldsByID, fieldsByName: fieldsByName} +} + +// EncodeFields converts a map of values with string keys to a byte slice of field +// IDs and values. +// +// If a field exists in the codec, but its type is different, an error is returned. If +// a field is not present in the codec, the system panics. +func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error) { + // Allocate byte slice and write field count. + b := make([]byte, 1, 10) + b[0] = byte(len(values)) + + for k, v := range values { + field := f.fieldsByName[k] + if field == nil { + panic(fmt.Sprintf("field does not exist for %s", k)) + } else if influxql.InspectDataType(v) != field.Type { + return nil, fmt.Errorf("field %s is not of type %s", k, field.Type) + } + + var buf []byte + + switch field.Type { + case influxql.Number: + var value float64 + // Convert integers to floats. + if intval, ok := v.(int); ok { + value = float64(intval) + } else { + value = v.(float64) + } + + buf = make([]byte, 9) + binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(value)) + case influxql.Boolean: + value := v.(bool) + + // Only 1 byte need for a boolean. + buf = make([]byte, 2) + if value { + buf[1] = byte(1) + } + case influxql.String: + value := v.(string) + if len(value) > maxStringLength { + value = value[:maxStringLength] + } + // Make a buffer for field ID, the string length, and the string. + buf = make([]byte, len(value)+3) + + // Set the string length, then copy the string itself. + binary.BigEndian.PutUint16(buf[1:3], uint16(len(value))) + for i, c := range []byte(value) { + buf[i+3] = byte(c) + } + default: + panic(fmt.Sprintf("unsupported value type: %T", v)) + } + + // Always set the field ID as the leading byte. + buf[0] = field.ID + + // Append temp buffer to the end. + b = append(b, buf...) + } + + return b, nil +} + +// DecodeByID +func (f *FieldCodec) DecodeByID(fieldId uint8, b []byte) (interface{}, error) { + return 0, nil +} + +// DecodeByName +func (f *FieldCodec) DecodeByName(fieldName uint8, b []byte) (interface{}, error) { + return 0, nil +} + // Series belong to a Measurement and represent unique time series in a database type Series struct { ID uint32 diff --git a/server.go b/server.go index ebc4f8d0e7..90e7300599 100644 --- a/server.go +++ b/server.go @@ -1662,15 +1662,23 @@ func (s *Server) writePoint(database, retentionPolicy string, point *Point) (uin return 0, err } - // Convert string-key/values to fieldID-key/values. - rawValues, err := m.mapValues(values) + // Get a field codec. + s.mu.RLock() + codec := NewFieldCodec(m) + s.mu.RUnlock() + if codec == nil { + panic("field codec is nil") + } + + // Convert string-key/values to encoded fields. + encodedFields, err := codec.EncodeFields(values) if err != nil { return 0, err } // Encode point header. data := marshalPointHeader(seriesID, timestamp.UnixNano()) - data = append(data, m.EncodeFields(rawValues)...) + data = append(data, encodedFields...) // Publish "raw write series" message on shard's topic to broker. return s.client.Publish(&messaging.Message{