Address PR feedback

pull/10048/head
Edd Robinson 2018-07-06 18:06:40 +01:00
parent 11bea138f8
commit ad388a8fd8
3 changed files with 57 additions and 97 deletions

View File

@ -71,50 +71,9 @@ func (cmd *Command) Run(args ...string) error {
}
cmd.Logger = logger.New(cmd.Stderr)
// Uncomment for profiling
// finish := startProfiles()
// defer finish()
return cmd.run(*dataDir, *walDir)
}
// func startProfiles() func() {
// runtime.MemProfileRate = 100 // Sample 1% of allocations.
// paths := []string{"/tmp/buildtsi.mem.pprof", "/tmp/buildtsi.cpu.pprof"}
// var files []*os.File
// for _, pth := range paths {
// f, err := os.Create(pth)
// if err != nil {
// log.Fatalf("memprofile: %v", err)
// }
// log.Printf("writing profile to: %s\n", pth)
// files = append(files, f)
// }
// closeFn := func() {
// // Write the memory profile
// if err := pprof.Lookup("heap").WriteTo(files[0], 0); err != nil {
// panic(err)
// }
// // Stop the CPU profile.
// pprof.StopCPUProfile()
// for _, fd := range files {
// if err := fd.Close(); err != nil {
// panic(err)
// }
// }
// }
// if err := pprof.StartCPUProfile(files[1]); err != nil {
// panic(err)
// }
// return closeFn
// }
func (cmd *Command) run(dataDir, walDir string) error {
// Verify the user actually wants to run as root.
if isRoot() {
@ -191,11 +150,13 @@ func (cmd *Command) processRetentionPolicy(sfile *tsdb.SeriesFile, dbName, rpNam
return err
}
var shards []struct {
type shard struct {
ID uint64
Path string
}
var shards []shard
for _, fi := range fis {
if !fi.IsDir() {
continue
@ -208,10 +169,7 @@ func (cmd *Command) processRetentionPolicy(sfile *tsdb.SeriesFile, dbName, rpNam
continue
}
shards = append(shards, struct {
ID uint64
Path string
}{shardID, fi.Name()})
shards = append(shards, shard{shardID, fi.Name()})
}
errC := make(chan error, len(shards))

View File

@ -1587,7 +1587,6 @@ func parseTags(buf []byte, dst Tags) Tags {
var i int
walkTags(buf, func(key, value []byte) bool {
dst[i].Key, dst[i].Value = key, value
// tags = append(tags, Tag{Key: key, Value: value})
i++
return true
})

View File

@ -41,6 +41,10 @@ const (
// by a bufio.Writer.
const defaultLogFileBufferSize = 4096
// indexFileBufferSize is the buffer size used when compacting the LogFile down
// into a .tsi file.
const indexFileBufferSize = 1 << 17 // 128K
// LogFile represents an on-disk write-ahead log file.
type LogFile struct {
mu sync.RWMutex
@ -289,8 +293,7 @@ func (f *LogFile) MeasurementHasSeries(ss *tsdb.SeriesIDSet, name []byte) bool {
return false
}
// TODO(edd): if mm is using a bitmap then this could be changed to do a fast
// intersection.
// TODO(edd): if mm is using a seriesSet then this could be changed to do a fast intersection.
for _, id := range mm.seriesIDs() {
if ss.Contains(id) {
return true
@ -639,7 +642,7 @@ func (f *LogFile) execDeleteMeasurementEntry(e *LogEntry) {
mm.deleted = true
mm.tagSet = make(map[string]logTagKey)
mm.series = make(map[uint64]struct{})
mm.bitmap = nil
mm.seriesSet = nil
// Update measurement tombstone sketch.
f.mTSketch.Add(e.Name)
@ -741,23 +744,23 @@ func (f *LogFile) SeriesIDIterator() tsdb.SeriesIDIterator {
defer f.mu.RUnlock()
ss := tsdb.NewSeriesIDSet()
allBitMaps := make([]*tsdb.SeriesIDSet, 0, len(f.mms))
allSeriesSets := make([]*tsdb.SeriesIDSet, 0, len(f.mms))
for _, mm := range f.mms {
if mm.bitmap != nil {
allBitMaps = append(allBitMaps, mm.bitmap)
if mm.seriesSet != nil {
allSeriesSets = append(allSeriesSets, mm.seriesSet)
continue
}
// measurement is not using bitmap to store series IDs.
// measurement is not using seriesSet to store series IDs.
mm.forEach(func(seriesID uint64) {
ss.AddNoLock(seriesID)
})
}
// Fast merge all bitmaps.
if len(allBitMaps) > 0 {
ss.Merge(allBitMaps...)
// Fast merge all seriesSets.
if len(allSeriesSets) > 0 {
ss.Merge(allSeriesSets...)
}
return tsdb.NewSeriesIDSetIterator(ss)
@ -818,7 +821,7 @@ func (f *LogFile) CompactTo(w io.Writer, m, k uint64, cancel <-chan struct{}) (n
}
// Wrap in bufferred writer with a buffer equivalent to the LogFile size.
bw := bufio.NewWriterSize(w, 1<<17) // 128K
bw := bufio.NewWriterSize(w, indexFileBufferSize) // 128K
// Setup compaction offset tracking data.
var t IndexFileTrailer
@ -1189,11 +1192,11 @@ func (mms *logMeasurements) bytes() int {
}
type logMeasurement struct {
name []byte
tagSet map[string]logTagKey
deleted bool
series map[uint64]struct{}
bitmap *tsdb.SeriesIDSet
name []byte
tagSet map[string]logTagKey
deleted bool
series map[uint64]struct{}
seriesSet *tsdb.SeriesIDSet
}
// bytes estimates the memory footprint of this logMeasurement, in bytes.
@ -1210,42 +1213,42 @@ func (m *logMeasurement) bytes() int {
}
func (m *logMeasurement) addSeriesID(x uint64) {
if m.bitmap != nil {
m.bitmap.AddNoLock(x)
if m.seriesSet != nil {
m.seriesSet.AddNoLock(x)
return
}
m.series[x] = struct{}{}
// If the map is getting too big it can be converted into a roaring bitmap.
// If the map is getting too big it can be converted into a roaring seriesSet.
if len(m.series) > 25 {
m.bitmap = tsdb.NewSeriesIDSet()
m.seriesSet = tsdb.NewSeriesIDSet()
for id := range m.series {
m.bitmap.AddNoLock(id)
m.seriesSet.AddNoLock(id)
}
m.series = nil
}
}
func (m *logMeasurement) removeSeriesID(x uint64) {
if m.bitmap != nil {
m.bitmap.RemoveNoLock(x)
if m.seriesSet != nil {
m.seriesSet.RemoveNoLock(x)
return
}
delete(m.series, x)
}
func (m *logMeasurement) cardinality() int64 {
if m.bitmap != nil {
return int64(m.bitmap.Cardinality())
if m.seriesSet != nil {
return int64(m.seriesSet.Cardinality())
}
return int64(len(m.series))
}
// forEach applies fn to every series ID in the logMeasurement.
func (m *logMeasurement) forEach(fn func(uint64)) {
if m.bitmap != nil {
m.bitmap.ForEachNoLock(fn)
if m.seriesSet != nil {
m.seriesSet.ForEachNoLock(fn)
return
}
@ -1257,8 +1260,8 @@ func (m *logMeasurement) forEach(fn func(uint64)) {
// seriesIDs returns a sorted set of seriesIDs.
func (m *logMeasurement) seriesIDs() []uint64 {
a := make([]uint64, 0, m.cardinality())
if m.bitmap != nil {
m.bitmap.ForEachNoLock(func(id uint64) { a = append(a, id) })
if m.seriesSet != nil {
m.seriesSet.ForEachNoLock(func(id uint64) { a = append(a, id) })
return a // IDs are already sorted.
}
@ -1269,11 +1272,11 @@ func (m *logMeasurement) seriesIDs() []uint64 {
return a
}
// seriesIDSet returns a copy of the logMeasurement's bitmap, or creates a new
// seriesIDSet returns a copy of the logMeasurement's seriesSet, or creates a new
// one
func (m *logMeasurement) seriesIDSet() *tsdb.SeriesIDSet {
if m.bitmap != nil {
return m.bitmap.CloneNoLock()
if m.seriesSet != nil {
return m.seriesSet.CloneNoLock()
}
ss := tsdb.NewSeriesIDSet()
@ -1370,10 +1373,10 @@ func (a logTagKeySlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a logTagKeySlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
type logTagValue struct {
name []byte
deleted bool
series map[uint64]struct{}
bitmap *tsdb.SeriesIDSet
name []byte
deleted bool
series map[uint64]struct{}
seriesSet *tsdb.SeriesIDSet
}
// bytes estimates the memory footprint of this logTagValue, in bytes.
@ -1386,34 +1389,34 @@ func (tv *logTagValue) bytes() int {
}
func (tv *logTagValue) addSeriesID(x uint64) {
if tv.bitmap != nil {
tv.bitmap.AddNoLock(x)
if tv.seriesSet != nil {
tv.seriesSet.AddNoLock(x)
return
}
tv.series[x] = struct{}{}
// If the map is getting too big it can be converted into a roaring bitmap.
// If the map is getting too big it can be converted into a roaring seriesSet.
if len(tv.series) > 25 {
tv.bitmap = tsdb.NewSeriesIDSet()
tv.seriesSet = tsdb.NewSeriesIDSet()
for id := range tv.series {
tv.bitmap.AddNoLock(id)
tv.seriesSet.AddNoLock(id)
}
tv.series = nil
}
}
func (tv *logTagValue) removeSeriesID(x uint64) {
if tv.bitmap != nil {
tv.bitmap.RemoveNoLock(x)
if tv.seriesSet != nil {
tv.seriesSet.RemoveNoLock(x)
return
}
delete(tv.series, x)
}
func (tv *logTagValue) cardinality() int64 {
if tv.bitmap != nil {
return int64(tv.bitmap.Cardinality())
if tv.seriesSet != nil {
return int64(tv.seriesSet.Cardinality())
}
return int64(len(tv.series))
}
@ -1421,8 +1424,8 @@ func (tv *logTagValue) cardinality() int64 {
// seriesIDs returns a sorted set of seriesIDs.
func (tv *logTagValue) seriesIDs() []uint64 {
a := make([]uint64, 0, tv.cardinality())
if tv.bitmap != nil {
tv.bitmap.ForEachNoLock(func(id uint64) { a = append(a, id) })
if tv.seriesSet != nil {
tv.seriesSet.ForEachNoLock(func(id uint64) { a = append(a, id) })
return a // IDs are already sorted.
}
@ -1433,11 +1436,11 @@ func (tv *logTagValue) seriesIDs() []uint64 {
return a
}
// seriesIDSet returns a copy of the logMeasurement's bitmap, or creates a new
// seriesIDSet returns a copy of the logMeasurement's seriesSet, or creates a new
// one
func (tv *logTagValue) seriesIDSet() *tsdb.SeriesIDSet {
if tv.bitmap != nil {
return tv.bitmap.CloneNoLock()
if tv.seriesSet != nil {
return tv.seriesSet.CloneNoLock()
}
ss := tsdb.NewSeriesIDSet()