diff --git a/cmd/influx/template.go b/cmd/influx/template.go index d4ed07dce2..40ae5e867c 100644 --- a/cmd/influx/template.go +++ b/cmd/influx/template.go @@ -1299,10 +1299,10 @@ func (b *cmdTemplateBuilder) printTemplateDiff(diff pkger.Diff) error { } if bkts := diff.Buckets; len(bkts) > 0 { - printer := diffPrinterGen("Buckets", []string{"Retention Period", "Description"}) + printer := diffPrinterGen("Buckets", []string{"Retention Period", "Description", "Schema Type", "Num Measurements"}) appendValues := func(id pkger.SafeID, metaName string, v pkger.DiffBucketValues) []string { - return []string{metaName, id.String(), v.Name, v.RetentionRules.RP().String(), v.Description} + return []string{metaName, id.String(), v.Name, v.RetentionRules.RP().String(), v.Description, v.SchemaType, strconv.Itoa(len(v.MeasurementSchemas))} } for _, b := range bkts { @@ -1603,7 +1603,7 @@ func (b *cmdTemplateBuilder) printTemplateSummary(stackID platform.ID, sum pkger } if buckets := sum.Buckets; len(buckets) > 0 { - headers := append(commonHeaders, "Retention", "Description") + headers := append(commonHeaders, "Retention", "Description", "Schema Type") tablePrintFn("BUCKETS", headers, len(buckets), func(i int) []string { bucket := buckets[i] return []string{ @@ -1612,6 +1612,7 @@ func (b *cmdTemplateBuilder) printTemplateSummary(stackID platform.ID, sum pkger bucket.Name, formatDuration(bucket.RetentionPeriod), bucket.Description, + bucket.SchemaType, } }) } diff --git a/measurement_schema.go b/measurement_schema.go new file mode 100644 index 0000000000..e4fcf64fcb --- /dev/null +++ b/measurement_schema.go @@ -0,0 +1,445 @@ +package influxdb + +import ( + "encoding/json" + "errors" + "fmt" + "sort" + "strconv" + "strings" + + influxid "github.com/influxdata/influxdb/v2/kit/platform" + influxerror "github.com/influxdata/influxdb/v2/kit/platform/errors" + "github.com/influxdata/influxdb/v2/models" + "go.uber.org/multierr" +) + +// SchemaType differentiates the supported schema for a bucket. +type SchemaType int + +const ( + SchemaTypeImplicit SchemaType = iota // SchemaTypeImplicit specifies the bucket has an implicit schema. + SchemaTypeExplicit // SchemaTypeExplicit specifies the bucket has an explicit schema. +) + +// SchemaTypeFromString returns the SchemaType for s +// or nil if none exists. +func SchemaTypeFromString(s string) *SchemaType { + switch s { + case "implicit": + return SchemaTypeImplicit.Ptr() + case "explicit": + return SchemaTypeExplicit.Ptr() + default: + return nil + } +} + +func (s *SchemaType) String() string { + if s == nil { + return "" + } + + switch s := *s; s { + case SchemaTypeImplicit: + return "implicit" + case SchemaTypeExplicit: + return "explicit" + default: + return "SchemaType(" + strconv.FormatInt(int64(s), 10) + ")" + } +} + +func (s *SchemaType) UnmarshalJSON(d []byte) error { + var val string + if err := json.Unmarshal(d, &val); err != nil { + return err + } + + switch val { + case "implicit": + *s = SchemaTypeImplicit + case "explicit": + *s = SchemaTypeExplicit + default: + return errors.New("unexpected value") + } + + return nil +} + +func (s *SchemaType) Equals(other *SchemaType) bool { + if s == nil && other == nil { + return true + } else if s == nil || other == nil { + return false + } + + return *s == *other +} + +func (s SchemaType) MarshalJSON() ([]byte, error) { + switch s { + case SchemaTypeImplicit: + return []byte(`"implicit"`), nil + case SchemaTypeExplicit: + return []byte(`"explicit"`), nil + default: + return nil, errors.New("unexpected value") + } +} + +// Ptr returns a pointer to s. +func (s SchemaType) Ptr() *SchemaType { return &s } + +// SemanticColumnType specifies the semantics of a measurement column. +type SemanticColumnType int + +const ( + SemanticColumnTypeTimestamp SemanticColumnType = iota // SemanticColumnTypeTimestamp identifies the column is used as the timestamp + SemanticColumnTypeTag // SemanticColumnTypeTag identifies the column is used as a tag + SemanticColumnTypeField // SemanticColumnTypeField identifies the column is used as a field +) + +func SemanticColumnTypeFromString(s string) *SemanticColumnType { + switch s { + case "timestamp": + return SemanticColumnTypeTimestamp.Ptr() + case "tag": + return SemanticColumnTypeTag.Ptr() + case "field": + return SemanticColumnTypeField.Ptr() + default: + return nil + } +} + +func (s SemanticColumnType) String() string { + switch s { + case SemanticColumnTypeTimestamp: + return "timestamp" + case SemanticColumnTypeTag: + return "tag" + case SemanticColumnTypeField: + return "field" + default: + return "SemanticColumnType(" + strconv.FormatInt(int64(s), 10) + ")" + } +} + +func (s SemanticColumnType) Ptr() *SemanticColumnType { + return &s +} + +func (s *SemanticColumnType) UnmarshalJSON(d []byte) error { + var val string + if err := json.Unmarshal(d, &val); err != nil { + return err + } + + switch val { + case "timestamp": + *s = SemanticColumnTypeTimestamp + case "tag": + *s = SemanticColumnTypeTag + case "field": + *s = SemanticColumnTypeField + default: + return errors.New("unexpected value") + } + + return nil +} + +func (s SemanticColumnType) MarshalJSON() ([]byte, error) { + switch s { + case SemanticColumnTypeTimestamp: + return []byte(`"timestamp"`), nil + case SemanticColumnTypeTag: + return []byte(`"tag"`), nil + case SemanticColumnTypeField: + return []byte(`"field"`), nil + default: + return nil, errors.New("unexpected value") + } +} + +type SchemaColumnDataType uint + +const ( + SchemaColumnDataTypeFloat SchemaColumnDataType = iota + SchemaColumnDataTypeInteger + SchemaColumnDataTypeUnsigned + SchemaColumnDataTypeString + SchemaColumnDataTypeBoolean +) + +func SchemaColumnDataTypeFromString(s string) *SchemaColumnDataType { + switch s { + case "float": + return SchemaColumnDataTypeFloat.Ptr() + case "integer": + return SchemaColumnDataTypeInteger.Ptr() + case "unsigned": + return SchemaColumnDataTypeUnsigned.Ptr() + case "string": + return SchemaColumnDataTypeString.Ptr() + case "boolean": + return SchemaColumnDataTypeBoolean.Ptr() + default: + return nil + } +} + +// Ptr returns a pointer to s. +func (s SchemaColumnDataType) Ptr() *SchemaColumnDataType { return &s } + +func (s *SchemaColumnDataType) String() string { + if s == nil { + return "" + } + + switch *s { + case SchemaColumnDataTypeFloat: + return "float" + case SchemaColumnDataTypeInteger: + return "integer" + case SchemaColumnDataTypeUnsigned: + return "unsigned" + case SchemaColumnDataTypeString: + return "string" + case SchemaColumnDataTypeBoolean: + return "boolean" + default: + return "SchemaColumnDataType(" + strconv.FormatInt(int64(*s), 10) + ")" + } +} + +func (s *SchemaColumnDataType) UnmarshalJSON(d []byte) error { + var val string + if err := json.Unmarshal(d, &val); err != nil { + return err + } + + switch val { + case "float": + *s = SchemaColumnDataTypeFloat + case "integer": + *s = SchemaColumnDataTypeInteger + case "unsigned": + *s = SchemaColumnDataTypeUnsigned + case "string": + *s = SchemaColumnDataTypeString + case "boolean": + *s = SchemaColumnDataTypeBoolean + default: + return errors.New("unexpected value") + } + + return nil +} + +func (s SchemaColumnDataType) MarshalJSON() ([]byte, error) { + switch s { + case SchemaColumnDataTypeFloat: + return []byte(`"float"`), nil + case SchemaColumnDataTypeInteger: + return []byte(`"integer"`), nil + case SchemaColumnDataTypeUnsigned: + return []byte(`"unsigned"`), nil + case SchemaColumnDataTypeString: + return []byte(`"string"`), nil + case SchemaColumnDataTypeBoolean: + return []byte(`"boolean"`), nil + default: + return nil, errors.New("unexpected value") + } +} + +var ( + schemaTypeToFieldTypeMap = [...]models.FieldType{ + SchemaColumnDataTypeFloat: models.Float, + SchemaColumnDataTypeInteger: models.Integer, + SchemaColumnDataTypeUnsigned: models.Unsigned, + SchemaColumnDataTypeString: models.String, + SchemaColumnDataTypeBoolean: models.Boolean, + } +) + +// ToFieldType maps SchemaColumnDataType to the equivalent models.FieldType or +// models.Empty if no such mapping exists. +func (s SchemaColumnDataType) ToFieldType() models.FieldType { + if int(s) > len(schemaTypeToFieldTypeMap) { + return models.Empty + } + return schemaTypeToFieldTypeMap[s] +} + +type MeasurementSchema struct { + ID influxid.ID `json:"id,omitempty"` + OrgID influxid.ID `json:"orgID"` + BucketID influxid.ID `json:"bucketID"` + Name string `json:"name"` + Columns []MeasurementSchemaColumn `json:"columns"` + CRUDLog +} + +func (m *MeasurementSchema) Validate() error { + var err error + + err = multierr.Append(err, m.validateName("name", m.Name)) + err = multierr.Append(err, m.validateColumns()) + + return err +} + +// ValidateMeasurementSchemaName determines if name is a valid identifier for +// a measurement schema or column name and if not, returns an error. +func ValidateMeasurementSchemaName(name string) error { + if len(name) == 0 { + return ErrMeasurementSchemaNameTooShort + } + + if len(name) > 128 { + return ErrMeasurementSchemaNameTooLong + } + + if err := models.CheckToken([]byte(name)); err != nil { + return &influxerror.Error{ + Code: influxerror.EInvalid, + Err: err, + } + } + + if strings.HasPrefix(name, "_") { + return ErrMeasurementSchemaNameUnderscore + } + + if strings.Contains(name, `"`) || strings.Contains(name, `'`) { + return ErrMeasurementSchemaNameQuotes + } + + return nil +} + +func (m *MeasurementSchema) validateName(prefix, name string) error { + if err := ValidateMeasurementSchemaName(name); err != nil { + return fmt.Errorf("%s %q: %w", prefix, name, err) + } + + return nil +} + +// columns implements sort.Interface to efficiently sort a MeasurementSchemaColumn slice +// by using indices to store the sorted element indices. +type columns struct { + indices []int // indices is a list of indices representing a sorted columns + columns []MeasurementSchemaColumn +} + +// newColumns returns an instance of columns which contains a sorted version of c. +func newColumns(c []MeasurementSchemaColumn) columns { + colIndices := make([]int, len(c)) + for i := range c { + colIndices[i] = i + } + res := columns{ + indices: colIndices, + columns: c, + } + sort.Sort(res) + return res +} + +func (c columns) Len() int { + return len(c.columns) +} + +func (c columns) Less(i, j int) bool { + return c.columns[c.indices[i]].Name < c.columns[c.indices[j]].Name +} + +func (c columns) Swap(i, j int) { + c.indices[i], c.indices[j] = c.indices[j], c.indices[i] +} + +// Index returns the sorted +func (c columns) Index(i int) *MeasurementSchemaColumn { + return &c.columns[c.indices[i]] +} + +func (m *MeasurementSchema) validateColumns() (err error) { + if len(m.Columns) == 0 { + return ErrMeasurementSchemaColumnsMissing + } + + cols := newColumns(m.Columns) + + timeCount := 0 + fieldCount := 0 + for i := range cols.columns { + col := &cols.columns[i] + + err = multierr.Append(err, m.validateName("column name", col.Name)) + + // special handling for time column + if col.Name == "time" { + timeCount++ + if col.Type != SemanticColumnTypeTimestamp { + err = multierr.Append(err, ErrMeasurementSchemaColumnsTimeInvalidSemantic) + } else if col.DataType != nil { + err = multierr.Append(err, ErrMeasurementSchemaColumnsTimestampSemanticDataType) + } + continue + } + + // ensure no other columns have a timestamp semantic + switch col.Type { + case SemanticColumnTypeTimestamp: + if col.Name != "time" { + err = multierr.Append(err, ErrMeasurementSchemaColumnsTimestampSemanticInvalidName) + } else { + if col.DataType != nil { + err = multierr.Append(err, ErrMeasurementSchemaColumnsTimestampSemanticDataType) + } + } + + case SemanticColumnTypeTag: + // ensure tag columns don't include a data type value + if col.DataType != nil { + err = multierr.Append(err, ErrMeasurementSchemaColumnsTagSemanticDataType) + } + + case SemanticColumnTypeField: + if col.DataType == nil { + err = multierr.Append(err, ErrMeasurementSchemaColumnsFieldSemanticMissingDataType) + } + fieldCount++ + } + } + + if timeCount == 0 { + err = multierr.Append(err, ErrMeasurementSchemaColumnsMissingTime) + } + + // ensure there is at least one field defined + if fieldCount == 0 { + err = multierr.Append(err, ErrMeasurementSchemaColumnsMissingFields) + } + + // check for duplicate columns using general UTF-8 case insensitive comparison + for i := range cols.columns[1:] { + if strings.EqualFold(cols.Index(i).Name, cols.Index(i+1).Name) { + err = multierr.Append(err, ErrMeasurementSchemaColumnsDuplicateNames) + break + } + } + + return err +} + +type MeasurementSchemaColumn struct { + Name string `json:"name"` + Type SemanticColumnType `json:"type"` + DataType *SchemaColumnDataType `json:"dataType,omitempty"` +} diff --git a/measurement_schema_errors.go b/measurement_schema_errors.go new file mode 100644 index 0000000000..351c828df6 --- /dev/null +++ b/measurement_schema_errors.go @@ -0,0 +1,72 @@ +package influxdb + +import ( + influxerror "github.com/influxdata/influxdb/v2/kit/platform/errors" +) + +var ( + ErrMeasurementSchemaNameTooShort = &influxerror.Error{ + Code: influxerror.EInvalid, + Msg: "too short", + } + + ErrMeasurementSchemaNameTooLong = &influxerror.Error{ + Code: influxerror.EInvalid, + Msg: "too long", + } + + ErrMeasurementSchemaNameUnderscore = &influxerror.Error{ + Code: influxerror.EInvalid, + Msg: "must not begin with _", + } + + ErrMeasurementSchemaNameQuotes = &influxerror.Error{ + Code: influxerror.EInvalid, + Msg: "must not contains single or double quotes", + } + + ErrMeasurementSchemaColumnsMissing = &influxerror.Error{ + Code: influxerror.EInvalid, + Msg: "measurement schema columns missing", + } + + ErrMeasurementSchemaColumnsMissingTime = &influxerror.Error{ + Code: influxerror.EInvalid, + Msg: "measurement schema columns missing time column with a timestamp semantic", + } + + ErrMeasurementSchemaColumnsTimeInvalidSemantic = &influxerror.Error{ + Code: influxerror.EInvalid, + Msg: "measurement schema contains a time column with an invalid semantic", + } + + ErrMeasurementSchemaColumnsTimestampSemanticInvalidName = &influxerror.Error{ + Code: influxerror.EInvalid, + Msg: "measurement schema columns contains a timestamp column that is not named time", + } + + ErrMeasurementSchemaColumnsTimestampSemanticDataType = &influxerror.Error{ + Code: influxerror.EInvalid, + Msg: "measurement schema columns contains a time column with a data type", + } + + ErrMeasurementSchemaColumnsTagSemanticDataType = &influxerror.Error{ + Code: influxerror.EInvalid, + Msg: "measurement schema columns contains a tag column with a data type", + } + + ErrMeasurementSchemaColumnsFieldSemanticMissingDataType = &influxerror.Error{ + Code: influxerror.EInvalid, + Msg: "measurement schema columns contains a field column with missing data type", + } + + ErrMeasurementSchemaColumnsMissingFields = &influxerror.Error{ + Code: influxerror.EInvalid, + Msg: "measurement schema columns requires at least one field type column", + } + + ErrMeasurementSchemaColumnsDuplicateNames = &influxerror.Error{ + Code: influxerror.EInvalid, + Msg: "measurement schema columns contains duplicate column names", + } +) diff --git a/measurement_schema_test.go b/measurement_schema_test.go new file mode 100644 index 0000000000..f07dbceff7 --- /dev/null +++ b/measurement_schema_test.go @@ -0,0 +1,279 @@ +package influxdb_test + +import ( + "fmt" + "strings" + "testing" + + "github.com/influxdata/influxdb/v2" + influxerror "github.com/influxdata/influxdb/v2/kit/platform/errors" + "github.com/stretchr/testify/assert" + "go.uber.org/multierr" +) + +func TestMeasurementSchema_Validate(t *testing.T) { + col1 := func(name string, st influxdb.SemanticColumnType) influxdb.MeasurementSchemaColumn { + return influxdb.MeasurementSchemaColumn{Name: name, Type: st} + } + col2 := func(name string, st influxdb.SemanticColumnType, dt influxdb.SchemaColumnDataType) influxdb.MeasurementSchemaColumn { + return influxdb.MeasurementSchemaColumn{Name: name, Type: st, DataType: &dt} + } + + // errp composes a new error from err with prefix p and quoted name + errp := func(p, name string, err error) error { + return fmt.Errorf("%s %q: %w", p, name, err) + } + + type fields struct { + Name string + Columns []influxdb.MeasurementSchemaColumn + } + + okCols := []influxdb.MeasurementSchemaColumn{ + col1("host", influxdb.SemanticColumnTypeTag), + col2("usage_user", influxdb.SemanticColumnTypeField, influxdb.SchemaColumnDataTypeFloat), + col1("time", influxdb.SemanticColumnTypeTimestamp), + } + + tests := []struct { + name string + fields fields + wantErr bool + errs []error + }{ + { + name: "is valid", + fields: fields{ + Name: "cpu", + Columns: okCols, + }, + }, + { + name: "name too short", + fields: fields{ + Name: "", + Columns: okCols, + }, + errs: []error{errp("name", "", influxdb.ErrMeasurementSchemaNameTooShort)}, + }, + { + name: "name too long", + fields: fields{ + Name: strings.Repeat("f", 129), + Columns: okCols, + }, + errs: []error{errp("name", strings.Repeat("f", 129), influxdb.ErrMeasurementSchemaNameTooLong)}, + }, + { + name: "name starts with underscore", + fields: fields{ + Name: "_cpu", + Columns: okCols, + }, + errs: []error{errp("name", "_cpu", influxdb.ErrMeasurementSchemaNameUnderscore)}, + }, + { + name: "name contains non-printable chars", + fields: fields{ + Name: "cp\x03u", + Columns: okCols, + }, + errs: []error{errp("name", "cp\x03u", &influxerror.Error{ + Code: influxerror.EInvalid, + Err: fmt.Errorf("non-printable character"), + })}, + }, + { + name: "name contains quotes", + fields: fields{ + Name: `"cpu"`, + Columns: okCols, + }, + errs: []error{errp("name", `"cpu"`, influxdb.ErrMeasurementSchemaNameQuotes)}, + }, + + // Columns validation + { + name: "missing columns", + fields: fields{ + Name: "cpu", + Columns: nil, + }, + errs: []error{influxdb.ErrMeasurementSchemaColumnsMissing}, + }, + { + name: "time column wrong semantic", + fields: fields{ + Name: "cpu", + Columns: []influxdb.MeasurementSchemaColumn{ + col1("host", influxdb.SemanticColumnTypeTag), + col2("usage_user", influxdb.SemanticColumnTypeField, influxdb.SchemaColumnDataTypeFloat), + col1("time", influxdb.SemanticColumnTypeField), + }, + }, + errs: []error{influxdb.ErrMeasurementSchemaColumnsTimeInvalidSemantic}, + }, + { + name: "time column with data type", + fields: fields{ + Name: "cpu", + Columns: []influxdb.MeasurementSchemaColumn{ + col1("host", influxdb.SemanticColumnTypeTag), + col2("usage_user", influxdb.SemanticColumnTypeField, influxdb.SchemaColumnDataTypeFloat), + col2("time", influxdb.SemanticColumnTypeTimestamp, influxdb.SchemaColumnDataTypeBoolean), + }, + }, + errs: []error{influxdb.ErrMeasurementSchemaColumnsTimestampSemanticDataType}, + }, + { + name: "missing time column", + fields: fields{ + Name: "cpu", + Columns: []influxdb.MeasurementSchemaColumn{ + col1("host", influxdb.SemanticColumnTypeTag), + col2("usage_user", influxdb.SemanticColumnTypeField, influxdb.SchemaColumnDataTypeFloat), + }, + }, + errs: []error{influxdb.ErrMeasurementSchemaColumnsMissingTime}, + }, + { + name: "timestamp column that is not named time", + fields: fields{ + Name: "cpu", + Columns: []influxdb.MeasurementSchemaColumn{ + col1("host", influxdb.SemanticColumnTypeTag), + col2("usage_user", influxdb.SemanticColumnTypeField, influxdb.SchemaColumnDataTypeFloat), + col1("foo", influxdb.SemanticColumnTypeTimestamp), + col1("time", influxdb.SemanticColumnTypeTimestamp), + }, + }, + errs: []error{influxdb.ErrMeasurementSchemaColumnsTimestampSemanticInvalidName}, + }, + { + name: "tag contains data type", + fields: fields{ + Name: "cpu", + Columns: []influxdb.MeasurementSchemaColumn{ + col2("host", influxdb.SemanticColumnTypeTag, influxdb.SchemaColumnDataTypeString), + col2("usage_user", influxdb.SemanticColumnTypeField, influxdb.SchemaColumnDataTypeFloat), + col1("time", influxdb.SemanticColumnTypeTimestamp), + }, + }, + errs: []error{influxdb.ErrMeasurementSchemaColumnsTagSemanticDataType}, + }, + { + name: "field missing data type", + fields: fields{ + Name: "cpu", + Columns: []influxdb.MeasurementSchemaColumn{ + col1("host", influxdb.SemanticColumnTypeTag), + col1("usage_user", influxdb.SemanticColumnTypeField), + col1("time", influxdb.SemanticColumnTypeTimestamp), + }, + }, + errs: []error{influxdb.ErrMeasurementSchemaColumnsFieldSemanticMissingDataType}, + }, + { + name: "missing fields", + fields: fields{ + Name: "cpu", + Columns: []influxdb.MeasurementSchemaColumn{ + col1("host", influxdb.SemanticColumnTypeTag), + col1("region", influxdb.SemanticColumnTypeTag), + col1("time", influxdb.SemanticColumnTypeTimestamp), + }, + }, + errs: []error{influxdb.ErrMeasurementSchemaColumnsMissingFields}, + }, + { + name: "duplicate column names", + fields: fields{ + Name: "cpu", + Columns: []influxdb.MeasurementSchemaColumn{ + col1("host", influxdb.SemanticColumnTypeTag), + col2("host", influxdb.SemanticColumnTypeField, influxdb.SchemaColumnDataTypeFloat), + col1("time", influxdb.SemanticColumnTypeTimestamp), + }, + }, + errs: []error{influxdb.ErrMeasurementSchemaColumnsDuplicateNames}, + }, + { + name: "duplicate column case insensitive names", + fields: fields{ + Name: "cpu", + Columns: []influxdb.MeasurementSchemaColumn{ + col1("host", influxdb.SemanticColumnTypeTag), + col2("HOST", influxdb.SemanticColumnTypeField, influxdb.SchemaColumnDataTypeFloat), + col1("time", influxdb.SemanticColumnTypeTimestamp), + }, + }, + errs: []error{influxdb.ErrMeasurementSchemaColumnsDuplicateNames}, + }, + + // column name validation + { + name: "column name too short", + fields: fields{ + Name: "cpu", + Columns: []influxdb.MeasurementSchemaColumn{ + col1("", influxdb.SemanticColumnTypeTag), + col2("usage_user", influxdb.SemanticColumnTypeField, influxdb.SchemaColumnDataTypeFloat), + col1("time", influxdb.SemanticColumnTypeTimestamp), + }, + }, + errs: []error{errp("column name", "", influxdb.ErrMeasurementSchemaNameTooShort)}, + }, + { + name: "column name too long", + fields: fields{ + Name: "cpu", + Columns: []influxdb.MeasurementSchemaColumn{ + col1(strings.Repeat("f", 129), influxdb.SemanticColumnTypeTag), + col2("usage_user", influxdb.SemanticColumnTypeField, influxdb.SchemaColumnDataTypeFloat), + col1("time", influxdb.SemanticColumnTypeTimestamp), + }, + }, + errs: []error{errp("column name", strings.Repeat("f", 129), influxdb.ErrMeasurementSchemaNameTooLong)}, + }, + { + name: "column name starts with underscore", + fields: fields{ + Name: "cpu", + Columns: []influxdb.MeasurementSchemaColumn{ + col1("_host", influxdb.SemanticColumnTypeTag), + col2("usage_user", influxdb.SemanticColumnTypeField, influxdb.SchemaColumnDataTypeFloat), + col1("time", influxdb.SemanticColumnTypeTimestamp), + }, + }, + errs: []error{errp("column name", "_host", influxdb.ErrMeasurementSchemaNameUnderscore)}, + }, + { + name: "column name contains quotes", + fields: fields{ + Name: "cpu", + Columns: []influxdb.MeasurementSchemaColumn{ + col1(`"host"`, influxdb.SemanticColumnTypeTag), + col2("usage_user", influxdb.SemanticColumnTypeField, influxdb.SchemaColumnDataTypeFloat), + col1("time", influxdb.SemanticColumnTypeTimestamp), + }, + }, + errs: []error{errp("column name", `"host"`, influxdb.ErrMeasurementSchemaNameQuotes)}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &influxdb.MeasurementSchema{ + Name: tt.fields.Name, + Columns: tt.fields.Columns, + } + + if gotErr := m.Validate(); len(tt.errs) > 0 { + gotErrs := multierr.Errors(gotErr) + assert.ElementsMatch(t, gotErrs, tt.errs) + } else { + assert.NoError(t, gotErr) + } + + }) + } +} diff --git a/models/points.go b/models/points.go index df81b79e3c..9b44411149 100644 --- a/models/points.go +++ b/models/points.go @@ -2614,3 +2614,27 @@ func ValidKeyTokens(name string, tags Tags) bool { return ValidTagTokens(tags) } + +var ( + errInvalidUTF8 = errors.New("invalid UTF-8 sequence") + errNonPrintable = errors.New("non-printable character") + errReplacementChar = fmt.Errorf("unicode replacement char %q cannot be used", unicode.ReplacementChar) +) + +// CheckToken returns an error when the given token is invalid +// for use as a tag or value key or measurement name. +func CheckToken(a []byte) error { + if !utf8.Valid(a) { + return errInvalidUTF8 + } + + for _, r := range string(a) { + if !unicode.IsPrint(r) { + return errNonPrintable + } + if r == unicode.ReplacementChar { + return errReplacementChar + } + } + return nil +} diff --git a/pkger/models.go b/pkger/models.go index 2b43159606..f74b82ad1f 100644 --- a/pkger/models.go +++ b/pkger/models.go @@ -203,9 +203,11 @@ type ( // DiffBucketValues are the varying values for a bucket. DiffBucketValues struct { - Name string `json:"name"` - Description string `json:"description"` - RetentionRules retentionRules `json:"retentionRules"` + Name string `json:"name"` + Description string `json:"description"` + RetentionRules retentionRules `json:"retentionRules"` + SchemaType string `json:"schemaType,omitempty"` + MeasurementSchemas measurementSchemas `json:"measurementSchemas,omitempty"` } ) @@ -469,9 +471,23 @@ type SummaryBucket struct { // TODO: return retention rules? RetentionPeriod time.Duration `json:"retentionPeriod"` + SchemaType string `json:"schemaType,omitempty"` + MeasurementSchemas []SummaryMeasurementSchema `json:"measurementSchemas,omitempty"` + LabelAssociations []SummaryLabel `json:"labelAssociations"` } +type SummaryMeasurementSchema struct { + Name string `json:"name"` + Columns []SummaryMeasurementSchemaColumn `json:"columns"` +} + +type SummaryMeasurementSchemaColumn struct { + Name string `json:"name"` + Type string `json:"type"` + DataType string `json:"dataType,omitempty"` +} + // SummaryCheck provides a summary of a pkg check. type SummaryCheck struct { SummaryIdentifier diff --git a/pkger/parser.go b/pkger/parser.go index b0d38d258e..f34dc3d7ef 100644 --- a/pkger/parser.go +++ b/pkger/parser.go @@ -813,6 +813,7 @@ func (p *Template) graphBuckets() *parseErr { bkt := &bucket{ identity: ident, Description: o.Spec.stringShort(fieldDescription), + SchemaType: o.Spec.stringShort(fieldBucketSchemaType), } if rules, ok := o.Spec[fieldBucketRetentionRules].(retentionRules); ok { bkt.RetentionRules = rules @@ -824,6 +825,21 @@ func (p *Template) graphBuckets() *parseErr { }) } } + if schemas, ok := o.Spec[fieldMeasurementSchemas].(measurementSchemas); ok { + bkt.MeasurementSchemas = schemas + } else { + for _, sr := range o.Spec.slcResource(fieldMeasurementSchemas) { + ms := measurementSchema{Name: sr.stringShort(fieldMeasurementSchemaName)} + for _, scr := range sr.slcResource(fieldMeasurementSchemaColumns) { + ms.Columns = append(ms.Columns, measurementColumn{ + Name: scr.stringShort(fieldMeasurementColumnName), + Type: scr.stringShort(fieldMeasurementColumnType), + DataType: scr.stringShort(fieldMeasurementColumnDataType), + }) + } + bkt.MeasurementSchemas = append(bkt.MeasurementSchemas, ms) + } + } p.setRefs(bkt.name, bkt.displayName) failures := p.parseNestedLabels(o.Spec, func(l *label) error { diff --git a/pkger/parser_models.go b/pkger/parser_models.go index 37152ab366..e37b195b4d 100644 --- a/pkger/parser_models.go +++ b/pkger/parser_models.go @@ -88,7 +88,11 @@ type bucket struct { Description string RetentionRules retentionRules - labels sortedLabels + + SchemaType string + MeasurementSchemas measurementSchemas + + labels sortedLabels } func (b *bucket) summarize() SummaryBucket { @@ -98,10 +102,12 @@ func (b *bucket) summarize() SummaryBucket { MetaName: b.MetaName(), EnvReferences: summarizeCommonReferences(b.identity, b.labels), }, - Name: b.Name(), - Description: b.Description, - RetentionPeriod: b.RetentionRules.RP(), - LabelAssociations: toSummaryLabels(b.labels...), + Name: b.Name(), + Description: b.Description, + SchemaType: b.SchemaType, + MeasurementSchemas: b.MeasurementSchemas.summarize(), + RetentionPeriod: b.RetentionRules.RP(), + LabelAssociations: toSummaryLabels(b.labels...), } } @@ -115,6 +121,7 @@ func (b *bucket) valid() []validationErr { vErrs = append(vErrs, err) } vErrs = append(vErrs, b.RetentionRules.valid()...) + vErrs = append(vErrs, b.MeasurementSchemas.valid()...) if len(vErrs) == 0 { return nil } diff --git a/pkger/parser_models_schema.go b/pkger/parser_models_schema.go new file mode 100644 index 0000000000..e6694641d5 --- /dev/null +++ b/pkger/parser_models_schema.go @@ -0,0 +1,219 @@ +package pkger + +import ( + "fmt" + "sort" + "strings" + + "github.com/influxdata/influxdb/v2" +) + +const ( + fieldBucketSchemaType = "schemaType" + fieldMeasurementSchemas = "measurementSchemas" + + // measurementSchema fields + fieldMeasurementSchemaName = "name" + fieldMeasurementSchemaColumns = "columns" + + // measurementColumn fields + fieldMeasurementColumnName = "name" + fieldMeasurementColumnType = "type" + fieldMeasurementColumnDataType = "dataType" +) + +type measurementSchemas []measurementSchema + +func (s measurementSchemas) valid() []validationErr { + var errs []validationErr + + for idx, ms := range s { + if nestedErrs := ms.valid(); len(nestedErrs) > 0 { + errs = append(errs, validationErr{ + Field: fieldMeasurementSchemas, + Index: intPtr(idx), + Nested: nestedErrs, + }) + } + } + + return errs +} + +func (s measurementSchema) valid() []validationErr { + var errs []validationErr + + if err := influxdb.ValidateMeasurementSchemaName(s.Name); err != nil { + errs = append(errs, validationErr{ + Field: fieldMeasurementSchemaName, + Msg: err.Error(), + }) + } + + // validate columns + timeCount := 0 + fieldCount := 0 + names := make([]string, 0, len(s.Columns)) + + columnErrors := make([]validationErr, len(s.Columns)) + + for idx, col := range s.Columns { + colErr := &columnErrors[idx] + *colErr = validationErr{ + Field: fieldMeasurementSchemaColumns, + Index: intPtr(idx), + } + + names = append(names, col.Name) + + if err := influxdb.ValidateMeasurementSchemaName(col.Name); err != nil { + colErr.Nested = append(colErr.Nested, validationErr{ + Field: fieldMeasurementColumnName, + Msg: err.Error(), + }) + } + + colType := influxdb.SemanticColumnTypeFromString(col.Type) + if colType == nil { + colErr.Nested = append(colErr.Nested, validationErr{ + Field: fieldMeasurementColumnType, + Msg: "missing type", + }) + continue + } + + colDataType := influxdb.SchemaColumnDataTypeFromString(col.DataType) + + // all columns require a type field + if col.Name == "time" { + timeCount++ + if *colType != influxdb.SemanticColumnTypeTimestamp { + colErr.Nested = append(colErr.Nested, validationErr{ + Field: fieldMeasurementColumnType, + Msg: "\"time\" column type must be timestamp", + }) + } + + if colDataType != nil { + colErr.Nested = append(colErr.Nested, validationErr{ + Field: fieldMeasurementColumnDataType, + Msg: "unexpected dataType for time column", + }) + } + } + + // ensure no other columns have a timestamp semantic + switch *colType { + case influxdb.SemanticColumnTypeTimestamp: + if col.Name != "time" { + colErr.Nested = append(colErr.Nested, validationErr{ + Field: fieldMeasurementColumnName, + Msg: "timestamp column must be named \"time\"", + }) + } + + case influxdb.SemanticColumnTypeTag: + // ensure tag columns don't include a data type value + if colDataType != nil { + colErr.Nested = append(colErr.Nested, validationErr{ + Field: fieldMeasurementColumnDataType, + Msg: "unexpected dataType for tag column", + }) + } + + case influxdb.SemanticColumnTypeField: + if colDataType == nil { + colErr.Nested = append(colErr.Nested, validationErr{ + Field: fieldMeasurementColumnDataType, + Msg: "missing or invalid data type for field column", + }) + } + fieldCount++ + } + } + + // collect only those column errors with nested errors + for _, colErr := range columnErrors { + if len(colErr.Nested) > 0 { + errs = append(errs, colErr) + } + } + + if timeCount == 0 { + errs = append(errs, validationErr{ + Field: fieldMeasurementSchemaColumns, + Msg: "missing \"time\" column", + }) + } + + // ensure there is at least one field defined + if fieldCount == 0 { + errs = append(errs, validationErr{ + Field: fieldMeasurementSchemaColumns, + Msg: "at least one field column is required", + }) + } + + // check for duplicate columns using general UTF-8 case insensitive comparison + sort.Strings(names) + for i := 0; i < len(names)-1; i++ { + if strings.EqualFold(names[i], names[i+1]) { + errs = append(errs, validationErr{ + Field: fieldMeasurementSchemaColumns, + Msg: fmt.Sprintf("duplicate columns with name %q", names[i]), + }) + } + } + + return errs +} + +func (s measurementSchemas) summarize() []SummaryMeasurementSchema { + if len(s) == 0 { + return nil + } + + schemas := make([]SummaryMeasurementSchema, 0, len(s)) + for _, schema := range s { + schemas = append(schemas, schema.summarize()) + } + + // Measurements are in Name order for consistent output in summaries + sort.Slice(schemas, func(i, j int) bool { + return schemas[i].Name < schemas[j].Name + }) + + return schemas +} + +type measurementSchema struct { + Name string `json:"name" yaml:"name"` + Columns []measurementColumn `json:"columns" yaml:"columns"` +} + +func (s measurementSchema) summarize() SummaryMeasurementSchema { + var cols []SummaryMeasurementSchemaColumn + if len(s.Columns) > 0 { + cols = make([]SummaryMeasurementSchemaColumn, 0, len(s.Columns)) + for i := range s.Columns { + cols = append(cols, s.Columns[i].summarize()) + } + + // Columns are in Name order for consistent output in summaries + sort.Slice(cols, func(i, j int) bool { + return cols[i].Name < cols[j].Name + }) + } + + return SummaryMeasurementSchema{Name: s.Name, Columns: cols} +} + +type measurementColumn struct { + Name string `json:"name" yaml:"name"` + Type string `json:"type" yaml:"type"` + DataType string `json:"dataType,omitempty" yaml:"dataType,omitempty"` +} + +func (c measurementColumn) summarize() SummaryMeasurementSchemaColumn { + return SummaryMeasurementSchemaColumn(c) +} diff --git a/pkger/parser_test.go b/pkger/parser_test.go index ab485f9b02..f5963a2913 100644 --- a/pkger/parser_test.go +++ b/pkger/parser_test.go @@ -59,6 +59,35 @@ func TestParse(t *testing.T) { }) }) + t.Run("with valid bucket and schema should be valid", func(t *testing.T) { + template := validParsedTemplateFromFile(t, "testdata/bucket_schema.yml", EncodingYAML) + buckets := template.Summary().Buckets + require.Len(t, buckets, 1) + + exp := SummaryBucket{ + SummaryIdentifier: SummaryIdentifier{ + Kind: KindBucket, + MetaName: "explicit-11", + EnvReferences: []SummaryReference{}, + }, + Name: "my_explicit", + SchemaType: "explicit", + LabelAssociations: []SummaryLabel{}, + MeasurementSchemas: []SummaryMeasurementSchema{ + { + Name: "cpu", + Columns: []SummaryMeasurementSchemaColumn{ + {Name: "host", Type: "tag"}, + {Name: "time", Type: "timestamp"}, + {Name: "usage_user", Type: "field", DataType: "float"}, + }, + }, + }, + } + + assert.Equal(t, exp, buckets[0]) + }) + t.Run("with env refs should be valid", func(t *testing.T) { testfileRunner(t, "testdata/bucket_ref.yml", func(t *testing.T, template *Template) { actual := template.Summary().Buckets @@ -182,6 +211,70 @@ metadata: name: invalid-name spec: name: f +`, + }, + { + name: "invalid measurement name", + resourceErrs: 1, + validationErrs: 1, + valFields: []string{strings.Join([]string{fieldSpec, fieldMeasurementSchemas}, ".")}, + templateStr: `apiVersion: influxdata.com/v2alpha1 +kind: Bucket +metadata: + name: foo-1 +spec: + name: foo + schemaType: explicit + measurementSchemas: + - name: _cpu + columns: + - name: time + type: timestamp + - name: usage_user + type: field + dataType: float +`, + }, + { + name: "invalid semantic type", + resourceErrs: 1, + validationErrs: 1, + valFields: []string{strings.Join([]string{fieldSpec, fieldMeasurementSchemas, fieldMeasurementSchemaColumns}, ".")}, + templateStr: `apiVersion: influxdata.com/v2alpha1 +kind: Bucket +metadata: + name: foo-1 +spec: + name: foo + schemaType: explicit + measurementSchemas: + - name: _cpu + columns: + - name: time + type: field + - name: usage_user + type: field + dataType: float +`, + }, + { + name: "missing time column", + resourceErrs: 1, + validationErrs: 1, + valFields: []string{strings.Join([]string{fieldSpec, fieldMeasurementSchemas}, ".")}, + templateStr: `apiVersion: influxdata.com/v2alpha1 +kind: Bucket +metadata: + name: foo-1 +spec: + name: foo + schemaType: explicit + measurementSchemas: + - name: cpu + columns: + - name: usage_user + type: field + dataType: float `, }, } diff --git a/pkger/testdata/bucket_schema.yml b/pkger/testdata/bucket_schema.yml new file mode 100644 index 0000000000..624be3f137 --- /dev/null +++ b/pkger/testdata/bucket_schema.yml @@ -0,0 +1,17 @@ +apiVersion: influxdata.com/v2alpha1 +kind: Bucket +metadata: + name: explicit-11 +spec: + measurementSchemas: + - columns: + - name: time + type: timestamp + - name: host + type: tag + - dataType: float + name: usage_user + type: field + name: cpu + name: my_explicit + schemaType: explicit