tsi compaction

pull/7913/head
Ben Johnson 2016-12-15 08:31:18 -07:00
parent 83e80f6d0b
commit 745b1973a8
No known key found for this signature in database
GPG Key ID: 81741CD251883081
14 changed files with 1027 additions and 184 deletions

View File

@ -51,7 +51,7 @@ type Engine interface {
MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
MeasurementFields(measurement string) *MeasurementFields MeasurementFields(measurement string) *MeasurementFields
ForEachMeasurement(fn func(name []byte) error) error ForEachMeasurementName(fn func(name []byte) error) error
DeleteMeasurement(name []byte) error DeleteMeasurement(name []byte) error
// TagKeys(name []byte) ([][]byte, error) // TagKeys(name []byte) ([][]byte, error)

View File

@ -457,11 +457,6 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
e.index = index e.index = index
e.FileStore.dereferencer = index e.FileStore.dereferencer = index
// Open the index if it's not already open.
if err := index.Open(); err != nil {
return err
}
if err := e.FileStore.WalkKeys(func(key []byte, typ byte) error { if err := e.FileStore.WalkKeys(func(key []byte, typ byte) error {
fieldType, err := tsmFieldTypeToInfluxQLDataType(typ) fieldType, err := tsmFieldTypeToInfluxQLDataType(typ)
if err != nil { if err != nil {
@ -830,9 +825,9 @@ func (e *Engine) DeleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
return nil return nil
} }
// ForEachMeasurement iterates over each measurement name in the engine. // ForEachMeasurementName iterates over each measurement name in the engine.
func (e *Engine) ForEachMeasurement(fn func(name []byte) error) error { func (e *Engine) ForEachMeasurementName(fn func(name []byte) error) error {
return e.index.ForEachMeasurement(fn) return e.index.ForEachMeasurementName(fn)
} }
// DeleteMeasurement deletes a measurement and all related series. // DeleteMeasurement deletes a measurement and all related series.

View File

@ -18,7 +18,7 @@ type Index interface {
MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
DropMeasurement(name []byte) error DropMeasurement(name []byte) error
ForEachMeasurement(fn func(name []byte) error) error ForEachMeasurementName(fn func(name []byte) error) error
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
DropSeries(keys [][]byte) error DropSeries(keys [][]byte) error

View File

@ -551,8 +551,8 @@ func (i *Index) SetFieldName(measurement, name string) {
m.SetFieldName(name) m.SetFieldName(name)
} }
// ForEachMeasurement iterates over each measurement // ForEachMeasurementName iterates over each measurement name.
func (i *Index) ForEachMeasurement(fn func(name []byte) error) error { func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error {
i.mu.RLock() i.mu.RLock()
defer i.mu.RUnlock() defer i.mu.RUnlock()

View File

@ -2,13 +2,18 @@ package tsi1
import ( import (
"bytes" "bytes"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"log"
"os" "os"
"path/filepath" "path/filepath"
"regexp" "regexp"
"sort" "sort"
"strconv"
"sync" "sync"
"time"
"github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/models"
@ -17,32 +22,80 @@ import (
"github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb"
) )
// Default compaction thresholds.
const (
DefaultMaxLogFileSize = 1 * 1024 * 1024 // 10MB
DefaultCompactionMonitorInterval = 30 * time.Second
)
func init() { func init() {
tsdb.RegisterIndex("tsi1", func(id uint64, path string, opt tsdb.EngineOptions) tsdb.Index { tsdb.RegisterIndex("tsi1", func(id uint64, path string, opt tsdb.EngineOptions) tsdb.Index {
return &Index{Path: path} idx := NewIndex()
idx.ShardID = id
idx.Path = path
return idx
}) })
} }
// File extensions. // File extensions.
const ( const (
LogFileExt = ".tsi.log" LogFileExt = ".tsl"
IndexFileExt = ".tsi" IndexFileExt = ".tsi"
CompactingExt = ".compacting"
) )
// ManifestFileName is the name of the index manifest file.
const ManifestFileName = "MANIFEST"
// Ensure index implements the interface. // Ensure index implements the interface.
var _ tsdb.Index = &Index{} var _ tsdb.Index = &Index{}
// Index represents a collection of layered index files and WAL. // Index represents a collection of layered index files and WAL.
type Index struct { type Index struct {
Path string
mu sync.RWMutex mu sync.RWMutex
opened bool
logFiles []*LogFile logFiles []*LogFile
indexFiles IndexFiles indexFiles IndexFiles
// Compaction management.
manualCompactNotify chan compactNotify
fastCompactNotify chan struct{}
// Close management.
closing chan struct{}
wg sync.WaitGroup
// Fieldset shared with engine. // Fieldset shared with engine.
// TODO: Move field management into index.
fieldset *tsdb.MeasurementFieldSet fieldset *tsdb.MeasurementFieldSet
// Associated shard info.
ShardID uint64
// Root directory of the index files.
Path string
// Log file compaction thresholds.
MaxLogFileSize int64
// Frequency of compaction checks.
CompactionMonitorInterval time.Duration
}
// NewIndex returns a new instance of Index.
func NewIndex() *Index {
return &Index{
manualCompactNotify: make(chan compactNotify),
fastCompactNotify: make(chan struct{}),
closing: make(chan struct{}),
// Default compaction thresholds.
MaxLogFileSize: DefaultMaxLogFileSize,
CompactionMonitorInterval: DefaultCompactionMonitorInterval,
}
} }
// Open opens the index. // Open opens the index.
@ -50,63 +103,110 @@ func (i *Index) Open() error {
i.mu.Lock() i.mu.Lock()
defer i.mu.Unlock() defer i.mu.Unlock()
// Open root index directory. if i.opened {
f, err := os.Open(i.Path) return errors.New("index already open")
if err != nil {
return err
} }
defer f.Close()
// Open all log & index files. // Create directory if it doesn't exist.
names, err := f.Readdirnames(-1) if err := os.MkdirAll(i.Path, 0777); err != nil {
if err != nil {
return err return err
} }
for _, name := range names {
switch filepath.Ext(name) { // Read manifest file.
case LogFileExt: m, err := ReadManifestFile(filepath.Join(i.Path, ManifestFileName))
if err := i.openLogFile(name); err != nil { if os.IsNotExist(err) {
return err m = &Manifest{}
} } else if err != nil {
case IndexFileExt: return err
if err := i.openIndexFile(name); err != nil {
return err
}
}
} }
// Ensure at least one log file exists. // Ensure at least one log file exists.
if len(i.logFiles) == 0 { if len(m.LogFiles) == 0 {
path := filepath.Join(i.Path, fmt.Sprintf("%08x%s", 0, LogFileExt)) m.LogFiles = []string{FormatLogFileName(1)}
if err := i.openLogFile(path); err != nil {
if err := i.writeManifestFile(); err != nil {
return err return err
} }
} }
// Open each log file in the manifest.
for _, filename := range m.LogFiles {
f, err := i.openLogFile(filepath.Join(i.Path, filename))
if err != nil {
return err
}
i.logFiles = append(i.logFiles, f)
}
// Open each index file in the manifest.
for _, filename := range m.IndexFiles {
f, err := i.openIndexFile(filepath.Join(i.Path, filename))
if err != nil {
return err
}
i.indexFiles = append(i.indexFiles, f)
}
// Delete any files not in the manifest.
if err := i.deleteNonManifestFiles(m); err != nil {
return err
}
// Start compaction monitor.
i.wg.Add(1)
go func() { defer i.wg.Done(); i.monitorCompaction() }()
// Mark opened.
i.opened = true
return nil return nil
} }
// openLogFile opens a log file and appends it to the index. // openLogFile opens a log file and appends it to the index.
func (i *Index) openLogFile(path string) error { func (i *Index) openLogFile(path string) (*LogFile, error) {
f := NewLogFile() f := NewLogFile()
f.Path = path f.Path = path
if err := f.Open(); err != nil { if err := f.Open(); err != nil {
return err return nil, err
} }
return f, nil
i.logFiles = append(i.logFiles, f)
return nil
} }
// openIndexFile opens a log file and appends it to the index. // openIndexFile opens a log file and appends it to the index.
func (i *Index) openIndexFile(path string) error { func (i *Index) openIndexFile(path string) (*IndexFile, error) {
f := NewIndexFile() f := NewIndexFile()
f.Path = path f.Path = path
if err := f.Open(); err != nil { if err := f.Open(); err != nil {
return nil, err
}
return f, nil
}
// deleteNonManifestFiles removes all files not in the manifest.
func (i *Index) deleteNonManifestFiles(m *Manifest) error {
dir, err := os.Open(i.Path)
if err != nil {
return err
}
defer dir.Close()
fis, err := dir.Readdir(-1)
if err != nil {
return err return err
} }
i.indexFiles = append(i.indexFiles, f) // Loop over all files and remove any not in the manifest.
for _, fi := range fis {
filename := filepath.Base(fi.Name())
if filename == ManifestFileName || m.HasFile(filename) {
continue
}
if err := os.RemoveAll(filename); err != nil {
return err
}
}
return nil return nil
} }
@ -130,6 +230,49 @@ func (i *Index) Close() error {
return nil return nil
} }
// ManifestPath returns the path to the index's manifest file.
func (i *Index) ManifestPath() string {
return filepath.Join(i.Path, ManifestFileName)
}
// Manifest returns a manifest for the index.
func (i *Index) Manifest() *Manifest {
m := &Manifest{
LogFiles: make([]string, len(i.logFiles)),
IndexFiles: make([]string, len(i.indexFiles)),
}
for j, f := range i.logFiles {
m.LogFiles[j] = filepath.Base(f.Path)
}
for j, f := range i.indexFiles {
m.IndexFiles[j] = filepath.Base(f.Path)
}
return m
}
// writeManifestFile writes the manifest to the appropriate file path.
func (i *Index) writeManifestFile() error {
return WriteManifestFile(i.ManifestPath(), i.Manifest())
}
// maxFileID returns the highest file id from the index.
func (i *Index) maxFileID() int {
var max int
for _, f := range i.logFiles {
if i := ParseFileID(f.Path); i > max {
max = i
}
}
for _, f := range i.indexFiles {
if i := ParseFileID(f.Path); i > max {
max = i
}
}
return max
}
// SetFieldSet sets a shared field set from the engine. // SetFieldSet sets a shared field set from the engine.
func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) { func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) {
i.mu.Lock() i.mu.Lock()
@ -137,14 +280,6 @@ func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) {
i.mu.Unlock() i.mu.Unlock()
} }
// SetLogFiles explicitly sets log files.
// TEMPORARY: For testing only.
func (i *Index) SetLogFiles(a ...*LogFile) { i.logFiles = a }
// SetIndexFiles explicitly sets index files
// TEMPORARY: For testing only.
func (i *Index) SetIndexFiles(a ...*IndexFile) { i.indexFiles = IndexFiles(a) }
// FileN returns the number of log and index files within the index. // FileN returns the number of log and index files within the index.
func (i *Index) FileN() int { return len(i.logFiles) + len(i.indexFiles) } func (i *Index) FileN() int { return len(i.logFiles) + len(i.indexFiles) }
@ -175,41 +310,8 @@ func (i *Index) SeriesIterator() SeriesIterator {
return FilterUndeletedSeriesIterator(MergeSeriesIterators(a...)) return FilterUndeletedSeriesIterator(MergeSeriesIterators(a...))
} }
// Measurement retrieves a measurement by name. // ForEachMeasurementName iterates over all measurement names in the index.
func (i *Index) Measurement(name []byte) (*tsdb.Measurement, error) { func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error {
return i.measurement(name), nil
}
func (i *Index) measurement(name []byte) *tsdb.Measurement {
m := tsdb.NewMeasurement(string(name))
// Iterate over measurement series.
itr := i.MeasurementSeriesIterator(name)
var id uint64 // TEMPORARY
for e := itr.Next(); e != nil; e = itr.Next() {
// TODO: Handle deleted series.
// Append series to to measurement.
// TODO: Remove concept of series ids.
m.AddSeries(&tsdb.Series{
ID: id,
Key: string(e.Name()),
Tags: models.CopyTags(e.Tags()),
})
// TEMPORARY: Increment ID.
id++
}
if !m.HasSeries() {
return nil
}
return m
}
// ForEachMeasurement iterates over all measurements in the index.
func (i *Index) ForEachMeasurement(fn func(name []byte) error) error {
itr := i.MeasurementIterator() itr := i.MeasurementIterator()
if itr == nil { if itr == nil {
return nil return nil
@ -248,16 +350,6 @@ func (i *Index) TagKeyIterator(name []byte) TagKeyIterator {
return MergeTagKeyIterators(a...) return MergeTagKeyIterators(a...)
} }
// Measurements returns a list of all measurements.
func (i *Index) Measurements() (tsdb.Measurements, error) {
var mms tsdb.Measurements
itr := i.MeasurementIterator()
for e := itr.Next(); e != nil; e = itr.Next() {
mms = append(mms, i.measurement(e.Name()))
}
return mms, nil
}
// MeasurementIterator returns an iterator over all measurements in the index. // MeasurementIterator returns an iterator over all measurements in the index.
func (i *Index) MeasurementIterator() MeasurementIterator { func (i *Index) MeasurementIterator() MeasurementIterator {
a := make([]MeasurementIterator, 0, i.FileN()) a := make([]MeasurementIterator, 0, i.FileN())
@ -466,19 +558,33 @@ func (i *Index) DropMeasurement(name []byte) error {
} }
// Mark measurement as deleted. // Mark measurement as deleted.
return i.logFiles[0].DeleteMeasurement(name) if err := i.logFiles[0].DeleteMeasurement(name); err != nil {
return err
}
i.CheckFastCompaction()
return nil
} }
// CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted. // CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted.
func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error { func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
if e := i.Series(name, tags); e != nil { i.mu.RLock()
defer i.mu.RUnlock()
if e := i.series(name, tags); e != nil {
return nil return nil
} }
return i.logFiles[0].AddSeries(name, tags)
if err := i.logFiles[0].AddSeries(name, tags); err != nil {
return err
}
i.checkFastCompaction()
return nil
} }
// Series returns a series by name/tags. // series returns a series by name/tags.
func (i *Index) Series(name []byte, tags models.Tags) SeriesElem { func (i *Index) series(name []byte, tags models.Tags) SeriesElem {
for _, f := range i.files() { for _, f := range i.files() {
if e := f.Series(name, tags); e != nil && !e.Deleted() { if e := f.Series(name, tags); e != nil && !e.Deleted() {
return e return e
@ -498,6 +604,8 @@ func (i *Index) DropSeries(keys [][]byte) error {
return err return err
} }
} }
i.CheckFastCompaction()
return nil return nil
} }
@ -1026,6 +1134,280 @@ func (i *Index) SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iter
return newSeriesPointIterator(i, opt), nil return newSeriesPointIterator(i, opt), nil
} }
// Compact runs a compaction check. Returns once the check is complete.
// If force is true then all files are compacted into a single index file regardless of size.
func (i *Index) Compact(force bool) error {
info := compactNotify{force: force, ch: make(chan error)}
i.manualCompactNotify <- info
select {
case err := <-info.ch:
return err
case <-i.closing:
return nil
}
}
// monitorCompaction periodically checks for files that need to be compacted.
func (i *Index) monitorCompaction() {
// Ignore full compaction if interval is unset.
var c <-chan time.Time
if i.CompactionMonitorInterval > 0 {
ticker := time.NewTicker(i.CompactionMonitorInterval)
c = ticker.C
defer ticker.Stop()
}
// Wait for compaction checks or for the index to close.
for {
select {
case <-i.closing:
return
case <-i.fastCompactNotify:
if err := i.compactLogFile(); err != nil {
log.Printf("fast compaction error: %s", err)
}
case <-c:
if err := i.checkFullCompaction(false); err != nil {
log.Printf("full compaction error: %s", err)
}
case info := <-i.manualCompactNotify:
if err := i.compactLogFile(); err != nil {
info.ch <- err
continue
} else if err := i.checkFullCompaction(info.force); err != nil {
info.ch <- err
continue
}
info.ch <- nil
}
}
}
// compactLogFile starts a new log file and compacts the previous one.
func (i *Index) compactLogFile() error {
if err := i.prependNewLogFile(); err != nil {
return err
}
if err := i.compactSecondaryLogFile(); err != nil {
return err
}
return nil
}
// prependNewLogFile adds a new log file so that the current log file can be compacted.
// This function is a no-op if there is currently more than one log file.
func (i *Index) prependNewLogFile() error {
i.mu.Lock()
defer i.mu.Unlock()
// Ignore if there is already a secondary log file that needs compacting.
if len(i.logFiles) == 2 {
return nil
} else if len(i.logFiles) > 2 {
panic("should not have more than two log files at a time")
}
// Generate new file identifier.
id := i.maxFileID() + 1
// Open file and insert it into the first position.
f, err := i.openLogFile(filepath.Join(i.Path, FormatLogFileName(id)))
if err != nil {
return err
}
i.logFiles = append([]*LogFile{f}, i.logFiles...)
// Write new manifest.
if err := i.writeManifestFile(); err != nil {
// TODO: Close index if write fails.
return err
}
return nil
}
// compactSecondaryLogFile compacts the secondary log file into an index file.
func (i *Index) compactSecondaryLogFile() error {
id, logFile := func() (int, *LogFile) {
i.mu.Lock()
defer i.mu.Unlock()
if len(i.logFiles) < 2 {
return 0, nil
}
return i.maxFileID() + 1, i.logFiles[1]
}()
// Exit if there is no secondary log file.
if logFile == nil {
return nil
}
// Create new index file.
path := filepath.Join(i.Path, FormatIndexFileName(id))
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
// Compact log file to new index file.
if _, err := logFile.WriteTo(f); err != nil {
return err
}
// Close file.
if err := f.Close(); err != nil {
return err
}
// Reopen as an index file.
file := NewIndexFile()
file.Path = path
if err := file.Open(); err != nil {
return err
}
// Obtain lock to swap in index file and write manifest.
i.mu.Lock()
defer i.mu.Unlock()
// Remove old log file and prepend new index file.
i.logFiles = []*LogFile{i.logFiles[0]}
i.indexFiles = append(IndexFiles{file}, i.indexFiles...)
// TODO: Close old log file.
// Write new manifest.
if err := i.writeManifestFile(); err != nil {
// TODO: Close index if write fails.
return err
}
return nil
}
// checkFullCompaction compacts all index files if the total size of index files
// is double the size of the largest index file. If force is true then all files
// are compacted regardless of size.
func (i *Index) checkFullCompaction(force bool) error {
// Only perform size check if compaction check is not forced.
if !force {
// Calculate total & max file sizes.
maxN, totalN, err := i.indexFileStats()
if err != nil {
return err
}
// Ignore if total is not twice the size of the largest index file.
if maxN*2 < totalN {
return nil
}
}
// Retrieve list of index files under lock.
i.mu.Lock()
indexFiles := i.indexFiles
id := i.maxFileID() + 1
i.mu.Unlock()
// Ignore if there are not at least two index files.
if len(indexFiles) < 2 {
return nil
}
// Create new index file.
path := filepath.Join(i.Path, FormatIndexFileName(id))
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
// Compact all index files to new index file.
if _, err := indexFiles.WriteTo(f); err != nil {
return err
}
// Close file.
if err := f.Close(); err != nil {
return err
}
// Reopen as an index file.
file := NewIndexFile()
file.Path = path
if err := file.Open(); err != nil {
return err
}
// Obtain lock to swap in index file and write manifest.
i.mu.Lock()
defer i.mu.Unlock()
// Replace index files with new index file.
i.indexFiles = IndexFiles{file}
// TODO: Close old index files.
// Write new manifest.
if err := i.writeManifestFile(); err != nil {
// TODO: Close index if write fails.
return err
}
return nil
}
// indexFileStats returns the max index file size and the total file size for all index files.
func (i *Index) indexFileStats() (maxN, totalN int64, err error) {
// Retrieve index file list under lock.
i.mu.Lock()
indexFiles := i.indexFiles
i.mu.Unlock()
// Iterate over each file and determine size.
for _, f := range indexFiles {
fi, err := os.Stat(f.Path)
if os.IsNotExist(err) {
continue
} else if err != nil {
return 0, 0, err
} else if fi.Size() > maxN {
maxN = fi.Size()
}
totalN += fi.Size()
}
return maxN, totalN, nil
}
// CheckFastCompaction notifies the index to begin compacting log file if the
// log file is above the max log file size.
func (i *Index) CheckFastCompaction() {
i.mu.Lock()
defer i.mu.Unlock()
i.checkFastCompaction()
}
func (i *Index) checkFastCompaction() {
if i.logFiles[0].Size() < i.MaxLogFileSize {
return
}
// Send signal to begin compaction of current log file.
select {
case i.fastCompactNotify <- struct{}{}:
default:
}
}
// compactNotify represents a manual compaction notification.
type compactNotify struct {
force bool
ch chan error
}
// File represents a log or index file. // File represents a log or index file.
type File interface { type File interface {
Measurement(name []byte) MeasurementElem Measurement(name []byte) MeasurementElem
@ -1158,3 +1540,69 @@ func intersectStringSets(a, b map[string]struct{}) map[string]struct{} {
} }
return other return other
} }
var fileIDRegex = regexp.MustCompile(`^(\d+)\..+$`)
// ParseFileID extracts the numeric id from a log or index file path.
// Returns 0 if it cannot be parsed.
func ParseFileID(name string) int {
a := fileIDRegex.FindStringSubmatch(filepath.Base(name))
if a == nil {
return 0
}
i, _ := strconv.Atoi(a[1])
return i
}
// Manifest represents the list of log & index files that make up the index.
// The files are listed in time order, not necessarily ID order.
type Manifest struct {
LogFiles []string `json:"logs,omitempty"`
IndexFiles []string `json:"indexes,omitempty"`
}
// HasFile returns true if name is listed in the log files or index files.
func (m *Manifest) HasFile(name string) bool {
for _, filename := range m.LogFiles {
if filename == name {
return true
}
}
for _, filename := range m.IndexFiles {
if filename == name {
return true
}
}
return false
}
// ReadManifestFile reads a manifest from a file path.
func ReadManifestFile(path string) (*Manifest, error) {
buf, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
// Decode manifest.
var m Manifest
if err := json.Unmarshal(buf, &m); err != nil {
return nil, err
}
return &m, nil
}
// WriteManifestFile writes a manifest to a file path.
func WriteManifestFile(path string, m *Manifest) error {
buf, err := json.MarshalIndent(m, "", " ")
if err != nil {
return err
}
buf = append(buf, '\n')
if err := ioutil.WriteFile(path, buf, 0666); err != nil {
return err
}
return nil
}

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt"
"io" "io"
"github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/models"
@ -47,7 +48,8 @@ type IndexFile struct {
tblks map[string]*TagBlock // tag blocks by measurement name tblks map[string]*TagBlock // tag blocks by measurement name
mblk MeasurementBlock mblk MeasurementBlock
// Path to data file. // Sortable identifier & filepath to the log file.
ID int
Path string Path string
} }
@ -58,10 +60,16 @@ func NewIndexFile() *IndexFile {
// Open memory maps the data file at the file's path. // Open memory maps the data file at the file's path.
func (f *IndexFile) Open() error { func (f *IndexFile) Open() error {
// Extract identifier from path name, if possible.
if id := ParseFileID(f.Path); id > 0 {
f.ID = id
}
data, err := mmap.Map(f.Path) data, err := mmap.Map(f.Path)
if err != nil { if err != nil {
return err return err
} }
return f.UnmarshalBinary(data) return f.UnmarshalBinary(data)
} }
@ -330,3 +338,8 @@ func (t *IndexFileTrailer) WriteTo(w io.Writer) (n int64, err error) {
return n, nil return n, nil
} }
// FormatIndexFileName generates an index filename for the given index.
func FormatIndexFileName(i int) string {
return fmt.Sprintf("%08d%s", i, IndexFileExt)
}

View File

@ -76,7 +76,7 @@ func CreateIndexFile(series []Series) (*tsi1.IndexFile, error) {
// Write index file to buffer. // Write index file to buffer.
var buf bytes.Buffer var buf bytes.Buffer
if _, err := lf.CompactTo(&buf); err != nil { if _, err := lf.WriteTo(&buf); err != nil {
return nil, err return nil, err
} }
@ -99,7 +99,7 @@ func GenerateIndexFile(measurementN, tagN, valueN int) (*tsi1.IndexFile, error)
// Compact log file to buffer. // Compact log file to buffer.
var buf bytes.Buffer var buf bytes.Buffer
if _, err := lf.CompactTo(&buf); err != nil { if _, err := lf.WriteTo(&buf); err != nil {
return nil, err return nil, err
} }

View File

@ -1,67 +1,417 @@
package tsi1_test package tsi1_test
import ( import (
"fmt"
"os"
"reflect"
"testing" "testing"
"github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/index/tsi1" "github.com/influxdata/influxdb/tsdb/index/tsi1"
) )
// Ensure index can return a single measurement by name. // Ensure index can return an iterator over all series in the index.
func TestIndex_Measurement(t *testing.T) { func TestIndex_SeriesIterator(t *testing.T) {
// Build an index file. idx := MustOpenIndex()
f, err := CreateIndexFile([]Series{ defer idx.Close()
// Create initial set of series.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})}, {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})},
}) }); err != nil {
if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Create an index from the single file. // Verify initial set of series.
var idx tsi1.Index if err := idx.MultiInvoke(func(state string) {
idx.SetIndexFiles(f) itr := idx.SeriesIterator()
if itr == nil {
t.Fatalf("expected iterator(%s)", state)
}
// Verify measurement is correct. if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region east}]` {
if mm, err := idx.Measurement([]byte("cpu")); err != nil { t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String())
} else if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region west}]` {
t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String())
} else if e := itr.Next(); string(e.Name()) != `mem` || e.Tags().String() != `[{region east}]` {
t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String())
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil series(%s): %s/%s", state, e.Name(), e.Tags().String())
}
}); err != nil {
t.Fatal(err) t.Fatal(err)
} else if mm == nil {
t.Fatal("expected measurement")
} }
// Verify non-existent measurement doesn't exist. // Add more series.
if mm, err := idx.Measurement([]byte("no_such_measurement")); err != nil { if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("disk")},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
}); err != nil {
t.Fatal(err)
}
// Verify additional series.
if err := idx.MultiInvoke(func(state string) {
itr := idx.SeriesIterator()
if itr == nil {
t.Fatalf("expected iterator(%s)", state)
}
if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region east}]` {
t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String())
} else if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region north}]` {
t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String())
} else if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region west}]` {
t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String())
} else if e := itr.Next(); string(e.Name()) != `disk` || len(e.Tags()) != 0 {
t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String())
} else if e := itr.Next(); string(e.Name()) != `mem` || e.Tags().String() != `[{region east}]` {
t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String())
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil series(%s): %s/%s", state, e.Name(), e.Tags().String())
}
}); err != nil {
t.Fatal(err) t.Fatal(err)
} else if mm != nil {
t.Fatal("expected nil measurement")
} }
} }
// Ensure index can return a list of all measurements. // Ensure index can iterate over all measurement names.
func TestIndex_Measurements(t *testing.T) { func TestIndex_ForEachMeasurementName(t *testing.T) {
// Build an index file. idx := MustOpenIndex()
f, err := CreateIndexFile([]Series{ defer idx.Close()
// Add series to index.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})}, {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})},
}) }); err != nil {
if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Create an index from the single file. // Verify measurements are returned.
var idx tsi1.Index if err := idx.MultiInvoke(func(state string) {
idx.SetIndexFiles(f) var names []string
if err := idx.ForEachMeasurementName(func(name []byte) error {
names = append(names, string(name))
return nil
}); err != nil {
t.Fatal(err)
}
// Retrieve measurements and verify. if !reflect.DeepEqual(names, []string{"cpu", "mem"}) {
if mms, err := idx.Measurements(); err != nil { t.Fatalf("unexpected names: %#v", names)
}
}); err != nil {
t.Fatal(err)
}
// Add more series.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("disk")},
{Name: []byte("mem")},
}); err != nil {
t.Fatal(err)
}
// Verify new measurements.
if err := idx.MultiInvoke(func(state string) {
var names []string
if err := idx.ForEachMeasurementName(func(name []byte) error {
names = append(names, string(name))
return nil
}); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(names, []string{"cpu", "disk", "mem"}) {
t.Fatalf("unexpected names: %#v", names)
}
}); err != nil {
t.Fatal(err) t.Fatal(err)
} else if len(mms) != 2 {
t.Fatalf("expected measurement count: %d", len(mms))
} else if mms[0].Name != "cpu" {
t.Fatalf("unexpected measurement(0): %s", mms[0].Name)
} else if mms[1].Name != "mem" {
t.Fatalf("unexpected measurement(1): %s", mms[1].Name)
} }
} }
// Ensure index can return an iterator over all series for one measurement.
func TestIndex_MeasurementSeriesIterator(t *testing.T) {
idx := MustOpenIndex()
defer idx.Close()
// Create initial set of series.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})},
}); err != nil {
t.Fatal(err)
}
// Verify initial set of series.
if err := idx.MultiInvoke(func(state string) {
itr := idx.MeasurementSeriesIterator([]byte("cpu"))
if itr == nil {
t.Fatalf("expected iterator(%s)", state)
}
if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region east}]` {
t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String())
} else if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region west}]` {
t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String())
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil series(%s): %s/%s", state, e.Name(), e.Tags().String())
}
}); err != nil {
t.Fatal(err)
}
// Add more series.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("disk")},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north"})},
}); err != nil {
t.Fatal(err)
}
// Verify additional series.
if err := idx.MultiInvoke(func(state string) {
itr := idx.MeasurementSeriesIterator([]byte("cpu"))
if itr == nil {
t.Fatalf("expected iterator(%s)", state)
}
if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region east}]` {
t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String())
} else if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region north}]` {
t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String())
} else if e := itr.Next(); string(e.Name()) != `cpu` || e.Tags().String() != `[{region west}]` {
t.Fatalf("unexpected series(%s): %s/%s", state, e.Name(), e.Tags().String())
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil series(%s): %s/%s", state, e.Name(), e.Tags().String())
}
}); err != nil {
t.Fatal(err)
}
}
// Ensure index can return an iterator over all measurements for the index.
func TestIndex_MeasurementIterator(t *testing.T) {
idx := MustOpenIndex()
defer idx.Close()
// Create initial set of series.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("cpu")},
{Name: []byte("mem")},
}); err != nil {
t.Fatal(err)
}
// Verify initial set of series.
if err := idx.MultiInvoke(func(state string) {
itr := idx.MeasurementIterator()
if itr == nil {
t.Fatalf("expected iterator(%s)", state)
}
if e := itr.Next(); string(e.Name()) != `cpu` {
t.Fatalf("unexpected measurement(%s): %s", state, e.Name())
} else if e := itr.Next(); string(e.Name()) != `mem` {
t.Fatalf("unexpected measurement(%s): %s", state, e.Name())
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil measurement(%s): %s", state, e.Name())
}
}); err != nil {
t.Fatal(err)
}
// Add more series.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("disk"), Tags: models.NewTags(map[string]string{"foo": "bar"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north", "x": "y"})},
}); err != nil {
t.Fatal(err)
}
// Verify additional series.
if err := idx.MultiInvoke(func(state string) {
itr := idx.MeasurementIterator()
if itr == nil {
t.Fatalf("expected iterator(%s)", state)
}
if e := itr.Next(); string(e.Name()) != `cpu` {
t.Fatalf("unexpected measurement(%s): %s", state, e.Name())
} else if e := itr.Next(); string(e.Name()) != `disk` {
t.Fatalf("unexpected measurement(%s): %s", state, e.Name())
} else if e := itr.Next(); string(e.Name()) != `mem` {
t.Fatalf("unexpected measurement(%s): %s", state, e.Name())
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil measurement(%s): %s", state, e.Name())
}
}); err != nil {
t.Fatal(err)
}
}
// Ensure index can return an iterator over all keys for one measurement.
func TestIndex_TagKeyIterator(t *testing.T) {
idx := MustOpenIndex()
defer idx.Close()
// Create initial set of series.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west", "type": "gpu"})},
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east", "misc": "other"})},
}); err != nil {
t.Fatal(err)
}
// Verify initial set of series.
if err := idx.MultiInvoke(func(state string) {
itr := idx.TagKeyIterator([]byte("cpu"))
if itr == nil {
t.Fatalf("expected iterator(%s)", state)
}
if e := itr.Next(); string(e.Key()) != `region` {
t.Fatalf("unexpected key(%s): %s", state, e.Key())
} else if e := itr.Next(); string(e.Key()) != `type` {
t.Fatalf("unexpected key(%s): %s", state, e.Key())
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil key(%s): %s/%s", state, e.Key())
}
}); err != nil {
t.Fatal(err)
}
// Add more series.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
{Name: []byte("disk"), Tags: models.NewTags(map[string]string{"foo": "bar"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north", "x": "y"})},
}); err != nil {
t.Fatal(err)
}
// Verify additional series.
if err := idx.MultiInvoke(func(state string) {
itr := idx.TagKeyIterator([]byte("cpu"))
if itr == nil {
t.Fatalf("expected iterator(%s)", state)
}
if e := itr.Next(); string(e.Key()) != `region` {
t.Fatalf("unexpected key(%s): %s", state, e.Key())
} else if e := itr.Next(); string(e.Key()) != `type` {
t.Fatalf("unexpected key(%s): %s", state, e.Key())
} else if e := itr.Next(); string(e.Key()) != `x` {
t.Fatalf("unexpected key(%s): %s", state, e.Key())
} else if e := itr.Next(); e != nil {
t.Fatalf("expected nil key(%s): %s", state, e.Key())
}
}); err != nil {
t.Fatal(err)
}
}
// Index is a test wrapper for tsi1.Index.
type Index struct {
*tsi1.Index
}
// NewIndex returns a new instance of Index at a temporary path.
func NewIndex() *Index {
idx := &Index{Index: tsi1.NewIndex()}
idx.Path = MustTempDir()
return idx
}
// MustOpenIndex returns a new, open index. Panic on error.
func MustOpenIndex() *Index {
idx := NewIndex()
if err := idx.Open(); err != nil {
panic(err)
}
return idx
}
// Close closes and removes the index directory.
func (idx *Index) Close() error {
defer os.RemoveAll(idx.Path)
return idx.Index.Close()
}
// Reopen closes and opens the index.
func (idx *Index) Reopen() error {
if err := idx.Index.Close(); err != nil {
return err
}
path := idx.Path
idx.Index = tsi1.NewIndex()
idx.Path = path
if err := idx.Open(); err != nil {
return err
}
return nil
}
// MultiInvoke executes fn in several different states:
//
// - Immediately
// - After reopen
// - After compaction
// - After reopen again
//
// The index should always respond in the same fashion regardless of
// how data is stored. This helper allows the index to be easily tested
// in all major states.
func (idx *Index) MultiInvoke(fn func(state string)) error {
// Invoke immediately.
fn("initial")
if testing.Verbose() {
println("[index] reopening")
}
// Reopen and invoke again.
if err := idx.Reopen(); err != nil {
return fmt.Errorf("reopen error: %s", err)
}
fn("reopen")
if testing.Verbose() {
println("[index] forcing compaction")
}
// Force a compaction
if err := idx.Compact(true); err != nil {
return err
}
fn("post-compaction")
if testing.Verbose() {
println("[index] reopening after compaction")
}
// Reopen and invoke again.
if err := idx.Reopen(); err != nil {
return fmt.Errorf("post-compaction reopen error: %s", err)
}
fn("post-compaction-reopen")
return nil
}
// CreateSeriesSliceIfNotExists creates multiple series at a time.
func (idx *Index) CreateSeriesSliceIfNotExists(a []Series) error {
for i, s := range a {
if err := idx.CreateSeriesIfNotExists(nil, s.Name, s.Tags); err != nil {
return fmt.Errorf("i=%d, name=%s, tags=%s, err=%s", i, s.Name, s.Tags, err)
}
}
return nil
}

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt"
"hash/crc32" "hash/crc32"
"io" "io"
"os" "os"
@ -30,11 +31,11 @@ const (
// LogFile represents an on-disk write-ahead log file. // LogFile represents an on-disk write-ahead log file.
type LogFile struct { type LogFile struct {
mu sync.RWMutex mu sync.RWMutex
data []byte // mmap data []byte // mmap
file *os.File // writer file *os.File // writer
buf []byte // marshaling buffer buf []byte // marshaling buffer
entries []LogEntry // parsed entries size int64 // tracks current file size
// In-memory index. // In-memory index.
mms logMeasurements mms logMeasurements
@ -74,6 +75,7 @@ func (f *LogFile) open() error {
} else if fi.Size() == 0 { } else if fi.Size() == 0 {
return nil return nil
} }
f.size = fi.Size()
// Open a read-only memory map of the existing data. // Open a read-only memory map of the existing data.
data, err := mmap.Map(f.Path) data, err := mmap.Map(f.Path)
@ -83,14 +85,12 @@ func (f *LogFile) open() error {
f.data = data f.data = data
// Read log entries from mmap. // Read log entries from mmap.
f.entries = nil
for buf := f.data; len(buf) > 0; { for buf := f.data; len(buf) > 0; {
// Read next entry. // Read next entry.
var e LogEntry var e LogEntry
if err := e.UnmarshalBinary(buf); err != nil { if err := e.UnmarshalBinary(buf); err != nil {
return err return err
} }
f.entries = append(f.entries, e)
// Execute entry against in-memory index. // Execute entry against in-memory index.
f.execEntry(&e) f.execEntry(&e)
@ -112,12 +112,19 @@ func (f *LogFile) Close() error {
mmap.Unmap(f.data) mmap.Unmap(f.data)
} }
f.entries = nil
f.mms = make(logMeasurements) f.mms = make(logMeasurements)
return nil return nil
} }
// Size returns the tracked in-memory file size of the log file.
func (f *LogFile) Size() int64 {
f.mu.Lock()
n := f.size
f.mu.Unlock()
return n
}
// Measurement returns a measurement element. // Measurement returns a measurement element.
func (f *LogFile) Measurement(name []byte) MeasurementElem { func (f *LogFile) Measurement(name []byte) MeasurementElem {
f.mu.RLock() f.mu.RLock()
@ -134,7 +141,10 @@ func (f *LogFile) Measurement(name []byte) MeasurementElem {
func (f *LogFile) MeasurementNames() []string { func (f *LogFile) MeasurementNames() []string {
f.mu.RLock() f.mu.RLock()
defer f.mu.RUnlock() defer f.mu.RUnlock()
return f.measurementNames()
}
func (f *LogFile) measurementNames() []string {
a := make([]string, 0, len(f.mms)) a := make([]string, 0, len(f.mms))
for name := range f.mms { for name := range f.mms {
a = append(a, name) a = append(a, name)
@ -173,8 +183,12 @@ func (f *LogFile) TagKeySeriesIterator(name, key []byte) SeriesIterator {
// Combine iterators across all tag keys. // Combine iterators across all tag keys.
itrs := make([]SeriesIterator, 0, len(tk.tagValues)) itrs := make([]SeriesIterator, 0, len(tk.tagValues))
for _, tv := range tk.tagValues { for _, tv := range tk.tagValues {
if len(tv.series) == 0 {
continue
}
itrs = append(itrs, newLogSeriesIterator(tv.series)) itrs = append(itrs, newLogSeriesIterator(tv.series))
} }
return MergeSeriesIterators(itrs...) return MergeSeriesIterators(itrs...)
} }
@ -266,7 +280,10 @@ func (f *LogFile) TagValueSeriesIterator(name, key, value []byte) SeriesIterator
tv, ok := tk.tagValues[string(value)] tv, ok := tk.tagValues[string(value)]
if !ok { if !ok {
return nil return nil
} else if len(tv.series) == 0 {
return nil
} }
return newLogSeriesIterator(tv.series) return newLogSeriesIterator(tv.series)
} }
@ -351,7 +368,8 @@ func (f *LogFile) appendEntry(e *LogEntry) error {
e.Size = len(f.buf) e.Size = len(f.buf)
// Write record to file. // Write record to file.
if n, err := f.file.Write(f.buf); err != nil { n, err := f.file.Write(f.buf)
if err != nil {
// Move position backwards over partial entry. // Move position backwards over partial entry.
// Log should be reopened if seeking cannot be completed. // Log should be reopened if seeking cannot be completed.
if n > 0 { if n > 0 {
@ -362,8 +380,8 @@ func (f *LogFile) appendEntry(e *LogEntry) error {
return err return err
} }
// Save entry to in-memory list. // Update in-memory file size.
f.entries = append(f.entries, *e) f.size += int64(n)
return nil return nil
} }
@ -441,7 +459,6 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) {
} }
// Insert series to list. // Insert series to list.
// TODO: Remove global series list.
mm.series.insert(e.Name, e.Tags, deleted) mm.series.insert(e.Name, e.Tags, deleted)
// Save measurement. // Save measurement.
@ -468,6 +485,9 @@ func (f *LogFile) SeriesIterator() SeriesIterator {
series = append(series, f.mms[string(name)].series...) series = append(series, f.mms[string(name)].series...)
} }
if len(series) == 0 {
return nil
}
return newLogSeriesIterator(series) return newLogSeriesIterator(series)
} }
@ -499,11 +519,17 @@ func (f *LogFile) MeasurementSeriesIterator(name []byte) SeriesIterator {
defer f.mu.RUnlock() defer f.mu.RUnlock()
mm := f.mms[string(name)] mm := f.mms[string(name)]
if len(mm.series) == 0 {
return nil
}
return newLogSeriesIterator(mm.series) return newLogSeriesIterator(mm.series)
} }
// CompactTo compacts the log file and writes it to w. // WriteTo compacts the log file and writes it to w.
func (f *LogFile) CompactTo(w io.Writer) (n int64, err error) { func (f *LogFile) WriteTo(w io.Writer) (n int64, err error) {
f.mu.Lock()
defer f.mu.Unlock()
var t IndexFileTrailer var t IndexFileTrailer
// Reset compaction fields. // Reset compaction fields.
@ -551,7 +577,7 @@ func (f *LogFile) writeSeriesBlockTo(w io.Writer, n *int64) error {
sw := NewSeriesBlockWriter() sw := NewSeriesBlockWriter()
// Retreve measurement names in order. // Retreve measurement names in order.
names := f.MeasurementNames() names := f.measurementNames()
// Add series from measurements in order. // Add series from measurements in order.
for _, name := range names { for _, name := range names {
@ -584,7 +610,7 @@ func (f *LogFile) writeSeriesBlockTo(w io.Writer, n *int64) error {
// Lookup series offset. // Lookup series offset.
serie.offset = sw.Offset(serie.name, serie.tags) serie.offset = sw.Offset(serie.name, serie.tags)
if serie.offset == 0 { if serie.offset == 0 {
panic("series not found") panic("series not found: " + string(serie.name) + " " + serie.tags.String())
} }
// Add series id to measurement, tag key, and tag value. // Add series id to measurement, tag key, and tag value.
@ -941,30 +967,6 @@ func (a logTagValueSlice) Len() int { return len(a) }
func (a logTagValueSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a logTagValueSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a logTagValueSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 } func (a logTagValueSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
/*
// insertEntry inserts an entry into the tag value in sorted order.
// If another entry matches the name/tags then it is overrwritten.
func (tv *logTagValue) insertEntry(e *LogEntry) {
i := sort.Search(len(tv.entries), func(i int) bool {
if cmp := bytes.Compare(tv.entries[i].Name, e.Name); cmp != 0 {
return cmp != -1
}
return models.CompareTags(tv.entries[i].Tags, e.Tags) != -1
})
// Update entry if it already exists.
if i < len(tv.entries) && bytes.Equal(tv.entries[i].Name, e.Name) && tv.entries[i].Tags.Equal(e.Tags) {
tv.entries[i] = *e
return
}
// Otherwise insert new entry.
tv.entries = append(tv.entries, LogEntry{})
copy(tv.entries[i+1:], tv.entries[i:])
tv.entries[i] = *e
}
*/
// logTagKeyIterator represents an iterator over a slice of tag keys. // logTagKeyIterator represents an iterator over a slice of tag keys.
type logTagKeyIterator struct { type logTagKeyIterator struct {
a []logTagKey a []logTagKey
@ -1013,6 +1015,10 @@ type logSeriesIterator struct {
// newLogSeriesIterator returns a new instance of logSeriesIterator. // newLogSeriesIterator returns a new instance of logSeriesIterator.
// All series are copied to the iterator. // All series are copied to the iterator.
func newLogSeriesIterator(a logSeries) *logSeriesIterator { func newLogSeriesIterator(a logSeries) *logSeriesIterator {
if len(a) == 0 {
return nil
}
itr := logSeriesIterator{series: make(logSeries, len(a))} itr := logSeriesIterator{series: make(logSeries, len(a))}
copy(itr.series, a) copy(itr.series, a)
return &itr return &itr
@ -1026,3 +1032,8 @@ func (itr *logSeriesIterator) Next() (e SeriesElem) {
e, itr.series = &itr.series[0], itr.series[1:] e, itr.series = &itr.series[0], itr.series[1:]
return e return e
} }
// FormatLogFileName generates a log filename for the given index.
func FormatLogFileName(i int) string {
return fmt.Sprintf("%08d%s", i, LogFileExt)
}

View File

@ -87,7 +87,7 @@ func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementBlockElem, ok bool)
d++ d++
if uint32(d) > n { if uint32(d) > n {
panic("empty hash data block") return MeasurementBlockElem{}, false
} }
} }
} }

View File

@ -568,7 +568,7 @@ func (sw *SeriesBlockWriter) writeSeriesTo(w io.Writer, terms *TermList, offset
seriesBuf = terms.AppendEncodedSeries(seriesBuf[:0], s.name, s.tags) seriesBuf = terms.AppendEncodedSeries(seriesBuf[:0], s.name, s.tags)
// Join flag, varint(length), & dictionary-encoded series in buffer. // Join flag, varint(length), & dictionary-encoded series in buffer.
buf[0] = 0 // TODO(benbjohnson): series tombstone buf[0] = s.flag()
sz := binary.PutUvarint(buf[1:], uint64(len(seriesBuf))) sz := binary.PutUvarint(buf[1:], uint64(len(seriesBuf)))
buf = append(buf[:1+sz], seriesBuf...) buf = append(buf[:1+sz], seriesBuf...)
@ -750,6 +750,14 @@ type serie struct {
offset uint32 offset uint32
} }
func (s *serie) flag() uint8 {
var flag byte
if s.deleted {
flag |= SeriesTombstoneFlag
}
return flag
}
type series []serie type series []serie
func (a series) Len() int { return len(a) } func (a series) Len() int { return len(a) }

View File

@ -120,6 +120,10 @@ func (blk *TagBlock) TagKeyElem(key []byte) TagKeyElem {
// Move position forward. // Move position forward.
pos = (pos + 1) % int(keyN) pos = (pos + 1) % int(keyN)
d++ d++
if uint32(d) > keyN {
return nil
}
} }
} }
@ -165,6 +169,10 @@ func (blk *TagBlock) TagValueElem(key, value []byte) TagValueElem {
// Move position forward. // Move position forward.
pos = (pos + 1) % int(valueN) pos = (pos + 1) % int(valueN)
d++ d++
if uint32(d) > valueN {
return nil
}
} }
} }

View File

@ -2,6 +2,7 @@ package tsi1_test
import ( import (
"bytes" "bytes"
"io/ioutil"
"reflect" "reflect"
"testing" "testing"
@ -296,3 +297,12 @@ func (itr *SeriesIterator) Next() (e tsi1.SeriesElem) {
e, itr.Elems = &itr.Elems[0], itr.Elems[1:] e, itr.Elems = &itr.Elems[0], itr.Elems[1:]
return e return e
} }
// MustTempDir returns a temporary directory. Panic on error.
func MustTempDir() string {
path, err := ioutil.TempDir("", "tsi-")
if err != nil {
panic(err)
}
return path
}

View File

@ -733,7 +733,7 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
names = append(names, source.(*influxql.Measurement).Name) names = append(names, source.(*influxql.Measurement).Name)
} }
} else { } else {
if err := sh.engine.ForEachMeasurement(func(name []byte) error { if err := sh.engine.ForEachMeasurementName(func(name []byte) error {
names = append(names, string(name)) names = append(names, string(name))
return nil return nil
}); err != nil { }); err != nil {