Ensure shard-level cardinality is correct
parent
ef5e3a09cd
commit
b19edd55ac
|
@ -104,16 +104,6 @@ func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
|
|||
return i.seriesSketch.Clone(), i.seriesTSSketch.Clone(), nil
|
||||
}
|
||||
|
||||
// SeriesN returns the number of unique non-tombstoned series in the index.
|
||||
// Since indexes are not shared across shards, the count returned by SeriesN
|
||||
// cannot be combined with other shards' counts.
|
||||
func (i *Index) SeriesN() int64 {
|
||||
i.mu.RLock()
|
||||
n := int64(len(i.series))
|
||||
i.mu.RUnlock()
|
||||
return n
|
||||
}
|
||||
|
||||
// Measurement returns the measurement object from the index by the name
|
||||
func (i *Index) Measurement(name []byte) (*measurement, error) {
|
||||
i.mu.RLock()
|
||||
|
@ -1182,24 +1172,33 @@ func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSli
|
|||
return nil
|
||||
}
|
||||
|
||||
// InitializeSeries is called during start-up.
|
||||
// This works the same as CreateSeriesIfNotExists except it ignore limit errors.
|
||||
func (i *ShardIndex) InitializeSeries(key, name []byte, tags models.Tags) error {
|
||||
return i.Index.CreateSeriesListIfNotExists(i.id, i.seriesIDSet, [][]byte{key}, [][]byte{name}, []models.Tags{tags}, &i.opt, true)
|
||||
// SeriesN returns the number of unique non-tombstoned series local to this shard.
|
||||
func (idx *ShardIndex) SeriesN() int64 {
|
||||
idx.mu.RLock()
|
||||
defer idx.mu.RUnlock()
|
||||
return int64(idx.seriesIDSet.Cardinality())
|
||||
}
|
||||
|
||||
func (i *ShardIndex) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
|
||||
return i.Index.CreateSeriesListIfNotExists(i.id, i.seriesIDSet, [][]byte{key}, [][]byte{name}, []models.Tags{tags}, &i.opt, false)
|
||||
// InitializeSeries is called during start-up.
|
||||
// This works the same as CreateSeriesIfNotExists except it ignore limit errors.
|
||||
func (idx *ShardIndex) InitializeSeries(key, name []byte, tags models.Tags) error {
|
||||
return idx.Index.CreateSeriesListIfNotExists(idx.id, idx.seriesIDSet, [][]byte{key}, [][]byte{name}, []models.Tags{tags}, &idx.opt, true)
|
||||
}
|
||||
|
||||
// CreateSeriesIfNotExists creates the provided series on the index if it is not
|
||||
// already present.
|
||||
func (idx *ShardIndex) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
|
||||
return idx.Index.CreateSeriesListIfNotExists(idx.id, idx.seriesIDSet, [][]byte{key}, [][]byte{name}, []models.Tags{tags}, &idx.opt, false)
|
||||
}
|
||||
|
||||
// TagSets returns a list of tag sets based on series filtering.
|
||||
func (i *ShardIndex) TagSets(name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) {
|
||||
return i.Index.TagSets(i.id, name, opt)
|
||||
func (idx *ShardIndex) TagSets(name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) {
|
||||
return idx.Index.TagSets(idx.id, name, opt)
|
||||
}
|
||||
|
||||
// SeriesIDSet returns the bitset associated with the series ids.
|
||||
func (i *ShardIndex) SeriesIDSet() *tsdb.SeriesIDSet {
|
||||
return i.seriesIDSet
|
||||
func (idx *ShardIndex) SeriesIDSet() *tsdb.SeriesIDSet {
|
||||
return idx.seriesIDSet
|
||||
}
|
||||
|
||||
// NewShardIndex returns a new index for a shard.
|
||||
|
|
|
@ -610,13 +610,14 @@ func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro
|
|||
return s, ts, nil
|
||||
}
|
||||
|
||||
// SeriesN returns the number of unique non-tombstoned series in the index.
|
||||
// SeriesN returns the number of unique non-tombstoned series in this index.
|
||||
//
|
||||
// Since indexes are not shared across shards, the count returned by SeriesN
|
||||
// cannot be combined with other shard's results. If you need to count series
|
||||
// across indexes then use SeriesSketches and merge the results from other
|
||||
// indexes.
|
||||
// across indexes then use either the database-wide series file, or merge the
|
||||
// index-level bitsets or sketches.
|
||||
func (i *Index) SeriesN() int64 {
|
||||
return int64(i.sfile.SeriesCount())
|
||||
return int64(i.SeriesIDSet().Cardinality())
|
||||
}
|
||||
|
||||
// HasTagKey returns true if tag key exists. It returns the first error
|
||||
|
|
|
@ -571,6 +571,41 @@ func TestStore_BackupRestoreShard(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
func TestStore_Shard_SeriesN(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
test := func(index string) error {
|
||||
s := MustOpenStore(index)
|
||||
defer s.Close()
|
||||
|
||||
// Create shard with data.
|
||||
s.MustCreateShardWithData("db0", "rp0", 1,
|
||||
`cpu value=1 0`,
|
||||
`cpu,host=serverA value=2 10`,
|
||||
)
|
||||
|
||||
// Create 2nd shard w/ same measurements.
|
||||
s.MustCreateShardWithData("db0", "rp0", 2,
|
||||
`cpu value=1 0`,
|
||||
`cpu value=2 10`,
|
||||
)
|
||||
|
||||
if got, exp := s.Shard(1).SeriesN(), int64(2); got != exp {
|
||||
return fmt.Errorf("[shard %d] got series count of %d, but expected %d", 1, got, exp)
|
||||
} else if got, exp := s.Shard(2).SeriesN(), int64(1); got != exp {
|
||||
return fmt.Errorf("[shard %d] got series count of %d, but expected %d", 2, got, exp)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) {
|
||||
if err := test(index); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_MeasurementNames_Deduplicate(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
|
Loading…
Reference in New Issue