Merge pull request #7538 from influxdata/jw-limits
Add config option to messages when limits exceededpull/7549/head
commit
97ba0bbb98
|
@ -953,7 +953,7 @@ func TestServer_Query_MaxSelectSeriesN(t *testing.T) {
|
|||
&Query{
|
||||
name: "exceeed max series",
|
||||
command: `SELECT COUNT(value) FROM db0.rp0.cpu`,
|
||||
exp: `{"results":[{"error":"max select series count exceeded: 4 series"}]}`,
|
||||
exp: `{"results":[{"error":"max-select-series limit exceeded: (4/3)"}]}`,
|
||||
},
|
||||
}...)
|
||||
|
||||
|
|
|
@ -587,7 +587,7 @@ func (e *StatementExecutor) createIterators(stmt *influxql.SelectStatement, ctx
|
|||
// Determine the number of buckets by finding the time span and dividing by the interval.
|
||||
buckets := int64(max.Sub(min)) / int64(interval)
|
||||
if int(buckets) > e.MaxSelectBucketsN {
|
||||
return nil, stmt, fmt.Errorf("max select bucket count exceeded: %d buckets", buckets)
|
||||
return nil, stmt, fmt.Errorf("max-select-buckets limit exceeded: (%d/%d)", buckets, e.MaxSelectBucketsN)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -101,7 +101,7 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) {
|
|||
if a := ReadAllResults(e.ExecuteQuery(`SELECT count(value) FROM cpu WHERE time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:35Z' GROUP BY time(10s)`, "db0", 0)); !reflect.DeepEqual(a, []*influxql.Result{
|
||||
{
|
||||
StatementID: 0,
|
||||
Err: errors.New("max select bucket count exceeded: 4 buckets"),
|
||||
Err: errors.New("max-select-buckets limit exceeded: (4/3)"),
|
||||
},
|
||||
}) {
|
||||
t.Fatalf("unexpected results: %s", spew.Sdump(a))
|
||||
|
|
|
@ -13,7 +13,7 @@ func PointLimitMonitor(itrs Iterators, interval time.Duration, limit int) QueryM
|
|||
case <-ticker.C:
|
||||
stats := itrs.Stats()
|
||||
if stats.PointN >= limit {
|
||||
return ErrMaxPointsReached
|
||||
return ErrMaxSelectPointsLimitExceeded(stats.PointN, limit)
|
||||
}
|
||||
case <-closing:
|
||||
return nil
|
||||
|
|
|
@ -25,20 +25,12 @@ var (
|
|||
// ErrQueryInterrupted is an error returned when the query is interrupted.
|
||||
ErrQueryInterrupted = errors.New("query interrupted")
|
||||
|
||||
// ErrMaxConcurrentQueriesReached is an error when a query cannot be run
|
||||
// because the maximum number of queries has been reached.
|
||||
ErrMaxConcurrentQueriesReached = errors.New("max concurrent queries reached")
|
||||
|
||||
// ErrQueryEngineShutdown is an error sent when the query cannot be
|
||||
// created because the query engine was shutdown.
|
||||
ErrQueryEngineShutdown = errors.New("query engine shutdown")
|
||||
|
||||
// ErrMaxPointsReached is an error when a query hits the maximum number of
|
||||
// points.
|
||||
ErrMaxPointsReached = errors.New("max number of points reached")
|
||||
|
||||
// ErrQueryTimeoutReached is an error when a query hits the timeout.
|
||||
ErrQueryTimeoutReached = errors.New("query timeout reached")
|
||||
// ErrQueryTimeoutLimitExceeded is an error when a query hits the max time allowed to run.
|
||||
ErrQueryTimeoutLimitExceeded = errors.New("query-timeout limit exceeded")
|
||||
)
|
||||
|
||||
// Statistics for the QueryExecutor
|
||||
|
@ -55,6 +47,17 @@ func ErrDatabaseNotFound(name string) error { return fmt.Errorf("database not fo
|
|||
// ErrMeasurementNotFound returns a measurement not found error for the given measurement name.
|
||||
func ErrMeasurementNotFound(name string) error { return fmt.Errorf("measurement not found: %s", name) }
|
||||
|
||||
// ErrMaxSelectPointsLimitExceeded is an error when a query hits the maximum number of points.
|
||||
func ErrMaxSelectPointsLimitExceeded(n, limit int) error {
|
||||
return fmt.Errorf("max-select-point limit exceeed: (%d/%d)", n, limit)
|
||||
}
|
||||
|
||||
// ErrMaxConcurrentQueriesLimitExceeded is an error when a query cannot be run
|
||||
// because the maximum number of queries has been reached.
|
||||
func ErrMaxConcurrentQueriesLimitExceeded(n, limit int) error {
|
||||
return fmt.Errorf("max-concurrent-queries limit exceeded(%d, %d)", n, limit)
|
||||
}
|
||||
|
||||
// ExecutionOptions contains the options for executing a query.
|
||||
type ExecutionOptions struct {
|
||||
// The database the query is running against.
|
||||
|
|
|
@ -3,6 +3,7 @@ package influxql_test
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -166,7 +167,7 @@ func TestQueryExecutor_Limit_Timeout(t *testing.T) {
|
|||
|
||||
results := e.ExecuteQuery(q, influxql.ExecutionOptions{}, nil)
|
||||
result := <-results
|
||||
if result.Err != influxql.ErrQueryTimeoutReached {
|
||||
if result.Err == nil || !strings.Contains(result.Err.Error(), "query-timeout") {
|
||||
t.Errorf("unexpected error: %s", result.Err)
|
||||
}
|
||||
}
|
||||
|
@ -202,7 +203,7 @@ func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) {
|
|||
if len(result.Series) != 0 {
|
||||
t.Errorf("expected %d rows, got %d", 0, len(result.Series))
|
||||
}
|
||||
if result.Err != influxql.ErrMaxConcurrentQueriesReached {
|
||||
if result.Err == nil || !strings.Contains(result.Err.Error(), "max-concurrent-queries") {
|
||||
t.Errorf("unexpected error: %s", result.Err)
|
||||
}
|
||||
case <-qid:
|
||||
|
|
|
@ -135,7 +135,7 @@ func (t *TaskManager) AttachQuery(q *Query, database string, interrupt <-chan st
|
|||
}
|
||||
|
||||
if t.MaxConcurrentQueries > 0 && len(t.queries) >= t.MaxConcurrentQueries {
|
||||
return 0, nil, ErrMaxConcurrentQueriesReached
|
||||
return 0, nil, ErrMaxConcurrentQueriesLimitExceeded(len(t.queries), t.MaxConcurrentQueries)
|
||||
}
|
||||
|
||||
qid := t.nextID
|
||||
|
@ -239,7 +239,7 @@ func (t *TaskManager) waitForQuery(qid uint64, interrupt <-chan struct{}, closin
|
|||
if !ok {
|
||||
break
|
||||
}
|
||||
query.setError(ErrQueryTimeoutReached)
|
||||
query.setError(ErrQueryTimeoutLimitExceeded)
|
||||
case <-interrupt:
|
||||
// Query was manually closed so exit the select.
|
||||
return
|
||||
|
|
|
@ -16,11 +16,14 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
ErrCacheMemoryExceeded = fmt.Errorf("cache maximum memory size exceeded")
|
||||
ErrCacheInvalidCheckpoint = fmt.Errorf("invalid checkpoint")
|
||||
ErrSnapshotInProgress = fmt.Errorf("snapshot in progress")
|
||||
)
|
||||
|
||||
func ErrCacheMemorySizeLimitExceeded(n, limit uint64) error {
|
||||
return fmt.Errorf("cache-max-memory-size exceeded: (%d/%d)", n, limit)
|
||||
}
|
||||
|
||||
// entry is a set of values and some metadata.
|
||||
type entry struct {
|
||||
mu sync.RWMutex
|
||||
|
@ -239,10 +242,12 @@ func (c *Cache) Write(key string, values []Value) error {
|
|||
|
||||
// Enough room in the cache?
|
||||
c.mu.Lock()
|
||||
if c.maxSize > 0 && c.size+c.snapshotSize+addedSize > c.maxSize {
|
||||
limit := c.maxSize
|
||||
n := c.size + c.snapshotSize + addedSize
|
||||
if limit > 0 && n > limit {
|
||||
c.mu.Unlock()
|
||||
atomic.AddInt64(&c.stats.WriteErr, 1)
|
||||
return ErrCacheMemoryExceeded
|
||||
return ErrCacheMemorySizeLimitExceeded(n, limit)
|
||||
}
|
||||
|
||||
if err := c.write(key, values); err != nil {
|
||||
|
@ -272,10 +277,12 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
|
|||
|
||||
// Enough room in the cache?
|
||||
c.mu.Lock()
|
||||
if c.maxSize > 0 && c.snapshotSize+c.size+totalSz > c.maxSize {
|
||||
limit := c.maxSize
|
||||
n := c.size + c.snapshotSize + totalSz
|
||||
if limit > 0 && n > limit {
|
||||
c.mu.Unlock()
|
||||
atomic.AddInt64(&c.stats.WriteErr, 1)
|
||||
return ErrCacheMemoryExceeded
|
||||
return ErrCacheMemorySizeLimitExceeded(n, limit)
|
||||
}
|
||||
|
||||
var werr error
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"os"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
@ -423,7 +424,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) {
|
|||
if exp, keys := []string{"foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) {
|
||||
t.Fatalf("cache keys incorrect after writes, exp %v, got %v", exp, keys)
|
||||
}
|
||||
if err := c.Write("bar", Values{v1}); err != ErrCacheMemoryExceeded {
|
||||
if err := c.Write("bar", Values{v1}); err == nil || !strings.Contains(err.Error(), "cache-max-memory-size") {
|
||||
t.Fatalf("wrong error writing key bar to cache")
|
||||
}
|
||||
|
||||
|
@ -432,7 +433,7 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("failed to snapshot cache: %v", err)
|
||||
}
|
||||
if err := c.Write("bar", Values{v1}); err != ErrCacheMemoryExceeded {
|
||||
if err := c.Write("bar", Values{v1}); err == nil || !strings.Contains(err.Error(), "cache-max-memory-size") {
|
||||
t.Fatalf("wrong error writing key bar to cache")
|
||||
}
|
||||
|
||||
|
|
|
@ -513,7 +513,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
|
|||
n := m.CardinalityBytes(tag.Key)
|
||||
if n >= s.options.Config.MaxValuesPerTag {
|
||||
dropPoint = true
|
||||
reason = fmt.Sprintf("max tag value limit exceeded (%d/%d): measurement=%q tag=%q value=%q",
|
||||
reason = fmt.Sprintf("max-values-per-tag limit exceeded (%d/%d): measurement=%q tag=%q value=%q",
|
||||
n, s.options.Config.MaxValuesPerTag, m.Name, string(tag.Key), string(tag.Key))
|
||||
break
|
||||
}
|
||||
|
@ -566,7 +566,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
|
|||
if s.options.Config.MaxSeriesPerDatabase > 0 && s.index.SeriesN()+1 > s.options.Config.MaxSeriesPerDatabase {
|
||||
atomic.AddInt64(&s.stats.WritePointsDropped, 1)
|
||||
dropped += 1
|
||||
reason = fmt.Sprintf("db %s max series limit reached: (%d/%d)",
|
||||
reason = fmt.Sprintf("max-series-per-database limit exceeded: db=%s (%d/%d)",
|
||||
s.database, s.index.SeriesN(), s.options.Config.MaxSeriesPerDatabase)
|
||||
continue
|
||||
}
|
||||
|
@ -862,7 +862,7 @@ func (s *Shard) monitor() {
|
|||
|
||||
// Log at 80, 85, 90-100% levels
|
||||
if perc == 80 || perc == 85 || perc >= 90 {
|
||||
s.logger.Printf("WARN: %d%% of tag values limit reached: (%d/%d), db=%s shard=%d measurement=%s tag=%s",
|
||||
s.logger.Printf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s",
|
||||
perc, n, s.options.Config.MaxValuesPerTag, s.database, s.id, m.Name, k)
|
||||
}
|
||||
}
|
||||
|
@ -1008,7 +1008,7 @@ func (ic *shardIteratorCreator) CreateIterator(opt influxql.IteratorOptions) (in
|
|||
stats := itr.Stats()
|
||||
if stats.SeriesN > ic.maxSeriesN {
|
||||
itr.Close()
|
||||
return nil, fmt.Errorf("max select series count exceeded: %d series", stats.SeriesN)
|
||||
return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", stats.SeriesN, ic.maxSeriesN)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -145,7 +145,7 @@ func TestMaxSeriesLimit(t *testing.T) {
|
|||
err = sh.WritePoints([]models.Point{pt})
|
||||
if err == nil {
|
||||
t.Fatal("expected error")
|
||||
} else if exp, got := `db db max series limit reached: (1000/1000) dropped=1`, err.Error(); exp != got {
|
||||
} else if exp, got := `max-series-per-database limit exceeded: db=db (1000/1000) dropped=1`, err.Error(); exp != got {
|
||||
t.Fatalf("unexpected error message:\n\texp = %s\n\tgot = %s", exp, got)
|
||||
}
|
||||
|
||||
|
@ -198,7 +198,7 @@ func TestShard_MaxTagValuesLimit(t *testing.T) {
|
|||
err = sh.WritePoints([]models.Point{pt})
|
||||
if err == nil {
|
||||
t.Fatal("expected error")
|
||||
} else if exp, got := `max tag value limit exceeded (1000/1000): measurement="cpu" tag="host" value="host" dropped=1`, err.Error(); exp != got {
|
||||
} else if exp, got := `max-values-per-tag limit exceeded (1000/1000): measurement="cpu" tag="host" value="host" dropped=1`, err.Error(); exp != got {
|
||||
t.Fatalf("unexpected error message:\n\texp = %s\n\tgot = %s", exp, got)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue