fix: correctly handle MaxSeriesPerDatabaseExceeded (#23091)
Check for the correctly returned PartialWriteError in (*shard).validateSeriesAndFields, allow partial writes. closes https://github.com/influxdata/influxdb/issues/23090pull/23094/head
parent
1ab50d7557
commit
0c3dca883e
|
@ -194,7 +194,7 @@ func (i *Index) CreateSeriesListIfNotExists(seriesIDSet *tsdb.SeriesIDSet, measu
|
|||
i.mu.RLock()
|
||||
if max := opt.Config.MaxSeriesPerDatabase; max > 0 && len(i.series)+len(keys) > max {
|
||||
i.mu.RUnlock()
|
||||
return errMaxSeriesPerDatabaseExceeded{limit: opt.Config.MaxSeriesPerDatabase}
|
||||
return errMaxSeriesPerDatabaseExceeded{limit: opt.Config.MaxSeriesPerDatabase, series: len(i.series), keys: len(keys)}
|
||||
}
|
||||
i.mu.RUnlock()
|
||||
}
|
||||
|
@ -1362,9 +1362,11 @@ func (itr *seriesIDIterator) nextKeys() error {
|
|||
// errMaxSeriesPerDatabaseExceeded is a marker error returned during series creation
|
||||
// to indicate that a new series would exceed the limits of the database.
|
||||
type errMaxSeriesPerDatabaseExceeded struct {
|
||||
limit int
|
||||
limit int
|
||||
series int
|
||||
keys int
|
||||
}
|
||||
|
||||
func (e errMaxSeriesPerDatabaseExceeded) Error() string {
|
||||
return fmt.Sprintf("max-series-per-database limit exceeded: (%d)", e.limit)
|
||||
return fmt.Sprintf("max-series-per-database exceeded limit=%d series=%d keys=%d", e.limit, e.series, e.keys)
|
||||
}
|
||||
|
|
|
@ -624,9 +624,12 @@ func (s *Shard) validateSeriesAndFields(points []models.Point, tracker StatsTrac
|
|||
var droppedKeys [][]byte
|
||||
if err := engine.CreateSeriesListIfNotExists(keys, names, tagsSlice, tracker); err != nil {
|
||||
switch err := err.(type) {
|
||||
// TODO(jmw): why is this a *PartialWriteError when everything else is not a pointer?
|
||||
// Maybe we can just change it to be consistent if we change it also in all
|
||||
// the places that construct it.
|
||||
// (DSB) This was previously *PartialWriteError. Now catch pointer and value types.
|
||||
case PartialWriteError:
|
||||
reason = err.Reason
|
||||
dropped += err.Dropped
|
||||
droppedKeys = err.DroppedKeys
|
||||
atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped))
|
||||
case *PartialWriteError:
|
||||
reason = err.Reason
|
||||
dropped += err.Dropped
|
||||
|
|
|
@ -201,7 +201,7 @@ func TestMaxSeriesLimit(t *testing.T) {
|
|||
err = sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
|
||||
if err == nil {
|
||||
t.Fatal("expected error")
|
||||
} else if exp, got := `partial write: max-series-per-database limit exceeded: (1000) dropped=1`, err.Error(); exp != got {
|
||||
} else if exp, got := `partial write: max-series-per-database exceeded limit=1000 series=1000 keys=1 dropped=1`, err.Error(); exp != got {
|
||||
t.Fatalf("unexpected error message:\n\texp = %s\n\tgot = %s", exp, got)
|
||||
}
|
||||
|
||||
|
|
|
@ -1366,7 +1366,7 @@ func TestStore_Cardinality_Limit_On_InMem_Index(t *testing.T) {
|
|||
to := from + pointsPerShard
|
||||
|
||||
if err := store.Store.WriteToShard(tsdb.WriteContext{}, uint64(shardID), points[from:to]); err != nil {
|
||||
if !strings.Contains(err.Error(), "partial write: max-series-per-database limit exceeded:") {
|
||||
if !strings.Contains(err.Error(), "partial write: max-series-per-database exceeded limit") {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue