Use cached tags when applying series entries
parent
ab4e619f81
commit
20d429c62b
|
@ -36,13 +36,14 @@ const (
|
||||||
|
|
||||||
// LogFile represents an on-disk write-ahead log file.
|
// LogFile represents an on-disk write-ahead log file.
|
||||||
type LogFile struct {
|
type LogFile struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
wg sync.WaitGroup // ref count
|
wg sync.WaitGroup // ref count
|
||||||
id int // file sequence identifier
|
id int // file sequence identifier
|
||||||
data []byte // mmap
|
data []byte // mmap
|
||||||
file *os.File // writer
|
file *os.File // writer
|
||||||
w *bufio.Writer // buffered writer
|
w *bufio.Writer // buffered writer
|
||||||
buf []byte // marshaling buffer
|
buf []byte // marshaling buffer
|
||||||
|
keyBuf []byte
|
||||||
|
|
||||||
sfile *tsdb.SeriesFile // series lookup
|
sfile *tsdb.SeriesFile // series lookup
|
||||||
size int64 // tracks current file size
|
size int64 // tracks current file size
|
||||||
|
@ -478,7 +479,7 @@ func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tag
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
writeRequired = true
|
writeRequired = true
|
||||||
entries = append(entries, LogEntry{SeriesID: seriesIDs[i]})
|
entries = append(entries, LogEntry{SeriesID: seriesIDs[i], name: names[i], tags: tagsSlice[i], cached: true})
|
||||||
}
|
}
|
||||||
seriesSet.RUnlock()
|
seriesSet.RUnlock()
|
||||||
|
|
||||||
|
@ -607,7 +608,17 @@ func (f *LogFile) execDeleteTagValueEntry(e *LogEntry) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *LogFile) execSeriesEntry(e *LogEntry) {
|
func (f *LogFile) execSeriesEntry(e *LogEntry) {
|
||||||
seriesKey := f.sfile.SeriesKey(e.SeriesID)
|
var seriesKey []byte
|
||||||
|
if e.cached {
|
||||||
|
sz := tsdb.SeriesKeySize(e.name, e.tags)
|
||||||
|
if len(f.keyBuf) < sz {
|
||||||
|
f.keyBuf = make([]byte, 0, sz)
|
||||||
|
}
|
||||||
|
seriesKey = tsdb.AppendSeriesKey(f.keyBuf[:0], e.name, e.tags)
|
||||||
|
} else {
|
||||||
|
seriesKey = f.sfile.SeriesKey(e.SeriesID)
|
||||||
|
}
|
||||||
|
|
||||||
assert(seriesKey != nil, fmt.Sprintf("series key for ID: %d not found", e.SeriesID))
|
assert(seriesKey != nil, fmt.Sprintf("series key for ID: %d not found", e.SeriesID))
|
||||||
|
|
||||||
// Check if deleted.
|
// Check if deleted.
|
||||||
|
@ -966,6 +977,10 @@ type LogEntry struct {
|
||||||
Value []byte // tag value
|
Value []byte // tag value
|
||||||
Checksum uint32 // checksum of flag/name/tags.
|
Checksum uint32 // checksum of flag/name/tags.
|
||||||
Size int // total size of record, in bytes.
|
Size int // total size of record, in bytes.
|
||||||
|
|
||||||
|
cached bool // Hint to LogFile that series data is already parsed
|
||||||
|
name []byte // series naem, this is a cached copy of the parsed measurement name
|
||||||
|
tags models.Tags // series tags, this is a cached copied of the parsed tags
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalBinary unmarshals data into e.
|
// UnmarshalBinary unmarshals data into e.
|
||||||
|
|
Loading…
Reference in New Issue