feat: Ingress metrics by measurement

Partial implementation of https://github.com/influxdata/influxdb/issues/20612

Implements per-measurement points written metric. Next step: Also support per-login.
pull/20677/head
Sam Arnold 2021-02-02 15:10:41 -05:00
parent 7a9a0ec1bf
commit eb92c997cd
10 changed files with 255 additions and 61 deletions

View File

@ -107,6 +107,9 @@ type Config struct {
CompactThroughput toml.Size `toml:"compact-throughput"`
CompactThroughputBurst toml.Size `toml:"compact-throughput-burst"`
// Options for ingress metrics
IngressMetricByMeasurement bool `toml:"ingress-metric-by-measurement-enabled"`
// Limits
// MaxSeriesPerDatabase is the maximum number of series a node can hold per database.

View File

@ -1293,6 +1293,7 @@ func (e *Engine) WritePoints(points []models.Point, tracker tsdb.StatsTracker) e
t := p.Time().UnixNano()
npoints++
var nValuesForPoint int64
for iter.Next() {
// Skip fields name "time", they are illegal
if bytes.Equal(iter.FieldKey(), timeBytes) {
@ -1363,9 +1364,13 @@ func (e *Engine) WritePoints(points []models.Point, tracker tsdb.StatsTracker) e
return fmt.Errorf("unknown field type for %s: %s", string(iter.FieldKey()), p.String())
}
nvalues++
nValuesForPoint++
values[string(keyBuf)] = append(values[string(keyBuf)], v)
}
nvalues += nValuesForPoint
if tracker.AddedMeasurementPoints != nil {
tracker.AddedMeasurementPoints(models.ParseName(keyBuf), 1, nValuesForPoint)
}
}
e.mu.RLock()
@ -1382,8 +1387,8 @@ func (e *Engine) WritePoints(points []models.Point, tracker tsdb.StatsTracker) e
}
}
if tracker != nil {
tracker(npoints, nvalues)
if tracker.AddedPoints != nil {
tracker.AddedPoints(npoints, nvalues)
}
return seriesErr
}

View File

@ -50,7 +50,7 @@ func TestEngine_ConcurrentShardSnapshots(t *testing.T) {
time.Unix(int64(i), 0),
))
}
err = sh.WritePoints(points, nil)
err = sh.WritePoints(points, tsdb.NoopStatsTracker())
if err != nil {
t.Fatalf(err.Error())
}

View File

@ -188,7 +188,7 @@ func TestEngine_Digest(t *testing.T) {
MustParsePointString("cpu,host=B value=1.2 2000000000"),
}
if err := e.WritePoints(points, nil); err != nil {
if err := e.WritePoints(points, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
@ -284,7 +284,7 @@ func TestEngine_Digest(t *testing.T) {
MustParsePointString("cpu,host=C value=1.1 3000000000"),
}
if err := e.WritePoints(points, nil); err != nil {
if err := e.WritePoints(points, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
@ -336,7 +336,7 @@ func TestEngine_Digest_Concurrent(t *testing.T) {
MustParsePointString("cpu,host=B value=1.2 2000000000"),
}
if err := e.WritePoints(points, nil); err != nil {
if err := e.WritePoints(points, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
@ -406,14 +406,14 @@ func TestEngine_Backup(t *testing.T) {
t.Fatalf("failed to open tsm1 engine: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p1}, nil); err != nil {
if err := e.WritePoints([]models.Point{p1}, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("failed to snapshot: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p2}, nil); err != nil {
if err := e.WritePoints([]models.Point{p2}, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
@ -459,7 +459,7 @@ func TestEngine_Backup(t *testing.T) {
// so this test won't work properly unless the file is at least a second past the last one
time.Sleep(time.Second)
if err := e.WritePoints([]models.Point{p3}, nil); err != nil {
if err := e.WritePoints([]models.Point{p3}, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
@ -513,21 +513,21 @@ func TestEngine_Export(t *testing.T) {
t.Fatalf("failed to open tsm1 engine: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p1}, nil); err != nil {
if err := e.WritePoints([]models.Point{p1}, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("failed to snapshot: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p2}, nil); err != nil {
if err := e.WritePoints([]models.Point{p2}, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("failed to snapshot: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p3}, nil); err != nil {
if err := e.WritePoints([]models.Point{p3}, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
@ -1275,7 +1275,7 @@ func TestEngine_DeleteSeriesRange(t *testing.T) {
}
}
if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}, nil); err != nil {
if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WriteSnapshot(); err != nil {
@ -1385,7 +1385,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate(t *testing.T) {
}
}
if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}, nil); err != nil {
if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WriteSnapshot(); err != nil {
@ -1511,7 +1511,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate_Nil(t *testing.T) {
}
}
if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}, nil); err != nil {
if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WriteSnapshot(); err != nil {
@ -1597,7 +1597,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate_FlushBatch(t *testing.T) {
}
}
if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}, nil); err != nil {
if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WriteSnapshot(); err != nil {
@ -1716,7 +1716,7 @@ func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) {
}
}
if err := e.WritePoints([]models.Point{p1}, nil); err != nil {
if err := e.WritePoints([]models.Point{p1}, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WriteSnapshot(); err != nil {
@ -2185,15 +2185,31 @@ func TestEngine_WritePointsWithStats(t *testing.T) {
e := MustOpenEngine(index)
var numPoints, numValues int64
tracker := func(points, values int64) {
var tracker tsdb.StatsTracker
tracker.AddedPoints = func(points, values int64) {
numPoints += points
numValues += values
}
var mPoints, mValues int64
var wrongMeasurement *string
tracker.AddedMeasurementPoints = func(measurement []byte, points, values int64) {
if string(measurement) != "cpu" {
wrongMeasurement = new(string)
*wrongMeasurement = string(measurement)
}
mPoints += points
mValues += values
}
if err := e.WritePoints(points, tracker); err != nil {
t.Fatalf("failed to write points: %v", err)
}
if wrongMeasurement != nil {
t.Fatalf("Expected only to have cpu measurements, got %s", string(*wrongMeasurement))
}
if got, expected := numPoints, expectedPoints; got != expected {
t.Fatalf("Expected stats to return %d points; got %d", expected, got)
}
@ -2201,6 +2217,14 @@ func TestEngine_WritePointsWithStats(t *testing.T) {
if got, expected := numValues, expectedValues; got != expected {
t.Fatalf("Expected stats to return %d points; got %d", expected, got)
}
if got, expected := mPoints, expectedPoints; got != expected {
t.Fatalf("Expected stats to return %d points; got %d", expected, got)
}
if got, expected := mValues, expectedValues; got != expected {
t.Fatalf("Expected stats to return %d points; got %d", expected, got)
}
})
}
}
@ -2307,7 +2331,7 @@ func TestEngine_Invalid_UTF8(t *testing.T) {
t.Fatalf("create series index error: %v", err)
}
if err := e.WritePoints([]models.Point{p}, nil); err != nil {
if err := e.WritePoints([]models.Point{p}, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
@ -2333,7 +2357,7 @@ func BenchmarkEngine_WritePoints(b *testing.B) {
b.Run(fmt.Sprintf("%s_%d", index, sz), func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
err := e.WritePoints(pp, nil)
err := e.WritePoints(pp, tsdb.NoopStatsTracker())
if err != nil {
b.Fatal(err)
}
@ -2368,7 +2392,7 @@ func BenchmarkEngine_WritePoints_Parallel(b *testing.B) {
go func(i int) {
defer wg.Done()
from, to := i*sz, (i+1)*sz
err := e.WritePoints(pp[from:to], nil)
err := e.WritePoints(pp[from:to], tsdb.NoopStatsTracker())
if err != nil {
errC <- err
return
@ -2784,7 +2808,7 @@ func (e *Engine) writePoints(points ...models.Point) error {
}
}
// Write the points into the cache/wal.
return e.WritePoints(points, nil)
return e.WritePoints(points, tsdb.NoopStatsTracker())
}
// MustAddSeries calls AddSeries, panicking if there is an error.

64
tsdb/ingress_metrics.go Normal file
View File

@ -0,0 +1,64 @@
package tsdb
import (
"sort"
"sync"
"sync/atomic"
)
type MetricKey struct {
measurement string
db string
rp string
// TODO: add login
}
type MetricValue struct {
points int64
values int64
}
type IngressMetrics struct {
m sync.Map
length int64
}
func (i *IngressMetrics) AddMetric(measurement, db, rp string, points, values int64) {
key := MetricKey{measurement, db, rp}
val, ok := i.m.Load(key)
if !ok {
var loaded bool
val, loaded = i.m.LoadOrStore(key, &MetricValue{})
if !loaded {
atomic.AddInt64(&i.length, 1)
}
}
metricVal := val.(*MetricValue)
atomic.AddInt64(&metricVal.points, points)
atomic.AddInt64(&metricVal.values, values)
}
func (i *IngressMetrics) ForEach(f func(m MetricKey, points, values int64)) {
keys := make([]MetricKey, 0, atomic.LoadInt64(&i.length))
i.m.Range(func(key, value interface{}) bool {
keys = append(keys, key.(MetricKey))
return true
})
sort.Slice(keys, func(i, j int) bool {
if keys[i].db != keys[j].db {
return keys[i].db < keys[j].db
}
if keys[i].rp != keys[j].rp {
return keys[i].rp < keys[j].rp
}
return keys[i].measurement < keys[j].measurement
//TODO: sort based on login
})
for _, key := range keys {
val, ok := i.m.Load(key)
// ok should always be true - we don't delete keys. But if we did we would ignore keys concurrently deleted.
if ok {
f(key, val.(*MetricValue).points, val.(*MetricValue).values)
}
}
}

View File

@ -0,0 +1,52 @@
package tsdb
import (
"sync"
"testing"
assert2 "github.com/stretchr/testify/assert"
)
func TestIngressMetrics(t *testing.T) {
assert := assert2.New(t)
ingress := &IngressMetrics{}
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
ingress.AddMetric("cpu", "telegraf", "autogen", 1, 10)
wg.Done()
}()
}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
ingress.AddMetric("mem", "telegraf", "autogen", 2, 20)
wg.Done()
}()
}
wg.Wait()
metric := 0
ingress.ForEach(func(m MetricKey, points, values int64) {
if metric == 0 {
assert.Equal(MetricKey{
measurement: "cpu",
db: "telegraf",
rp: "autogen",
}, m)
assert.Equal(int64(10), points)
assert.Equal(int64(100), values)
} else if metric == 1 {
assert.Equal(MetricKey{
measurement: "mem",
db: "telegraf",
rp: "autogen",
}, m)
assert.Equal(int64(20), points)
assert.Equal(int64(200), values)
}
metric++
})
assert.Equal(2, metric)
}

View File

@ -496,7 +496,14 @@ type FieldCreate struct {
Field *Field
}
type StatsTracker func(points, values int64)
type StatsTracker struct {
AddedPoints func(points, values int64)
AddedMeasurementPoints func(measurement []byte, points, values int64)
}
func NoopStatsTracker() StatsTracker {
return StatsTracker{}
}
// WritePoints() will write the raw data points and any new metadata
// to the index in the shard.
@ -528,10 +535,11 @@ func (s *Shard) WritePoints(points []models.Point, tracker StatsTracker) error {
return err
}
engineTracker := func(points, values int64) {
if tracker != nil {
engineTracker := tracker
engineTracker.AddedPoints = func(points, values int64) {
if tracker.AddedPoints != nil {
// notify outer tracker (e.g. http service)
tracker(points, values)
tracker.AddedPoints(points, values)
}
atomic.AddInt64(&s.stats.WritePointsOK, points)
atomic.AddInt64(&s.stats.WriteValuesOK, values)

View File

@ -262,7 +262,7 @@ func (sh *TempShard) MustWritePointsString(s string) {
panic(err)
}
if err := sh.WritePoints(a, nil); err != nil {
if err := sh.WritePoints(a, NoopStatsTracker()); err != nil {
panic(err)
}
}

View File

@ -49,7 +49,7 @@ func TestShardWriteAndIndex(t *testing.T) {
// Calling WritePoints when the engine is not open will return
// ErrEngineClosed.
if got, exp := sh.WritePoints(nil, nil), tsdb.ErrEngineClosed; got != exp {
if got, exp := sh.WritePoints(nil, tsdb.NoopStatsTracker()), tsdb.ErrEngineClosed; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
@ -64,13 +64,13 @@ func TestShardWriteAndIndex(t *testing.T) {
time.Unix(1, 2),
)
err := sh.WritePoints([]models.Point{pt}, nil)
err := sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
if err != nil {
t.Fatalf(err.Error())
}
pt.SetTime(time.Unix(2, 3))
err = sh.WritePoints([]models.Point{pt}, nil)
err = sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
if err != nil {
t.Fatalf(err.Error())
}
@ -96,7 +96,7 @@ func TestShardWriteAndIndex(t *testing.T) {
// and ensure that we can still write data
pt.SetTime(time.Unix(2, 6))
err = sh.WritePoints([]models.Point{pt}, nil)
err = sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
if err != nil {
t.Fatalf(err.Error())
}
@ -119,7 +119,7 @@ func TestShard_Open_CorruptFieldsIndex(t *testing.T) {
// Calling WritePoints when the engine is not open will return
// ErrEngineClosed.
if got, exp := sh.WritePoints(nil, nil), tsdb.ErrEngineClosed; got != exp {
if got, exp := sh.WritePoints(nil, tsdb.NoopStatsTracker()), tsdb.ErrEngineClosed; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
@ -134,7 +134,7 @@ func TestShard_Open_CorruptFieldsIndex(t *testing.T) {
time.Unix(1, 2),
)
err := sh.WritePoints([]models.Point{pt}, nil)
err := sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
if err != nil {
t.Fatalf(err.Error())
}
@ -186,7 +186,7 @@ func TestMaxSeriesLimit(t *testing.T) {
points = append(points, pt)
}
err := sh.WritePoints(points, nil)
err := sh.WritePoints(points, tsdb.NoopStatsTracker())
if err != nil {
t.Fatalf(err.Error())
}
@ -199,7 +199,7 @@ func TestMaxSeriesLimit(t *testing.T) {
time.Unix(1, 2),
)
err = sh.WritePoints([]models.Point{pt}, nil)
err = sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
if err == nil {
t.Fatal("expected error")
} else if exp, got := `partial write: max-series-per-database limit exceeded: (1000) dropped=1`, err.Error(); exp != got {
@ -242,7 +242,7 @@ func TestShard_MaxTagValuesLimit(t *testing.T) {
points = append(points, pt)
}
err := sh.WritePoints(points, nil)
err := sh.WritePoints(points, tsdb.NoopStatsTracker())
if err != nil {
t.Fatalf(err.Error())
}
@ -255,7 +255,7 @@ func TestShard_MaxTagValuesLimit(t *testing.T) {
time.Unix(1, 2),
)
err = sh.WritePoints([]models.Point{pt}, nil)
err = sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
if err == nil {
t.Fatal("expected error")
} else if exp, got := `partial write: max-values-per-tag limit exceeded (1000/1000): measurement="cpu" tag="host" value="server9999" dropped=1`, err.Error(); exp != got {
@ -291,7 +291,7 @@ func TestWriteTimeTag(t *testing.T) {
time.Unix(1, 2),
)
if err := sh.WritePoints([]models.Point{pt}, nil); err == nil {
if err := sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker()); err == nil {
t.Fatal("expected error: got nil")
}
@ -302,7 +302,7 @@ func TestWriteTimeTag(t *testing.T) {
time.Unix(1, 2),
)
if err := sh.WritePoints([]models.Point{pt}, nil); err != nil {
if err := sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -342,7 +342,7 @@ func TestWriteTimeField(t *testing.T) {
time.Unix(1, 2),
)
if err := sh.WritePoints([]models.Point{pt}, nil); err == nil {
if err := sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker()); err == nil {
t.Fatal("expected error: got nil")
}
@ -378,7 +378,7 @@ func TestShardWriteAddNewField(t *testing.T) {
time.Unix(1, 2),
)
err := sh.WritePoints([]models.Point{pt}, nil)
err := sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
if err != nil {
t.Fatalf(err.Error())
}
@ -390,7 +390,7 @@ func TestShardWriteAddNewField(t *testing.T) {
time.Unix(1, 2),
)
err = sh.WritePoints([]models.Point{pt}, nil)
err = sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
if err != nil {
t.Fatalf(err.Error())
}
@ -455,7 +455,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
return
}
_ = sh.WritePoints(points[:500], nil)
_ = sh.WritePoints(points[:500], tsdb.NoopStatsTracker())
if f, err := sh.CreateSnapshot(false); err == nil {
os.RemoveAll(f)
}
@ -471,7 +471,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
return
}
_ = sh.WritePoints(points[500:], nil)
_ = sh.WritePoints(points[500:], tsdb.NoopStatsTracker())
if f, err := sh.CreateSnapshot(false); err == nil {
os.RemoveAll(f)
}
@ -543,7 +543,7 @@ func TestShard_WritePoints_FieldConflictConcurrentQuery(t *testing.T) {
errC <- err
}
sh.WritePoints(points, nil)
sh.WritePoints(points, tsdb.NoopStatsTracker())
m := &influxql.Measurement{Name: "cpu"}
iter, err := sh.CreateIterator(context.Background(), m, query.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
@ -603,7 +603,7 @@ func TestShard_WritePoints_FieldConflictConcurrentQuery(t *testing.T) {
errC <- err
}
sh.WritePoints(points, nil)
sh.WritePoints(points, tsdb.NoopStatsTracker())
m := &influxql.Measurement{Name: "cpu"}
iter, err := sh.CreateIterator(context.Background(), m, query.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
@ -670,7 +670,7 @@ func TestShard_Close_RemoveIndex(t *testing.T) {
time.Unix(1, 2),
)
err := sh.WritePoints([]models.Point{pt}, nil)
err := sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
if err != nil {
t.Fatalf(err.Error())
}
@ -1021,7 +1021,7 @@ func TestShard_Disabled_WriteQuery(t *testing.T) {
time.Unix(1, 2),
)
err := sh.WritePoints([]models.Point{pt}, nil)
err := sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
if err == nil {
t.Fatalf("expected shard disabled error")
}
@ -1039,7 +1039,7 @@ func TestShard_Disabled_WriteQuery(t *testing.T) {
sh.SetEnabled(true)
err = sh.WritePoints([]models.Point{pt}, nil)
err = sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -1070,7 +1070,7 @@ func TestShard_Closed_Functions(t *testing.T) {
time.Unix(1, 2),
)
if err := sh.WritePoints([]models.Point{pt}, nil); err != nil {
if err := sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -2094,7 +2094,7 @@ func benchmarkWritePointsExistingSeriesEqualBatches(b *testing.B, mCnt, tkCnt, t
}
b.StartTimer()
shard.WritePoints(points[start:end], nil)
shard.WritePoints(points[start:end], tsdb.NoopStatsTracker())
b.StopTimer()
start = end
@ -2128,7 +2128,7 @@ func BenchmarkCreateIterator(b *testing.B) {
setup := func(index string, shards Shards) {
// Write all the points to all the shards.
for _, sh := range shards {
if err := sh.WritePoints(points, nil); err != nil {
if err := sh.WritePoints(points, tsdb.NoopStatsTracker()); err != nil {
b.Fatal(err)
}
}
@ -2196,7 +2196,7 @@ func chunkedWrite(shard *tsdb.Shard, points []models.Point) {
break
}
shard.WritePoints(points[start:end], nil)
shard.WritePoints(points[start:end], tsdb.NoopStatsTracker())
start = end
end += chunkSz
}
@ -2335,7 +2335,7 @@ func (sh *Shard) MustWritePointsString(s string) {
panic(err)
}
if err := sh.WritePoints(a, nil); err != nil {
if err := sh.WritePoints(a, tsdb.NoopStatsTracker()); err != nil {
panic(err)
}
}

View File

@ -101,7 +101,8 @@ type Store struct {
epochs map[uint64]*epochTracker
// Statistics for the store
stats StoreStatistics
stats StoreStatistics
ingressMetrics IngressMetrics
EngineOptions EngineOptions
@ -185,6 +186,29 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic {
statValuesWritten: atomic.LoadInt64(&s.stats.ValuesWritten),
},
})
ingressTags := func(key MetricKey, tags map[string]string) map[string]string {
newTags := make(map[string]string, 2)
if s.EngineOptions.Config.IngressMetricByMeasurement {
newTags["measurement"] = key.measurement
newTags["db"] = key.db
newTags["rp"] = key.rp
}
// TODO: add login
return models.StatisticTags(newTags).Merge(tags)
}
s.ingressMetrics.ForEach(func(key MetricKey, points, values int64) {
statistics = append(statistics, models.Statistic{
Name: "ingress",
Tags: ingressTags(key, tags),
Values: map[string]interface{}{
statPointsWritten: points,
statValuesWritten: values,
},
})
})
return statistics
}
@ -1409,6 +1433,24 @@ func (s *Store) ExpandSources(sources influxql.Sources) (influxql.Sources, error
return shards.ExpandSources(sources)
}
func (s *Store) statsTracker(db, rp string) StatsTracker {
var tracker StatsTracker
// TODO: add login
if s.EngineOptions.Config.IngressMetricByMeasurement {
tracker.AddedMeasurementPoints = func(measurement []byte, points, values int64) {
atomic.AddInt64(&s.stats.ValuesWritten, values)
atomic.AddInt64(&s.stats.PointsWritten, points)
s.ingressMetrics.AddMetric(string(measurement), db, rp, points, values)
}
} else {
tracker.AddedPoints = func(points, values int64) {
atomic.AddInt64(&s.stats.ValuesWritten, values)
atomic.AddInt64(&s.stats.PointsWritten, points)
}
}
return tracker
}
// WriteToShard writes a list of points to a shard identified by its ID.
func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
s.mu.RLock()
@ -1447,11 +1489,7 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
sh.SetCompactionsEnabled(true)
}
tracker := func(points, values int64) {
atomic.AddInt64(&s.stats.ValuesWritten, values)
atomic.AddInt64(&s.stats.PointsWritten, points)
}
return sh.WritePoints(points, tracker)
return sh.WritePoints(points, s.statsTracker(sh.database, sh.retentionPolicy))
}
// MeasurementNames returns a slice of all measurements. Measurements accepts an