feat(tsdb): Add basic tsdb read metrics
Adds a total cursor counter and seek location counter to a new `readMetrics` that is added to each `Engine`. Default labels group by `engine_id` and `node_id`.pull/14211/head
parent
350f72c296
commit
12549c859e
|
@ -19,6 +19,9 @@ func (q *arrayCursorIterator) buildFloatArrayCursor(ctx context.Context, name []
|
|||
key := q.seriesFieldKeyBytes(name, tags, field)
|
||||
cacheValues := q.e.Cache.Values(key)
|
||||
keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending)
|
||||
|
||||
q.e.readTracker.AddSeeks(uint64(keyCursor.seekN()))
|
||||
|
||||
if opt.Ascending {
|
||||
if q.asc.Float == nil {
|
||||
q.asc.Float = newFloatArrayAscendingCursor()
|
||||
|
@ -39,6 +42,9 @@ func (q *arrayCursorIterator) buildIntegerArrayCursor(ctx context.Context, name
|
|||
key := q.seriesFieldKeyBytes(name, tags, field)
|
||||
cacheValues := q.e.Cache.Values(key)
|
||||
keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending)
|
||||
|
||||
q.e.readTracker.AddSeeks(uint64(keyCursor.seekN()))
|
||||
|
||||
if opt.Ascending {
|
||||
if q.asc.Integer == nil {
|
||||
q.asc.Integer = newIntegerArrayAscendingCursor()
|
||||
|
@ -59,6 +65,9 @@ func (q *arrayCursorIterator) buildUnsignedArrayCursor(ctx context.Context, name
|
|||
key := q.seriesFieldKeyBytes(name, tags, field)
|
||||
cacheValues := q.e.Cache.Values(key)
|
||||
keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending)
|
||||
|
||||
q.e.readTracker.AddSeeks(uint64(keyCursor.seekN()))
|
||||
|
||||
if opt.Ascending {
|
||||
if q.asc.Unsigned == nil {
|
||||
q.asc.Unsigned = newUnsignedArrayAscendingCursor()
|
||||
|
@ -79,6 +88,9 @@ func (q *arrayCursorIterator) buildStringArrayCursor(ctx context.Context, name [
|
|||
key := q.seriesFieldKeyBytes(name, tags, field)
|
||||
cacheValues := q.e.Cache.Values(key)
|
||||
keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending)
|
||||
|
||||
q.e.readTracker.AddSeeks(uint64(keyCursor.seekN()))
|
||||
|
||||
if opt.Ascending {
|
||||
if q.asc.String == nil {
|
||||
q.asc.String = newStringArrayAscendingCursor()
|
||||
|
@ -99,6 +111,9 @@ func (q *arrayCursorIterator) buildBooleanArrayCursor(ctx context.Context, name
|
|||
key := q.seriesFieldKeyBytes(name, tags, field)
|
||||
cacheValues := q.e.Cache.Values(key)
|
||||
keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending)
|
||||
|
||||
q.e.readTracker.AddSeeks(uint64(keyCursor.seekN()))
|
||||
|
||||
if opt.Ascending {
|
||||
if q.asc.Boolean == nil {
|
||||
q.asc.Boolean = newBooleanArrayAscendingCursor()
|
||||
|
|
|
@ -15,6 +15,9 @@ func (q *arrayCursorIterator) build{{.Name}}ArrayCursor(ctx context.Context, nam
|
|||
key := q.seriesFieldKeyBytes(name, tags, field)
|
||||
cacheValues := q.e.Cache.Values(key)
|
||||
keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending)
|
||||
|
||||
q.e.readTracker.AddSeeks(uint64(keyCursor.seekN()))
|
||||
|
||||
if opt.Ascending {
|
||||
if q.asc.{{.Name}} == nil {
|
||||
q.asc.{{.Name}} = new{{.Name}}ArrayAscendingCursor()
|
||||
|
|
|
@ -39,6 +39,8 @@ func (q *arrayCursorIterator) Next(ctx context.Context, r *tsdb.CursorRequest) (
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
q.e.readTracker.AddCursors(1)
|
||||
|
||||
if grp := metrics.GroupFromContext(ctx); grp != nil {
|
||||
grp.GetCounter(numberOfRefCursorsCounter).Add(1)
|
||||
}
|
||||
|
|
|
@ -155,6 +155,7 @@ type Engine struct {
|
|||
enableCompactionsOnOpen bool
|
||||
|
||||
compactionTracker *compactionTracker // Used to track state of compactions.
|
||||
readTracker *readTracker // Used to track number of reads.
|
||||
defaultMetricLabels prometheus.Labels // N.B this must not be mutated after Open is called.
|
||||
|
||||
// Limiter for concurrent compactions.
|
||||
|
@ -494,6 +495,7 @@ func (e *Engine) initTrackers() {
|
|||
e.compactionTracker = newCompactionTracker(bms.compactionMetrics, e.defaultMetricLabels)
|
||||
e.FileStore.tracker = newFileTracker(bms.fileMetrics, e.defaultMetricLabels)
|
||||
e.Cache.tracker = newCacheTracker(bms.cacheMetrics, e.defaultMetricLabels)
|
||||
e.readTracker = newReadTracker(bms.readMetrics, e.defaultMetricLabels)
|
||||
|
||||
e.scheduler.setCompactionTracker(e.compactionTracker)
|
||||
}
|
||||
|
@ -1374,3 +1376,37 @@ func SeriesAndFieldFromCompositeKey(key []byte) ([]byte, []byte) {
|
|||
}
|
||||
return key[:sep], key[sep+len(keyFieldSeparator):]
|
||||
}
|
||||
|
||||
// readTracker tracks reads from the engine.
|
||||
type readTracker struct {
|
||||
metrics *readMetrics
|
||||
labels prometheus.Labels
|
||||
cursors uint64
|
||||
seeks uint64
|
||||
}
|
||||
|
||||
func newReadTracker(metrics *readMetrics, defaultLabels prometheus.Labels) *readTracker {
|
||||
return &readTracker{metrics: metrics, labels: defaultLabels}
|
||||
}
|
||||
|
||||
// Labels returns a copy of the default labels used by the tracker's metrics.
|
||||
// The returned map is safe for modification.
|
||||
func (t *readTracker) Labels() prometheus.Labels {
|
||||
labels := make(prometheus.Labels, len(t.labels))
|
||||
for k, v := range t.labels {
|
||||
labels[k] = v
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
// AddCursors increases the number of cursors.
|
||||
func (t *readTracker) AddCursors(n uint64) {
|
||||
atomic.AddUint64(&t.cursors, n)
|
||||
t.metrics.Cursors.With(t.labels).Add(float64(n))
|
||||
}
|
||||
|
||||
// AddSeeks increases the number of location seeks.
|
||||
func (t *readTracker) AddSeeks(n uint64) {
|
||||
atomic.AddUint64(&t.seeks, n)
|
||||
t.metrics.Seeks.With(t.labels).Add(float64(n))
|
||||
}
|
||||
|
|
|
@ -1438,6 +1438,11 @@ func (c *KeyCursor) seekDescending(t int64) {
|
|||
}
|
||||
}
|
||||
|
||||
// seekN returns the number of seek locations.
|
||||
func (c *KeyCursor) seekN() int {
|
||||
return len(c.seeks)
|
||||
}
|
||||
|
||||
// Next moves the cursor to the next position.
|
||||
// Data should be read by the ReadBlock functions.
|
||||
func (c *KeyCursor) Next() {
|
||||
|
|
|
@ -35,6 +35,7 @@ const namespace = "storage"
|
|||
const compactionSubsystem = "compactions" // sub-system associated with metrics for compactions.
|
||||
const fileStoreSubsystem = "tsm_files" // sub-system associated with metrics for TSM files.
|
||||
const cacheSubsystem = "cache" // sub-system associated with metrics for the cache.
|
||||
const readSubsystem = "reads" // sub-system associated with metrics for reads.
|
||||
|
||||
// blockMetrics are a set of metrics concerned with tracking data about block storage.
|
||||
type blockMetrics struct {
|
||||
|
@ -42,6 +43,7 @@ type blockMetrics struct {
|
|||
*compactionMetrics
|
||||
*fileMetrics
|
||||
*cacheMetrics
|
||||
*readMetrics
|
||||
}
|
||||
|
||||
// newBlockMetrics initialises the prometheus metrics for the block subsystem.
|
||||
|
@ -51,6 +53,7 @@ func newBlockMetrics(labels prometheus.Labels) *blockMetrics {
|
|||
compactionMetrics: newCompactionMetrics(labels),
|
||||
fileMetrics: newFileMetrics(labels),
|
||||
cacheMetrics: newCacheMetrics(labels),
|
||||
readMetrics: newReadMetrics(labels),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -60,6 +63,7 @@ func (m *blockMetrics) PrometheusCollectors() []prometheus.Collector {
|
|||
metrics = append(metrics, m.compactionMetrics.PrometheusCollectors()...)
|
||||
metrics = append(metrics, m.fileMetrics.PrometheusCollectors()...)
|
||||
metrics = append(metrics, m.cacheMetrics.PrometheusCollectors()...)
|
||||
metrics = append(metrics, m.readMetrics.PrometheusCollectors()...)
|
||||
return metrics
|
||||
}
|
||||
|
||||
|
@ -245,3 +249,41 @@ func (m *cacheMetrics) PrometheusCollectors() []prometheus.Collector {
|
|||
m.Writes,
|
||||
}
|
||||
}
|
||||
|
||||
// readMetrics are a set of metrics concerned with tracking data engine reads.
|
||||
type readMetrics struct {
|
||||
Cursors *prometheus.CounterVec
|
||||
Seeks *prometheus.CounterVec
|
||||
}
|
||||
|
||||
// newReadMetrics initialises the prometheus metrics for tracking reads.
|
||||
func newReadMetrics(labels prometheus.Labels) *readMetrics {
|
||||
var names []string
|
||||
for k := range labels {
|
||||
names = append(names, k)
|
||||
}
|
||||
sort.Strings(names)
|
||||
|
||||
return &readMetrics{
|
||||
Cursors: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: readSubsystem,
|
||||
Name: "cursors",
|
||||
Help: "Number of cursors created.",
|
||||
}, names),
|
||||
Seeks: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: readSubsystem,
|
||||
Name: "seek_locations",
|
||||
Help: "Number of tsm locations searched.",
|
||||
}, names),
|
||||
}
|
||||
}
|
||||
|
||||
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
|
||||
func (m *readMetrics) PrometheusCollectors() []prometheus.Collector {
|
||||
return []prometheus.Collector{
|
||||
m.Cursors,
|
||||
m.Seeks,
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue