2016-09-02 14:52:11 +00:00
|
|
|
package tsi1
|
|
|
|
|
2016-10-21 15:31:40 +00:00
|
|
|
import (
|
2016-12-22 18:50:56 +00:00
|
|
|
"bufio"
|
2016-10-21 15:31:40 +00:00
|
|
|
"bytes"
|
|
|
|
"encoding/binary"
|
2016-11-11 16:25:53 +00:00
|
|
|
"errors"
|
2016-12-15 15:31:18 +00:00
|
|
|
"fmt"
|
2016-10-21 15:31:40 +00:00
|
|
|
"hash/crc32"
|
2016-10-25 14:36:58 +00:00
|
|
|
"io"
|
2016-10-21 15:31:40 +00:00
|
|
|
"os"
|
|
|
|
"sort"
|
2016-11-21 16:59:23 +00:00
|
|
|
"sync"
|
2017-01-03 17:27:25 +00:00
|
|
|
"time"
|
2016-10-21 15:31:40 +00:00
|
|
|
|
2016-11-28 22:12:22 +00:00
|
|
|
"github.com/influxdata/influxdb/pkg/estimator/hll"
|
|
|
|
|
2016-11-10 15:45:27 +00:00
|
|
|
"github.com/influxdata/influxdb/influxql"
|
2016-10-21 15:31:40 +00:00
|
|
|
"github.com/influxdata/influxdb/models"
|
2016-11-28 22:12:22 +00:00
|
|
|
"github.com/influxdata/influxdb/pkg/estimator"
|
2016-10-21 15:31:40 +00:00
|
|
|
"github.com/influxdata/influxdb/pkg/mmap"
|
2016-11-11 16:25:53 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// Log errors.
|
|
|
|
var (
|
|
|
|
ErrLogEntryChecksumMismatch = errors.New("log entry checksum mismatch")
|
2016-10-21 15:31:40 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// Log entry flag constants.
|
|
|
|
const (
|
|
|
|
LogEntrySeriesTombstoneFlag = 0x01
|
|
|
|
LogEntryMeasurementTombstoneFlag = 0x02
|
|
|
|
LogEntryTagKeyTombstoneFlag = 0x04
|
|
|
|
LogEntryTagValueTombstoneFlag = 0x08
|
|
|
|
)
|
|
|
|
|
|
|
|
// LogFile represents an on-disk write-ahead log file.
|
|
|
|
type LogFile struct {
|
2016-12-15 15:31:18 +00:00
|
|
|
mu sync.RWMutex
|
2016-12-28 19:59:09 +00:00
|
|
|
wg sync.WaitGroup // ref count
|
|
|
|
data []byte // mmap
|
|
|
|
file *os.File // writer
|
|
|
|
w *bufio.Writer // buffered writer
|
|
|
|
buf []byte // marshaling buffer
|
2017-01-03 17:27:25 +00:00
|
|
|
|
|
|
|
size int64 // tracks current file size
|
|
|
|
modTime time.Time // tracks last time write occurred
|
2016-10-21 15:31:40 +00:00
|
|
|
|
2016-11-28 22:12:22 +00:00
|
|
|
mSketch, mTSketch estimator.Sketch // Measurement sketches
|
|
|
|
sSketch, sTSketch estimator.Sketch // Series sketche
|
|
|
|
|
2016-10-21 15:31:40 +00:00
|
|
|
// In-memory index.
|
2016-11-11 16:25:53 +00:00
|
|
|
mms logMeasurements
|
2016-10-21 15:31:40 +00:00
|
|
|
|
|
|
|
// Filepath to the log file.
|
|
|
|
Path string
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewLogFile returns a new instance of LogFile.
|
|
|
|
func NewLogFile() *LogFile {
|
|
|
|
return &LogFile{
|
2016-11-28 22:12:22 +00:00
|
|
|
mms: make(logMeasurements),
|
|
|
|
mSketch: hll.NewDefaultPlus(),
|
|
|
|
mTSketch: hll.NewDefaultPlus(),
|
|
|
|
sSketch: hll.NewDefaultPlus(),
|
|
|
|
sTSketch: hll.NewDefaultPlus(),
|
2016-10-21 15:31:40 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Open reads the log from a file and validates all the checksums.
|
|
|
|
func (f *LogFile) Open() error {
|
|
|
|
if err := f.open(); err != nil {
|
|
|
|
f.Close()
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *LogFile) open() error {
|
|
|
|
// Open file for appending.
|
|
|
|
file, err := os.OpenFile(f.Path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
f.file = file
|
2016-12-22 18:50:56 +00:00
|
|
|
f.w = bufio.NewWriter(f.file)
|
2016-10-21 15:31:40 +00:00
|
|
|
|
2016-11-10 15:45:27 +00:00
|
|
|
// Finish opening if file is empty.
|
|
|
|
fi, err := file.Stat()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
} else if fi.Size() == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
2016-12-15 15:31:18 +00:00
|
|
|
f.size = fi.Size()
|
2017-01-03 17:27:25 +00:00
|
|
|
f.modTime = fi.ModTime()
|
2016-11-10 15:45:27 +00:00
|
|
|
|
2016-10-21 15:31:40 +00:00
|
|
|
// Open a read-only memory map of the existing data.
|
|
|
|
data, err := mmap.Map(f.Path)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
f.data = data
|
|
|
|
|
|
|
|
// Read log entries from mmap.
|
|
|
|
for buf := f.data; len(buf) > 0; {
|
|
|
|
// Read next entry.
|
|
|
|
var e LogEntry
|
|
|
|
if err := e.UnmarshalBinary(buf); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
// Execute entry against in-memory index.
|
|
|
|
f.execEntry(&e)
|
|
|
|
|
2016-10-21 15:31:40 +00:00
|
|
|
// Move buffer forward.
|
|
|
|
buf = buf[e.Size:]
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close shuts down the file handle and mmap.
|
|
|
|
func (f *LogFile) Close() error {
|
2017-01-02 16:29:18 +00:00
|
|
|
// Wait until the file has no more references.
|
|
|
|
f.wg.Wait()
|
|
|
|
|
2016-12-22 18:50:56 +00:00
|
|
|
if f.w != nil {
|
|
|
|
f.w.Flush()
|
|
|
|
f.w = nil
|
|
|
|
}
|
|
|
|
|
2016-10-21 15:31:40 +00:00
|
|
|
if f.file != nil {
|
|
|
|
f.file.Close()
|
2016-10-25 14:36:58 +00:00
|
|
|
f.file = nil
|
2016-10-21 15:31:40 +00:00
|
|
|
}
|
2016-12-22 18:50:56 +00:00
|
|
|
|
2016-10-21 15:31:40 +00:00
|
|
|
if f.data != nil {
|
|
|
|
mmap.Unmap(f.data)
|
|
|
|
}
|
2016-11-11 16:25:53 +00:00
|
|
|
|
|
|
|
f.mms = make(logMeasurements)
|
|
|
|
|
2016-10-21 15:31:40 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-12-28 19:59:09 +00:00
|
|
|
// Retain adds a reference count to the file.
|
|
|
|
func (f *LogFile) Retain() { f.wg.Add(1) }
|
|
|
|
|
|
|
|
// Release removes a reference count from the file.
|
|
|
|
func (f *LogFile) Release() { f.wg.Done() }
|
|
|
|
|
2017-01-03 17:27:25 +00:00
|
|
|
// Stat returns size and last modification time of the file.
|
|
|
|
func (f *LogFile) Stat() (int64, time.Time) {
|
2016-12-15 15:31:18 +00:00
|
|
|
f.mu.Lock()
|
2017-01-03 17:27:25 +00:00
|
|
|
size, modTime := f.size, f.modTime
|
2016-12-15 15:31:18 +00:00
|
|
|
f.mu.Unlock()
|
2017-01-03 17:27:25 +00:00
|
|
|
return size, modTime
|
2016-12-15 15:31:18 +00:00
|
|
|
}
|
|
|
|
|
2016-11-27 16:34:03 +00:00
|
|
|
// Measurement returns a measurement element.
|
|
|
|
func (f *LogFile) Measurement(name []byte) MeasurementElem {
|
|
|
|
f.mu.RLock()
|
|
|
|
defer f.mu.RUnlock()
|
|
|
|
|
|
|
|
mm, ok := f.mms[string(name)]
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
2017-01-02 16:29:18 +00:00
|
|
|
return mm
|
2016-11-27 16:34:03 +00:00
|
|
|
}
|
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
// MeasurementNames returns an ordered list of measurement names.
|
|
|
|
func (f *LogFile) MeasurementNames() []string {
|
2016-11-21 16:59:23 +00:00
|
|
|
f.mu.RLock()
|
|
|
|
defer f.mu.RUnlock()
|
2016-12-15 15:31:18 +00:00
|
|
|
return f.measurementNames()
|
|
|
|
}
|
2016-11-21 16:59:23 +00:00
|
|
|
|
2016-12-15 15:31:18 +00:00
|
|
|
func (f *LogFile) measurementNames() []string {
|
2016-11-11 16:25:53 +00:00
|
|
|
a := make([]string, 0, len(f.mms))
|
|
|
|
for name := range f.mms {
|
|
|
|
a = append(a, name)
|
|
|
|
}
|
|
|
|
return a
|
2016-11-08 21:07:01 +00:00
|
|
|
}
|
|
|
|
|
2016-10-21 15:31:40 +00:00
|
|
|
// DeleteMeasurement adds a tombstone for a measurement to the log file.
|
|
|
|
func (f *LogFile) DeleteMeasurement(name []byte) error {
|
2016-11-21 16:59:23 +00:00
|
|
|
f.mu.Lock()
|
|
|
|
defer f.mu.Unlock()
|
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
e := LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name}
|
|
|
|
if err := f.appendEntry(&e); err != nil {
|
2016-10-21 15:31:40 +00:00
|
|
|
return err
|
|
|
|
}
|
2016-11-11 16:25:53 +00:00
|
|
|
f.execEntry(&e)
|
2016-10-21 15:31:40 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
// TagKeySeriesIterator returns a series iterator for a tag key.
|
|
|
|
func (f *LogFile) TagKeySeriesIterator(name, key []byte) SeriesIterator {
|
2016-11-27 16:34:03 +00:00
|
|
|
f.mu.RLock()
|
|
|
|
defer f.mu.RUnlock()
|
|
|
|
|
|
|
|
mm, ok := f.mms[string(name)]
|
|
|
|
if !ok {
|
2016-11-29 18:09:33 +00:00
|
|
|
return nil
|
2016-11-27 16:34:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
tk, ok := mm.tagSet[string(key)]
|
|
|
|
if !ok {
|
2016-11-29 18:09:33 +00:00
|
|
|
return nil
|
2016-11-27 16:34:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Combine iterators across all tag keys.
|
|
|
|
itrs := make([]SeriesIterator, 0, len(tk.tagValues))
|
|
|
|
for _, tv := range tk.tagValues {
|
2016-12-15 15:31:18 +00:00
|
|
|
if len(tv.series) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
2016-11-27 16:34:03 +00:00
|
|
|
itrs = append(itrs, newLogSeriesIterator(tv.series))
|
|
|
|
}
|
2016-12-15 15:31:18 +00:00
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
return MergeSeriesIterators(itrs...)
|
2016-11-27 16:34:03 +00:00
|
|
|
}
|
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
// TagKeyIterator returns a value iterator for a measurement.
|
|
|
|
func (f *LogFile) TagKeyIterator(name []byte) TagKeyIterator {
|
2016-11-27 20:15:32 +00:00
|
|
|
f.mu.RLock()
|
|
|
|
defer f.mu.RUnlock()
|
|
|
|
|
|
|
|
mm, ok := f.mms[string(name)]
|
|
|
|
if !ok {
|
2016-11-29 18:09:33 +00:00
|
|
|
return nil
|
2016-11-27 20:15:32 +00:00
|
|
|
}
|
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
a := make([]logTagKey, 0, len(mm.tagSet))
|
|
|
|
for _, k := range mm.tagSet {
|
|
|
|
a = append(a, k)
|
|
|
|
}
|
|
|
|
return newLogTagKeyIterator(a)
|
|
|
|
}
|
|
|
|
|
2016-12-05 17:51:06 +00:00
|
|
|
// TagValue returns a tag value element.
|
|
|
|
func (f *LogFile) TagValue(name, key, value []byte) TagValueElem {
|
|
|
|
f.mu.RLock()
|
|
|
|
defer f.mu.RUnlock()
|
|
|
|
|
|
|
|
mm, ok := f.mms[string(name)]
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
tk, ok := mm.tagSet[string(key)]
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
tv, ok := tk.tagValues[string(value)]
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return &tv
|
|
|
|
}
|
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
// TagValueIterator returns a value iterator for a tag key.
|
|
|
|
func (f *LogFile) TagValueIterator(name, key []byte) TagValueIterator {
|
|
|
|
f.mu.RLock()
|
|
|
|
defer f.mu.RUnlock()
|
|
|
|
|
|
|
|
mm, ok := f.mms[string(name)]
|
2016-11-27 20:15:32 +00:00
|
|
|
if !ok {
|
2016-11-29 18:09:33 +00:00
|
|
|
return nil
|
2016-11-27 20:15:32 +00:00
|
|
|
}
|
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
tk, ok := mm.tagSet[string(key)]
|
|
|
|
if !ok {
|
|
|
|
return nil
|
2016-11-27 20:15:32 +00:00
|
|
|
}
|
2016-11-29 18:09:33 +00:00
|
|
|
return tk.TagValueIterator()
|
2016-11-27 20:15:32 +00:00
|
|
|
}
|
|
|
|
|
2016-10-21 15:31:40 +00:00
|
|
|
// DeleteTagKey adds a tombstone for a tag key to the log file.
|
|
|
|
func (f *LogFile) DeleteTagKey(name, key []byte) error {
|
2016-11-21 16:59:23 +00:00
|
|
|
f.mu.Lock()
|
|
|
|
defer f.mu.Unlock()
|
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
e := LogEntry{Flag: LogEntryTagKeyTombstoneFlag, Name: name, Tags: models.Tags{{Key: key}}}
|
|
|
|
if err := f.appendEntry(&e); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
f.execEntry(&e)
|
|
|
|
return nil
|
2016-10-21 15:31:40 +00:00
|
|
|
}
|
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
// TagValueSeriesIterator returns a series iterator for a tag value.
|
|
|
|
func (f *LogFile) TagValueSeriesIterator(name, key, value []byte) SeriesIterator {
|
2016-11-27 16:34:03 +00:00
|
|
|
f.mu.RLock()
|
|
|
|
defer f.mu.RUnlock()
|
|
|
|
|
|
|
|
mm, ok := f.mms[string(name)]
|
|
|
|
if !ok {
|
2016-11-29 18:09:33 +00:00
|
|
|
return nil
|
2016-11-27 16:34:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
tk, ok := mm.tagSet[string(key)]
|
|
|
|
if !ok {
|
2016-11-29 18:09:33 +00:00
|
|
|
return nil
|
2016-11-27 16:34:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
tv, ok := tk.tagValues[string(value)]
|
|
|
|
if !ok {
|
2016-11-29 18:09:33 +00:00
|
|
|
return nil
|
2016-12-15 15:31:18 +00:00
|
|
|
} else if len(tv.series) == 0 {
|
|
|
|
return nil
|
2016-11-27 16:34:03 +00:00
|
|
|
}
|
2016-12-15 15:31:18 +00:00
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
return newLogSeriesIterator(tv.series)
|
2016-11-27 16:34:03 +00:00
|
|
|
}
|
|
|
|
|
2016-10-21 15:31:40 +00:00
|
|
|
// DeleteTagValue adds a tombstone for a tag value to the log file.
|
|
|
|
func (f *LogFile) DeleteTagValue(name, key, value []byte) error {
|
2016-11-21 16:59:23 +00:00
|
|
|
f.mu.Lock()
|
|
|
|
defer f.mu.Unlock()
|
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
e := LogEntry{Flag: LogEntryTagValueTombstoneFlag, Name: name, Tags: models.Tags{{Key: key, Value: value}}}
|
|
|
|
if err := f.appendEntry(&e); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
f.execEntry(&e)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-12-22 18:50:56 +00:00
|
|
|
// AddSeriesList adds a list of series to the log file in bulk.
|
|
|
|
func (f *LogFile) AddSeriesList(names [][]byte, tagsSlice []models.Tags) error {
|
|
|
|
f.mu.Lock()
|
|
|
|
defer f.mu.Unlock()
|
|
|
|
|
|
|
|
for i := range names {
|
|
|
|
e := LogEntry{Name: names[i], Tags: tagsSlice[i]}
|
|
|
|
if err := f.appendEntry(&e); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
f.execEntry(&e)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
// AddSeries adds a series to the log file.
|
|
|
|
func (f *LogFile) AddSeries(name []byte, tags models.Tags) error {
|
2016-11-21 16:59:23 +00:00
|
|
|
f.mu.Lock()
|
|
|
|
defer f.mu.Unlock()
|
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
e := LogEntry{Name: name, Tags: tags}
|
|
|
|
if err := f.appendEntry(&e); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
f.execEntry(&e)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// DeleteSeries adds a tombstone for a series to the log file.
|
|
|
|
func (f *LogFile) DeleteSeries(name []byte, tags models.Tags) error {
|
2016-11-21 16:59:23 +00:00
|
|
|
f.mu.Lock()
|
|
|
|
defer f.mu.Unlock()
|
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, Name: name, Tags: tags}
|
|
|
|
if err := f.appendEntry(&e); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
f.execEntry(&e)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// SeriesN returns the total number of series in the file.
|
|
|
|
func (f *LogFile) SeriesN() (n uint64) {
|
2016-11-21 16:59:23 +00:00
|
|
|
f.mu.RLock()
|
|
|
|
defer f.mu.RUnlock()
|
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
for _, mm := range f.mms {
|
|
|
|
n += uint64(len(mm.series))
|
|
|
|
}
|
|
|
|
return n
|
2016-10-21 15:31:40 +00:00
|
|
|
}
|
|
|
|
|
2016-12-22 18:50:56 +00:00
|
|
|
// HasSeries returns flags indicating if the series exists and if it is tombstoned.
|
|
|
|
func (f *LogFile) HasSeries(name []byte, tags models.Tags) (exists, tombstoned bool) {
|
|
|
|
e := f.Series(name, tags)
|
|
|
|
if e == nil {
|
|
|
|
return false, false
|
|
|
|
}
|
|
|
|
return true, e.Deleted()
|
|
|
|
}
|
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
// Series returns a series by name/tags.
|
|
|
|
func (f *LogFile) Series(name []byte, tags models.Tags) SeriesElem {
|
2016-11-21 16:59:23 +00:00
|
|
|
f.mu.RLock()
|
|
|
|
defer f.mu.RUnlock()
|
|
|
|
|
2016-11-10 15:45:27 +00:00
|
|
|
mm, ok := f.mms[string(name)]
|
|
|
|
if !ok {
|
2016-11-29 18:09:33 +00:00
|
|
|
return nil
|
2016-11-10 15:45:27 +00:00
|
|
|
}
|
|
|
|
|
2016-12-27 16:22:15 +00:00
|
|
|
serie := mm.series[string(AppendSeriesKey(make([]byte, 0, 256), name, tags))]
|
2016-12-26 17:17:14 +00:00
|
|
|
if serie == nil {
|
|
|
|
return nil
|
2016-11-10 15:45:27 +00:00
|
|
|
}
|
2016-12-26 17:17:14 +00:00
|
|
|
return serie
|
2016-11-10 15:45:27 +00:00
|
|
|
}
|
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
// appendEntry adds a log entry to the end of the file.
|
|
|
|
func (f *LogFile) appendEntry(e *LogEntry) error {
|
|
|
|
// Marshal entry to the local buffer.
|
2016-11-21 16:59:58 +00:00
|
|
|
f.buf = appendLogEntry(f.buf[:0], e)
|
2016-10-21 15:31:40 +00:00
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
// Save the size of the record.
|
|
|
|
e.Size = len(f.buf)
|
2016-10-21 15:31:40 +00:00
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
// Write record to file.
|
2016-12-22 18:50:56 +00:00
|
|
|
n, err := f.w.Write(f.buf)
|
2016-12-15 15:31:18 +00:00
|
|
|
if err != nil {
|
2016-11-11 16:25:53 +00:00
|
|
|
// Move position backwards over partial entry.
|
|
|
|
// Log should be reopened if seeking cannot be completed.
|
|
|
|
if n > 0 {
|
2016-12-22 18:50:56 +00:00
|
|
|
f.w.Reset(f.file)
|
2016-11-11 16:25:53 +00:00
|
|
|
if _, err := f.file.Seek(int64(-n), os.SEEK_CUR); err != nil {
|
|
|
|
f.Close()
|
|
|
|
}
|
|
|
|
}
|
2016-10-21 15:31:40 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-01-03 17:27:25 +00:00
|
|
|
// Update in-memory file size & modification time.
|
2016-12-15 15:31:18 +00:00
|
|
|
f.size += int64(n)
|
2017-01-03 17:27:25 +00:00
|
|
|
f.modTime = time.Now()
|
2016-11-11 16:25:53 +00:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// execEntry executes a log entry against the in-memory index.
|
|
|
|
// This is done after appending and on replay of the log.
|
|
|
|
func (f *LogFile) execEntry(e *LogEntry) {
|
|
|
|
switch e.Flag {
|
|
|
|
case LogEntryMeasurementTombstoneFlag:
|
|
|
|
f.execDeleteMeasurementEntry(e)
|
|
|
|
case LogEntryTagKeyTombstoneFlag:
|
|
|
|
f.execDeleteTagKeyEntry(e)
|
|
|
|
case LogEntryTagValueTombstoneFlag:
|
|
|
|
f.execDeleteTagValueEntry(e)
|
|
|
|
default:
|
|
|
|
f.execSeriesEntry(e)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *LogFile) execDeleteMeasurementEntry(e *LogEntry) {
|
|
|
|
mm := f.measurement(e.Name)
|
|
|
|
mm.deleted = true
|
2016-11-29 18:09:33 +00:00
|
|
|
mm.tagSet = make(map[string]logTagKey)
|
2017-01-06 16:31:25 +00:00
|
|
|
mm.series = make(map[string]*logSerie)
|
2016-11-28 22:12:22 +00:00
|
|
|
|
|
|
|
// Update measurement tombstone sketch.
|
|
|
|
f.mTSketch.Add(e.Name)
|
2016-11-11 16:25:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (f *LogFile) execDeleteTagKeyEntry(e *LogEntry) {
|
|
|
|
key := e.Tags[0].Key
|
|
|
|
|
|
|
|
mm := f.measurement(e.Name)
|
|
|
|
ts := mm.createTagSetIfNotExists(key)
|
|
|
|
|
|
|
|
ts.deleted = true
|
|
|
|
|
|
|
|
mm.tagSet[string(key)] = ts
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *LogFile) execDeleteTagValueEntry(e *LogEntry) {
|
|
|
|
key, value := e.Tags[0].Key, e.Tags[0].Value
|
|
|
|
|
|
|
|
mm := f.measurement(e.Name)
|
|
|
|
ts := mm.createTagSetIfNotExists(key)
|
|
|
|
tv := ts.createTagValueIfNotExists(value)
|
|
|
|
|
|
|
|
tv.deleted = true
|
|
|
|
|
|
|
|
ts.tagValues[string(value)] = tv
|
|
|
|
mm.tagSet[string(key)] = ts
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *LogFile) execSeriesEntry(e *LogEntry) {
|
2016-10-21 15:31:40 +00:00
|
|
|
// Check if series is deleted.
|
|
|
|
deleted := (e.Flag & LogEntrySeriesTombstoneFlag) != 0
|
|
|
|
|
|
|
|
// Fetch measurement.
|
|
|
|
mm := f.measurement(e.Name)
|
|
|
|
|
2016-12-05 17:51:06 +00:00
|
|
|
// Undelete measurement if it's been tombstoned previously.
|
|
|
|
if !deleted && mm.deleted {
|
|
|
|
mm.deleted = false
|
|
|
|
}
|
|
|
|
|
2016-12-26 17:17:14 +00:00
|
|
|
// Generate key & series, if not exists.
|
2016-12-27 16:22:15 +00:00
|
|
|
key := AppendSeriesKey(make([]byte, 0, 256), e.Name, e.Tags)
|
2016-12-26 17:17:14 +00:00
|
|
|
serie := mm.series[string(key)]
|
|
|
|
if serie == nil {
|
|
|
|
serie = &logSerie{name: e.Name, tags: e.Tags, deleted: deleted}
|
2017-01-06 16:31:25 +00:00
|
|
|
mm.series[string(key)] = serie
|
2017-01-09 17:10:12 +00:00
|
|
|
} else if deleted {
|
|
|
|
serie.deleted = true
|
|
|
|
mm.series[string(key)] = serie
|
2016-12-26 17:17:14 +00:00
|
|
|
}
|
|
|
|
|
2016-10-21 15:31:40 +00:00
|
|
|
// Save tags.
|
|
|
|
for _, t := range e.Tags {
|
2016-11-11 16:25:53 +00:00
|
|
|
ts := mm.createTagSetIfNotExists(t.Key)
|
|
|
|
tv := ts.createTagValueIfNotExists(t.Value)
|
2016-10-21 15:31:40 +00:00
|
|
|
|
2016-12-26 17:17:14 +00:00
|
|
|
tv.series[string(key)] = serie
|
2016-10-21 15:31:40 +00:00
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
ts.tagValues[string(t.Value)] = tv
|
2016-10-21 15:31:40 +00:00
|
|
|
mm.tagSet[string(t.Key)] = ts
|
|
|
|
}
|
|
|
|
|
2016-11-28 22:12:22 +00:00
|
|
|
// Update the sketches...
|
|
|
|
if deleted {
|
2016-11-29 12:26:52 +00:00
|
|
|
// TODO(edd) decrement series count...
|
2016-12-27 16:22:15 +00:00
|
|
|
f.sTSketch.Add(key) // Deleting series so update tombstone sketch.
|
2016-11-28 22:12:22 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2016-11-29 12:26:52 +00:00
|
|
|
// TODO(edd) increment series count....
|
2016-12-27 16:22:15 +00:00
|
|
|
f.sSketch.Add(key) // Add series to sketch.
|
|
|
|
f.mSketch.Add(e.Name) // Add measurement to sketch as this may be the fist series for the measurement.
|
2016-10-21 15:31:40 +00:00
|
|
|
}
|
|
|
|
|
2016-11-28 16:59:36 +00:00
|
|
|
// SeriesIterator returns an iterator over all series in the log file.
|
|
|
|
func (f *LogFile) SeriesIterator() SeriesIterator {
|
|
|
|
f.mu.RLock()
|
|
|
|
defer f.mu.RUnlock()
|
|
|
|
|
|
|
|
// Sort measurement names determine total series count.
|
|
|
|
var n int
|
|
|
|
names := make([][]byte, 0, len(f.mms))
|
|
|
|
for _, mm := range f.mms {
|
|
|
|
names = append(names, mm.name)
|
|
|
|
n += len(mm.series)
|
|
|
|
}
|
|
|
|
sort.Sort(byteSlices(names))
|
|
|
|
|
|
|
|
// Combine series across all measurements.
|
|
|
|
series := make(logSeries, 0, n)
|
|
|
|
for _, name := range names {
|
2016-12-26 17:17:14 +00:00
|
|
|
for _, serie := range f.mms[string(name)].series {
|
|
|
|
series = append(series, *serie)
|
|
|
|
}
|
2016-11-28 16:59:36 +00:00
|
|
|
}
|
2016-12-26 17:17:14 +00:00
|
|
|
sort.Sort(series)
|
2016-11-28 16:59:36 +00:00
|
|
|
|
2016-12-15 15:31:18 +00:00
|
|
|
if len(series) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
2016-12-26 17:17:14 +00:00
|
|
|
return &logSeriesIterator{series: series}
|
2016-11-28 16:59:36 +00:00
|
|
|
}
|
|
|
|
|
2016-10-21 15:31:40 +00:00
|
|
|
// measurement returns a measurement by name.
|
2017-01-02 16:29:18 +00:00
|
|
|
func (f *LogFile) measurement(name []byte) *logMeasurement {
|
|
|
|
mm := f.mms[string(name)]
|
|
|
|
if mm == nil {
|
|
|
|
mm = &logMeasurement{
|
2016-12-26 17:17:14 +00:00
|
|
|
name: name,
|
|
|
|
tagSet: make(map[string]logTagKey),
|
|
|
|
series: make(map[string]*logSerie),
|
|
|
|
}
|
2017-01-02 16:29:18 +00:00
|
|
|
f.mms[string(name)] = mm
|
2016-10-21 15:31:40 +00:00
|
|
|
}
|
2017-01-06 16:31:25 +00:00
|
|
|
if mm.series == nil {
|
|
|
|
panic("NO SERIES? " + string(mm.name))
|
|
|
|
}
|
2016-10-21 15:31:40 +00:00
|
|
|
return mm
|
|
|
|
}
|
|
|
|
|
|
|
|
// MeasurementIterator returns an iterator over all the measurements in the file.
|
|
|
|
func (f *LogFile) MeasurementIterator() MeasurementIterator {
|
2016-11-21 16:59:23 +00:00
|
|
|
f.mu.RLock()
|
|
|
|
defer f.mu.RUnlock()
|
|
|
|
|
2016-10-31 14:46:07 +00:00
|
|
|
var itr logMeasurementIterator
|
2016-10-21 15:31:40 +00:00
|
|
|
for _, mm := range f.mms {
|
2017-01-02 16:29:18 +00:00
|
|
|
itr.mms = append(itr.mms, *mm)
|
2016-10-21 15:31:40 +00:00
|
|
|
}
|
2016-10-31 14:46:07 +00:00
|
|
|
sort.Sort(logMeasurementSlice(itr.mms))
|
2016-10-21 15:31:40 +00:00
|
|
|
return &itr
|
|
|
|
}
|
|
|
|
|
2016-11-28 16:59:36 +00:00
|
|
|
// MeasurementSeriesIterator returns an iterator over all series for a measurement.
|
2016-11-08 21:07:01 +00:00
|
|
|
func (f *LogFile) MeasurementSeriesIterator(name []byte) SeriesIterator {
|
2016-11-21 16:59:23 +00:00
|
|
|
f.mu.RLock()
|
|
|
|
defer f.mu.RUnlock()
|
|
|
|
|
2016-11-10 15:45:27 +00:00
|
|
|
mm := f.mms[string(name)]
|
2017-01-02 16:29:18 +00:00
|
|
|
if mm == nil || len(mm.series) == 0 {
|
2016-12-15 15:31:18 +00:00
|
|
|
return nil
|
|
|
|
}
|
2016-11-27 16:34:03 +00:00
|
|
|
return newLogSeriesIterator(mm.series)
|
2016-11-08 21:07:01 +00:00
|
|
|
}
|
|
|
|
|
2016-12-15 15:31:18 +00:00
|
|
|
// WriteTo compacts the log file and writes it to w.
|
|
|
|
func (f *LogFile) WriteTo(w io.Writer) (n int64, err error) {
|
2016-12-27 16:22:15 +00:00
|
|
|
f.mu.RLock()
|
|
|
|
defer f.mu.RUnlock()
|
2016-12-15 15:31:18 +00:00
|
|
|
|
2016-12-26 17:17:14 +00:00
|
|
|
// Wrap in bufferred writer.
|
|
|
|
bw := bufio.NewWriter(w)
|
2016-10-25 14:36:58 +00:00
|
|
|
|
|
|
|
// Reset compaction fields.
|
2016-12-26 17:17:14 +00:00
|
|
|
var t IndexFileTrailer
|
2016-10-25 14:36:58 +00:00
|
|
|
f.reset()
|
|
|
|
|
|
|
|
// Write magic number.
|
2016-12-26 17:17:14 +00:00
|
|
|
if err := writeTo(bw, []byte(FileSignature), &n); err != nil {
|
2016-10-25 14:36:58 +00:00
|
|
|
return n, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Write series list.
|
2016-11-02 16:09:49 +00:00
|
|
|
t.SeriesBlock.Offset = n
|
2016-12-26 17:17:14 +00:00
|
|
|
if err := f.writeSeriesBlockTo(bw, &n); err != nil {
|
2016-10-25 14:36:58 +00:00
|
|
|
return n, err
|
|
|
|
}
|
2016-11-02 16:09:49 +00:00
|
|
|
t.SeriesBlock.Size = n - t.SeriesBlock.Offset
|
2016-10-25 14:36:58 +00:00
|
|
|
|
|
|
|
// Sort measurement names.
|
|
|
|
names := f.mms.names()
|
|
|
|
|
|
|
|
// Write tagset blocks in measurement order.
|
2016-12-26 17:17:14 +00:00
|
|
|
if err := f.writeTagsetsTo(bw, names, &n); err != nil {
|
2016-10-25 14:36:58 +00:00
|
|
|
return n, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Write measurement block.
|
|
|
|
t.MeasurementBlock.Offset = n
|
2016-12-26 17:17:14 +00:00
|
|
|
if err := f.writeMeasurementBlockTo(bw, names, &n); err != nil {
|
2016-10-25 14:36:58 +00:00
|
|
|
return n, err
|
|
|
|
}
|
|
|
|
t.MeasurementBlock.Size = n - t.MeasurementBlock.Offset
|
|
|
|
|
|
|
|
// Write trailer.
|
2016-12-26 17:17:14 +00:00
|
|
|
nn, err := t.WriteTo(bw)
|
2016-10-25 14:36:58 +00:00
|
|
|
n += nn
|
|
|
|
if err != nil {
|
|
|
|
return n, err
|
|
|
|
}
|
|
|
|
|
2016-12-26 17:17:14 +00:00
|
|
|
// Flush buffer.
|
|
|
|
if err := bw.Flush(); err != nil {
|
|
|
|
return n, err
|
|
|
|
}
|
|
|
|
|
2016-10-25 14:36:58 +00:00
|
|
|
return n, nil
|
|
|
|
}
|
|
|
|
|
2016-11-02 16:09:49 +00:00
|
|
|
func (f *LogFile) writeSeriesBlockTo(w io.Writer, n *int64) error {
|
2016-10-25 14:36:58 +00:00
|
|
|
// Write all series.
|
2016-11-02 16:09:49 +00:00
|
|
|
sw := NewSeriesBlockWriter()
|
2016-11-11 16:25:53 +00:00
|
|
|
|
|
|
|
// Retreve measurement names in order.
|
2016-12-15 15:31:18 +00:00
|
|
|
names := f.measurementNames()
|
2016-11-11 16:25:53 +00:00
|
|
|
|
2017-01-02 16:29:18 +00:00
|
|
|
// Add series from measurements.
|
2016-11-11 16:25:53 +00:00
|
|
|
for _, name := range names {
|
|
|
|
mm := f.mms[name]
|
|
|
|
for _, serie := range mm.series {
|
|
|
|
if err := sw.Add(serie.name, serie.tags); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-10-25 14:36:58 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-11-28 22:12:22 +00:00
|
|
|
// As the log file is created it's possible that series were added, removed
|
|
|
|
// and then added again. Since sketches cannot have values removed from them
|
|
|
|
// the series would be in both the series and tombstoned series sketches. So
|
|
|
|
// that a series only appears in one of the sketches we rebuild some fresh
|
|
|
|
// sketches for the compaction to a TSI file.
|
|
|
|
//
|
|
|
|
// We update these sketches below as we iterate through the series in this
|
|
|
|
// log file.
|
2016-12-16 14:21:42 +00:00
|
|
|
sw.Sketch, sw.TSketch = hll.NewDefaultPlus(), hll.NewDefaultPlus()
|
2016-11-28 22:12:22 +00:00
|
|
|
|
2016-10-25 14:36:58 +00:00
|
|
|
// Flush series list.
|
|
|
|
nn, err := sw.WriteTo(w)
|
|
|
|
*n += nn
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add series to each measurement and key/value.
|
2017-01-02 16:29:18 +00:00
|
|
|
buf := make([]byte, 0, 1024)
|
2016-11-11 16:25:53 +00:00
|
|
|
for _, name := range names {
|
|
|
|
mm := f.mms[name]
|
2016-10-25 14:36:58 +00:00
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
for i := range mm.series {
|
2016-12-26 17:17:14 +00:00
|
|
|
serie := mm.series[i]
|
2016-11-11 16:25:53 +00:00
|
|
|
|
|
|
|
// Lookup series offset.
|
|
|
|
serie.offset = sw.Offset(serie.name, serie.tags)
|
|
|
|
if serie.offset == 0 {
|
2016-12-15 15:31:18 +00:00
|
|
|
panic("series not found: " + string(serie.name) + " " + serie.tags.String())
|
2016-11-11 16:25:53 +00:00
|
|
|
}
|
2016-10-25 14:36:58 +00:00
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
// Add series id to measurement, tag key, and tag value.
|
|
|
|
mm.seriesIDs = append(mm.seriesIDs, serie.offset)
|
2016-10-25 14:36:58 +00:00
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
// Add series id to each tag value.
|
|
|
|
for _, tag := range serie.tags {
|
|
|
|
t := mm.tagSet[string(tag.Key)]
|
2016-10-25 14:36:58 +00:00
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
v := t.tagValues[string(tag.Value)]
|
|
|
|
v.seriesIDs = append(v.seriesIDs, serie.offset)
|
|
|
|
t.tagValues[string(tag.Value)] = v
|
|
|
|
}
|
2016-11-28 22:12:22 +00:00
|
|
|
|
2017-01-02 16:29:18 +00:00
|
|
|
key := AppendSeriesKey(buf[:0], serie.name, serie.tags)
|
2016-11-28 22:12:22 +00:00
|
|
|
if serie.Deleted() {
|
2016-12-27 16:22:15 +00:00
|
|
|
sw.TSketch.Add(key)
|
2016-11-28 22:12:22 +00:00
|
|
|
} else {
|
2016-12-27 16:22:15 +00:00
|
|
|
sw.Sketch.Add(key)
|
2016-11-28 22:12:22 +00:00
|
|
|
}
|
2016-10-25 14:36:58 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-11-28 22:12:22 +00:00
|
|
|
// Set log file sketches to updated versions.
|
2016-12-16 14:21:42 +00:00
|
|
|
f.sSketch, f.sTSketch = sw.Sketch, sw.TSketch
|
2016-10-25 14:36:58 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *LogFile) writeTagsetsTo(w io.Writer, names []string, n *int64) error {
|
|
|
|
for _, name := range names {
|
|
|
|
if err := f.writeTagsetTo(w, name, n); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// writeTagsetTo writes a single tagset to w and saves the tagset offset.
|
|
|
|
func (f *LogFile) writeTagsetTo(w io.Writer, name string, n *int64) error {
|
|
|
|
mm := f.mms[name]
|
|
|
|
|
2016-11-02 16:09:49 +00:00
|
|
|
tw := NewTagBlockWriter()
|
2016-10-25 14:36:58 +00:00
|
|
|
for _, tag := range mm.tagSet {
|
|
|
|
// Mark tag deleted.
|
|
|
|
if tag.deleted {
|
2016-11-02 16:09:49 +00:00
|
|
|
tw.DeleteTag(tag.name)
|
2016-10-25 14:36:58 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add each value.
|
|
|
|
for _, value := range tag.tagValues {
|
2017-01-10 15:57:59 +00:00
|
|
|
sort.Sort(uint64Slice(value.seriesIDs))
|
2016-11-02 16:09:49 +00:00
|
|
|
tw.AddTagValue(tag.name, value.name, value.deleted, value.seriesIDs)
|
2016-10-25 14:36:58 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Save tagset offset to measurement.
|
|
|
|
mm.offset = *n
|
|
|
|
|
|
|
|
// Write tagset to writer.
|
2016-11-02 16:09:49 +00:00
|
|
|
nn, err := tw.WriteTo(w)
|
2016-10-25 14:36:58 +00:00
|
|
|
*n += nn
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Save tagset offset to measurement.
|
|
|
|
mm.size = *n - mm.offset
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f *LogFile) writeMeasurementBlockTo(w io.Writer, names []string, n *int64) error {
|
|
|
|
mw := NewMeasurementBlockWriter()
|
|
|
|
|
2016-11-28 22:12:22 +00:00
|
|
|
// As the log file is created it's possible that measurements were added,
|
|
|
|
// removed and then added again. Since sketches cannot have values removed
|
|
|
|
// from them, the measurement would be in both the measurement and
|
|
|
|
// tombstoned measurement sketches. So that a measurement only appears in
|
|
|
|
// one of the sketches, we rebuild some fresh sketches for the compaction to
|
|
|
|
// a TSI file.
|
|
|
|
//
|
|
|
|
// We update these sketches below as we iterate through the measurements in
|
|
|
|
// this log file.
|
2016-12-16 14:21:42 +00:00
|
|
|
mw.Sketch, mw.TSketch = hll.NewDefaultPlus(), hll.NewDefaultPlus()
|
2016-11-28 22:12:22 +00:00
|
|
|
|
2016-10-25 14:36:58 +00:00
|
|
|
// Add measurement data.
|
2017-01-09 17:10:12 +00:00
|
|
|
for _, name := range names {
|
|
|
|
mm := f.mms[name]
|
|
|
|
|
2017-01-10 15:57:59 +00:00
|
|
|
sort.Sort(uint64Slice(mm.seriesIDs))
|
2016-10-25 14:36:58 +00:00
|
|
|
mw.Add(mm.name, mm.offset, mm.size, mm.seriesIDs)
|
2016-11-28 22:12:22 +00:00
|
|
|
if mm.Deleted() {
|
2016-12-16 14:21:42 +00:00
|
|
|
mw.TSketch.Add(mm.Name())
|
2016-11-28 22:12:22 +00:00
|
|
|
} else {
|
2016-12-16 14:21:42 +00:00
|
|
|
mw.Sketch.Add(mm.Name())
|
2016-11-28 22:12:22 +00:00
|
|
|
}
|
2016-10-25 14:36:58 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Write data to writer.
|
|
|
|
nn, err := mw.WriteTo(w)
|
|
|
|
*n += nn
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-11-28 22:12:22 +00:00
|
|
|
// Set the updated sketches
|
2016-12-16 14:21:42 +00:00
|
|
|
f.mSketch, f.mTSketch = mw.Sketch, mw.TSketch
|
2016-10-25 14:36:58 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// reset clears all the compaction fields on the in-memory index.
|
|
|
|
func (f *LogFile) reset() {
|
2017-01-02 16:29:18 +00:00
|
|
|
for _, mm := range f.mms {
|
2016-11-11 16:25:53 +00:00
|
|
|
for i := range mm.series {
|
|
|
|
mm.series[i].offset = 0
|
|
|
|
}
|
|
|
|
|
2016-10-25 14:36:58 +00:00
|
|
|
mm.offset, mm.size, mm.seriesIDs = 0, 0, nil
|
|
|
|
for key, tagSet := range mm.tagSet {
|
|
|
|
for value, tagValue := range tagSet.tagValues {
|
|
|
|
tagValue.seriesIDs = nil
|
|
|
|
tagSet.tagValues[value] = tagValue
|
|
|
|
}
|
|
|
|
mm.tagSet[key] = tagSet
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-20 15:58:06 +00:00
|
|
|
// MergeSeriesSketches merges the series sketches within a mutex.
|
|
|
|
func (f *LogFile) MergeSeriesSketches(sketch, tsketch estimator.Sketch) error {
|
|
|
|
f.mu.RLock()
|
|
|
|
defer f.mu.RUnlock()
|
|
|
|
|
|
|
|
if err := sketch.Merge(f.sSketch); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := tsketch.Merge(f.sTSketch); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-10-21 15:31:40 +00:00
|
|
|
// LogEntry represents a single log entry in the write-ahead log.
|
|
|
|
type LogEntry struct {
|
|
|
|
Flag byte // flag
|
|
|
|
Name []byte // measurement name
|
|
|
|
Tags models.Tags // tagset
|
|
|
|
Checksum uint32 // checksum of flag/name/tags.
|
|
|
|
Size int // total size of record, in bytes.
|
|
|
|
}
|
|
|
|
|
|
|
|
// UnmarshalBinary unmarshals data into e.
|
|
|
|
func (e *LogEntry) UnmarshalBinary(data []byte) error {
|
2016-11-11 16:25:53 +00:00
|
|
|
orig := data
|
2016-10-21 15:31:40 +00:00
|
|
|
start := len(data)
|
|
|
|
|
|
|
|
// Parse flag data.
|
|
|
|
e.Flag, data = data[0], data[1:]
|
|
|
|
|
|
|
|
// Parse name.
|
|
|
|
sz, n := binary.Uvarint(data)
|
|
|
|
e.Name, data = data[n:n+int(sz)], data[n+int(sz):]
|
|
|
|
|
|
|
|
// Parse tag count.
|
|
|
|
tagN, n := binary.Uvarint(data)
|
|
|
|
data = data[n:]
|
|
|
|
|
|
|
|
// Parse tags.
|
|
|
|
tags := make(models.Tags, tagN)
|
|
|
|
for i := range tags {
|
|
|
|
tag := &tags[i]
|
|
|
|
|
|
|
|
// Parse key.
|
|
|
|
sz, n := binary.Uvarint(data)
|
|
|
|
tag.Key, data = data[n:n+int(sz)], data[n+int(sz):]
|
|
|
|
|
|
|
|
// Parse value.
|
|
|
|
sz, n = binary.Uvarint(data)
|
|
|
|
tag.Value, data = data[n:n+int(sz)], data[n+int(sz):]
|
|
|
|
}
|
2016-11-21 17:00:41 +00:00
|
|
|
e.Tags = tags
|
2016-10-21 15:31:40 +00:00
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
// Compute checksum.
|
|
|
|
chk := crc32.ChecksumIEEE(orig[:start-len(data)])
|
|
|
|
|
2016-10-21 15:31:40 +00:00
|
|
|
// Parse checksum.
|
|
|
|
e.Checksum, data = binary.BigEndian.Uint32(data[:4]), data[4:]
|
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
// Verify checksum.
|
|
|
|
if chk != e.Checksum {
|
|
|
|
return ErrLogEntryChecksumMismatch
|
|
|
|
}
|
|
|
|
|
2016-10-21 15:31:40 +00:00
|
|
|
// Save length of elem.
|
|
|
|
e.Size = start - len(data)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// appendLogEntry appends to dst and returns the new buffer.
|
|
|
|
// This updates the checksum on the entry.
|
|
|
|
func appendLogEntry(dst []byte, e *LogEntry) []byte {
|
|
|
|
var buf [binary.MaxVarintLen64]byte
|
|
|
|
start := len(dst)
|
|
|
|
|
|
|
|
// Append flag.
|
|
|
|
dst = append(dst, e.Flag)
|
|
|
|
|
|
|
|
// Append name.
|
|
|
|
n := binary.PutUvarint(buf[:], uint64(len(e.Name)))
|
|
|
|
dst = append(dst, buf[:n]...)
|
|
|
|
dst = append(dst, e.Name...)
|
|
|
|
|
|
|
|
// Append tag count.
|
|
|
|
n = binary.PutUvarint(buf[:], uint64(len(e.Tags)))
|
|
|
|
dst = append(dst, buf[:n]...)
|
|
|
|
|
|
|
|
// Append key/value pairs.
|
|
|
|
for i := range e.Tags {
|
|
|
|
t := &e.Tags[i]
|
|
|
|
|
|
|
|
// Append key.
|
|
|
|
n := binary.PutUvarint(buf[:], uint64(len(t.Key)))
|
|
|
|
dst = append(dst, buf[:n]...)
|
|
|
|
dst = append(dst, t.Key...)
|
|
|
|
|
|
|
|
// Append value.
|
|
|
|
n = binary.PutUvarint(buf[:], uint64(len(t.Value)))
|
|
|
|
dst = append(dst, buf[:n]...)
|
|
|
|
dst = append(dst, t.Value...)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Calculate checksum.
|
|
|
|
e.Checksum = crc32.ChecksumIEEE(dst[start:])
|
|
|
|
|
|
|
|
// Append checksum.
|
|
|
|
binary.BigEndian.PutUint32(buf[:4], e.Checksum)
|
|
|
|
dst = append(dst, buf[:4]...)
|
|
|
|
|
|
|
|
return dst
|
|
|
|
}
|
|
|
|
|
2016-10-25 14:36:58 +00:00
|
|
|
type logSerie struct {
|
|
|
|
name []byte
|
|
|
|
tags models.Tags
|
|
|
|
deleted bool
|
2017-01-10 15:57:59 +00:00
|
|
|
offset uint64
|
2016-10-25 14:36:58 +00:00
|
|
|
}
|
|
|
|
|
2016-11-10 15:45:27 +00:00
|
|
|
func (s *logSerie) Name() []byte { return s.name }
|
|
|
|
func (s *logSerie) Tags() models.Tags { return s.tags }
|
|
|
|
func (s *logSerie) Deleted() bool { return s.deleted }
|
|
|
|
func (s *logSerie) Expr() influxql.Expr { return nil }
|
|
|
|
|
2016-10-25 14:36:58 +00:00
|
|
|
type logSeries []logSerie
|
|
|
|
|
|
|
|
func (a logSeries) Len() int { return len(a) }
|
|
|
|
func (a logSeries) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
|
|
func (a logSeries) Less(i, j int) bool {
|
|
|
|
if cmp := bytes.Compare(a[i].name, a[j].name); cmp != 0 {
|
|
|
|
return cmp == -1
|
|
|
|
}
|
|
|
|
return models.CompareTags(a[i].tags, a[j].tags) == -1
|
|
|
|
}
|
|
|
|
|
|
|
|
// logMeasurements represents a map of measurement names to measurements.
|
2017-01-02 16:29:18 +00:00
|
|
|
type logMeasurements map[string]*logMeasurement
|
2016-10-25 14:36:58 +00:00
|
|
|
|
|
|
|
// names returns a sorted list of measurement names.
|
|
|
|
func (m logMeasurements) names() []string {
|
|
|
|
a := make([]string, 0, len(m))
|
|
|
|
for name := range m {
|
|
|
|
a = append(a, name)
|
|
|
|
}
|
|
|
|
sort.Strings(a)
|
|
|
|
return a
|
|
|
|
}
|
|
|
|
|
2016-10-21 15:31:40 +00:00
|
|
|
type logMeasurement struct {
|
|
|
|
name []byte
|
2016-11-29 18:09:33 +00:00
|
|
|
tagSet map[string]logTagKey
|
2016-10-21 15:31:40 +00:00
|
|
|
deleted bool
|
2016-12-26 17:17:14 +00:00
|
|
|
series map[string]*logSerie
|
2016-10-25 14:36:58 +00:00
|
|
|
|
|
|
|
// Compaction fields.
|
|
|
|
offset int64 // tagset offset
|
|
|
|
size int64 // tagset size
|
2017-01-10 15:57:59 +00:00
|
|
|
seriesIDs []uint64 // series offsets
|
2016-10-21 15:31:40 +00:00
|
|
|
}
|
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
func (m *logMeasurement) Name() []byte { return m.name }
|
|
|
|
func (m *logMeasurement) Deleted() bool { return m.deleted }
|
2016-10-31 14:46:07 +00:00
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
func (m *logMeasurement) createTagSetIfNotExists(key []byte) logTagKey {
|
2016-11-11 16:25:53 +00:00
|
|
|
ts, ok := m.tagSet[string(key)]
|
|
|
|
if !ok {
|
2016-11-29 18:09:33 +00:00
|
|
|
ts = logTagKey{name: key, tagValues: make(map[string]logTagValue)}
|
2016-11-11 16:25:53 +00:00
|
|
|
}
|
|
|
|
return ts
|
|
|
|
}
|
|
|
|
|
2016-10-31 14:46:07 +00:00
|
|
|
// logMeasurementSlice is a sortable list of log measurements.
|
|
|
|
type logMeasurementSlice []logMeasurement
|
|
|
|
|
|
|
|
func (a logMeasurementSlice) Len() int { return len(a) }
|
|
|
|
func (a logMeasurementSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
|
|
func (a logMeasurementSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
|
|
|
|
|
|
|
|
// logMeasurementIterator represents an iterator over a slice of measurements.
|
|
|
|
type logMeasurementIterator struct {
|
|
|
|
mms []logMeasurement
|
|
|
|
}
|
|
|
|
|
|
|
|
// Next returns the next element in the iterator.
|
|
|
|
func (itr *logMeasurementIterator) Next() (e MeasurementElem) {
|
|
|
|
if len(itr.mms) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
e, itr.mms = &itr.mms[0], itr.mms[1:]
|
|
|
|
return e
|
|
|
|
}
|
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
type logTagKey struct {
|
2016-10-21 15:31:40 +00:00
|
|
|
name []byte
|
|
|
|
deleted bool
|
2016-11-11 16:25:53 +00:00
|
|
|
tagValues map[string]logTagValue
|
|
|
|
}
|
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
func (tk *logTagKey) Key() []byte { return tk.name }
|
|
|
|
func (tk *logTagKey) Deleted() bool { return tk.deleted }
|
|
|
|
|
|
|
|
func (tk *logTagKey) TagValueIterator() TagValueIterator {
|
|
|
|
a := make([]logTagValue, 0, len(tk.tagValues))
|
|
|
|
for _, v := range tk.tagValues {
|
|
|
|
a = append(a, v)
|
|
|
|
}
|
|
|
|
return newLogTagValueIterator(a)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (tk *logTagKey) createTagValueIfNotExists(value []byte) logTagValue {
|
|
|
|
tv, ok := tk.tagValues[string(value)]
|
2016-11-11 16:25:53 +00:00
|
|
|
if !ok {
|
2016-12-26 17:17:14 +00:00
|
|
|
tv = logTagValue{name: value, series: make(map[string]*logSerie)}
|
2016-11-11 16:25:53 +00:00
|
|
|
}
|
|
|
|
return tv
|
2016-10-21 15:31:40 +00:00
|
|
|
}
|
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
// logTagKey is a sortable list of log tag keys.
|
|
|
|
type logTagKeySlice []logTagKey
|
|
|
|
|
|
|
|
func (a logTagKeySlice) Len() int { return len(a) }
|
|
|
|
func (a logTagKeySlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
|
|
func (a logTagKeySlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
|
|
|
|
|
2016-10-21 15:31:40 +00:00
|
|
|
type logTagValue struct {
|
|
|
|
name []byte
|
|
|
|
deleted bool
|
2016-12-26 17:17:14 +00:00
|
|
|
series map[string]*logSerie
|
2016-10-25 14:36:58 +00:00
|
|
|
|
|
|
|
// Compaction fields.
|
2017-01-10 15:57:59 +00:00
|
|
|
seriesIDs []uint64
|
2016-10-21 15:31:40 +00:00
|
|
|
}
|
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
func (tv *logTagValue) Value() []byte { return tv.name }
|
|
|
|
func (tv *logTagValue) Deleted() bool { return tv.deleted }
|
2016-11-27 20:15:32 +00:00
|
|
|
|
|
|
|
// logTagValue is a sortable list of log tag values.
|
|
|
|
type logTagValueSlice []logTagValue
|
|
|
|
|
|
|
|
func (a logTagValueSlice) Len() int { return len(a) }
|
|
|
|
func (a logTagValueSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
|
|
func (a logTagValueSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
|
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
// logTagKeyIterator represents an iterator over a slice of tag keys.
|
|
|
|
type logTagKeyIterator struct {
|
|
|
|
a []logTagKey
|
|
|
|
}
|
|
|
|
|
|
|
|
// newLogTagKeyIterator returns a new instance of logTagKeyIterator.
|
|
|
|
func newLogTagKeyIterator(a []logTagKey) *logTagKeyIterator {
|
|
|
|
sort.Sort(logTagKeySlice(a))
|
|
|
|
return &logTagKeyIterator{a: a}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Next returns the next element in the iterator.
|
|
|
|
func (itr *logTagKeyIterator) Next() (e TagKeyElem) {
|
|
|
|
if len(itr.a) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
e, itr.a = &itr.a[0], itr.a[1:]
|
|
|
|
return e
|
|
|
|
}
|
|
|
|
|
2016-11-27 20:15:32 +00:00
|
|
|
// logTagValueIterator represents an iterator over a slice of tag values.
|
|
|
|
type logTagValueIterator struct {
|
|
|
|
a []logTagValue
|
|
|
|
}
|
|
|
|
|
|
|
|
// newLogTagValueIterator returns a new instance of logTagValueIterator.
|
|
|
|
func newLogTagValueIterator(a []logTagValue) *logTagValueIterator {
|
|
|
|
sort.Sort(logTagValueSlice(a))
|
|
|
|
return &logTagValueIterator{a: a}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Next returns the next element in the iterator.
|
|
|
|
func (itr *logTagValueIterator) Next() (e TagValueElem) {
|
|
|
|
if len(itr.a) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
e, itr.a = &itr.a[0], itr.a[1:]
|
|
|
|
return e
|
|
|
|
}
|
|
|
|
|
2016-11-10 15:45:27 +00:00
|
|
|
// logSeriesIterator represents an iterator over a slice of series.
|
|
|
|
type logSeriesIterator struct {
|
|
|
|
series logSeries
|
|
|
|
}
|
|
|
|
|
2016-11-27 16:34:03 +00:00
|
|
|
// newLogSeriesIterator returns a new instance of logSeriesIterator.
|
|
|
|
// All series are copied to the iterator.
|
2016-12-26 17:17:14 +00:00
|
|
|
func newLogSeriesIterator(m map[string]*logSerie) *logSeriesIterator {
|
|
|
|
if len(m) == 0 {
|
2016-12-15 15:31:18 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-12-26 17:17:14 +00:00
|
|
|
itr := logSeriesIterator{series: make(logSeries, 0, len(m))}
|
|
|
|
for _, serie := range m {
|
|
|
|
itr.series = append(itr.series, *serie)
|
|
|
|
}
|
|
|
|
sort.Sort(itr.series)
|
2016-11-27 16:34:03 +00:00
|
|
|
return &itr
|
|
|
|
}
|
|
|
|
|
2016-11-10 15:45:27 +00:00
|
|
|
// Next returns the next element in the iterator.
|
|
|
|
func (itr *logSeriesIterator) Next() (e SeriesElem) {
|
|
|
|
if len(itr.series) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
e, itr.series = &itr.series[0], itr.series[1:]
|
|
|
|
return e
|
|
|
|
}
|
2016-12-15 15:31:18 +00:00
|
|
|
|
|
|
|
// FormatLogFileName generates a log filename for the given index.
|
|
|
|
func FormatLogFileName(i int) string {
|
|
|
|
return fmt.Sprintf("%08d%s", i, LogFileExt)
|
|
|
|
}
|