Add TagValueSeriesIDCache.Delete().

pull/10234/head
Ben Johnson 2018-08-17 11:11:47 -06:00 committed by Edd Robinson
parent fcbc03240a
commit e651153f1c
2 changed files with 51 additions and 18 deletions

View File

@ -141,28 +141,51 @@ EVICT:
c.checkEviction()
}
// Delete removes x from the tuple {name, key, value} if it exists.
// This method takes a lock on the underlying SeriesIDSet.
func (c *TagValueSeriesIDCache) Delete(name, key, value []byte, x uint64) {
c.Lock()
c.delete(name, key, value, x)
c.Unlock()
}
// delete removes x from the tuple {name, key, value} if it exists.
func (c *TagValueSeriesIDCache) delete(name, key, value []byte, x uint64) {
if mmap, ok := c.cache[string(name)]; ok {
if tkmap, ok := mmap[string(key)]; ok {
if ele, ok := tkmap[string(value)]; ok {
if ss := ele.Value.(*seriesIDCacheElement).SeriesIDSet; ss != nil {
ele.Value.(*seriesIDCacheElement).SeriesIDSet.Remove(x)
}
}
}
}
}
// checkEviction checks if the cache is too big, and evicts the least recently used
// item if it is.
func (c *TagValueSeriesIDCache) checkEviction() {
if c.evictor.Len() > c.capacity {
e := c.evictor.Back() // Least recently used item.
listElement := e.Value.(*seriesIDCacheElement)
name := listElement.name
key := listElement.key
value := listElement.value
if c.evictor.Len() <= c.capacity {
return
}
c.evictor.Remove(e) // Remove from evictor
delete(c.cache[string(name)][string(key)], string(value)) // Remove from hashmap of items.
e := c.evictor.Back() // Least recently used item.
listElement := e.Value.(*seriesIDCacheElement)
name := listElement.name
key := listElement.key
value := listElement.value
// Check if there are no more tag values for the tag key.
if len(c.cache[string(name)][string(key)]) == 0 {
delete(c.cache[string(name)], string(key))
}
c.evictor.Remove(e) // Remove from evictor
delete(c.cache[string(name)][string(key)], string(value)) // Remove from hashmap of items.
// Check there are no more tag keys for the measurement.
if len(c.cache[string(name)]) == 0 {
delete(c.cache, string(name))
}
// Check if there are no more tag values for the tag key.
if len(c.cache[string(name)][string(key)]) == 0 {
delete(c.cache[string(name)], string(key))
}
// Check there are no more tag keys for the measurement.
if len(c.cache[string(name)]) == 0 {
delete(c.cache, string(name))
}
}

View File

@ -789,8 +789,18 @@ func (i *Index) DropSeries(seriesID uint64, key []byte, cascade bool) error {
return nil
}
// Extract measurement name.
name, _ := models.ParseKeyBytes(key)
// Extract measurement name & tags.
name, tags := models.ParseKeyBytes(key)
// If there are cached sets for any of the tag pairs, they will need to be
// updated with the series id.
i.tagValueCache.RLock()
if i.tagValueCache.measurementContainsSets(name) {
for _, pair := range tags {
i.tagValueCache.delete(name, pair.Key, pair.Value, seriesID) // Takes a lock on the series id set
}
}
i.tagValueCache.RUnlock()
// Check if that was the last series for the measurement in the entire index.
if ok, err := i.MeasurementHasSeries(name); err != nil {