allow partial writes on field conflicts
parent
3722fa383d
commit
0103e44896
|
@ -48,6 +48,7 @@ The stress tool `influx_stress` will be removed in a subsequent release. We reco
|
|||
- [#7822](https://github.com/influxdata/influxdb/issues/7822): Drop database will delete /influxdb/data directory
|
||||
- [#7838](https://github.com/influxdata/influxdb/issues/7838): Ensure Subscriber service can be disabled.
|
||||
- [#7845](https://github.com/influxdata/influxdb/issues/7845): Fix race in storage engine.
|
||||
- [#7814](https://github.com/influxdata/influxdb/issues/7814): InfluxDB should do a partial write on mismatched type errors.
|
||||
|
||||
## v1.1.1 [2016-12-06]
|
||||
|
||||
|
|
|
@ -193,6 +193,23 @@ func (s *Server) HTTPPost(url string, content []byte) (results string, err error
|
|||
}
|
||||
}
|
||||
|
||||
type WriteError struct {
|
||||
body string
|
||||
statusCode int
|
||||
}
|
||||
|
||||
func (wr WriteError) StatusCode() int {
|
||||
return wr.statusCode
|
||||
}
|
||||
|
||||
func (wr WriteError) Body() string {
|
||||
return wr.body
|
||||
}
|
||||
|
||||
func (wr WriteError) Error() string {
|
||||
return fmt.Sprintf("invalid status code: code=%d, body=%s", wr.statusCode, wr.body)
|
||||
}
|
||||
|
||||
// Write executes a write against the server and returns the results.
|
||||
func (s *Server) Write(db, rp, body string, params url.Values) (results string, err error) {
|
||||
if params == nil {
|
||||
|
@ -208,7 +225,7 @@ func (s *Server) Write(db, rp, body string, params url.Values) (results string,
|
|||
if err != nil {
|
||||
return "", err
|
||||
} else if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
|
||||
return "", fmt.Errorf("invalid status code: code=%d, body=%s", resp.StatusCode, MustReadAll(resp.Body))
|
||||
return "", WriteError{statusCode: resp.StatusCode, body: string(MustReadAll(resp.Body))}
|
||||
}
|
||||
return string(MustReadAll(resp.Body)), nil
|
||||
}
|
||||
|
|
|
@ -387,6 +387,58 @@ func TestServer_UserCommands(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure the server will write all points possible with exception to the field type conflict.
|
||||
// This should return a partial write and a status of 400
|
||||
func TestServer_Write_FieldTypeConflict(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := OpenServer(NewConfig())
|
||||
defer s.Close()
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0), true); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if res, err := s.Write("db0", "rp0", fmt.Sprintf("cpu value=1i %d", mustParseTime(time.RFC3339Nano, "2015-01-01T00:00:01Z").UnixNano()), nil); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if exp := ``; exp != res {
|
||||
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
|
||||
}
|
||||
|
||||
// Verify the data was written.
|
||||
if res, err := s.Query(`SELECT * FROM db0.rp0.cpu`); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if exp := `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","value"],"values":[["2015-01-01T00:00:01Z",1]]}]}]}`; exp != res {
|
||||
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
|
||||
}
|
||||
|
||||
writes := []string{
|
||||
fmt.Sprintf("cpu value=2i %d", mustParseTime(time.RFC3339Nano, "2015-01-01T00:00:02Z").UnixNano()),
|
||||
fmt.Sprintf("cpu value=3 %d", mustParseTime(time.RFC3339Nano, "2015-01-01T00:00:03Z").UnixNano()),
|
||||
fmt.Sprintf("cpu value=4i %d", mustParseTime(time.RFC3339Nano, "2015-01-01T00:00:04Z").UnixNano()),
|
||||
}
|
||||
res, err := s.Write("db0", "rp0", strings.Join(writes, "\n"), nil)
|
||||
if err == nil {
|
||||
t.Fatal("expected error, got nil")
|
||||
}
|
||||
wr, ok := err.(WriteError)
|
||||
if !ok {
|
||||
t.Fatalf("wrong error type %v", err)
|
||||
}
|
||||
if exp, got := http.StatusBadRequest, wr.StatusCode(); exp != got {
|
||||
t.Fatalf("unexpected status code\nexp: %d\ngot: %d\n", exp, got)
|
||||
}
|
||||
if exp := ``; exp != res {
|
||||
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
|
||||
}
|
||||
|
||||
// Verify the data was written.
|
||||
if res, err := s.Query(`SELECT * FROM db0.rp0.cpu`); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if exp := `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","value"],"values":[["2015-01-01T00:00:01Z",1],["2015-01-01T00:00:02Z",2],["2015-01-01T00:00:04Z",4]]}]}]}`; exp != res {
|
||||
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the server can create a single point via line protocol with float type and read it back.
|
||||
func TestServer_Write_LineProtocol_Float(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
|
|
@ -353,6 +353,12 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
|
|||
return nil
|
||||
}
|
||||
|
||||
// If this is a partial write error, that is also ok.
|
||||
if _, ok := err.(tsdb.PartialWriteError); ok {
|
||||
atomic.AddInt64(&w.stats.WriteErr, 1)
|
||||
return err
|
||||
}
|
||||
|
||||
// If we've written to shard that should exist on the current node, but the store has
|
||||
// not actually created this shard, tell it to create it and retry the write
|
||||
if err == tsdb.ErrShardNotFound {
|
||||
|
|
|
@ -484,6 +484,9 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error
|
|||
|
||||
// Add the field to the in memory index
|
||||
if err := m.CreateFieldIfNotExists(f.Field.Name, f.Field.Type, false); err != nil {
|
||||
if err == ErrFieldTypeConflict {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -543,7 +546,9 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
|
|||
|
||||
// get the shard mutex for locally defined fields
|
||||
n = 0
|
||||
var skip bool
|
||||
for i, p := range points {
|
||||
skip = false
|
||||
// verify the tags and fields
|
||||
tags := p.Tags()
|
||||
if v := tags.Get(timeBytes); v != nil {
|
||||
|
@ -631,16 +636,24 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
|
|||
if f := mf.FieldBytes(iter.FieldKey()); f != nil {
|
||||
// Field present in shard metadata, make sure there is no type conflict.
|
||||
if f.Type != fieldType {
|
||||
return points, nil, fmt.Errorf("%s: input field \"%s\" on measurement \"%s\" is type %s, already exists as type %s", ErrFieldTypeConflict, iter.FieldKey(), p.Name(), fieldType, f.Type)
|
||||
atomic.AddInt64(&s.stats.WritePointsDropped, 1)
|
||||
dropped++
|
||||
reason = fmt.Sprintf("%s: input field \"%s\" on measurement \"%s\" is type %s, already exists as type %s", ErrFieldTypeConflict, iter.FieldKey(), p.Name(), fieldType, f.Type)
|
||||
skip = true
|
||||
} else {
|
||||
continue // Field is present, and it's of the same type. Nothing more to do.
|
||||
}
|
||||
|
||||
continue // Field is present, and it's of the same type. Nothing more to do.
|
||||
}
|
||||
|
||||
fieldsToCreate = append(fieldsToCreate, &FieldCreate{p.Name(), &Field{Name: string(iter.FieldKey()), Type: fieldType}})
|
||||
if !skip {
|
||||
fieldsToCreate = append(fieldsToCreate, &FieldCreate{p.Name(), &Field{Name: string(iter.FieldKey()), Type: fieldType}})
|
||||
}
|
||||
}
|
||||
|
||||
if !skip {
|
||||
points[n] = points[i]
|
||||
n++
|
||||
}
|
||||
points[n] = points[i]
|
||||
n++
|
||||
}
|
||||
points = points[:n]
|
||||
|
||||
|
|
Loading…
Reference in New Issue