feat: optionally dump queries to log on SIGTERM (#22638)

Dump all active queries to the log when a SIGTERM
is received and the termination-query-log flag is
true in the coordinator section of the config. The
default is false.
pull/22662/head
davidby-influx 2021-10-07 14:02:38 -07:00 committed by GitHub
parent 611a4370a2
commit fd1d51690d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 46 additions and 10 deletions

View File

@ -87,8 +87,11 @@ func (m *Main) Run(args ...string) error {
cmd.Logger.Info("Listening for signals")
// Block until one of the signals above is received
<-signalCh
sig := <-signalCh
cmd.Logger.Info("Signal received, initializing clean shutdown...")
if sig == syscall.SIGTERM && cmd.Server.LogQueriesOnTermination() {
cmd.Server.QueryExecutor.TaskManager.LogCurrentQueries(cmd.Logger.Info)
}
go cmd.Close()
// Block again until another signal is received, a shutdown timeout elapses,

View File

@ -671,6 +671,14 @@ func (s *Server) stopProfile() error {
return nil
}
func (s *Server) LogQueriesOnTermination() bool {
if s != nil && s.config != nil {
return s.config.Coordinator.TerminationQueryLog
} else {
return false
}
}
// monitorPointsWriter is a wrapper around `coordinator.PointsWriter` that helps
// to prevent a circular dependency between the `cluster` and `monitor` packages.
type monitorPointsWriter coordinator.PointsWriter

View File

@ -36,6 +36,7 @@ type Config struct {
MaxSelectPointN int `toml:"max-select-point"`
MaxSelectSeriesN int `toml:"max-select-series"`
MaxSelectBucketsN int `toml:"max-select-buckets"`
TerminationQueryLog bool `toml:"termination-query-log"`
}
// NewConfig returns an instance of Config with defaults.
@ -46,6 +47,7 @@ func NewConfig() Config {
MaxConcurrentQueries: DefaultMaxConcurrentQueries,
MaxSelectPointN: DefaultMaxSelectPointN,
MaxSelectSeriesN: DefaultMaxSelectSeriesN,
TerminationQueryLog: false,
}
}

View File

@ -194,6 +194,10 @@
# number of buckets unlimited.
# max-select-buckets = 0
# Whether to print a list of running queries when a data node receives a SIGTERM (sent when a process
# exceeds a container memory limit, or by the kill command.
# termination-query-log = false
###
### [retention]
###

View File

@ -30,6 +30,10 @@ const (
KilledTask
)
var (
queryFieldNames []string = []string{"qid", "query", "database", "duration", "status"}
)
func (t TaskStatus) String() string {
switch t {
case RunningTask:
@ -136,24 +140,39 @@ func (t *TaskManager) executeShowQueriesStatement(q *influxql.ShowQueriesStateme
for id, qi := range t.queries {
d := now.Sub(qi.startTime)
switch {
case d >= time.Second:
d = d - (d % time.Second)
case d >= time.Millisecond:
d = d - (d % time.Millisecond)
case d >= time.Microsecond:
d = d - (d % time.Microsecond)
}
d = prettyTime(d)
values = append(values, []interface{}{id, qi.query, qi.database, d.String(), qi.status.String()})
}
return []*models.Row{{
Columns: []string{"qid", "query", "database", "duration", "status"},
Columns: queryFieldNames,
Values: values,
}}, nil
}
func prettyTime(d time.Duration) time.Duration {
switch {
case d >= time.Second:
d = d - (d % time.Second)
case d >= time.Millisecond:
d = d - (d % time.Millisecond)
case d >= time.Microsecond:
d = d - (d % time.Microsecond)
}
return d
}
func (t *TaskManager) LogCurrentQueries(logFunc func(string, ...zap.Field)) {
for _, queryInfo := range t.Queries() {
logFunc("Current Queries", zap.Uint64(queryFieldNames[0], queryInfo.ID),
zap.String(queryFieldNames[1], queryInfo.Query),
zap.String(queryFieldNames[2], queryInfo.Database),
zap.String(queryFieldNames[3], prettyTime(queryInfo.Duration).String()),
zap.String(queryFieldNames[4], queryInfo.Status.String()))
}
}
func (t *TaskManager) queryError(qid uint64, err error) {
t.mu.RLock()
query := t.queries[qid]