package tsi1 import ( "bufio" "bytes" "encoding/binary" "errors" "fmt" "hash/crc32" "io" "os" "sort" "sync" "time" "unsafe" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/bloom" "github.com/influxdata/influxdb/pkg/lifecycle" "github.com/influxdata/influxdb/pkg/mmap" "github.com/influxdata/influxdb/tsdb" ) // Log errors. var ( ErrLogEntryChecksumMismatch = errors.New("log entry checksum mismatch") ) // Log entry flag constants. const ( LogEntrySeriesTombstoneFlag = 0x01 LogEntryMeasurementTombstoneFlag = 0x02 LogEntryTagKeyTombstoneFlag = 0x04 LogEntryTagValueTombstoneFlag = 0x08 ) // defaultLogFileBufferSize describes the size of the buffer that the LogFile's buffered // writer uses. If the LogFile does not have an explicit buffer size set then // this is the size of the buffer; it is equal to the default buffer size used // by a bufio.Writer. const defaultLogFileBufferSize = 4096 // indexFileBufferSize is the buffer size used when compacting the LogFile down // into a .tsi file. const indexFileBufferSize = 1 << 17 // 128K // LogFile represents an on-disk write-ahead log file. type LogFile struct { mu sync.RWMutex res lifecycle.Resource id int // file sequence identifier data []byte // mmap file *os.File // writer w *bufio.Writer // buffered writer bufferSize int // The size of the buffer used by the buffered writer nosync bool // Disables buffer flushing and file syncing. Useful for offline tooling. buf []byte // marshaling buffer keyBuf []byte sfile *tsdb.SeriesFile // series lookup sfileref *lifecycle.Reference size int64 // tracks current file size modTime time.Time // tracks last time write occurred // In-memory series existence/tombstone sets. seriesIDSet, tombstoneSeriesIDSet *tsdb.SeriesIDSet // In-memory index. mms logMeasurements // In-memory stats stats MeasurementCardinalityStats // Filepath to the log file. path string } // NewLogFile returns a new instance of LogFile. func NewLogFile(sfile *tsdb.SeriesFile, path string) *LogFile { return &LogFile{ sfile: sfile, path: path, mms: make(logMeasurements), stats: make(MeasurementCardinalityStats), seriesIDSet: tsdb.NewSeriesIDSet(), tombstoneSeriesIDSet: tsdb.NewSeriesIDSet(), } } // bytes estimates the memory footprint of this LogFile, in bytes. func (f *LogFile) bytes() int { var b int b += 24 // mu RWMutex is 24 bytes b += int(unsafe.Sizeof(f.res)) b += int(unsafe.Sizeof(f.id)) // Do not count f.data contents because it is mmap'd b += int(unsafe.Sizeof(f.data)) b += int(unsafe.Sizeof(f.file)) b += int(unsafe.Sizeof(f.w)) b += int(unsafe.Sizeof(f.bufferSize)) b += int(unsafe.Sizeof(f.nosync)) // TODO(jacobmarble): Uncomment when we are using go >= 1.10.0 //b += f.w.Size() b += int(unsafe.Sizeof(f.buf)) + len(f.buf) b += int(unsafe.Sizeof(f.keyBuf)) + len(f.keyBuf) b += int(unsafe.Sizeof(f.sfile)) b += int(unsafe.Sizeof(f.sfileref)) b += int(unsafe.Sizeof(f.size)) b += int(unsafe.Sizeof(f.modTime)) b += int(unsafe.Sizeof(f.seriesIDSet)) + f.seriesIDSet.Bytes() b += int(unsafe.Sizeof(f.tombstoneSeriesIDSet)) + f.tombstoneSeriesIDSet.Bytes() b += int(unsafe.Sizeof(f.mms)) + f.mms.bytes() b += int(unsafe.Sizeof(f.stats)) b += int(unsafe.Sizeof(f.path)) + len(f.path) return b } // Open reads the log from a file and validates all the checksums. func (f *LogFile) Open() error { if err := f.open(); err != nil { f.Close() return err } return nil } func (f *LogFile) open() (err error) { // Attempt to acquire a reference to the series file. f.sfileref, err = f.sfile.Acquire() if err != nil { return err } f.id, _ = ParseFilename(f.path) // Open file for appending. file, err := os.OpenFile(f.Path(), os.O_WRONLY|os.O_CREATE, 0666) if err != nil { return err } f.file = file if f.bufferSize == 0 { f.bufferSize = defaultLogFileBufferSize } f.w = bufio.NewWriterSize(f.file, f.bufferSize) // Finish opening if file is empty. fi, err := file.Stat() if err != nil { return err } else if fi.Size() == 0 { f.res.Open() return nil } f.size = fi.Size() f.modTime = fi.ModTime() // Open a read-only memory map of the existing data. data, err := mmap.Map(f.Path(), 0) if err != nil { return err } f.data = data // Read log entries from mmap. var n int64 for buf := f.data; len(buf) > 0; { // Read next entry. Truncate partial writes. var e LogEntry if err := e.UnmarshalBinary(buf); err == io.ErrShortBuffer || err == ErrLogEntryChecksumMismatch { break } else if err != nil { return err } // Execute entry against in-memory index. f.execEntry(&e) // Move buffer forward. n += int64(e.Size) buf = buf[e.Size:] } // Move to the end of the file. f.size = n _, err = file.Seek(n, io.SeekStart) if err != nil { return err } // The resource is now open. f.res.Open() return nil } // Close shuts down the file handle and mmap. func (f *LogFile) Close() error { // Wait until the file has no more references. f.res.Close() if f.sfileref != nil { f.sfileref.Release() f.sfileref = nil } if f.w != nil { f.w.Flush() f.w = nil } if f.file != nil { f.file.Close() f.file = nil } if f.data != nil { mmap.Unmap(f.data) } f.mms = make(logMeasurements) return nil } // FlushAndSync flushes buffered data to disk and then fsyncs the underlying file. // If the LogFile has disabled flushing and syncing then FlushAndSync is a no-op. func (f *LogFile) FlushAndSync() error { if f.nosync { return nil } if f.w != nil { if err := f.w.Flush(); err != nil { return err } } if f.file == nil { return nil } return f.file.Sync() } // ID returns the file sequence identifier. func (f *LogFile) ID() int { return f.id } // Path returns the file path. func (f *LogFile) Path() string { return f.path } // SetPath sets the log file's path. func (f *LogFile) SetPath(path string) { f.path = path } // Level returns the log level of the file. func (f *LogFile) Level() int { return 0 } // Filter returns the bloom filter for the file. func (f *LogFile) Filter() *bloom.Filter { return nil } // Acquire adds a reference count to the file. func (f *LogFile) Acquire() (*lifecycle.Reference, error) { return f.res.Acquire() } // Stat returns size and last modification time of the file. func (f *LogFile) Stat() (int64, time.Time) { f.mu.RLock() size, modTime := f.size, f.modTime f.mu.RUnlock() return size, modTime } // SeriesIDSet returns the series existence set. func (f *LogFile) SeriesIDSet() (*tsdb.SeriesIDSet, error) { return f.seriesIDSet, nil } // TombstoneSeriesIDSet returns the series tombstone set. func (f *LogFile) TombstoneSeriesIDSet() (*tsdb.SeriesIDSet, error) { return f.tombstoneSeriesIDSet, nil } // Size returns the size of the file, in bytes. func (f *LogFile) Size() int64 { f.mu.RLock() v := f.size f.mu.RUnlock() return v } // Measurement returns a measurement element. func (f *LogFile) Measurement(name []byte) MeasurementElem { f.mu.RLock() defer f.mu.RUnlock() mm, ok := f.mms[string(name)] if !ok { return nil } return mm } func (f *LogFile) MeasurementHasSeries(ss *tsdb.SeriesIDSet, name []byte) bool { f.mu.RLock() defer f.mu.RUnlock() mm, ok := f.mms[string(name)] if !ok { return false } // TODO(edd): if mm is using a seriesSet then this could be changed to do a fast intersection. for _, id := range mm.seriesIDs() { if ss.Contains(id) { return true } } return false } // MeasurementNames returns an ordered list of measurement names. func (f *LogFile) MeasurementNames() []string { f.mu.RLock() defer f.mu.RUnlock() return f.measurementNames() } func (f *LogFile) measurementNames() []string { a := make([]string, 0, len(f.mms)) for name := range f.mms { a = append(a, name) } sort.Strings(a) return a } // DeleteMeasurement adds a tombstone for a measurement to the log file. func (f *LogFile) DeleteMeasurement(name []byte) error { f.mu.Lock() defer f.mu.Unlock() e := LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name} if err := f.appendEntry(&e); err != nil { return err } f.execEntry(&e) // Flush buffer and sync to disk. return f.FlushAndSync() } // TagKeySeriesIDIterator returns a series iterator for a tag key. func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator { f.mu.RLock() defer f.mu.RUnlock() mm, ok := f.mms[string(name)] if !ok { return nil } tk, ok := mm.tagSet[string(key)] if !ok { return nil } // Combine iterators across all tag keys. itrs := make([]tsdb.SeriesIDIterator, 0, len(tk.tagValues)) for _, tv := range tk.tagValues { if tv.cardinality() == 0 { continue } itrs = append(itrs, tsdb.NewSeriesIDSetIterator(tv.seriesIDSet())) } return tsdb.MergeSeriesIDIterators(itrs...) } // TagKeyIterator returns a value iterator for a measurement. func (f *LogFile) TagKeyIterator(name []byte) TagKeyIterator { f.mu.RLock() defer f.mu.RUnlock() mm, ok := f.mms[string(name)] if !ok { return nil } a := make([]logTagKey, 0, len(mm.tagSet)) for _, k := range mm.tagSet { a = append(a, k) } return newLogTagKeyIterator(a) } // TagKey returns a tag key element. func (f *LogFile) TagKey(name, key []byte) TagKeyElem { f.mu.RLock() defer f.mu.RUnlock() mm, ok := f.mms[string(name)] if !ok { return nil } tk, ok := mm.tagSet[string(key)] if !ok { return nil } return &tk } // TagValue returns a tag value element. func (f *LogFile) TagValue(name, key, value []byte) TagValueElem { f.mu.RLock() defer f.mu.RUnlock() mm, ok := f.mms[string(name)] if !ok { return nil } tk, ok := mm.tagSet[string(key)] if !ok { return nil } tv, ok := tk.tagValues[string(value)] if !ok { return nil } return &tv } // TagValueIterator returns a value iterator for a tag key. func (f *LogFile) TagValueIterator(name, key []byte) TagValueIterator { f.mu.RLock() defer f.mu.RUnlock() mm, ok := f.mms[string(name)] if !ok { return nil } tk, ok := mm.tagSet[string(key)] if !ok { return nil } return tk.TagValueIterator() } // DeleteTagKey adds a tombstone for a tag key to the log file. func (f *LogFile) DeleteTagKey(name, key []byte) error { f.mu.Lock() defer f.mu.Unlock() e := LogEntry{Flag: LogEntryTagKeyTombstoneFlag, Name: name, Key: key} if err := f.appendEntry(&e); err != nil { return err } f.execEntry(&e) // Flush buffer and sync to disk. return f.FlushAndSync() } // TagValueSeriesIDSet returns a series iterator for a tag value. func (f *LogFile) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error) { f.mu.RLock() defer f.mu.RUnlock() mm, ok := f.mms[string(name)] if !ok { return nil, nil } tk, ok := mm.tagSet[string(key)] if !ok { return nil, nil } tv, ok := tk.tagValues[string(value)] if !ok { return nil, nil } else if tv.cardinality() == 0 { return nil, nil } return tv.seriesIDSet(), nil } // MeasurementN returns the total number of measurements. func (f *LogFile) MeasurementN() (n uint64) { f.mu.RLock() defer f.mu.RUnlock() return uint64(len(f.mms)) } // TagKeyN returns the total number of keys. func (f *LogFile) TagKeyN() (n uint64) { f.mu.RLock() defer f.mu.RUnlock() for _, mm := range f.mms { n += uint64(len(mm.tagSet)) } return n } // TagValueN returns the total number of values. func (f *LogFile) TagValueN() (n uint64) { f.mu.RLock() defer f.mu.RUnlock() for _, mm := range f.mms { for _, k := range mm.tagSet { n += uint64(len(k.tagValues)) } } return n } // DeleteTagValue adds a tombstone for a tag value to the log file. func (f *LogFile) DeleteTagValue(name, key, value []byte) error { f.mu.Lock() defer f.mu.Unlock() e := LogEntry{Flag: LogEntryTagValueTombstoneFlag, Name: name, Key: key, Value: value} if err := f.appendEntry(&e); err != nil { return err } f.execEntry(&e) // Flush buffer and sync to disk. return f.FlushAndSync() } // AddSeriesList adds a list of series to the log file in bulk. func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, collection *tsdb.SeriesCollection) ([]tsdb.SeriesID, error) { var writeRequired bool var entries []LogEntry var i int // Track the index of the point in the batch seriesSet.RLock() for iter := collection.Iterator(); iter.Next(); { seriesID := iter.SeriesID() if seriesSet.ContainsNoLock(seriesID) { i++ continue } writeRequired = true // lazy allocation of entries to avoid common case of no new series if entries == nil { entries = make([]LogEntry, 0, collection.Length()) } entries = append(entries, LogEntry{ SeriesID: seriesID, name: iter.Name(), tags: iter.Tags(), cached: true, batchidx: i, }) i++ } seriesSet.RUnlock() // Exit if all series already exist. if !writeRequired { return nil, nil } f.mu.Lock() defer f.mu.Unlock() seriesSet.Lock() defer seriesSet.Unlock() var seriesIDs []tsdb.SeriesID for i := range entries { // NB - this doesn't evaluate all series ids returned from series file. entry := &entries[i] if seriesSet.ContainsNoLock(entry.SeriesID) { // We don't need to allocate anything for this series. continue } if err := f.appendEntry(entry); err != nil { return nil, err } f.execEntry(entry) seriesSet.AddNoLock(entry.SeriesID) if seriesIDs == nil { seriesIDs = make([]tsdb.SeriesID, collection.Length()) } seriesIDs[entry.batchidx] = entry.SeriesID } // Flush buffer and sync to disk. if err := f.FlushAndSync(); err != nil { return nil, err } return seriesIDs, nil } // DeleteSeriesID adds a tombstone for a series id. func (f *LogFile) DeleteSeriesID(id tsdb.SeriesID) error { f.mu.Lock() defer f.mu.Unlock() e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id} if err := f.appendEntry(&e); err != nil { return err } f.execEntry(&e) // Flush buffer and sync to disk. return f.FlushAndSync() } // DeleteSeriesIDList marks a tombstone for all the series IDs. DeleteSeriesIDList // should be preferred to repeatedly calling DeleteSeriesID for many series ids. func (f *LogFile) DeleteSeriesIDList(ids []tsdb.SeriesID) error { f.mu.Lock() defer f.mu.Unlock() for _, id := range ids { e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id} if err := f.appendEntry(&e); err != nil { return err } f.execEntry(&e) } // Flush buffer and sync to disk. return f.FlushAndSync() } // SeriesN returns the total number of series in the file. func (f *LogFile) SeriesN() (n uint64) { f.mu.RLock() defer f.mu.RUnlock() for _, mm := range f.mms { n += uint64(mm.cardinality()) } return n } // appendEntry adds a log entry to the end of the file. func (f *LogFile) appendEntry(e *LogEntry) error { // Marshal entry to the local buffer. f.buf = appendLogEntry(f.buf[:0], e) // Save the size of the record. e.Size = len(f.buf) // Write record to file. n, err := f.w.Write(f.buf) if err != nil { // Move position backwards over partial entry. // Log should be reopened if seeking cannot be completed. if n > 0 { f.w.Reset(f.file) if _, err := f.file.Seek(int64(-n), io.SeekCurrent); err != nil { f.Close() } } return err } // Update in-memory file size & modification time. f.size += int64(n) f.modTime = time.Now() return nil } // execEntry executes a log entry against the in-memory index. // This is done after appending and on replay of the log. func (f *LogFile) execEntry(e *LogEntry) { switch e.Flag { case LogEntryMeasurementTombstoneFlag: f.execDeleteMeasurementEntry(e) case LogEntryTagKeyTombstoneFlag: f.execDeleteTagKeyEntry(e) case LogEntryTagValueTombstoneFlag: f.execDeleteTagValueEntry(e) default: f.execSeriesEntry(e) } } func (f *LogFile) execDeleteMeasurementEntry(e *LogEntry) { mm := f.createMeasurementIfNotExists(e.Name) mm.deleted = true mm.tagSet = make(map[string]logTagKey) mm.series = make(map[tsdb.SeriesID]struct{}) mm.seriesSet = nil } func (f *LogFile) execDeleteTagKeyEntry(e *LogEntry) { mm := f.createMeasurementIfNotExists(e.Name) ts := mm.createTagSetIfNotExists(e.Key) ts.deleted = true mm.tagSet[string(e.Key)] = ts } func (f *LogFile) execDeleteTagValueEntry(e *LogEntry) { mm := f.createMeasurementIfNotExists(e.Name) ts := mm.createTagSetIfNotExists(e.Key) tv := ts.createTagValueIfNotExists(e.Value) tv.deleted = true ts.tagValues[string(e.Value)] = tv mm.tagSet[string(e.Key)] = ts } func (f *LogFile) execSeriesEntry(e *LogEntry) { var seriesKey []byte if e.cached { sz := tsdb.SeriesKeySize(e.name, e.tags) if len(f.keyBuf) < sz { f.keyBuf = make([]byte, 0, sz) } seriesKey = tsdb.AppendSeriesKey(f.keyBuf[:0], e.name, e.tags) } else { seriesKey = f.sfile.SeriesKey(e.SeriesID) } // Series keys can be removed if the series has been deleted from // the entire database and the server is restarted. This would cause // the log to replay its insert but the key cannot be found. // // https://github.com/influxdata/influxdb/issues/9444 if seriesKey == nil { return } // Check if deleted. deleted := e.Flag == LogEntrySeriesTombstoneFlag // Read key size. _, remainder := tsdb.ReadSeriesKeyLen(seriesKey) // Read measurement name. name, remainder := tsdb.ReadSeriesKeyMeasurement(remainder) mm := f.createMeasurementIfNotExists(name) mm.deleted = false if !deleted { mm.addSeriesID(e.SeriesID) } else { mm.removeSeriesID(e.SeriesID) } // Read tag count. tagN, remainder := tsdb.ReadSeriesKeyTagN(remainder) // Save tags. var k, v []byte for i := 0; i < tagN; i++ { k, v, remainder = tsdb.ReadSeriesKeyTag(remainder) ts := mm.createTagSetIfNotExists(k) tv := ts.createTagValueIfNotExists(v) // Add/remove a reference to the series on the tag value. if !deleted { tv.addSeriesID(e.SeriesID) } else { tv.removeSeriesID(e.SeriesID) } ts.tagValues[string(v)] = tv mm.tagSet[string(k)] = ts } // Add/remove from appropriate series id sets & stats. if !deleted { f.seriesIDSet.Add(e.SeriesID) f.tombstoneSeriesIDSet.Remove(e.SeriesID) f.stats.Inc(name) } else { f.seriesIDSet.Remove(e.SeriesID) f.tombstoneSeriesIDSet.Add(e.SeriesID) f.stats.Dec(name) } } // SeriesIDIterator returns an iterator over all series in the log file. func (f *LogFile) SeriesIDIterator() tsdb.SeriesIDIterator { f.mu.RLock() defer f.mu.RUnlock() ss := tsdb.NewSeriesIDSet() allSeriesSets := make([]*tsdb.SeriesIDSet, 0, len(f.mms)) for _, mm := range f.mms { if mm.seriesSet != nil { allSeriesSets = append(allSeriesSets, mm.seriesSet) continue } // measurement is not using seriesSet to store series IDs. mm.forEach(func(seriesID tsdb.SeriesID) { ss.AddNoLock(seriesID) }) } // Fast merge all seriesSets. if len(allSeriesSets) > 0 { ss.Merge(allSeriesSets...) } return tsdb.NewSeriesIDSetIterator(ss) } // createMeasurementIfNotExists returns a measurement by name. func (f *LogFile) createMeasurementIfNotExists(name []byte) *logMeasurement { mm := f.mms[string(name)] if mm == nil { mm = &logMeasurement{ name: name, tagSet: make(map[string]logTagKey), series: make(map[tsdb.SeriesID]struct{}), } f.mms[string(name)] = mm } return mm } // MeasurementIterator returns an iterator over all the measurements in the file. func (f *LogFile) MeasurementIterator() MeasurementIterator { f.mu.RLock() defer f.mu.RUnlock() var itr logMeasurementIterator for _, mm := range f.mms { itr.mms = append(itr.mms, *mm) } sort.Sort(logMeasurementSlice(itr.mms)) return &itr } // MeasurementSeriesIDIterator returns an iterator over all series for a measurement. func (f *LogFile) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator { f.mu.RLock() defer f.mu.RUnlock() mm := f.mms[string(name)] if mm == nil || mm.cardinality() == 0 { return nil } return tsdb.NewSeriesIDSetIterator(mm.seriesIDSet()) } // CompactTo compacts the log file and writes it to w. func (f *LogFile) CompactTo(w io.Writer, m, k uint64, cancel <-chan struct{}) (n int64, err error) { f.mu.RLock() defer f.mu.RUnlock() // Check for cancellation. select { case <-cancel: return n, ErrCompactionInterrupted default: } // Wrap in bufferred writer with a buffer equivalent to the LogFile size. bw := bufio.NewWriterSize(w, indexFileBufferSize) // 128K // Setup compaction offset tracking data. var t IndexFileTrailer info := newLogFileCompactInfo() info.cancel = cancel // Write magic number. if err := writeTo(bw, []byte(FileSignature), &n); err != nil { return n, err } // Retreve measurement names in order. names := f.measurementNames() // Flush buffer & mmap series block. if err := bw.Flush(); err != nil { return n, err } // Write tagset blocks in measurement order. if err := f.writeTagsetsTo(bw, names, info, &n); err != nil { return n, err } // Write measurement block. t.MeasurementBlock.Offset = n if err := f.writeMeasurementBlockTo(bw, names, info, &n); err != nil { return n, err } t.MeasurementBlock.Size = n - t.MeasurementBlock.Offset // Write series set. t.SeriesIDSet.Offset = n nn, err := f.seriesIDSet.WriteTo(bw) if n += nn; err != nil { return n, err } t.SeriesIDSet.Size = n - t.SeriesIDSet.Offset // Write tombstone series set. t.TombstoneSeriesIDSet.Offset = n nn, err = f.tombstoneSeriesIDSet.WriteTo(bw) if n += nn; err != nil { return n, err } t.TombstoneSeriesIDSet.Size = n - t.TombstoneSeriesIDSet.Offset // Write trailer. nn, err = t.WriteTo(bw) n += nn if err != nil { return n, err } // Flush buffer. if err := bw.Flush(); err != nil { return n, err } return n, nil } func (f *LogFile) writeTagsetsTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error { for _, name := range names { if err := f.writeTagsetTo(w, name, info, n); err != nil { return err } } return nil } // writeTagsetTo writes a single tagset to w and saves the tagset offset. func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactInfo, n *int64) error { mm := f.mms[name] // Check for cancellation. select { case <-info.cancel: return ErrCompactionInterrupted default: } enc := NewTagBlockEncoder(w) var valueN int for _, k := range mm.keys() { tag := mm.tagSet[k] // Encode tag. Skip values if tag is deleted. if err := enc.EncodeKey(tag.name, tag.deleted); err != nil { return err } else if tag.deleted { continue } // Sort tag values. values := make([]string, 0, len(tag.tagValues)) for v := range tag.tagValues { values = append(values, v) } sort.Strings(values) // Add each value. for _, v := range values { value := tag.tagValues[v] if err := enc.EncodeValue(value.name, value.deleted, value.seriesIDSet()); err != nil { return err } // Check for cancellation periodically. if valueN++; valueN%1000 == 0 { select { case <-info.cancel: return ErrCompactionInterrupted default: } } } } // Save tagset offset to measurement. offset := *n // Flush tag block. err := enc.Close() *n += enc.N() if err != nil { return err } // Save tagset offset to measurement. size := *n - offset info.mms[name] = &logFileMeasurementCompactInfo{offset: offset, size: size} return nil } func (f *LogFile) writeMeasurementBlockTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error { mw := NewMeasurementBlockWriter() // Check for cancellation. select { case <-info.cancel: return ErrCompactionInterrupted default: } // Add measurement data. for _, name := range names { mm := f.mms[name] mmInfo := info.mms[name] assert(mmInfo != nil, "measurement info not found") mw.Add(mm.name, mm.deleted, mmInfo.offset, mmInfo.size, mm.seriesIDs()) } // Flush data to writer. nn, err := mw.WriteTo(w) *n += nn return err } // logFileCompactInfo is a context object to track compaction position info. type logFileCompactInfo struct { cancel <-chan struct{} mms map[string]*logFileMeasurementCompactInfo } // newLogFileCompactInfo returns a new instance of logFileCompactInfo. func newLogFileCompactInfo() *logFileCompactInfo { return &logFileCompactInfo{ mms: make(map[string]*logFileMeasurementCompactInfo), } } type logFileMeasurementCompactInfo struct { offset int64 size int64 } // MeasurementCardinalityStats returns cardinality stats for this log file. func (f *LogFile) MeasurementCardinalityStats() MeasurementCardinalityStats { f.mu.RLock() defer f.mu.RUnlock() return f.stats.Clone() } // LogEntry represents a single log entry in the write-ahead log. type LogEntry struct { Flag byte // flag SeriesID tsdb.SeriesID // series id Name []byte // measurement name Key []byte // tag key Value []byte // tag value Checksum uint32 // checksum of flag/name/tags. Size int // total size of record, in bytes. cached bool // Hint to LogFile that series data is already parsed name []byte // series naem, this is a cached copy of the parsed measurement name tags models.Tags // series tags, this is a cached copied of the parsed tags batchidx int // position of entry in batch. } // UnmarshalBinary unmarshals data into e. func (e *LogEntry) UnmarshalBinary(data []byte) error { var sz uint64 var n int var seriesID uint64 var err error orig := data start := len(data) // Parse flag data. if len(data) < 1 { return io.ErrShortBuffer } e.Flag, data = data[0], data[1:] // Parse series id. if seriesID, n, err = uvarint(data); err != nil { return err } e.SeriesID, data = tsdb.NewSeriesID(seriesID), data[n:] // Parse name length. if sz, n, err = uvarint(data); err != nil { return err } // Read name data. if len(data) < n+int(sz) { return io.ErrShortBuffer } e.Name, data = data[n:n+int(sz)], data[n+int(sz):] // Parse key length. if sz, n, err = uvarint(data); err != nil { return err } // Read key data. if len(data) < n+int(sz) { return io.ErrShortBuffer } e.Key, data = data[n:n+int(sz)], data[n+int(sz):] // Parse value length. if sz, n, err = uvarint(data); err != nil { return err } // Read value data. if len(data) < n+int(sz) { return io.ErrShortBuffer } e.Value, data = data[n:n+int(sz)], data[n+int(sz):] // Compute checksum. chk := crc32.ChecksumIEEE(orig[:start-len(data)]) // Parse checksum. if len(data) < 4 { return io.ErrShortBuffer } e.Checksum, data = binary.BigEndian.Uint32(data[:4]), data[4:] // Verify checksum. if chk != e.Checksum { return ErrLogEntryChecksumMismatch } // Save length of elem. e.Size = start - len(data) return nil } // appendLogEntry appends to dst and returns the new buffer. // This updates the checksum on the entry. func appendLogEntry(dst []byte, e *LogEntry) []byte { var buf [binary.MaxVarintLen64]byte start := len(dst) // Append flag. dst = append(dst, e.Flag) // Append series id. n := binary.PutUvarint(buf[:], e.SeriesID.RawID()) dst = append(dst, buf[:n]...) // Append name. n = binary.PutUvarint(buf[:], uint64(len(e.Name))) dst = append(dst, buf[:n]...) dst = append(dst, e.Name...) // Append key. n = binary.PutUvarint(buf[:], uint64(len(e.Key))) dst = append(dst, buf[:n]...) dst = append(dst, e.Key...) // Append value. n = binary.PutUvarint(buf[:], uint64(len(e.Value))) dst = append(dst, buf[:n]...) dst = append(dst, e.Value...) // Calculate checksum. e.Checksum = crc32.ChecksumIEEE(dst[start:]) // Append checksum. binary.BigEndian.PutUint32(buf[:4], e.Checksum) dst = append(dst, buf[:4]...) return dst } // logMeasurements represents a map of measurement names to measurements. type logMeasurements map[string]*logMeasurement // bytes estimates the memory footprint of this logMeasurements, in bytes. func (mms *logMeasurements) bytes() int { var b int for k, v := range *mms { b += len(k) b += v.bytes() } b += int(unsafe.Sizeof(*mms)) return b } type logMeasurement struct { name []byte tagSet map[string]logTagKey deleted bool series map[tsdb.SeriesID]struct{} seriesSet *tsdb.SeriesIDSet } // bytes estimates the memory footprint of this logMeasurement, in bytes. func (m *logMeasurement) bytes() int { var b int b += len(m.name) for k, v := range m.tagSet { b += len(k) b += v.bytes() } b += (int(m.cardinality()) * 8) b += int(unsafe.Sizeof(*m)) return b } func (m *logMeasurement) addSeriesID(x tsdb.SeriesID) { if m.seriesSet != nil { m.seriesSet.AddNoLock(x) return } m.series[x] = struct{}{} // If the map is getting too big it can be converted into a roaring seriesSet. if len(m.series) > 25 { m.seriesSet = tsdb.NewSeriesIDSet() for id := range m.series { m.seriesSet.AddNoLock(id) } m.series = nil } } func (m *logMeasurement) removeSeriesID(x tsdb.SeriesID) { if m.seriesSet != nil { m.seriesSet.RemoveNoLock(x) return } delete(m.series, x) } func (m *logMeasurement) cardinality() int64 { if m.seriesSet != nil { return int64(m.seriesSet.Cardinality()) } return int64(len(m.series)) } // forEach applies fn to every series ID in the logMeasurement. func (m *logMeasurement) forEach(fn func(tsdb.SeriesID)) { if m.seriesSet != nil { m.seriesSet.ForEachNoLock(fn) return } for seriesID := range m.series { fn(seriesID) } } // seriesIDs returns a sorted set of seriesIDs. func (m *logMeasurement) seriesIDs() []tsdb.SeriesID { a := make([]tsdb.SeriesID, 0, m.cardinality()) if m.seriesSet != nil { m.seriesSet.ForEachNoLock(func(id tsdb.SeriesID) { a = append(a, id) }) return a // IDs are already sorted. } for seriesID := range m.series { a = append(a, seriesID) } sort.Slice(a, func(i, j int) bool { return a[i].Less(a[j]) }) return a } // seriesIDSet returns a copy of the logMeasurement's seriesSet, or creates a new // one func (m *logMeasurement) seriesIDSet() *tsdb.SeriesIDSet { if m.seriesSet != nil { return m.seriesSet.CloneNoLock() } ss := tsdb.NewSeriesIDSet() for seriesID := range m.series { ss.AddNoLock(seriesID) } return ss } func (m *logMeasurement) Name() []byte { return m.name } func (m *logMeasurement) Deleted() bool { return m.deleted } func (m *logMeasurement) createTagSetIfNotExists(key []byte) logTagKey { ts, ok := m.tagSet[string(key)] if !ok { ts = logTagKey{name: key, tagValues: make(map[string]logTagValue)} } return ts } // keys returns a sorted list of tag keys. func (m *logMeasurement) keys() []string { a := make([]string, 0, len(m.tagSet)) for k := range m.tagSet { a = append(a, k) } sort.Strings(a) return a } // logMeasurementSlice is a sortable list of log measurements. type logMeasurementSlice []logMeasurement func (a logMeasurementSlice) Len() int { return len(a) } func (a logMeasurementSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a logMeasurementSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 } // logMeasurementIterator represents an iterator over a slice of measurements. type logMeasurementIterator struct { mms []logMeasurement } // Next returns the next element in the iterator. func (itr *logMeasurementIterator) Next() (e MeasurementElem) { if len(itr.mms) == 0 { return nil } e, itr.mms = &itr.mms[0], itr.mms[1:] return e } type logTagKey struct { name []byte deleted bool tagValues map[string]logTagValue } // bytes estimates the memory footprint of this logTagKey, in bytes. func (tk *logTagKey) bytes() int { var b int b += len(tk.name) for k, v := range tk.tagValues { b += len(k) b += v.bytes() } b += int(unsafe.Sizeof(*tk)) return b } func (tk *logTagKey) Key() []byte { return tk.name } func (tk *logTagKey) Deleted() bool { return tk.deleted } func (tk *logTagKey) TagValueIterator() TagValueIterator { a := make([]logTagValue, 0, len(tk.tagValues)) for _, v := range tk.tagValues { a = append(a, v) } return newLogTagValueIterator(a) } func (tk *logTagKey) createTagValueIfNotExists(value []byte) logTagValue { tv, ok := tk.tagValues[string(value)] if !ok { tv = logTagValue{name: value, series: make(map[tsdb.SeriesID]struct{})} } return tv } // logTagKey is a sortable list of log tag keys. type logTagKeySlice []logTagKey func (a logTagKeySlice) Len() int { return len(a) } func (a logTagKeySlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a logTagKeySlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 } type logTagValue struct { name []byte deleted bool series map[tsdb.SeriesID]struct{} seriesSet *tsdb.SeriesIDSet } // bytes estimates the memory footprint of this logTagValue, in bytes. func (tv *logTagValue) bytes() int { var b int b += len(tv.name) b += int(unsafe.Sizeof(*tv)) b += (int(tv.cardinality()) * 8) return b } func (tv *logTagValue) addSeriesID(x tsdb.SeriesID) { if tv.seriesSet != nil { tv.seriesSet.AddNoLock(x) return } tv.series[x] = struct{}{} // If the map is getting too big it can be converted into a roaring seriesSet. if len(tv.series) > 25 { tv.seriesSet = tsdb.NewSeriesIDSet() for id := range tv.series { tv.seriesSet.AddNoLock(id) } tv.series = nil } } func (tv *logTagValue) removeSeriesID(x tsdb.SeriesID) { if tv.seriesSet != nil { tv.seriesSet.RemoveNoLock(x) return } delete(tv.series, x) } func (tv *logTagValue) cardinality() int64 { if tv.seriesSet != nil { return int64(tv.seriesSet.Cardinality()) } return int64(len(tv.series)) } // seriesIDSet returns a copy of the logMeasurement's seriesSet, or creates a new // one func (tv *logTagValue) seriesIDSet() *tsdb.SeriesIDSet { if tv.seriesSet != nil { return tv.seriesSet.CloneNoLock() } ss := tsdb.NewSeriesIDSet() for seriesID := range tv.series { ss.AddNoLock(seriesID) } return ss } func (tv *logTagValue) Value() []byte { return tv.name } func (tv *logTagValue) Deleted() bool { return tv.deleted } // logTagValue is a sortable list of log tag values. type logTagValueSlice []logTagValue func (a logTagValueSlice) Len() int { return len(a) } func (a logTagValueSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a logTagValueSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 } // logTagKeyIterator represents an iterator over a slice of tag keys. type logTagKeyIterator struct { a []logTagKey } // newLogTagKeyIterator returns a new instance of logTagKeyIterator. func newLogTagKeyIterator(a []logTagKey) *logTagKeyIterator { sort.Sort(logTagKeySlice(a)) return &logTagKeyIterator{a: a} } // Next returns the next element in the iterator. func (itr *logTagKeyIterator) Next() (e TagKeyElem) { if len(itr.a) == 0 { return nil } e, itr.a = &itr.a[0], itr.a[1:] return e } // logTagValueIterator represents an iterator over a slice of tag values. type logTagValueIterator struct { a []logTagValue } // newLogTagValueIterator returns a new instance of logTagValueIterator. func newLogTagValueIterator(a []logTagValue) *logTagValueIterator { sort.Sort(logTagValueSlice(a)) return &logTagValueIterator{a: a} } // Next returns the next element in the iterator. func (itr *logTagValueIterator) Next() (e TagValueElem) { if len(itr.a) == 0 { return nil } e, itr.a = &itr.a[0], itr.a[1:] return e } // FormatLogFileName generates a log filename for the given index. func FormatLogFileName(id int) string { return fmt.Sprintf("L0-%08d%s", id, LogFileExt) }