Add option to disable sfile metrics
parent
3ff39cd9dc
commit
6b63a3def7
|
@ -39,6 +39,8 @@ func NewHashMap(opt Options) *HashMap {
|
|||
loadFactor: opt.LoadFactor,
|
||||
tracker: newRHHTracker(opt.Metrics, opt.Labels),
|
||||
}
|
||||
m.tracker.enabled = opt.MetricsEnabled
|
||||
|
||||
m.alloc()
|
||||
return m
|
||||
}
|
||||
|
@ -270,6 +272,7 @@ func (m *HashMap) PrometheusCollectors() []prometheus.Collector {
|
|||
type rhhTracker struct {
|
||||
metrics *Metrics
|
||||
labels prometheus.Labels
|
||||
enabled bool
|
||||
}
|
||||
|
||||
// Labels returns a copy of the default labels used by the tracker's metrics.
|
||||
|
@ -283,43 +286,71 @@ func (t *rhhTracker) Labels() prometheus.Labels {
|
|||
}
|
||||
|
||||
func newRHHTracker(metrics *Metrics, defaultLabels prometheus.Labels) *rhhTracker {
|
||||
return &rhhTracker{metrics: metrics, labels: defaultLabels}
|
||||
return &rhhTracker{metrics: metrics, labels: defaultLabels, enabled: true}
|
||||
}
|
||||
|
||||
func (t *rhhTracker) SetLoadFactor(load float64) {
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
labels := t.Labels()
|
||||
t.metrics.LoadFactor.With(labels).Set(load)
|
||||
}
|
||||
|
||||
func (t *rhhTracker) SetSize(sz uint64) {
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
labels := t.Labels()
|
||||
t.metrics.Size.With(labels).Set(float64(sz))
|
||||
}
|
||||
|
||||
func (t *rhhTracker) ObserveGet(d time.Duration) {
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
labels := t.Labels()
|
||||
t.metrics.GetDuration.With(labels).Observe(float64(d.Nanoseconds()))
|
||||
t.metrics.LastGetDuration.With(labels).Set(float64(d.Nanoseconds()))
|
||||
}
|
||||
|
||||
func (t *rhhTracker) ObservePut(d time.Duration) {
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
labels := t.Labels()
|
||||
t.metrics.InsertDuration.With(labels).Observe(float64(d.Nanoseconds()))
|
||||
t.metrics.LastInsertDuration.With(labels).Set(float64(d.Nanoseconds()))
|
||||
}
|
||||
|
||||
func (t *rhhTracker) SetGrowDuration(d time.Duration) {
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
labels := t.Labels()
|
||||
t.metrics.LastGrowDuration.With(labels).Set(d.Seconds())
|
||||
}
|
||||
|
||||
// TODO(edd): currently no safe way to calculate this concurrently.
|
||||
func (t *rhhTracker) SetProbeCount(length float64) {
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
labels := t.Labels()
|
||||
t.metrics.MeanProbeCount.With(labels).Set(length)
|
||||
}
|
||||
|
||||
func (t *rhhTracker) incGet(status string) {
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
labels := t.Labels()
|
||||
labels["status"] = status
|
||||
t.metrics.Gets.With(labels).Inc()
|
||||
|
@ -329,6 +360,10 @@ func (t *rhhTracker) IncGetHit() { t.incGet("hit") }
|
|||
func (t *rhhTracker) IncGetMiss() { t.incGet("miss") }
|
||||
|
||||
func (t *rhhTracker) incPut(status string) {
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
labels := t.Labels()
|
||||
labels["status"] = status
|
||||
t.metrics.Puts.With(labels).Inc()
|
||||
|
@ -357,16 +392,18 @@ func (e *hashElem) setKey(v []byte) {
|
|||
|
||||
// Options represents initialization options that are passed to NewHashMap().
|
||||
type Options struct {
|
||||
Capacity int64
|
||||
LoadFactor int
|
||||
Metrics *Metrics
|
||||
Labels prometheus.Labels
|
||||
Capacity int64
|
||||
LoadFactor int
|
||||
MetricsEnabled bool
|
||||
Metrics *Metrics
|
||||
Labels prometheus.Labels
|
||||
}
|
||||
|
||||
// DefaultOptions represents a default set of options to pass to NewHashMap().
|
||||
var DefaultOptions = Options{
|
||||
Capacity: 256,
|
||||
LoadFactor: 90,
|
||||
Capacity: 256,
|
||||
LoadFactor: 90,
|
||||
MetricsEnabled: true,
|
||||
}
|
||||
|
||||
// HashKey computes a hash of key. Hash is always non-zero.
|
||||
|
|
|
@ -44,6 +44,7 @@ type SeriesFile struct {
|
|||
// each partition decorates the same metric measurements with different
|
||||
// partition id label values.
|
||||
defaultMetricLabels prometheus.Labels
|
||||
metricsEnabled bool
|
||||
|
||||
refs sync.RWMutex // RWMutex to track references to the SeriesFile that are in use.
|
||||
|
||||
|
@ -53,10 +54,9 @@ type SeriesFile struct {
|
|||
// NewSeriesFile returns a new instance of SeriesFile.
|
||||
func NewSeriesFile(path string) *SeriesFile {
|
||||
return &SeriesFile{
|
||||
path: path,
|
||||
// partitionMetrics: newSeriesFileMetrics(nil),
|
||||
// indexMetrics: rhh.NewMetrics(namespace, seriesFileSubsystem+"_index", nil),
|
||||
Logger: zap.NewNop(),
|
||||
path: path,
|
||||
metricsEnabled: true,
|
||||
Logger: zap.NewNop(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -74,6 +74,12 @@ func (f *SeriesFile) SetDefaultMetricLabels(labels prometheus.Labels) {
|
|||
}
|
||||
}
|
||||
|
||||
// DisableMetrics ensures that activity is not collected via the prometheus metrics.
|
||||
// DisableMetrics must be called before Open.
|
||||
func (f *SeriesFile) DisableMetrics() {
|
||||
f.metricsEnabled = false
|
||||
}
|
||||
|
||||
// Open memory maps the data file at the file's path.
|
||||
func (f *SeriesFile) Open() error {
|
||||
_, logEnd := logger.NewOperation(f.Logger, "Opening Series File", "series_file_open", zap.String("path", f.path))
|
||||
|
@ -90,10 +96,10 @@ func (f *SeriesFile) Open() error {
|
|||
|
||||
// Initialise metrics for trackers.
|
||||
mmu.Lock()
|
||||
if sms == nil {
|
||||
if sms == nil && f.metricsEnabled {
|
||||
sms = newSeriesFileMetrics(f.defaultMetricLabels)
|
||||
}
|
||||
if ims == nil {
|
||||
if ims == nil && f.metricsEnabled {
|
||||
// Make a copy of the default labels so that another label can be provided.
|
||||
labels := make(prometheus.Labels, len(f.defaultMetricLabels))
|
||||
for k, v := range f.defaultMetricLabels {
|
||||
|
@ -122,9 +128,11 @@ func (f *SeriesFile) Open() error {
|
|||
|
||||
p.index.rhhMetrics = ims
|
||||
p.index.rhhLabels = labels
|
||||
p.index.rhhMetricsEnabled = f.metricsEnabled
|
||||
|
||||
// Set the metric trackers on the partition with any injected default labels.
|
||||
p.tracker = newSeriesPartitionTracker(sms, labels)
|
||||
p.tracker.enabled = f.metricsEnabled
|
||||
|
||||
if err := p.Open(); err != nil {
|
||||
f.Close()
|
||||
|
|
|
@ -46,8 +46,9 @@ type SeriesIndex struct {
|
|||
|
||||
// metrics stores a shard instance of some Prometheus metrics. metrics
|
||||
// must be set before Open is called.
|
||||
rhhMetrics *rhh.Metrics
|
||||
rhhLabels prometheus.Labels
|
||||
rhhMetrics *rhh.Metrics
|
||||
rhhLabels prometheus.Labels
|
||||
rhhMetricsEnabled bool
|
||||
|
||||
data []byte // mmap data
|
||||
keyIDData []byte // key/id mmap data
|
||||
|
@ -61,7 +62,8 @@ type SeriesIndex struct {
|
|||
|
||||
func NewSeriesIndex(path string) *SeriesIndex {
|
||||
return &SeriesIndex{
|
||||
path: path,
|
||||
path: path,
|
||||
rhhMetricsEnabled: true,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -95,6 +97,7 @@ func (idx *SeriesIndex) Open() (err error) {
|
|||
options := rhh.DefaultOptions
|
||||
options.Metrics = idx.rhhMetrics
|
||||
options.Labels = idx.rhhLabels
|
||||
options.MetricsEnabled = idx.rhhMetricsEnabled
|
||||
|
||||
idx.keyIDMap = rhh.NewHashMap(options)
|
||||
idx.idOffsetMap = make(map[SeriesID]int64)
|
||||
|
|
|
@ -561,12 +561,14 @@ func (p *SeriesPartition) seriesKeyByOffset(offset int64) []byte {
|
|||
type seriesPartitionTracker struct {
|
||||
metrics *seriesFileMetrics
|
||||
labels prometheus.Labels
|
||||
enabled bool
|
||||
}
|
||||
|
||||
func newSeriesPartitionTracker(metrics *seriesFileMetrics, defaultLabels prometheus.Labels) *seriesPartitionTracker {
|
||||
return &seriesPartitionTracker{
|
||||
metrics: metrics,
|
||||
labels: defaultLabels,
|
||||
enabled: true,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -581,36 +583,60 @@ func (t *seriesPartitionTracker) Labels() prometheus.Labels {
|
|||
|
||||
// AddSeriesCreated increases the number of series created in the partition by n.
|
||||
func (t *seriesPartitionTracker) AddSeriesCreated(n uint64) {
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
labels := t.Labels()
|
||||
t.metrics.SeriesCreated.With(labels).Add(float64(n))
|
||||
}
|
||||
|
||||
// SetSeries sets the number of series in the partition.
|
||||
func (t *seriesPartitionTracker) SetSeries(n uint64) {
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
labels := t.Labels()
|
||||
t.metrics.Series.With(labels).Set(float64(n))
|
||||
}
|
||||
|
||||
// AddSeries increases the number of series in the partition by n.
|
||||
func (t *seriesPartitionTracker) AddSeries(n uint64) {
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
labels := t.Labels()
|
||||
t.metrics.Series.With(labels).Add(float64(n))
|
||||
}
|
||||
|
||||
// SubSeries decreases the number of series in the partition by n.
|
||||
func (t *seriesPartitionTracker) SubSeries(n uint64) {
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
labels := t.Labels()
|
||||
t.metrics.Series.With(labels).Sub(float64(n))
|
||||
}
|
||||
|
||||
// SetDiskSize sets the number of bytes used by files for in partition.
|
||||
func (t *seriesPartitionTracker) SetDiskSize(sz uint64) {
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
labels := t.Labels()
|
||||
t.metrics.DiskSize.With(labels).Set(float64(sz))
|
||||
}
|
||||
|
||||
// SetSegments sets the number of segments files for the partition.
|
||||
func (t *seriesPartitionTracker) SetSegments(n uint64) {
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
labels := t.Labels()
|
||||
t.metrics.Segments.With(labels).Set(float64(n))
|
||||
}
|
||||
|
@ -618,6 +644,10 @@ func (t *seriesPartitionTracker) SetSegments(n uint64) {
|
|||
// IncCompactionsActive increments the number of active compactions for the
|
||||
// components of a partition (index and segments).
|
||||
func (t *seriesPartitionTracker) IncCompactionsActive() {
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
labels := t.Labels()
|
||||
labels["component"] = "index" // TODO(edd): when we add segment compactions we will add a new label value.
|
||||
t.metrics.CompactionsActive.With(labels).Inc()
|
||||
|
@ -626,6 +656,10 @@ func (t *seriesPartitionTracker) IncCompactionsActive() {
|
|||
// DecCompactionsActive decrements the number of active compactions for the
|
||||
// components of a partition (index and segments).
|
||||
func (t *seriesPartitionTracker) DecCompactionsActive() {
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
labels := t.Labels()
|
||||
labels["component"] = "index" // TODO(edd): when we add segment compactions we will add a new label value.
|
||||
t.metrics.CompactionsActive.With(labels).Dec()
|
||||
|
@ -634,6 +668,10 @@ func (t *seriesPartitionTracker) DecCompactionsActive() {
|
|||
// incCompactions increments the number of compactions for the partition.
|
||||
// Callers should use IncCompactionOK and IncCompactionErr.
|
||||
func (t *seriesPartitionTracker) incCompactions(status string, duration time.Duration) {
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
if duration > 0 {
|
||||
labels := t.Labels()
|
||||
labels["component"] = "index"
|
||||
|
|
Loading…
Reference in New Issue