Drop time when used as a tag or field key
The "time" field and tags are unqueryable so we prevent those from being written so we don't have unreadable data.pull/7132/head
parent
3c124036f0
commit
9621bee195
|
@ -1,3 +1,9 @@
|
|||
## v1.1.0 [unreleased]
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- [#1834](https://github.com/influxdata/influxdb/issues/1834): Drop time when used as a tag or field key.
|
||||
|
||||
## v1.0.0 [unreleased]
|
||||
|
||||
### Breaking changes
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
# TODO
|
||||
|
||||
## v2
|
||||
|
||||
TODO list for v2. Here is a list of things we want to add to v1, but can't because they would be a breaking change.
|
||||
|
||||
- [#1834](https://github.com/influxdata/influxdb/issues/1834): Disallow using time as a tag key or field key.
|
|
@ -447,6 +447,24 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate,
|
|||
|
||||
// get the shard mutex for locally defined fields
|
||||
for _, p := range points {
|
||||
// verify the tags and fields
|
||||
tags := p.Tags()
|
||||
if _, ok := tags["time"]; ok {
|
||||
s.logger.Printf("dropping tag 'time' from '%s'\n", p.PrecisionString(""))
|
||||
delete(tags, "time")
|
||||
p.SetTags(tags)
|
||||
}
|
||||
|
||||
fields := p.Fields()
|
||||
if _, ok := fields["time"]; ok {
|
||||
s.logger.Printf("dropping field 'time' from '%s'\n", p.PrecisionString(""))
|
||||
delete(fields, "time")
|
||||
|
||||
if len(fields) == 0 {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// see if the series should be added to the index
|
||||
key := string(p.Key())
|
||||
ss := s.index.Series(key)
|
||||
|
@ -455,7 +473,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate,
|
|||
return nil, fmt.Errorf("max series per database exceeded: %s", key)
|
||||
}
|
||||
|
||||
ss = NewSeries(key, p.Tags())
|
||||
ss = NewSeries(key, tags)
|
||||
atomic.AddInt64(&s.stats.SeriesCreated, 1)
|
||||
}
|
||||
|
||||
|
@ -466,14 +484,14 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate,
|
|||
mf := s.engine.MeasurementFields(p.Name())
|
||||
|
||||
if mf == nil {
|
||||
for name, value := range p.Fields() {
|
||||
for name, value := range fields {
|
||||
fieldsToCreate = append(fieldsToCreate, &FieldCreate{p.Name(), &Field{Name: name, Type: influxql.InspectDataType(value)}})
|
||||
}
|
||||
continue // skip validation since all fields are new
|
||||
}
|
||||
|
||||
// validate field types and encode data
|
||||
for name, value := range p.Fields() {
|
||||
for name, value := range fields {
|
||||
if f := mf.Field(name); f != nil {
|
||||
// Field present in shard metadata, make sure there is no type conflict.
|
||||
if f.Type != influxql.InspectDataType(value) {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package tsdb_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
@ -144,12 +145,113 @@ func TestMaxSeriesLimit(t *testing.T) {
|
|||
if err == nil {
|
||||
t.Fatal("expected error")
|
||||
} else if err.Error() != "max series per database exceeded: cpu,host=server9999" {
|
||||
t.Fatalf("unexpected error messag:\n\texp = max series per database exceeded: cpu,host=server9999\n\tgot = %s", err.Error())
|
||||
t.Fatalf("unexpected error message:\n\texp = max series per database exceeded: cpu,host=server9999\n\tgot = %s", err.Error())
|
||||
}
|
||||
|
||||
sh.Close()
|
||||
}
|
||||
|
||||
func TestWriteTimeTag(t *testing.T) {
|
||||
tmpDir, _ := ioutil.TempDir("", "shard_test")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
tmpShard := path.Join(tmpDir, "shard")
|
||||
tmpWal := path.Join(tmpDir, "wal")
|
||||
|
||||
index := tsdb.NewDatabaseIndex("db")
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
|
||||
|
||||
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
|
||||
if err := sh.Open(); err != nil {
|
||||
t.Fatalf("error opening shard: %s", err.Error())
|
||||
}
|
||||
defer sh.Close()
|
||||
|
||||
pt := models.MustNewPoint(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{"time": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
sh.SetLogOutput(buf)
|
||||
if err := sh.WritePoints([]models.Point{pt}); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
} else if got, exp := buf.String(), "dropping field 'time'"; !strings.Contains(got, exp) {
|
||||
t.Fatalf("unexpected log message: %s", strings.TrimSpace(got))
|
||||
}
|
||||
|
||||
m := index.Measurement("cpu")
|
||||
if m != nil {
|
||||
t.Fatal("unexpected cpu measurement")
|
||||
}
|
||||
|
||||
pt = models.MustNewPoint(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{"value": 1.0, "time": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
|
||||
buf = bytes.NewBuffer(nil)
|
||||
sh.SetLogOutput(buf)
|
||||
if err := sh.WritePoints([]models.Point{pt}); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
} else if got, exp := buf.String(), "dropping field 'time'"; !strings.Contains(got, exp) {
|
||||
t.Fatalf("unexpected log message: %s", strings.TrimSpace(got))
|
||||
}
|
||||
|
||||
m = index.Measurement("cpu")
|
||||
if m == nil {
|
||||
t.Fatal("expected cpu measurement")
|
||||
}
|
||||
|
||||
if got, exp := len(m.FieldNames()), 1; got != exp {
|
||||
t.Fatalf("invalid number of field names: got=%v exp=%v", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteTimeField(t *testing.T) {
|
||||
tmpDir, _ := ioutil.TempDir("", "shard_test")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
tmpShard := path.Join(tmpDir, "shard")
|
||||
tmpWal := path.Join(tmpDir, "wal")
|
||||
|
||||
index := tsdb.NewDatabaseIndex("db")
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
|
||||
|
||||
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
|
||||
if err := sh.Open(); err != nil {
|
||||
t.Fatalf("error opening shard: %s", err.Error())
|
||||
}
|
||||
defer sh.Close()
|
||||
|
||||
pt := models.MustNewPoint(
|
||||
"cpu",
|
||||
map[string]string{"time": "now"},
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
sh.SetLogOutput(buf)
|
||||
if err := sh.WritePoints([]models.Point{pt}); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
} else if got, exp := buf.String(), "dropping tag 'time'"; !strings.Contains(got, exp) {
|
||||
t.Fatalf("unexpected log message: %s", strings.TrimSpace(got))
|
||||
}
|
||||
|
||||
key := models.MakeKey([]byte("cpu"), nil)
|
||||
series := index.Series(string(key))
|
||||
if series == nil {
|
||||
t.Fatal("expected series")
|
||||
} else if len(series.Tags) != 0 {
|
||||
t.Fatalf("unexpected number of tags: got=%v exp=%v", len(series.Tags), 0)
|
||||
}
|
||||
}
|
||||
|
||||
func TestShardWriteAddNewField(t *testing.T) {
|
||||
tmpDir, _ := ioutil.TempDir("", "shard_test")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
|
Loading…
Reference in New Issue