diff --git a/tsdb/engine.go b/tsdb/engine.go index fa6ad12604..95fb5a5c10 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -51,7 +51,7 @@ type Engine interface { MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) MeasurementFields(measurement string) *MeasurementFields - ForEachMeasurement(fn func(name []byte) error) error + ForEachMeasurementName(fn func(name []byte) error) error DeleteMeasurement(name []byte) error // TagKeys(name []byte) ([][]byte, error) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 43abf3dc7c..c1ea5cc236 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -457,11 +457,6 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error { e.index = index e.FileStore.dereferencer = index - // Open the index if it's not already open. - if err := index.Open(); err != nil { - return err - } - if err := e.FileStore.WalkKeys(func(key []byte, typ byte) error { fieldType, err := tsmFieldTypeToInfluxQLDataType(typ) if err != nil { @@ -830,9 +825,9 @@ func (e *Engine) DeleteSeriesRange(seriesKeys [][]byte, min, max int64) error { return nil } -// ForEachMeasurement iterates over each measurement name in the engine. -func (e *Engine) ForEachMeasurement(fn func(name []byte) error) error { - return e.index.ForEachMeasurement(fn) +// ForEachMeasurementName iterates over each measurement name in the engine. +func (e *Engine) ForEachMeasurementName(fn func(name []byte) error) error { + return e.index.ForEachMeasurementName(fn) } // DeleteMeasurement deletes a measurement and all related series. diff --git a/tsdb/index.go b/tsdb/index.go index cb2dc47b60..7e06b0aed0 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -18,7 +18,7 @@ type Index interface { MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) DropMeasurement(name []byte) error - ForEachMeasurement(fn func(name []byte) error) error + ForEachMeasurementName(fn func(name []byte) error) error CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error DropSeries(keys [][]byte) error diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index 6d32b60c4a..465c361829 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -551,8 +551,8 @@ func (i *Index) SetFieldName(measurement, name string) { m.SetFieldName(name) } -// ForEachMeasurement iterates over each measurement -func (i *Index) ForEachMeasurement(fn func(name []byte) error) error { +// ForEachMeasurementName iterates over each measurement name. +func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error { i.mu.RLock() defer i.mu.RUnlock() diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index e66df0182a..a67a5235e1 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -2,13 +2,18 @@ package tsi1 import ( "bytes" + "encoding/json" "errors" "fmt" + "io/ioutil" + "log" "os" "path/filepath" "regexp" "sort" + "strconv" "sync" + "time" "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" @@ -17,32 +22,80 @@ import ( "github.com/influxdata/influxdb/tsdb" ) +// Default compaction thresholds. +const ( + DefaultMaxLogFileSize = 1 * 1024 * 1024 // 10MB + + DefaultCompactionMonitorInterval = 30 * time.Second +) + func init() { tsdb.RegisterIndex("tsi1", func(id uint64, path string, opt tsdb.EngineOptions) tsdb.Index { - return &Index{Path: path} + idx := NewIndex() + idx.ShardID = id + idx.Path = path + return idx }) } // File extensions. const ( - LogFileExt = ".tsi.log" + LogFileExt = ".tsl" IndexFileExt = ".tsi" + + CompactingExt = ".compacting" ) +// ManifestFileName is the name of the index manifest file. +const ManifestFileName = "MANIFEST" + // Ensure index implements the interface. var _ tsdb.Index = &Index{} // Index represents a collection of layered index files and WAL. type Index struct { - Path string - mu sync.RWMutex + opened bool logFiles []*LogFile indexFiles IndexFiles + // Compaction management. + manualCompactNotify chan compactNotify + fastCompactNotify chan struct{} + + // Close management. + closing chan struct{} + wg sync.WaitGroup + // Fieldset shared with engine. - // TODO: Move field management into index. fieldset *tsdb.MeasurementFieldSet + + // Associated shard info. + ShardID uint64 + + // Root directory of the index files. + Path string + + // Log file compaction thresholds. + MaxLogFileSize int64 + + // Frequency of compaction checks. + CompactionMonitorInterval time.Duration +} + +// NewIndex returns a new instance of Index. +func NewIndex() *Index { + return &Index{ + manualCompactNotify: make(chan compactNotify), + fastCompactNotify: make(chan struct{}), + + closing: make(chan struct{}), + + // Default compaction thresholds. + MaxLogFileSize: DefaultMaxLogFileSize, + + CompactionMonitorInterval: DefaultCompactionMonitorInterval, + } } // Open opens the index. @@ -50,63 +103,110 @@ func (i *Index) Open() error { i.mu.Lock() defer i.mu.Unlock() - // Open root index directory. - f, err := os.Open(i.Path) - if err != nil { - return err + if i.opened { + return errors.New("index already open") } - defer f.Close() - // Open all log & index files. - names, err := f.Readdirnames(-1) - if err != nil { + // Create directory if it doesn't exist. + if err := os.MkdirAll(i.Path, 0777); err != nil { return err } - for _, name := range names { - switch filepath.Ext(name) { - case LogFileExt: - if err := i.openLogFile(name); err != nil { - return err - } - case IndexFileExt: - if err := i.openIndexFile(name); err != nil { - return err - } - } + + // Read manifest file. + m, err := ReadManifestFile(filepath.Join(i.Path, ManifestFileName)) + if os.IsNotExist(err) { + m = &Manifest{} + } else if err != nil { + return err } // Ensure at least one log file exists. - if len(i.logFiles) == 0 { - path := filepath.Join(i.Path, fmt.Sprintf("%08x%s", 0, LogFileExt)) - if err := i.openLogFile(path); err != nil { + if len(m.LogFiles) == 0 { + m.LogFiles = []string{FormatLogFileName(1)} + + if err := i.writeManifestFile(); err != nil { return err } } + // Open each log file in the manifest. + for _, filename := range m.LogFiles { + f, err := i.openLogFile(filepath.Join(i.Path, filename)) + if err != nil { + return err + } + i.logFiles = append(i.logFiles, f) + } + + // Open each index file in the manifest. + for _, filename := range m.IndexFiles { + f, err := i.openIndexFile(filepath.Join(i.Path, filename)) + if err != nil { + return err + } + i.indexFiles = append(i.indexFiles, f) + } + + // Delete any files not in the manifest. + if err := i.deleteNonManifestFiles(m); err != nil { + return err + } + + // Start compaction monitor. + i.wg.Add(1) + go func() { defer i.wg.Done(); i.monitorCompaction() }() + + // Mark opened. + i.opened = true + return nil } // openLogFile opens a log file and appends it to the index. -func (i *Index) openLogFile(path string) error { +func (i *Index) openLogFile(path string) (*LogFile, error) { f := NewLogFile() f.Path = path if err := f.Open(); err != nil { - return err + return nil, err } - - i.logFiles = append(i.logFiles, f) - return nil + return f, nil } // openIndexFile opens a log file and appends it to the index. -func (i *Index) openIndexFile(path string) error { +func (i *Index) openIndexFile(path string) (*IndexFile, error) { f := NewIndexFile() f.Path = path if err := f.Open(); err != nil { + return nil, err + } + return f, nil +} + +// deleteNonManifestFiles removes all files not in the manifest. +func (i *Index) deleteNonManifestFiles(m *Manifest) error { + dir, err := os.Open(i.Path) + if err != nil { + return err + } + defer dir.Close() + + fis, err := dir.Readdir(-1) + if err != nil { return err } - i.indexFiles = append(i.indexFiles, f) + // Loop over all files and remove any not in the manifest. + for _, fi := range fis { + filename := filepath.Base(fi.Name()) + if filename == ManifestFileName || m.HasFile(filename) { + continue + } + + if err := os.RemoveAll(filename); err != nil { + return err + } + } + return nil } @@ -130,6 +230,49 @@ func (i *Index) Close() error { return nil } +// ManifestPath returns the path to the index's manifest file. +func (i *Index) ManifestPath() string { + return filepath.Join(i.Path, ManifestFileName) +} + +// Manifest returns a manifest for the index. +func (i *Index) Manifest() *Manifest { + m := &Manifest{ + LogFiles: make([]string, len(i.logFiles)), + IndexFiles: make([]string, len(i.indexFiles)), + } + + for j, f := range i.logFiles { + m.LogFiles[j] = filepath.Base(f.Path) + } + for j, f := range i.indexFiles { + m.IndexFiles[j] = filepath.Base(f.Path) + } + + return m +} + +// writeManifestFile writes the manifest to the appropriate file path. +func (i *Index) writeManifestFile() error { + return WriteManifestFile(i.ManifestPath(), i.Manifest()) +} + +// maxFileID returns the highest file id from the index. +func (i *Index) maxFileID() int { + var max int + for _, f := range i.logFiles { + if i := ParseFileID(f.Path); i > max { + max = i + } + } + for _, f := range i.indexFiles { + if i := ParseFileID(f.Path); i > max { + max = i + } + } + return max +} + // SetFieldSet sets a shared field set from the engine. func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) { i.mu.Lock() @@ -137,14 +280,6 @@ func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) { i.mu.Unlock() } -// SetLogFiles explicitly sets log files. -// TEMPORARY: For testing only. -func (i *Index) SetLogFiles(a ...*LogFile) { i.logFiles = a } - -// SetIndexFiles explicitly sets index files -// TEMPORARY: For testing only. -func (i *Index) SetIndexFiles(a ...*IndexFile) { i.indexFiles = IndexFiles(a) } - // FileN returns the number of log and index files within the index. func (i *Index) FileN() int { return len(i.logFiles) + len(i.indexFiles) } @@ -175,41 +310,8 @@ func (i *Index) SeriesIterator() SeriesIterator { return FilterUndeletedSeriesIterator(MergeSeriesIterators(a...)) } -// Measurement retrieves a measurement by name. -func (i *Index) Measurement(name []byte) (*tsdb.Measurement, error) { - return i.measurement(name), nil -} - -func (i *Index) measurement(name []byte) *tsdb.Measurement { - m := tsdb.NewMeasurement(string(name)) - - // Iterate over measurement series. - itr := i.MeasurementSeriesIterator(name) - - var id uint64 // TEMPORARY - for e := itr.Next(); e != nil; e = itr.Next() { - // TODO: Handle deleted series. - - // Append series to to measurement. - // TODO: Remove concept of series ids. - m.AddSeries(&tsdb.Series{ - ID: id, - Key: string(e.Name()), - Tags: models.CopyTags(e.Tags()), - }) - - // TEMPORARY: Increment ID. - id++ - } - - if !m.HasSeries() { - return nil - } - return m -} - -// ForEachMeasurement iterates over all measurements in the index. -func (i *Index) ForEachMeasurement(fn func(name []byte) error) error { +// ForEachMeasurementName iterates over all measurement names in the index. +func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error { itr := i.MeasurementIterator() if itr == nil { return nil @@ -248,16 +350,6 @@ func (i *Index) TagKeyIterator(name []byte) TagKeyIterator { return MergeTagKeyIterators(a...) } -// Measurements returns a list of all measurements. -func (i *Index) Measurements() (tsdb.Measurements, error) { - var mms tsdb.Measurements - itr := i.MeasurementIterator() - for e := itr.Next(); e != nil; e = itr.Next() { - mms = append(mms, i.measurement(e.Name())) - } - return mms, nil -} - // MeasurementIterator returns an iterator over all measurements in the index. func (i *Index) MeasurementIterator() MeasurementIterator { a := make([]MeasurementIterator, 0, i.FileN()) @@ -466,19 +558,33 @@ func (i *Index) DropMeasurement(name []byte) error { } // Mark measurement as deleted. - return i.logFiles[0].DeleteMeasurement(name) + if err := i.logFiles[0].DeleteMeasurement(name); err != nil { + return err + } + + i.CheckFastCompaction() + return nil } // CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted. func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error { - if e := i.Series(name, tags); e != nil { + i.mu.RLock() + defer i.mu.RUnlock() + + if e := i.series(name, tags); e != nil { return nil } - return i.logFiles[0].AddSeries(name, tags) + + if err := i.logFiles[0].AddSeries(name, tags); err != nil { + return err + } + + i.checkFastCompaction() + return nil } -// Series returns a series by name/tags. -func (i *Index) Series(name []byte, tags models.Tags) SeriesElem { +// series returns a series by name/tags. +func (i *Index) series(name []byte, tags models.Tags) SeriesElem { for _, f := range i.files() { if e := f.Series(name, tags); e != nil && !e.Deleted() { return e @@ -498,6 +604,8 @@ func (i *Index) DropSeries(keys [][]byte) error { return err } } + + i.CheckFastCompaction() return nil } @@ -1026,6 +1134,280 @@ func (i *Index) SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iter return newSeriesPointIterator(i, opt), nil } +// Compact runs a compaction check. Returns once the check is complete. +// If force is true then all files are compacted into a single index file regardless of size. +func (i *Index) Compact(force bool) error { + info := compactNotify{force: force, ch: make(chan error)} + i.manualCompactNotify <- info + + select { + case err := <-info.ch: + return err + case <-i.closing: + return nil + } +} + +// monitorCompaction periodically checks for files that need to be compacted. +func (i *Index) monitorCompaction() { + // Ignore full compaction if interval is unset. + var c <-chan time.Time + if i.CompactionMonitorInterval > 0 { + ticker := time.NewTicker(i.CompactionMonitorInterval) + c = ticker.C + defer ticker.Stop() + } + + // Wait for compaction checks or for the index to close. + for { + select { + case <-i.closing: + return + case <-i.fastCompactNotify: + if err := i.compactLogFile(); err != nil { + log.Printf("fast compaction error: %s", err) + } + case <-c: + if err := i.checkFullCompaction(false); err != nil { + log.Printf("full compaction error: %s", err) + } + case info := <-i.manualCompactNotify: + if err := i.compactLogFile(); err != nil { + info.ch <- err + continue + } else if err := i.checkFullCompaction(info.force); err != nil { + info.ch <- err + continue + } + info.ch <- nil + } + } +} + +// compactLogFile starts a new log file and compacts the previous one. +func (i *Index) compactLogFile() error { + if err := i.prependNewLogFile(); err != nil { + return err + } + if err := i.compactSecondaryLogFile(); err != nil { + return err + } + return nil +} + +// prependNewLogFile adds a new log file so that the current log file can be compacted. +// This function is a no-op if there is currently more than one log file. +func (i *Index) prependNewLogFile() error { + i.mu.Lock() + defer i.mu.Unlock() + + // Ignore if there is already a secondary log file that needs compacting. + if len(i.logFiles) == 2 { + return nil + } else if len(i.logFiles) > 2 { + panic("should not have more than two log files at a time") + } + + // Generate new file identifier. + id := i.maxFileID() + 1 + + // Open file and insert it into the first position. + f, err := i.openLogFile(filepath.Join(i.Path, FormatLogFileName(id))) + if err != nil { + return err + } + i.logFiles = append([]*LogFile{f}, i.logFiles...) + + // Write new manifest. + if err := i.writeManifestFile(); err != nil { + // TODO: Close index if write fails. + return err + } + + return nil +} + +// compactSecondaryLogFile compacts the secondary log file into an index file. +func (i *Index) compactSecondaryLogFile() error { + id, logFile := func() (int, *LogFile) { + i.mu.Lock() + defer i.mu.Unlock() + + if len(i.logFiles) < 2 { + return 0, nil + } + return i.maxFileID() + 1, i.logFiles[1] + }() + + // Exit if there is no secondary log file. + if logFile == nil { + return nil + } + + // Create new index file. + path := filepath.Join(i.Path, FormatIndexFileName(id)) + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + // Compact log file to new index file. + if _, err := logFile.WriteTo(f); err != nil { + return err + } + + // Close file. + if err := f.Close(); err != nil { + return err + } + + // Reopen as an index file. + file := NewIndexFile() + file.Path = path + if err := file.Open(); err != nil { + return err + } + + // Obtain lock to swap in index file and write manifest. + i.mu.Lock() + defer i.mu.Unlock() + + // Remove old log file and prepend new index file. + i.logFiles = []*LogFile{i.logFiles[0]} + i.indexFiles = append(IndexFiles{file}, i.indexFiles...) + + // TODO: Close old log file. + + // Write new manifest. + if err := i.writeManifestFile(); err != nil { + // TODO: Close index if write fails. + return err + } + + return nil +} + +// checkFullCompaction compacts all index files if the total size of index files +// is double the size of the largest index file. If force is true then all files +// are compacted regardless of size. +func (i *Index) checkFullCompaction(force bool) error { + // Only perform size check if compaction check is not forced. + if !force { + // Calculate total & max file sizes. + maxN, totalN, err := i.indexFileStats() + if err != nil { + return err + } + + // Ignore if total is not twice the size of the largest index file. + if maxN*2 < totalN { + return nil + } + } + + // Retrieve list of index files under lock. + i.mu.Lock() + indexFiles := i.indexFiles + id := i.maxFileID() + 1 + i.mu.Unlock() + + // Ignore if there are not at least two index files. + if len(indexFiles) < 2 { + return nil + } + + // Create new index file. + path := filepath.Join(i.Path, FormatIndexFileName(id)) + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + // Compact all index files to new index file. + if _, err := indexFiles.WriteTo(f); err != nil { + return err + } + + // Close file. + if err := f.Close(); err != nil { + return err + } + + // Reopen as an index file. + file := NewIndexFile() + file.Path = path + if err := file.Open(); err != nil { + return err + } + + // Obtain lock to swap in index file and write manifest. + i.mu.Lock() + defer i.mu.Unlock() + + // Replace index files with new index file. + i.indexFiles = IndexFiles{file} + + // TODO: Close old index files. + + // Write new manifest. + if err := i.writeManifestFile(); err != nil { + // TODO: Close index if write fails. + return err + } + + return nil +} + +// indexFileStats returns the max index file size and the total file size for all index files. +func (i *Index) indexFileStats() (maxN, totalN int64, err error) { + // Retrieve index file list under lock. + i.mu.Lock() + indexFiles := i.indexFiles + i.mu.Unlock() + + // Iterate over each file and determine size. + for _, f := range indexFiles { + fi, err := os.Stat(f.Path) + if os.IsNotExist(err) { + continue + } else if err != nil { + return 0, 0, err + } else if fi.Size() > maxN { + maxN = fi.Size() + } + totalN += fi.Size() + } + return maxN, totalN, nil +} + +// CheckFastCompaction notifies the index to begin compacting log file if the +// log file is above the max log file size. +func (i *Index) CheckFastCompaction() { + i.mu.Lock() + defer i.mu.Unlock() + i.checkFastCompaction() +} + +func (i *Index) checkFastCompaction() { + if i.logFiles[0].Size() < i.MaxLogFileSize { + return + } + + // Send signal to begin compaction of current log file. + select { + case i.fastCompactNotify <- struct{}{}: + default: + } +} + +// compactNotify represents a manual compaction notification. +type compactNotify struct { + force bool + ch chan error +} + // File represents a log or index file. type File interface { Measurement(name []byte) MeasurementElem @@ -1158,3 +1540,69 @@ func intersectStringSets(a, b map[string]struct{}) map[string]struct{} { } return other } + +var fileIDRegex = regexp.MustCompile(`^(\d+)\..+$`) + +// ParseFileID extracts the numeric id from a log or index file path. +// Returns 0 if it cannot be parsed. +func ParseFileID(name string) int { + a := fileIDRegex.FindStringSubmatch(filepath.Base(name)) + if a == nil { + return 0 + } + + i, _ := strconv.Atoi(a[1]) + return i +} + +// Manifest represents the list of log & index files that make up the index. +// The files are listed in time order, not necessarily ID order. +type Manifest struct { + LogFiles []string `json:"logs,omitempty"` + IndexFiles []string `json:"indexes,omitempty"` +} + +// HasFile returns true if name is listed in the log files or index files. +func (m *Manifest) HasFile(name string) bool { + for _, filename := range m.LogFiles { + if filename == name { + return true + } + } + for _, filename := range m.IndexFiles { + if filename == name { + return true + } + } + return false +} + +// ReadManifestFile reads a manifest from a file path. +func ReadManifestFile(path string) (*Manifest, error) { + buf, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + + // Decode manifest. + var m Manifest + if err := json.Unmarshal(buf, &m); err != nil { + return nil, err + } + return &m, nil +} + +// WriteManifestFile writes a manifest to a file path. +func WriteManifestFile(path string, m *Manifest) error { + buf, err := json.MarshalIndent(m, "", " ") + if err != nil { + return err + } + buf = append(buf, '\n') + + if err := ioutil.WriteFile(path, buf, 0666); err != nil { + return err + } + + return nil +} diff --git a/tsdb/index/tsi1/index_file.go b/tsdb/index/tsi1/index_file.go index 2b11631024..1380d5bd06 100644 --- a/tsdb/index/tsi1/index_file.go +++ b/tsdb/index/tsi1/index_file.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" "io" "github.com/influxdata/influxdb/models" @@ -47,7 +48,8 @@ type IndexFile struct { tblks map[string]*TagBlock // tag blocks by measurement name mblk MeasurementBlock - // Path to data file. + // Sortable identifier & filepath to the log file. + ID int Path string } @@ -58,10 +60,16 @@ func NewIndexFile() *IndexFile { // Open memory maps the data file at the file's path. func (f *IndexFile) Open() error { + // Extract identifier from path name, if possible. + if id := ParseFileID(f.Path); id > 0 { + f.ID = id + } + data, err := mmap.Map(f.Path) if err != nil { return err } + return f.UnmarshalBinary(data) } @@ -330,3 +338,8 @@ func (t *IndexFileTrailer) WriteTo(w io.Writer) (n int64, err error) { return n, nil } + +// FormatIndexFileName generates an index filename for the given index. +func FormatIndexFileName(i int) string { + return fmt.Sprintf("%08d%s", i, IndexFileExt) +} diff --git a/tsdb/index/tsi1/index_file_test.go b/tsdb/index/tsi1/index_file_test.go index dabac5fff5..ca043522d2 100644 --- a/tsdb/index/tsi1/index_file_test.go +++ b/tsdb/index/tsi1/index_file_test.go @@ -76,7 +76,7 @@ func CreateIndexFile(series []Series) (*tsi1.IndexFile, error) { // Write index file to buffer. var buf bytes.Buffer - if _, err := lf.CompactTo(&buf); err != nil { + if _, err := lf.WriteTo(&buf); err != nil { return nil, err } @@ -99,7 +99,7 @@ func GenerateIndexFile(measurementN, tagN, valueN int) (*tsi1.IndexFile, error) // Compact log file to buffer. var buf bytes.Buffer - if _, err := lf.CompactTo(&buf); err != nil { + if _, err := lf.WriteTo(&buf); err != nil { return nil, err } diff --git a/tsdb/index/tsi1/index_test.go b/tsdb/index/tsi1/index_test.go index 6a7c13000c..11f1a048eb 100644 --- a/tsdb/index/tsi1/index_test.go +++ b/tsdb/index/tsi1/index_test.go @@ -1,67 +1,417 @@ package tsi1_test import ( + "fmt" + "os" + "reflect" "testing" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb/index/tsi1" ) -// Ensure index can return a single measurement by name. -func TestIndex_Measurement(t *testing.T) { - // Build an index file. - f, err := CreateIndexFile([]Series{ +// Ensure index can return an iterator over all series in the index. +func TestIndex_SeriesIterator(t *testing.T) { + idx := MustOpenIndex() + defer idx.Close() + + // Create initial set of series. + if err := idx.CreateSeriesSliceIfNotExists([]Series{ {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})}, - }) - if err != nil { + }); err != nil { t.Fatal(err) } - // Create an index from the single file. - var idx tsi1.Index - idx.SetIndexFiles(f) + // Verify initial set of series. + if err := idx.MultiInvoke(func(state string) { + itr := idx.SeriesIterator() + if itr == nil { + t.Fatalf("expected iterator(%s)", state) + } - // Verify measurement is correct. - if mm, err := idx.Measurement([]byte("cpu")); err != nil { + if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region east}]` { + t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String()) + } else if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region west}]` { + t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String()) + } else if e := itr.Next(); string(e.Name()) != `mem` || e.Tags().String() != `[{region east}]` { + t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String()) + } else if e := itr.Next(); e != nil { + t.Fatalf("expected nil series(%s): %s/%s", state, e.Name(), e.Tags().String()) + } + }); err != nil { t.Fatal(err) - } else if mm == nil { - t.Fatal("expected measurement") } - // Verify non-existent measurement doesn't exist. - if mm, err := idx.Measurement([]byte("no_such_measurement")); err != nil { + // Add more series. + if err := idx.CreateSeriesSliceIfNotExists([]Series{ + {Name: []byte("disk")}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, + }); err != nil { + t.Fatal(err) + } + + // Verify additional series. + if err := idx.MultiInvoke(func(state string) { + itr := idx.SeriesIterator() + if itr == nil { + t.Fatalf("expected iterator(%s)", state) + } + + if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region east}]` { + t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String()) + } else if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region north}]` { + t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String()) + } else if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region west}]` { + t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String()) + } else if e := itr.Next(); string(e.Name()) != `disk` || len(e.Tags()) != 0 { + t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String()) + } else if e := itr.Next(); string(e.Name()) != `mem` || e.Tags().String() != `[{region east}]` { + t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String()) + } else if e := itr.Next(); e != nil { + t.Fatalf("expected nil series(%s): %s/%s", state, e.Name(), e.Tags().String()) + } + }); err != nil { t.Fatal(err) - } else if mm != nil { - t.Fatal("expected nil measurement") } } -// Ensure index can return a list of all measurements. -func TestIndex_Measurements(t *testing.T) { - // Build an index file. - f, err := CreateIndexFile([]Series{ +// Ensure index can iterate over all measurement names. +func TestIndex_ForEachMeasurementName(t *testing.T) { + idx := MustOpenIndex() + defer idx.Close() + + // Add series to index. + if err := idx.CreateSeriesSliceIfNotExists([]Series{ {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})}, - }) - if err != nil { + }); err != nil { t.Fatal(err) } - // Create an index from the single file. - var idx tsi1.Index - idx.SetIndexFiles(f) + // Verify measurements are returned. + if err := idx.MultiInvoke(func(state string) { + var names []string + if err := idx.ForEachMeasurementName(func(name []byte) error { + names = append(names, string(name)) + return nil + }); err != nil { + t.Fatal(err) + } - // Retrieve measurements and verify. - if mms, err := idx.Measurements(); err != nil { + if !reflect.DeepEqual(names, []string{"cpu", "mem"}) { + t.Fatalf("unexpected names: %#v", names) + } + }); err != nil { + t.Fatal(err) + } + + // Add more series. + if err := idx.CreateSeriesSliceIfNotExists([]Series{ + {Name: []byte("disk")}, + {Name: []byte("mem")}, + }); err != nil { + t.Fatal(err) + } + + // Verify new measurements. + if err := idx.MultiInvoke(func(state string) { + var names []string + if err := idx.ForEachMeasurementName(func(name []byte) error { + names = append(names, string(name)) + return nil + }); err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(names, []string{"cpu", "disk", "mem"}) { + t.Fatalf("unexpected names: %#v", names) + } + }); err != nil { t.Fatal(err) - } else if len(mms) != 2 { - t.Fatalf("expected measurement count: %d", len(mms)) - } else if mms[0].Name != "cpu" { - t.Fatalf("unexpected measurement(0): %s", mms[0].Name) - } else if mms[1].Name != "mem" { - t.Fatalf("unexpected measurement(1): %s", mms[1].Name) } } + +// Ensure index can return an iterator over all series for one measurement. +func TestIndex_MeasurementSeriesIterator(t *testing.T) { + idx := MustOpenIndex() + defer idx.Close() + + // Create initial set of series. + if err := idx.CreateSeriesSliceIfNotExists([]Series{ + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, + {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})}, + }); err != nil { + t.Fatal(err) + } + + // Verify initial set of series. + if err := idx.MultiInvoke(func(state string) { + itr := idx.MeasurementSeriesIterator([]byte("cpu")) + if itr == nil { + t.Fatalf("expected iterator(%s)", state) + } + + if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region east}]` { + t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String()) + } else if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region west}]` { + t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String()) + } else if e := itr.Next(); e != nil { + t.Fatalf("expected nil series(%s): %s/%s", state, e.Name(), e.Tags().String()) + } + }); err != nil { + t.Fatal(err) + } + + // Add more series. + if err := idx.CreateSeriesSliceIfNotExists([]Series{ + {Name: []byte("disk")}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north"})}, + }); err != nil { + t.Fatal(err) + } + + // Verify additional series. + if err := idx.MultiInvoke(func(state string) { + itr := idx.MeasurementSeriesIterator([]byte("cpu")) + if itr == nil { + t.Fatalf("expected iterator(%s)", state) + } + + if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region east}]` { + t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String()) + } else if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region north}]` { + t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String()) + } else if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region west}]` { + t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String()) + } else if e := itr.Next(); e != nil { + t.Fatalf("expected nil series(%s): %s/%s", state, e.Name(), e.Tags().String()) + } + }); err != nil { + t.Fatal(err) + } +} + +// Ensure index can return an iterator over all measurements for the index. +func TestIndex_MeasurementIterator(t *testing.T) { + idx := MustOpenIndex() + defer idx.Close() + + // Create initial set of series. + if err := idx.CreateSeriesSliceIfNotExists([]Series{ + {Name: []byte("cpu")}, + {Name: []byte("mem")}, + }); err != nil { + t.Fatal(err) + } + + // Verify initial set of series. + if err := idx.MultiInvoke(func(state string) { + itr := idx.MeasurementIterator() + if itr == nil { + t.Fatalf("expected iterator(%s)", state) + } + + if e := itr.Next(); string(e.Name()) != `cpu` { + t.Fatalf("unexpected measurement(%s): %s", state, e.Name()) + } else if e := itr.Next(); string(e.Name()) != `mem` { + t.Fatalf("unexpected measurement(%s): %s", state, e.Name()) + } else if e := itr.Next(); e != nil { + t.Fatalf("expected nil measurement(%s): %s", state, e.Name()) + } + }); err != nil { + t.Fatal(err) + } + + // Add more series. + if err := idx.CreateSeriesSliceIfNotExists([]Series{ + {Name: []byte("disk"), Tags: models.NewTags(map[string]string{"foo": "bar"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north", "x": "y"})}, + }); err != nil { + t.Fatal(err) + } + + // Verify additional series. + if err := idx.MultiInvoke(func(state string) { + itr := idx.MeasurementIterator() + if itr == nil { + t.Fatalf("expected iterator(%s)", state) + } + + if e := itr.Next(); string(e.Name()) != `cpu` { + t.Fatalf("unexpected measurement(%s): %s", state, e.Name()) + } else if e := itr.Next(); string(e.Name()) != `disk` { + t.Fatalf("unexpected measurement(%s): %s", state, e.Name()) + } else if e := itr.Next(); string(e.Name()) != `mem` { + t.Fatalf("unexpected measurement(%s): %s", state, e.Name()) + } else if e := itr.Next(); e != nil { + t.Fatalf("expected nil measurement(%s): %s", state, e.Name()) + } + }); err != nil { + t.Fatal(err) + } +} + +// Ensure index can return an iterator over all keys for one measurement. +func TestIndex_TagKeyIterator(t *testing.T) { + idx := MustOpenIndex() + defer idx.Close() + + // Create initial set of series. + if err := idx.CreateSeriesSliceIfNotExists([]Series{ + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west", "type": "gpu"})}, + {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east", "misc": "other"})}, + }); err != nil { + t.Fatal(err) + } + + // Verify initial set of series. + if err := idx.MultiInvoke(func(state string) { + itr := idx.TagKeyIterator([]byte("cpu")) + if itr == nil { + t.Fatalf("expected iterator(%s)", state) + } + + if e := itr.Next(); string(e.Key()) != `region` { + t.Fatalf("unexpected key(%s): %s", state, e.Key()) + } else if e := itr.Next(); string(e.Key()) != `type` { + t.Fatalf("unexpected key(%s): %s", state, e.Key()) + } else if e := itr.Next(); e != nil { + t.Fatalf("expected nil key(%s): %s/%s", state, e.Key()) + } + }); err != nil { + t.Fatal(err) + } + + // Add more series. + if err := idx.CreateSeriesSliceIfNotExists([]Series{ + {Name: []byte("disk"), Tags: models.NewTags(map[string]string{"foo": "bar"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north", "x": "y"})}, + }); err != nil { + t.Fatal(err) + } + + // Verify additional series. + if err := idx.MultiInvoke(func(state string) { + itr := idx.TagKeyIterator([]byte("cpu")) + if itr == nil { + t.Fatalf("expected iterator(%s)", state) + } + + if e := itr.Next(); string(e.Key()) != `region` { + t.Fatalf("unexpected key(%s): %s", state, e.Key()) + } else if e := itr.Next(); string(e.Key()) != `type` { + t.Fatalf("unexpected key(%s): %s", state, e.Key()) + } else if e := itr.Next(); string(e.Key()) != `x` { + t.Fatalf("unexpected key(%s): %s", state, e.Key()) + } else if e := itr.Next(); e != nil { + t.Fatalf("expected nil key(%s): %s", state, e.Key()) + } + }); err != nil { + t.Fatal(err) + } +} + +// Index is a test wrapper for tsi1.Index. +type Index struct { + *tsi1.Index +} + +// NewIndex returns a new instance of Index at a temporary path. +func NewIndex() *Index { + idx := &Index{Index: tsi1.NewIndex()} + idx.Path = MustTempDir() + return idx +} + +// MustOpenIndex returns a new, open index. Panic on error. +func MustOpenIndex() *Index { + idx := NewIndex() + if err := idx.Open(); err != nil { + panic(err) + } + return idx +} + +// Close closes and removes the index directory. +func (idx *Index) Close() error { + defer os.RemoveAll(idx.Path) + return idx.Index.Close() +} + +// Reopen closes and opens the index. +func (idx *Index) Reopen() error { + if err := idx.Index.Close(); err != nil { + return err + } + + path := idx.Path + idx.Index = tsi1.NewIndex() + idx.Path = path + if err := idx.Open(); err != nil { + return err + } + return nil +} + +// MultiInvoke executes fn in several different states: +// +// - Immediately +// - After reopen +// - After compaction +// - After reopen again +// +// The index should always respond in the same fashion regardless of +// how data is stored. This helper allows the index to be easily tested +// in all major states. +func (idx *Index) MultiInvoke(fn func(state string)) error { + // Invoke immediately. + fn("initial") + + if testing.Verbose() { + println("[index] reopening") + } + + // Reopen and invoke again. + if err := idx.Reopen(); err != nil { + return fmt.Errorf("reopen error: %s", err) + } + fn("reopen") + + if testing.Verbose() { + println("[index] forcing compaction") + } + + // Force a compaction + if err := idx.Compact(true); err != nil { + return err + } + fn("post-compaction") + + if testing.Verbose() { + println("[index] reopening after compaction") + } + + // Reopen and invoke again. + if err := idx.Reopen(); err != nil { + return fmt.Errorf("post-compaction reopen error: %s", err) + } + fn("post-compaction-reopen") + + return nil +} + +// CreateSeriesSliceIfNotExists creates multiple series at a time. +func (idx *Index) CreateSeriesSliceIfNotExists(a []Series) error { + for i, s := range a { + if err := idx.CreateSeriesIfNotExists(nil, s.Name, s.Tags); err != nil { + return fmt.Errorf("i=%d, name=%s, tags=%s, err=%s", i, s.Name, s.Tags, err) + } + } + return nil +} diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index 1528be6963..4297828a1a 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" "hash/crc32" "io" "os" @@ -30,11 +31,11 @@ const ( // LogFile represents an on-disk write-ahead log file. type LogFile struct { - mu sync.RWMutex - data []byte // mmap - file *os.File // writer - buf []byte // marshaling buffer - entries []LogEntry // parsed entries + mu sync.RWMutex + data []byte // mmap + file *os.File // writer + buf []byte // marshaling buffer + size int64 // tracks current file size // In-memory index. mms logMeasurements @@ -74,6 +75,7 @@ func (f *LogFile) open() error { } else if fi.Size() == 0 { return nil } + f.size = fi.Size() // Open a read-only memory map of the existing data. data, err := mmap.Map(f.Path) @@ -83,14 +85,12 @@ func (f *LogFile) open() error { f.data = data // Read log entries from mmap. - f.entries = nil for buf := f.data; len(buf) > 0; { // Read next entry. var e LogEntry if err := e.UnmarshalBinary(buf); err != nil { return err } - f.entries = append(f.entries, e) // Execute entry against in-memory index. f.execEntry(&e) @@ -112,12 +112,19 @@ func (f *LogFile) Close() error { mmap.Unmap(f.data) } - f.entries = nil f.mms = make(logMeasurements) return nil } +// Size returns the tracked in-memory file size of the log file. +func (f *LogFile) Size() int64 { + f.mu.Lock() + n := f.size + f.mu.Unlock() + return n +} + // Measurement returns a measurement element. func (f *LogFile) Measurement(name []byte) MeasurementElem { f.mu.RLock() @@ -134,7 +141,10 @@ func (f *LogFile) Measurement(name []byte) MeasurementElem { func (f *LogFile) MeasurementNames() []string { f.mu.RLock() defer f.mu.RUnlock() + return f.measurementNames() +} +func (f *LogFile) measurementNames() []string { a := make([]string, 0, len(f.mms)) for name := range f.mms { a = append(a, name) @@ -173,8 +183,12 @@ func (f *LogFile) TagKeySeriesIterator(name, key []byte) SeriesIterator { // Combine iterators across all tag keys. itrs := make([]SeriesIterator, 0, len(tk.tagValues)) for _, tv := range tk.tagValues { + if len(tv.series) == 0 { + continue + } itrs = append(itrs, newLogSeriesIterator(tv.series)) } + return MergeSeriesIterators(itrs...) } @@ -266,7 +280,10 @@ func (f *LogFile) TagValueSeriesIterator(name, key, value []byte) SeriesIterator tv, ok := tk.tagValues[string(value)] if !ok { return nil + } else if len(tv.series) == 0 { + return nil } + return newLogSeriesIterator(tv.series) } @@ -351,7 +368,8 @@ func (f *LogFile) appendEntry(e *LogEntry) error { e.Size = len(f.buf) // Write record to file. - if n, err := f.file.Write(f.buf); err != nil { + n, err := f.file.Write(f.buf) + if err != nil { // Move position backwards over partial entry. // Log should be reopened if seeking cannot be completed. if n > 0 { @@ -362,8 +380,8 @@ func (f *LogFile) appendEntry(e *LogEntry) error { return err } - // Save entry to in-memory list. - f.entries = append(f.entries, *e) + // Update in-memory file size. + f.size += int64(n) return nil } @@ -441,7 +459,6 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) { } // Insert series to list. - // TODO: Remove global series list. mm.series.insert(e.Name, e.Tags, deleted) // Save measurement. @@ -468,6 +485,9 @@ func (f *LogFile) SeriesIterator() SeriesIterator { series = append(series, f.mms[string(name)].series...) } + if len(series) == 0 { + return nil + } return newLogSeriesIterator(series) } @@ -499,11 +519,17 @@ func (f *LogFile) MeasurementSeriesIterator(name []byte) SeriesIterator { defer f.mu.RUnlock() mm := f.mms[string(name)] + if len(mm.series) == 0 { + return nil + } return newLogSeriesIterator(mm.series) } -// CompactTo compacts the log file and writes it to w. -func (f *LogFile) CompactTo(w io.Writer) (n int64, err error) { +// WriteTo compacts the log file and writes it to w. +func (f *LogFile) WriteTo(w io.Writer) (n int64, err error) { + f.mu.Lock() + defer f.mu.Unlock() + var t IndexFileTrailer // Reset compaction fields. @@ -551,7 +577,7 @@ func (f *LogFile) writeSeriesBlockTo(w io.Writer, n *int64) error { sw := NewSeriesBlockWriter() // Retreve measurement names in order. - names := f.MeasurementNames() + names := f.measurementNames() // Add series from measurements in order. for _, name := range names { @@ -584,7 +610,7 @@ func (f *LogFile) writeSeriesBlockTo(w io.Writer, n *int64) error { // Lookup series offset. serie.offset = sw.Offset(serie.name, serie.tags) if serie.offset == 0 { - panic("series not found") + panic("series not found: " + string(serie.name) + " " + serie.tags.String()) } // Add series id to measurement, tag key, and tag value. @@ -941,30 +967,6 @@ func (a logTagValueSlice) Len() int { return len(a) } func (a logTagValueSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a logTagValueSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 } -/* -// insertEntry inserts an entry into the tag value in sorted order. -// If another entry matches the name/tags then it is overrwritten. -func (tv *logTagValue) insertEntry(e *LogEntry) { - i := sort.Search(len(tv.entries), func(i int) bool { - if cmp := bytes.Compare(tv.entries[i].Name, e.Name); cmp != 0 { - return cmp != -1 - } - return models.CompareTags(tv.entries[i].Tags, e.Tags) != -1 - }) - - // Update entry if it already exists. - if i < len(tv.entries) && bytes.Equal(tv.entries[i].Name, e.Name) && tv.entries[i].Tags.Equal(e.Tags) { - tv.entries[i] = *e - return - } - - // Otherwise insert new entry. - tv.entries = append(tv.entries, LogEntry{}) - copy(tv.entries[i+1:], tv.entries[i:]) - tv.entries[i] = *e -} -*/ - // logTagKeyIterator represents an iterator over a slice of tag keys. type logTagKeyIterator struct { a []logTagKey @@ -1013,6 +1015,10 @@ type logSeriesIterator struct { // newLogSeriesIterator returns a new instance of logSeriesIterator. // All series are copied to the iterator. func newLogSeriesIterator(a logSeries) *logSeriesIterator { + if len(a) == 0 { + return nil + } + itr := logSeriesIterator{series: make(logSeries, len(a))} copy(itr.series, a) return &itr @@ -1026,3 +1032,8 @@ func (itr *logSeriesIterator) Next() (e SeriesElem) { e, itr.series = &itr.series[0], itr.series[1:] return e } + +// FormatLogFileName generates a log filename for the given index. +func FormatLogFileName(i int) string { + return fmt.Sprintf("%08d%s", i, LogFileExt) +} diff --git a/tsdb/index/tsi1/measurement_block.go b/tsdb/index/tsi1/measurement_block.go index 68d5a11844..bddc1fc0a1 100644 --- a/tsdb/index/tsi1/measurement_block.go +++ b/tsdb/index/tsi1/measurement_block.go @@ -87,7 +87,7 @@ func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementBlockElem, ok bool) d++ if uint32(d) > n { - panic("empty hash data block") + return MeasurementBlockElem{}, false } } } diff --git a/tsdb/index/tsi1/series_block.go b/tsdb/index/tsi1/series_block.go index ac7d816d9c..9192abbad7 100644 --- a/tsdb/index/tsi1/series_block.go +++ b/tsdb/index/tsi1/series_block.go @@ -568,7 +568,7 @@ func (sw *SeriesBlockWriter) writeSeriesTo(w io.Writer, terms *TermList, offset seriesBuf = terms.AppendEncodedSeries(seriesBuf[:0], s.name, s.tags) // Join flag, varint(length), & dictionary-encoded series in buffer. - buf[0] = 0 // TODO(benbjohnson): series tombstone + buf[0] = s.flag() sz := binary.PutUvarint(buf[1:], uint64(len(seriesBuf))) buf = append(buf[:1+sz], seriesBuf...) @@ -750,6 +750,14 @@ type serie struct { offset uint32 } +func (s *serie) flag() uint8 { + var flag byte + if s.deleted { + flag |= SeriesTombstoneFlag + } + return flag +} + type series []serie func (a series) Len() int { return len(a) } diff --git a/tsdb/index/tsi1/tag_block.go b/tsdb/index/tsi1/tag_block.go index 5b45ac0ac0..b56e33a7c9 100644 --- a/tsdb/index/tsi1/tag_block.go +++ b/tsdb/index/tsi1/tag_block.go @@ -120,6 +120,10 @@ func (blk *TagBlock) TagKeyElem(key []byte) TagKeyElem { // Move position forward. pos = (pos + 1) % int(keyN) d++ + + if uint32(d) > keyN { + return nil + } } } @@ -165,6 +169,10 @@ func (blk *TagBlock) TagValueElem(key, value []byte) TagValueElem { // Move position forward. pos = (pos + 1) % int(valueN) d++ + + if uint32(d) > valueN { + return nil + } } } diff --git a/tsdb/index/tsi1/tsi1_test.go b/tsdb/index/tsi1/tsi1_test.go index 5e249fa201..c0e81fc24c 100644 --- a/tsdb/index/tsi1/tsi1_test.go +++ b/tsdb/index/tsi1/tsi1_test.go @@ -2,6 +2,7 @@ package tsi1_test import ( "bytes" + "io/ioutil" "reflect" "testing" @@ -296,3 +297,12 @@ func (itr *SeriesIterator) Next() (e tsi1.SeriesElem) { e, itr.Elems = &itr.Elems[0], itr.Elems[1:] return e } + +// MustTempDir returns a temporary directory. Panic on error. +func MustTempDir() string { + path, err := ioutil.TempDir("", "tsi-") + if err != nil { + panic(err) + } + return path +} diff --git a/tsdb/store.go b/tsdb/store.go index 4bda5a84c1..091c4698c8 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -733,7 +733,7 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi names = append(names, source.(*influxql.Measurement).Name) } } else { - if err := sh.engine.ForEachMeasurement(func(name []byte) error { + if err := sh.engine.ForEachMeasurementName(func(name []byte) error { names = append(names, string(name)) return nil }); err != nil {