Add "create fields" broadcast message
parent
a13223466b
commit
32faba7248
14
database.go
14
database.go
|
@ -132,16 +132,20 @@ func NewMeasurement(name string) *Measurement {
|
|||
}
|
||||
|
||||
// createFieldIfNotExists creates a new field with an autoincrementing ID.
|
||||
// Returns an error if 255 fields have already been created on the measurement.
|
||||
func (m *Measurement) createFieldIfNotExists(name string, typ influxql.DataType) (*Field, error) {
|
||||
// Returns an error if 255 fields have already been created on the measurement or
|
||||
// the fields already exists with a different type.
|
||||
func (m *Measurement) createFieldIfNotExists(name string, typ influxql.DataType) error {
|
||||
// Ignore if the field already exists.
|
||||
if f := m.FieldByName(name); f != nil {
|
||||
return f, nil
|
||||
if f.Type != typ {
|
||||
return ErrFieldTypeConflict
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Only 255 fields are allowed. If we go over that then return an error.
|
||||
if len(m.Fields)+1 > math.MaxUint8 {
|
||||
return nil, ErrFieldOverflow
|
||||
return ErrFieldOverflow
|
||||
}
|
||||
|
||||
// Create and append a new field.
|
||||
|
@ -152,7 +156,7 @@ func (m *Measurement) createFieldIfNotExists(name string, typ influxql.DataType)
|
|||
}
|
||||
m.Fields = append(m.Fields, f)
|
||||
|
||||
return f, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// Field returns a field by id.
|
||||
|
|
|
@ -103,6 +103,9 @@ var (
|
|||
// ErrFieldOverflow is returned when too many fields are created on a measurement.
|
||||
ErrFieldOverflow = errors.New("field overflow")
|
||||
|
||||
// ErrFieldTypeConflict is returned when a new field already exists with a different type.
|
||||
ErrFieldTypeConflict = errors.New("field overflow")
|
||||
|
||||
// ErrSeriesNotFound is returned when looking up a non-existent series by database, name and tags
|
||||
ErrSeriesNotFound = errors.New("series not found")
|
||||
|
||||
|
|
54
server.go
54
server.go
|
@ -71,6 +71,9 @@ const (
|
|||
// Series messages
|
||||
createSeriesIfNotExistsMessageType = messaging.MessageType(0x50)
|
||||
|
||||
// Measurement messages
|
||||
createFieldsIfNotExistMessageType = messaging.MessageType(0x60)
|
||||
|
||||
// Write series data messages (per-topic)
|
||||
writeRawSeriesMessageType = messaging.MessageType(0x80)
|
||||
writeSeriesMessageType = messaging.MessageType(0x81)
|
||||
|
@ -1476,6 +1479,46 @@ type setDefaultRetentionPolicyCommand struct {
|
|||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
type createFieldsIfNotExistCommand struct {
|
||||
Database string `json:"database"`
|
||||
Measurement string `json:"measurement"`
|
||||
Fields map[string]influxql.DataType `json:"fields"`
|
||||
}
|
||||
|
||||
func (s *Server) applyCreateFieldsIfNotExist(m *messaging.Message) error {
|
||||
var c createFieldsIfNotExistCommand
|
||||
mustUnmarshalJSON(m.Data, &c)
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Validate command.
|
||||
db := s.databases[c.Database]
|
||||
if db == nil {
|
||||
return ErrDatabaseNotFound
|
||||
}
|
||||
mm := db.measurements[c.Measurement]
|
||||
if mm == nil {
|
||||
return ErrMeasurementNotFound
|
||||
}
|
||||
|
||||
// Create fields in Metastore.
|
||||
for k, v := range c.Fields {
|
||||
if err := mm.createFieldIfNotExists(k, v); err != nil {
|
||||
if err == ErrFieldOverflow {
|
||||
log.Printf("no more fields allowed: %s::%s", mm.Name, k)
|
||||
continue
|
||||
} else if err == ErrFieldTypeConflict {
|
||||
log.Printf("field type conflict: %s::%s", mm.Name, k)
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) applyCreateSeriesIfNotExists(m *messaging.Message) error {
|
||||
var c createSeriesIfNotExistsCommand
|
||||
mustUnmarshalJSON(m.Data, &c)
|
||||
|
@ -1682,16 +1725,19 @@ func (s *Server) applyWriteSeries(m *messaging.Message) error {
|
|||
// TODO: Support non-float types.
|
||||
|
||||
// Find or create fields.
|
||||
// If too many fields are on the measurement then log the issue.
|
||||
// Just skip certain errors.
|
||||
// If any other error occurs then exit.
|
||||
f, err := mm.createFieldIfNotExists(k, influxql.Number)
|
||||
err := mm.createFieldIfNotExists(k, influxql.Number)
|
||||
if err == ErrFieldOverflow {
|
||||
log.Printf("no more fields allowed: %s::%s", mm.Name, k)
|
||||
continue
|
||||
} else if err == ErrFieldTypeConflict {
|
||||
log.Printf("ddd")
|
||||
continue
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
rawValues[f.ID] = v
|
||||
rawValues[1] = v
|
||||
}
|
||||
|
||||
// Update metastore.
|
||||
|
@ -2649,6 +2695,8 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) {
|
|||
err = s.applyDeleteShardGroup(m)
|
||||
case setDefaultRetentionPolicyMessageType:
|
||||
err = s.applySetDefaultRetentionPolicy(m)
|
||||
case createFieldsIfNotExistMessageType:
|
||||
err = s.applyCreateFieldsIfNotExist(m)
|
||||
case createSeriesIfNotExistsMessageType:
|
||||
err = s.applyCreateSeriesIfNotExists(m)
|
||||
case setPrivilegeMessageType:
|
||||
|
|
Loading…
Reference in New Issue