influxdb/tsdb/index/tsi1/index.go

1286 lines
32 KiB
Go
Raw Normal View History

2016-09-02 14:52:11 +00:00
package tsi1
import (
2016-12-15 15:31:18 +00:00
"encoding/json"
2016-11-30 19:45:14 +00:00
"errors"
2017-02-01 21:19:24 +00:00
"fmt"
2016-12-15 15:31:18 +00:00
"io/ioutil"
"log"
2016-11-10 15:45:27 +00:00
"os"
"path/filepath"
"regexp"
2016-10-03 15:08:43 +00:00
"sort"
2016-12-15 15:31:18 +00:00
"strconv"
2017-02-14 20:59:48 +00:00
"strings"
"sync"
2016-12-15 15:31:18 +00:00
"time"
2016-10-03 15:08:43 +00:00
"github.com/influxdata/influxdb/influxql"
2016-10-03 15:08:43 +00:00
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/tsdb"
2016-10-03 15:08:43 +00:00
)
2017-02-09 17:59:14 +00:00
// IndexName is the name of the index.
const IndexName = "tsi1"
2016-12-15 15:31:18 +00:00
// Default compaction thresholds.
const (
2017-04-25 16:26:45 +00:00
DefaultMaxLogFileSize = 5 * 1024 * 1024
2016-12-15 15:31:18 +00:00
)
func init() {
2017-02-09 17:59:14 +00:00
tsdb.RegisterIndex(IndexName, func(id uint64, path string, opt tsdb.EngineOptions) tsdb.Index {
2016-12-15 15:31:18 +00:00
idx := NewIndex()
idx.ShardID = id
idx.Path = path
2017-02-06 18:14:13 +00:00
idx.options = opt
2016-12-15 15:31:18 +00:00
return idx
})
}
2016-11-10 15:45:27 +00:00
// File extensions.
const (
2016-12-15 15:31:18 +00:00
LogFileExt = ".tsl"
2016-11-10 15:45:27 +00:00
IndexFileExt = ".tsi"
2016-12-15 15:31:18 +00:00
CompactingExt = ".compacting"
2016-11-10 15:45:27 +00:00
)
2016-12-15 15:31:18 +00:00
// ManifestFileName is the name of the index manifest file.
const ManifestFileName = "MANIFEST"
// Ensure index implements the interface.
var _ tsdb.Index = &Index{}
2016-09-02 14:52:11 +00:00
// Index represents a collection of layered index files and WAL.
2016-09-02 14:52:11 +00:00
type Index struct {
2017-02-06 18:14:13 +00:00
mu sync.RWMutex
opened bool
options tsdb.EngineOptions
2017-04-25 16:26:45 +00:00
activeLogFile *LogFile // current log file
fileSet *FileSet // current file set
levels []CompactionLevel // compaction levels
seq int // file id sequence
2016-12-15 15:31:18 +00:00
// Close management.
2017-01-31 15:47:18 +00:00
once sync.Once
2016-12-15 15:31:18 +00:00
closing chan struct{}
wg sync.WaitGroup
2016-11-28 16:59:36 +00:00
// Fieldset shared with engine.
fieldset *tsdb.MeasurementFieldSet
2016-12-15 15:31:18 +00:00
// Associated shard info.
ShardID uint64
// Root directory of the index files.
Path string
// Log file compaction thresholds.
2017-04-25 16:26:45 +00:00
MaxLogFileSize int64
2016-12-15 15:31:18 +00:00
// Frequency of compaction checks.
2017-05-05 21:06:07 +00:00
CompactionEnabled bool
2016-12-15 15:31:18 +00:00
CompactionMonitorInterval time.Duration
}
// NewIndex returns a new instance of Index.
func NewIndex() *Index {
return &Index{
closing: make(chan struct{}),
// Default compaction thresholds.
2017-05-05 21:06:07 +00:00
MaxLogFileSize: DefaultMaxLogFileSize,
CompactionEnabled: true,
CompactionFactor: DefaultCompactionFactor,
2016-12-15 15:31:18 +00:00
}
2016-09-02 14:52:11 +00:00
}
2017-02-09 17:59:14 +00:00
func (i *Index) Type() string { return IndexName }
2016-10-21 15:48:00 +00:00
// Open opens the index.
2016-11-10 15:45:27 +00:00
func (i *Index) Open() error {
i.mu.Lock()
defer i.mu.Unlock()
2016-12-15 15:31:18 +00:00
if i.opened {
return errors.New("index already open")
}
// Create directory if it doesn't exist.
if err := os.MkdirAll(i.Path, 0777); err != nil {
2016-11-10 15:45:27 +00:00
return err
}
2016-12-15 15:31:18 +00:00
// Read manifest file.
m, err := ReadManifestFile(filepath.Join(i.Path, ManifestFileName))
if os.IsNotExist(err) {
2017-04-25 16:26:45 +00:00
m = NewManifest()
2016-12-15 15:31:18 +00:00
} else if err != nil {
2016-11-10 15:45:27 +00:00
return err
}
2016-12-15 15:31:18 +00:00
2017-04-25 16:26:45 +00:00
// Copy compaction levels to the index.
i.levels = make([]CompactionLevel, len(m.Levels))
copy(i.levels, m.Levels)
2017-02-10 15:48:11 +00:00
// Open each file in the manifest.
2017-04-25 16:26:45 +00:00
var files []File
2017-02-10 15:48:11 +00:00
for _, filename := range m.Files {
switch filepath.Ext(filename) {
case LogFileExt:
f, err := i.openLogFile(filepath.Join(i.Path, filename))
if err != nil {
return err
}
2017-04-25 16:26:45 +00:00
files = append(files, f)
2016-12-15 15:31:18 +00:00
2017-02-10 15:48:11 +00:00
// Make first log file active, if within threshold.
sz, _ := f.Stat()
if i.activeLogFile == nil && sz < i.MaxLogFileSize {
i.activeLogFile = f
}
2017-02-10 15:48:11 +00:00
case IndexFileExt:
f, err := i.openIndexFile(filepath.Join(i.Path, filename))
if err != nil {
return err
}
2017-04-25 16:26:45 +00:00
files = append(files, f)
2016-11-10 15:45:27 +00:00
}
}
2017-04-25 16:26:45 +00:00
i.fileSet = NewFileSet(i.levels, files)
2016-11-10 15:45:27 +00:00
2017-02-10 15:48:11 +00:00
// Set initial sequnce number.
i.seq = i.fileSet.MaxID()
2016-12-15 15:31:18 +00:00
// Delete any files not in the manifest.
if err := i.deleteNonManifestFiles(m); err != nil {
return err
}
// Ensure a log file exists.
if i.activeLogFile == nil {
if err := i.prependActiveLogFile(); err != nil {
return err
}
}
2016-12-15 15:31:18 +00:00
// Mark opened.
i.opened = true
2017-02-14 20:59:48 +00:00
// Send a compaction request on start up.
i.compact()
2016-11-10 15:45:27 +00:00
return nil
}
// openLogFile opens a log file and appends it to the index.
2016-12-15 15:31:18 +00:00
func (i *Index) openLogFile(path string) (*LogFile, error) {
2017-02-06 18:14:13 +00:00
f := NewLogFile(path)
2016-11-10 15:45:27 +00:00
if err := f.Open(); err != nil {
2016-12-15 15:31:18 +00:00
return nil, err
2016-11-10 15:45:27 +00:00
}
2016-12-15 15:31:18 +00:00
return f, nil
2016-11-10 15:45:27 +00:00
}
// openIndexFile opens a log file and appends it to the index.
2016-12-15 15:31:18 +00:00
func (i *Index) openIndexFile(path string) (*IndexFile, error) {
2016-11-10 15:45:27 +00:00
f := NewIndexFile()
2017-02-01 21:19:24 +00:00
f.SetPath(path)
2016-11-10 15:45:27 +00:00
if err := f.Open(); err != nil {
2016-12-15 15:31:18 +00:00
return nil, err
}
return f, nil
}
// deleteNonManifestFiles removes all files not in the manifest.
func (i *Index) deleteNonManifestFiles(m *Manifest) error {
dir, err := os.Open(i.Path)
if err != nil {
2016-11-10 15:45:27 +00:00
return err
}
2016-12-15 15:31:18 +00:00
defer dir.Close()
fis, err := dir.Readdir(-1)
if err != nil {
return err
}
// Loop over all files and remove any not in the manifest.
for _, fi := range fis {
filename := filepath.Base(fi.Name())
if filename == ManifestFileName || m.HasFile(filename) {
continue
}
if err := os.RemoveAll(filename); err != nil {
return err
}
}
2016-11-10 15:45:27 +00:00
return nil
}
2016-10-21 15:48:00 +00:00
// Close closes the index.
2016-11-10 15:45:27 +00:00
func (i *Index) Close() error {
2017-01-31 15:47:18 +00:00
// Wait for goroutines to finish.
i.once.Do(func() { close(i.closing) })
i.wg.Wait()
// Lock index and close remaining
i.mu.Lock()
defer i.mu.Unlock()
2016-11-10 15:45:27 +00:00
// Close log files.
2017-04-25 16:26:45 +00:00
for _, f := range i.fileSet.files {
2016-11-10 15:45:27 +00:00
f.Close()
}
2017-04-25 16:26:45 +00:00
i.fileSet.files = nil
2016-11-10 15:45:27 +00:00
2017-02-10 15:48:11 +00:00
return nil
}
2016-11-10 15:45:27 +00:00
2017-02-10 15:48:11 +00:00
// NextSequence returns the next file identifier.
func (i *Index) NextSequence() int {
i.mu.Lock()
defer i.mu.Unlock()
return i.nextSequence()
}
2017-01-02 16:29:18 +00:00
2017-02-10 15:48:11 +00:00
func (i *Index) nextSequence() int {
i.seq++
return i.seq
2016-11-10 15:45:27 +00:00
}
2016-10-21 15:48:00 +00:00
2016-12-15 15:31:18 +00:00
// ManifestPath returns the path to the index's manifest file.
func (i *Index) ManifestPath() string {
return filepath.Join(i.Path, ManifestFileName)
}
// Manifest returns a manifest for the index.
func (i *Index) Manifest() *Manifest {
m := &Manifest{
2017-04-25 16:26:45 +00:00
Levels: i.levels,
Files: make([]string, len(i.fileSet.files)),
2016-12-15 15:31:18 +00:00
}
2017-04-25 16:26:45 +00:00
for j, f := range i.fileSet.files {
2017-02-10 15:48:11 +00:00
m.Files[j] = filepath.Base(f.Path())
2016-12-15 15:31:18 +00:00
}
return m
}
// writeManifestFile writes the manifest to the appropriate file path.
func (i *Index) writeManifestFile() error {
return WriteManifestFile(i.ManifestPath(), i.Manifest())
}
2016-11-28 16:59:36 +00:00
// SetFieldSet sets a shared field set from the engine.
func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) {
i.mu.Lock()
i.fieldset = fs
i.mu.Unlock()
}
2016-12-28 19:59:09 +00:00
// RetainFileSet returns the current fileset and adds a reference count.
2017-04-25 16:26:45 +00:00
func (i *Index) RetainFileSet() *FileSet {
i.mu.RLock()
2016-12-28 19:59:09 +00:00
fs := i.retainFileSet()
i.mu.RUnlock()
2016-12-28 19:59:09 +00:00
return fs
2016-11-28 16:59:36 +00:00
}
2017-04-25 16:26:45 +00:00
func (i *Index) retainFileSet() *FileSet {
2016-12-28 19:59:09 +00:00
fs := i.fileSet
fs.Retain()
return fs
}
// FileN returns the active files in the file set.
2017-04-25 16:26:45 +00:00
func (i *Index) FileN() int { return len(i.fileSet.files) }
2016-12-28 19:59:09 +00:00
// prependActiveLogFile adds a new log file so that the current log file can be compacted.
func (i *Index) prependActiveLogFile() error {
// Open file and insert it into the first position.
2017-02-10 15:48:11 +00:00
f, err := i.openLogFile(filepath.Join(i.Path, FormatLogFileName(i.nextSequence())))
if err != nil {
return err
}
i.activeLogFile = f
2017-04-25 16:26:45 +00:00
i.fileSet.files = append([]File{f}, i.fileSet.files...)
// Write new manifest.
if err := i.writeManifestFile(); err != nil {
// TODO: Close index if write fails.
return err
}
return nil
}
2016-12-15 15:31:18 +00:00
// ForEachMeasurementName iterates over all measurement names in the index.
func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error {
2016-12-28 19:59:09 +00:00
fs := i.RetainFileSet()
defer fs.Release()
itr := fs.MeasurementIterator()
2016-11-30 19:45:14 +00:00
if itr == nil {
return nil
}
for e := itr.Next(); e != nil; e = itr.Next() {
if err := fn(e.Name()); err != nil {
return err
}
}
return nil
}
2016-12-28 19:59:09 +00:00
// MeasurementExists returns true if a measurement exists.
func (i *Index) MeasurementExists(name []byte) (bool, error) {
fs := i.RetainFileSet()
defer fs.Release()
2017-01-12 16:29:40 +00:00
m := fs.Measurement(name)
return m != nil && !m.Deleted(), nil
2016-11-08 21:07:01 +00:00
}
2016-12-05 17:51:06 +00:00
func (i *Index) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
2016-12-28 19:59:09 +00:00
fs := i.RetainFileSet()
defer fs.Release()
return fs.MeasurementNamesByExpr(expr)
2016-09-02 14:52:11 +00:00
}
2016-10-03 15:08:43 +00:00
2016-11-11 16:25:53 +00:00
func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
2016-12-28 19:59:09 +00:00
fs := i.RetainFileSet()
defer fs.Release()
itr := fs.MeasurementIterator()
2016-11-11 16:25:53 +00:00
var a [][]byte
for e := itr.Next(); e != nil; e = itr.Next() {
2016-10-31 14:46:07 +00:00
if re.Match(e.Name()) {
2016-11-11 16:25:53 +00:00
a = append(a, e.Name())
2016-10-03 15:08:43 +00:00
}
}
2016-11-11 16:25:53 +00:00
return a, nil
2016-10-03 15:08:43 +00:00
}
2016-11-10 15:45:27 +00:00
// DropMeasurement deletes a measurement from the index.
func (i *Index) DropMeasurement(name []byte) error {
2016-12-28 19:59:09 +00:00
fs := i.RetainFileSet()
defer fs.Release()
2016-11-29 18:09:33 +00:00
// Delete all keys and values.
2016-12-28 19:59:09 +00:00
if kitr := fs.TagKeyIterator(name); kitr != nil {
2016-11-29 18:09:33 +00:00
for k := kitr.Next(); k != nil; k = kitr.Next() {
// Delete key if not already deleted.
if !k.Deleted() {
if err := func() error {
i.mu.RLock()
defer i.mu.RUnlock()
return i.activeLogFile.DeleteTagKey(name, k.Key())
}(); err != nil {
2016-11-29 18:09:33 +00:00
return err
}
}
// Delete each value in key.
if vitr := k.TagValueIterator(); vitr != nil {
for v := vitr.Next(); v != nil; v = vitr.Next() {
if !v.Deleted() {
if err := func() error {
i.mu.RLock()
defer i.mu.RUnlock()
return i.activeLogFile.DeleteTagValue(name, k.Key(), v.Value())
}(); err != nil {
2016-11-29 18:09:33 +00:00
return err
}
}
}
}
}
}
// Delete all series in measurement.
2016-12-28 19:59:09 +00:00
if sitr := fs.MeasurementSeriesIterator(name); sitr != nil {
2016-11-29 18:09:33 +00:00
for s := sitr.Next(); s != nil; s = sitr.Next() {
if !s.Deleted() {
if err := func() error {
i.mu.RLock()
defer i.mu.RUnlock()
return i.activeLogFile.DeleteSeries(s.Name(), s.Tags())
}(); err != nil {
2016-11-29 18:09:33 +00:00
return err
}
}
}
}
// Mark measurement as deleted.
if err := func() error {
i.mu.RLock()
defer i.mu.RUnlock()
return i.activeLogFile.DeleteMeasurement(name)
}(); err != nil {
2016-12-15 15:31:18 +00:00
return err
}
2017-02-10 15:48:11 +00:00
// Check if the log file needs to be swapped.
if err := i.CheckLogFile(); err != nil {
return err
}
2017-02-10 15:48:11 +00:00
2016-12-15 15:31:18 +00:00
return nil
2016-10-03 15:08:43 +00:00
}
// CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk.
func (i *Index) CreateSeriesListIfNotExists(_, names [][]byte, tagsSlice []models.Tags) error {
// All slices must be of equal length.
if len(names) != len(tagsSlice) {
return errors.New("names/tags length mismatch")
}
2017-03-17 17:20:50 +00:00
// Maintain reference count on files in file set.
fs := i.RetainFileSet()
defer fs.Release()
2016-12-28 19:59:09 +00:00
2017-03-17 17:20:50 +00:00
// Filter out existing series. Exit if no new series exist.
names, tagsSlice = fs.FilterNamesTags(names, tagsSlice)
if len(names) == 0 {
return nil
}
2017-03-17 17:20:50 +00:00
// Ensure fileset cannot change during insert.
i.mu.RLock()
2017-03-17 17:20:50 +00:00
// Insert series into log file.
if err := i.activeLogFile.AddSeriesList(names, tagsSlice); err != nil {
i.mu.RUnlock()
2017-02-06 18:14:13 +00:00
return err
}
i.mu.RUnlock()
return i.CheckLogFile()
}
2017-03-24 22:27:16 +00:00
// InitializeSeries is a no-op. This only applies to the in-memory index.
func (i *Index) InitializeSeries(key, name []byte, tags models.Tags) error {
return nil
}
2016-11-10 15:45:27 +00:00
// CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted.
2016-11-21 15:21:58 +00:00
func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
if err := func() error {
i.mu.RLock()
defer i.mu.RUnlock()
2016-12-15 15:31:18 +00:00
fs := i.retainFileSet()
defer fs.Release()
2016-12-28 19:59:09 +00:00
if fs.HasSeries(name, tags, nil) {
return nil
}
2016-12-15 15:31:18 +00:00
if err := i.activeLogFile.AddSeries(name, tags); err != nil {
return err
}
return nil
}(); err != nil {
2016-12-15 15:31:18 +00:00
return err
}
2017-02-10 15:48:11 +00:00
// Swap log file, if necesssary.
if err := i.CheckLogFile(); err != nil {
return err
}
2016-12-15 15:31:18 +00:00
return nil
2016-10-03 15:08:43 +00:00
}
2017-02-01 18:51:29 +00:00
func (i *Index) DropSeries(key []byte) error {
if err := func() error {
i.mu.RLock()
defer i.mu.RUnlock()
2016-12-28 19:59:09 +00:00
name, tags, err := models.ParseKey(key)
if err != nil {
return err
}
2017-02-01 18:51:29 +00:00
mname := []byte(name)
if err := i.activeLogFile.DeleteSeries(mname, tags); err != nil {
2016-11-11 16:25:53 +00:00
return err
}
2017-02-01 18:51:29 +00:00
// Obtain file set after deletion because that may add a new log file.
fs := i.retainFileSet()
defer fs.Release()
2017-02-01 18:51:29 +00:00
// Check if that was the last series for the measurement in the entire index.
itr := fs.MeasurementSeriesIterator(mname)
if itr == nil {
return nil
2017-02-01 18:51:29 +00:00
} else if e := itr.Next(); e != nil {
return nil
}
// If no more series exist in the measurement then delete the measurement.
if err := i.activeLogFile.DeleteMeasurement(mname); err != nil {
return err
2016-11-29 12:26:52 +00:00
}
2017-02-01 18:51:29 +00:00
return nil
}(); err != nil {
2017-02-01 18:51:29 +00:00
return err
2016-11-11 16:25:53 +00:00
}
2016-12-15 15:31:18 +00:00
2017-02-10 15:48:11 +00:00
// Swap log file, if necesssary.
if err := i.CheckLogFile(); err != nil {
return err
}
2016-11-11 16:25:53 +00:00
return nil
2016-10-03 15:08:43 +00:00
}
2016-11-28 13:21:39 +00:00
// SeriesSketches returns the two sketches for the index by merging all
2017-02-01 15:33:30 +00:00
// instances sketches from TSI files and the WAL.
2016-11-28 13:21:39 +00:00
func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
2016-12-28 19:59:09 +00:00
fs := i.RetainFileSet()
defer fs.Release()
2017-02-01 15:33:30 +00:00
return fs.SeriesSketches()
2016-11-28 13:21:39 +00:00
}
// MeasurementsSketches returns the two sketches for the index by merging all
2017-02-01 13:43:37 +00:00
// instances of the type sketch types in all the index files.
2016-11-28 13:21:39 +00:00
func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
2016-12-28 19:59:09 +00:00
fs := i.RetainFileSet()
defer fs.Release()
2017-02-01 15:33:30 +00:00
return fs.MeasurementsSketches()
2016-11-28 13:21:39 +00:00
}
2016-12-28 19:59:09 +00:00
// SeriesN returns the number of unique non-tombstoned series in the index.
// Since indexes are not shared across shards, the count returned by SeriesN
// cannot be combined with other shard's results. If you need to count series
// across indexes then use SeriesSketches and merge the results from other
// indexes.
func (i *Index) SeriesN() int64 {
fs := i.RetainFileSet()
defer fs.Release()
2016-11-08 21:07:01 +00:00
2016-12-28 19:59:09 +00:00
var total int64
2017-04-25 16:26:45 +00:00
for _, f := range fs.files {
2016-12-28 19:59:09 +00:00
total += int64(f.SeriesN())
2016-11-27 20:15:32 +00:00
}
2016-12-28 19:59:09 +00:00
return total
2016-11-27 20:15:32 +00:00
}
// HasTagKey returns true if tag key exists.
func (i *Index) HasTagKey(name, key []byte) (bool, error) {
fs := i.RetainFileSet()
defer fs.Release()
return fs.HasTagKey(name, key), nil
}
2016-12-28 19:59:09 +00:00
// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression.
func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
fs := i.RetainFileSet()
defer fs.Release()
return fs.MeasurementTagKeysByExpr(name, expr)
2016-11-08 21:07:01 +00:00
}
2016-12-05 17:51:06 +00:00
// ForEachMeasurementSeriesByExpr iterates over all series in a measurement filtered by an expression.
func (i *Index) ForEachMeasurementSeriesByExpr(name []byte, condition influxql.Expr, fn func(tags models.Tags) error) error {
2016-12-28 19:59:09 +00:00
fs := i.RetainFileSet()
defer fs.Release()
itr, err := fs.MeasurementSeriesByExprIterator(name, condition, i.fieldset)
2016-12-05 17:51:06 +00:00
if err != nil {
return err
} else if itr == nil {
return nil
}
for e := itr.Next(); e != nil; e = itr.Next() {
if err := fn(e.Tags()); err != nil {
return err
}
}
return nil
}
// ForEachMeasurementTagKey iterates over all tag keys in a measurement.
func (i *Index) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
2016-12-28 19:59:09 +00:00
fs := i.RetainFileSet()
defer fs.Release()
itr := fs.TagKeyIterator(name)
2016-12-05 17:51:06 +00:00
if itr == nil {
return nil
}
for e := itr.Next(); e != nil; e = itr.Next() {
if err := fn(e.Key()); err != nil {
return err
}
}
return nil
}
2017-03-24 15:48:10 +00:00
// TagKeyCardinality always returns zero.
// It is not possible to determine cardinality of tags across index files.
func (i *Index) TagKeyCardinality(name, key []byte) int {
return 0
}
2016-12-28 19:59:09 +00:00
// MeasurementSeriesKeysByExpr returns a list of series keys matching expr.
func (i *Index) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) {
fs := i.RetainFileSet()
defer fs.Release()
return fs.MeasurementSeriesKeysByExpr(name, expr, i.fieldset)
}
2016-11-08 21:07:01 +00:00
// TagSets returns an ordered list of tag sets for a measurement by dimension
// and filtered by an optional conditional expression.
2017-03-29 17:00:28 +00:00
func (i *Index) TagSets(name []byte, opt influxql.IteratorOptions) ([]*influxql.TagSet, error) {
2016-12-28 19:59:09 +00:00
fs := i.RetainFileSet()
defer fs.Release()
2017-03-29 17:00:28 +00:00
itr, err := fs.MeasurementSeriesByExprIterator(name, opt.Condition, i.fieldset)
2016-11-11 16:25:53 +00:00
if err != nil {
return nil, err
} else if itr == nil {
return nil, nil
}
// For every series, get the tag values for the requested tag keys i.e.
// dimensions. This is the TagSet for that series. Series with the same
// TagSet are then grouped together, because for the purpose of GROUP BY
// they are part of the same composite series.
tagSets := make(map[string]*influxql.TagSet, 64)
2017-01-09 17:10:12 +00:00
if itr != nil {
for e := itr.Next(); e != nil; e = itr.Next() {
2017-03-29 17:00:28 +00:00
tags := make(map[string]string, len(opt.Dimensions))
2016-11-11 16:25:53 +00:00
2017-01-09 17:10:12 +00:00
// Build the TagSet for this series.
2017-03-29 17:00:28 +00:00
for _, dim := range opt.Dimensions {
2017-01-09 17:10:12 +00:00
tags[dim] = e.Tags().GetString(dim)
}
2016-11-11 16:25:53 +00:00
2017-01-09 17:10:12 +00:00
// Convert the TagSet to a string, so it can be added to a map
// allowing TagSets to be handled as a set.
tagsAsKey := tsdb.MarshalTags(tags)
tagSet, ok := tagSets[string(tagsAsKey)]
if !ok {
// This TagSet is new, create a new entry for it.
tagSet = &influxql.TagSet{
Tags: tags,
Key: tagsAsKey,
}
2016-11-11 16:25:53 +00:00
}
2017-01-09 17:10:12 +00:00
// Associate the series and filter with the Tagset.
tagSet.AddFilter(string(SeriesElemKey(e)), e.Expr())
2016-11-11 16:25:53 +00:00
2017-01-09 17:10:12 +00:00
// Ensure it's back in the map.
tagSets[string(tagsAsKey)] = tagSet
}
2016-11-11 16:25:53 +00:00
}
// Sort the series in each tag set.
for _, t := range tagSets {
sort.Sort(t)
}
// The TagSets have been created, as a map of TagSets. Just send
// the values back as a slice, sorting for consistency.
sortedTagsSets := make([]*influxql.TagSet, 0, len(tagSets))
for _, v := range tagSets {
sortedTagsSets = append(sortedTagsSets, v)
}
sort.Sort(byTagKey(sortedTagsSets))
return sortedTagsSets, nil
2016-11-08 21:07:01 +00:00
}
2017-02-01 21:19:24 +00:00
// SnapshotTo creates hard links to the file set into path.
func (i *Index) SnapshotTo(path string) error {
i.mu.Lock()
defer i.mu.Unlock()
fs := i.retainFileSet()
defer fs.Release()
2017-03-15 17:23:58 +00:00
// Flush active log file, if any.
if err := i.activeLogFile.Flush(); err != nil {
return err
2017-03-15 17:23:58 +00:00
}
2017-02-01 21:19:24 +00:00
if err := os.Mkdir(filepath.Join(path, "index"), 0777); err != nil {
return err
}
2017-03-15 17:23:58 +00:00
// Link manifest.
if err := os.Link(i.ManifestPath(), filepath.Join(path, "index", filepath.Base(i.ManifestPath()))); err != nil {
return fmt.Errorf("error creating tsi manifest hard link: %q", err)
}
// Link files in directory.
2017-04-25 16:26:45 +00:00
for _, f := range fs.files {
2017-02-01 21:19:24 +00:00
if err := os.Link(f.Path(), filepath.Join(path, "index", filepath.Base(f.Path()))); err != nil {
return fmt.Errorf("error creating tsi hard link: %q", err)
}
}
return nil
}
func (i *Index) SetFieldName(measurement []byte, name string) {}
func (i *Index) RemoveShard(shardID uint64) {}
func (i *Index) AssignShard(k string, shardID uint64) {}
2016-12-28 19:59:09 +00:00
func (i *Index) UnassignShard(k string, shardID uint64) error {
// This can be called directly once inmem is gone.
2017-02-01 18:51:29 +00:00
return i.DropSeries([]byte(k))
2016-11-08 21:07:01 +00:00
}
2016-12-28 19:59:09 +00:00
// SeriesPointIterator returns an influxql iterator over all series.
func (i *Index) SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
// NOTE: The iterator handles releasing the file set.
fs := i.RetainFileSet()
return newSeriesPointIterator(fs, i.fieldset, opt), nil
}
2016-11-30 19:45:14 +00:00
2017-02-14 20:59:48 +00:00
// Compact requests a compaction of log files.
func (i *Index) Compact() {
i.mu.Lock()
defer i.mu.Unlock()
i.compact()
}
2016-11-30 19:45:14 +00:00
2017-02-14 20:59:48 +00:00
// compact compacts continguous groups of files that are not currently compacting.
func (i *Index) compact() {
2017-05-05 21:06:07 +00:00
if !i.CompactionEnabled {
return
}
2017-02-14 20:59:48 +00:00
fs := i.retainFileSet()
defer fs.Release()
// Return contiguous groups of files that are available for compaction.
for _, group := range i.compactionGroups(fs) {
// Mark files in group as compacting.
for _, f := range group {
f.Retain()
f.setCompacting(true)
2017-02-10 15:48:11 +00:00
}
2016-11-30 19:45:14 +00:00
2017-02-14 20:59:48 +00:00
// Execute in closure to save reference to the group within the loop.
func(group []*IndexFile) {
// Start compacting in a separate goroutine.
i.wg.Add(1)
go func() {
defer i.wg.Done()
i.compactGroup(group)
i.Compact() // check for new compactions
}()
}(group)
2016-12-28 19:59:09 +00:00
}
2017-02-14 20:59:48 +00:00
}
2016-11-08 21:07:01 +00:00
2017-02-14 20:59:48 +00:00
// compactionGroups returns contiguous groups of index files that can be compacted.
//
// All groups will have at least two files and the total size is more than the
// largest file times the compaction factor. For example, if the compaction
// factor is 2 then the total size will be at least double the max file size.
2017-04-25 16:26:45 +00:00
func (i *Index) compactionGroups(fileSet *FileSet) [][]*IndexFile {
log.Printf("%s: checking for compaction groups: n=%d", IndexName, len(fileSet.files))
2017-02-14 20:59:48 +00:00
var groups [][]*IndexFile
// Loop over all files to find contiguous group of compactable files.
var group []*IndexFile
2017-04-25 16:26:45 +00:00
for _, f := range fileSet.files {
2017-02-14 20:59:48 +00:00
indexFile, ok := f.(*IndexFile)
// Skip over log files. They compact themselves.
if !ok {
2017-04-25 16:26:45 +00:00
if isCompactableGroup(group, CompactionFactor) {
2017-02-14 20:59:48 +00:00
group, groups = nil, append(groups, group)
} else {
group = nil
2016-12-15 15:31:18 +00:00
}
2017-02-14 20:59:48 +00:00
continue
}
// If file is currently compacting then stop current group.
if indexFile.Compacting() {
2017-04-25 16:26:45 +00:00
if isCompactableGroup(group, CompactionFactor) {
2017-02-14 20:59:48 +00:00
group, groups = nil, append(groups, group)
} else {
group = nil
2016-12-15 15:31:18 +00:00
}
2017-02-14 20:59:48 +00:00
continue
2016-12-15 15:31:18 +00:00
}
2017-02-14 20:59:48 +00:00
// Stop current group if adding file will invalidate group.
// This can happen when appending a large file to a group of small files.
2017-04-25 16:26:45 +00:00
if isCompactableGroup(group, CompactionFactor) && !isCompactableGroup(append(group, indexFile), CompactionFactor) {
2017-02-14 20:59:48 +00:00
group, groups = []*IndexFile{indexFile}, append(groups, group)
continue
}
2016-12-15 15:31:18 +00:00
2017-02-14 20:59:48 +00:00
// Otherwise append to the current group.
group = append(group, indexFile)
}
2016-12-15 15:31:18 +00:00
2017-02-14 20:59:48 +00:00
// Append final group, if compactable.
2017-04-25 16:26:45 +00:00
if isCompactableGroup(group, CompactionFactor) {
2017-02-14 20:59:48 +00:00
groups = append(groups, group)
2016-12-15 15:31:18 +00:00
}
2017-02-14 20:59:48 +00:00
return groups
}
2017-02-10 15:48:11 +00:00
2017-02-14 20:59:48 +00:00
// isCompactableGroup returns true if total file size is greater than max file size times factor.
func isCompactableGroup(files []*IndexFile, factor float64) bool {
if len(files) < 2 {
return false
2017-02-10 15:48:11 +00:00
}
2017-02-14 20:59:48 +00:00
var max, total int64
for _, f := range files {
sz := f.Size()
if sz > max {
max = sz
}
total += sz
}
return total >= int64(float64(max)*factor)
}
// compactGroup compacts files into a new file. Replaces old files with
// compacted file on successful completion. This runs in a separate goroutine.
func (i *Index) compactGroup(files []*IndexFile) {
assert(len(files) >= 2, "at least two index files are required for compaction")
// Files have already been retained by caller.
// Ensure files are released only once.
var once sync.Once
defer once.Do(func() { IndexFiles(files).Release() })
// Track time to compact.
start := time.Now()
2016-12-15 15:31:18 +00:00
// Create new index file.
2017-04-25 16:26:45 +00:00
path := filepath.Join(i.Path, FormatIndexFileName(i.NextSequence(), 1)) // TODO
2016-12-15 15:31:18 +00:00
f, err := os.Create(path)
if err != nil {
2017-02-14 20:59:48 +00:00
log.Printf("%s: error creating compaction files: %s", IndexName, err)
return
2016-12-15 15:31:18 +00:00
}
defer f.Close()
2017-02-14 20:59:48 +00:00
srcIDs := joinIntSlice(IndexFiles(files).IDs(), ",")
log.Printf("%s: performing full compaction: src=%s, path=%s", IndexName, srcIDs, path)
2017-02-10 15:48:11 +00:00
// Compact all index files to new index file.
2017-02-14 20:59:48 +00:00
n, err := IndexFiles(files).WriteTo(f)
2017-02-10 15:48:11 +00:00
if err != nil {
2017-02-14 20:59:48 +00:00
log.Printf("%s: error compacting index files: src=%s, path=%s, err=%s", IndexName, srcIDs, path, err)
return
2016-12-15 15:31:18 +00:00
}
// Close file.
if err := f.Close(); err != nil {
2017-03-15 12:16:35 +00:00
log.Printf("%s: error closing index file: %s", IndexName, err)
2017-02-14 20:59:48 +00:00
return
2016-12-15 15:31:18 +00:00
}
// Reopen as an index file.
file := NewIndexFile()
2017-02-01 21:19:24 +00:00
file.SetPath(path)
2016-12-15 15:31:18 +00:00
if err := file.Open(); err != nil {
2017-03-15 12:16:35 +00:00
log.Printf("%s: error opening new index file: %s", IndexName, err)
2017-02-14 20:59:48 +00:00
return
2016-12-15 15:31:18 +00:00
}
// Obtain lock to swap in index file and write manifest.
2017-01-02 16:29:18 +00:00
if err := func() error {
i.mu.Lock()
defer i.mu.Unlock()
2016-12-15 15:31:18 +00:00
2017-02-10 15:48:11 +00:00
// Replace previous files with new index file.
2017-02-14 20:59:48 +00:00
i.fileSet = i.fileSet.MustReplace(IndexFiles(files).Files(), file)
2016-12-15 15:31:18 +00:00
2017-01-02 16:29:18 +00:00
// Write new manifest.
if err := i.writeManifestFile(); err != nil {
// TODO: Close index if write fails.
return err
}
return nil
}(); err != nil {
2017-03-15 12:16:35 +00:00
log.Printf("%s: error writing manifest: %s", IndexName, err)
2017-02-14 20:59:48 +00:00
return
2017-01-02 16:29:18 +00:00
}
2017-02-10 15:48:11 +00:00
log.Printf("%s: full compaction complete: file=%s, t=%s, sz=%d", IndexName, path, time.Since(start), n)
2016-12-15 15:31:18 +00:00
2017-02-10 15:48:11 +00:00
// Release old files.
2017-02-14 20:59:48 +00:00
once.Do(func() { IndexFiles(files).Release() })
2017-02-10 15:48:11 +00:00
// Close and delete all old index files.
2017-02-14 20:59:48 +00:00
for _, f := range files {
2017-02-10 15:48:11 +00:00
log.Printf("%s: removing index file: file=%s", IndexName, f.Path())
if err := f.Close(); err != nil {
2017-03-15 12:16:35 +00:00
log.Printf("%s: error closing index file: %s", IndexName, err)
2017-02-14 20:59:48 +00:00
return
2017-02-10 15:48:11 +00:00
} else if err := os.Remove(f.Path()); err != nil {
2017-03-15 12:16:35 +00:00
log.Printf("%s: error removing index file: %s", IndexName, err)
2017-02-14 20:59:48 +00:00
return
2017-02-10 15:48:11 +00:00
}
2016-12-15 15:31:18 +00:00
}
}
func (i *Index) CheckLogFile() error {
// Check log file size under read lock.
if size := func() int64 {
i.mu.RLock()
defer i.mu.RUnlock()
return i.activeLogFile.Size()
}(); size < i.MaxLogFileSize {
return nil
}
// If file size exceeded then recheck under write lock and swap files.
i.mu.Lock()
defer i.mu.Unlock()
2017-03-17 17:20:50 +00:00
return i.checkLogFile()
}
func (i *Index) checkLogFile() error {
if i.activeLogFile.Size() < i.MaxLogFileSize {
return nil
2017-02-10 15:48:11 +00:00
}
2017-01-02 16:29:18 +00:00
// Swap current log file.
2017-02-10 15:48:11 +00:00
logFile := i.activeLogFile
// Open new log file and insert it into the first position.
if err := i.prependActiveLogFile(); err != nil {
return err
}
2016-12-15 15:31:18 +00:00
2017-02-10 15:48:11 +00:00
// Begin compacting in a background goroutine.
i.wg.Add(1)
2017-02-14 20:59:48 +00:00
go func() {
2017-03-15 17:23:58 +00:00
defer i.wg.Done()
2017-02-14 20:59:48 +00:00
i.compactLogFile(logFile)
i.Compact() // check for new compactions
}()
return nil
2017-02-10 15:48:11 +00:00
}
2016-12-15 15:31:18 +00:00
2017-02-10 15:48:11 +00:00
// compactLogFile compacts f into a tsi file. The new file will share the
// same identifier but will have a ".tsi" extension. Once the log file is
// compacted then the manifest is updated and the log file is discarded.
func (i *Index) compactLogFile(logFile *LogFile) {
start := time.Now()
log.Printf("tsi1: compacting log file: file=%s", logFile.Path())
2016-12-15 15:31:18 +00:00
2017-02-10 15:48:11 +00:00
// Retrieve identifier from current path.
2017-04-25 16:26:45 +00:00
id := logFile.ID()
2017-02-10 15:48:11 +00:00
assert(id != 0, "cannot parse log file id: %s", logFile.Path())
2016-12-15 15:31:18 +00:00
// Create new index file.
2017-04-25 16:26:45 +00:00
path := filepath.Join(i.Path, FormatIndexFileName(id, 1))
2016-12-15 15:31:18 +00:00
f, err := os.Create(path)
if err != nil {
2017-02-10 15:48:11 +00:00
log.Printf("tsi1: error creating index file: %s", err)
return
2016-12-15 15:31:18 +00:00
}
defer f.Close()
2017-02-10 15:48:11 +00:00
// Compact log file to new index file.
n, err := logFile.WriteTo(f)
if err != nil {
log.Printf("%s: error compacting log file: path=%s, err=%s", IndexName, logFile.Path(), err)
return
2016-12-15 15:31:18 +00:00
}
// Close file.
if err := f.Close(); err != nil {
2017-02-10 15:48:11 +00:00
log.Printf("tsi1: error closing log file: %s", err)
return
2016-12-15 15:31:18 +00:00
}
// Reopen as an index file.
file := NewIndexFile()
2017-02-01 21:19:24 +00:00
file.SetPath(path)
2016-12-15 15:31:18 +00:00
if err := file.Open(); err != nil {
2017-02-10 15:48:11 +00:00
log.Printf("tsi1: error opening compacted index file: path=%s, err=%s", file.Path(), err)
return
2016-12-15 15:31:18 +00:00
}
// Obtain lock to swap in index file and write manifest.
2017-01-02 16:29:18 +00:00
if err := func() error {
i.mu.Lock()
defer i.mu.Unlock()
2016-12-15 15:31:18 +00:00
2017-02-10 15:48:11 +00:00
// Replace previous log file with index file.
i.fileSet = i.fileSet.MustReplace([]File{logFile}, file)
2016-12-15 15:31:18 +00:00
2017-01-02 16:29:18 +00:00
// Write new manifest.
if err := i.writeManifestFile(); err != nil {
// TODO: Close index if write fails.
return err
}
return nil
}(); err != nil {
2017-02-10 15:48:11 +00:00
log.Printf("%s: error updating manifest: %s", IndexName, err)
return
}
2017-02-10 15:48:11 +00:00
log.Printf("%s: finished compacting log file: file=%s, t=%v, sz=%d", IndexName, logFile.Path(), time.Since(start), n)
2017-02-10 15:48:11 +00:00
// Closing the log file will automatically wait until the ref count is zero.
log.Printf("%s: removing log file: file=%s", IndexName, logFile.Path())
if err := logFile.Close(); err != nil {
log.Printf("%s: error closing log file: %s", IndexName, err)
return
} else if err := os.Remove(logFile.Path()); err != nil {
log.Printf("%s: error removing log file: %s", IndexName, err)
2016-12-15 15:31:18 +00:00
return
}
2017-02-10 15:48:11 +00:00
return
2016-12-15 15:31:18 +00:00
}
2016-12-28 19:59:09 +00:00
// seriesPointIterator adapts SeriesIterator to an influxql.Iterator.
type seriesPointIterator struct {
once sync.Once
2017-04-25 16:26:45 +00:00
fs *FileSet
2016-12-28 19:59:09 +00:00
fieldset *tsdb.MeasurementFieldSet
mitr MeasurementIterator
sitr SeriesIterator
opt influxql.IteratorOptions
point influxql.FloatPoint // reusable point
}
// newSeriesPointIterator returns a new instance of seriesPointIterator.
2017-04-25 16:26:45 +00:00
func newSeriesPointIterator(fs *FileSet, fieldset *tsdb.MeasurementFieldSet, opt influxql.IteratorOptions) *seriesPointIterator {
2016-12-28 19:59:09 +00:00
return &seriesPointIterator{
fs: fs,
fieldset: fieldset,
mitr: fs.MeasurementIterator(),
point: influxql.FloatPoint{
Aux: make([]interface{}, len(opt.Aux)),
},
opt: opt,
}
}
// Stats returns stats about the points processed.
func (itr *seriesPointIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} }
// Close closes the iterator.
func (itr *seriesPointIterator) Close() error {
itr.once.Do(func() { itr.fs.Release() })
return nil
}
2016-11-28 16:59:36 +00:00
// Next emits the next point in the iterator.
func (itr *seriesPointIterator) Next() (*influxql.FloatPoint, error) {
for {
// Create new series iterator, if necessary.
// Exit if there are no measurements remaining.
if itr.sitr == nil {
m := itr.mitr.Next()
if m == nil {
return nil, nil
}
2016-12-28 19:59:09 +00:00
sitr, err := itr.fs.MeasurementSeriesByExprIterator(m.Name(), itr.opt.Condition, itr.fieldset)
2016-11-28 16:59:36 +00:00
if err != nil {
return nil, err
} else if sitr == nil {
continue
}
itr.sitr = sitr
}
// Read next series element.
e := itr.sitr.Next()
if e == nil {
itr.sitr = nil
continue
}
// Convert to a key.
key := string(models.MakeKey(e.Name(), e.Tags()))
// Write auxiliary fields.
for i, f := range itr.opt.Aux {
switch f.Val {
case "key":
itr.point.Aux[i] = key
}
}
return &itr.point, nil
}
}
2016-12-06 17:30:41 +00:00
// unionStringSets returns the union of two sets
func unionStringSets(a, b map[string]struct{}) map[string]struct{} {
other := make(map[string]struct{})
for k := range a {
other[k] = struct{}{}
}
for k := range b {
other[k] = struct{}{}
}
return other
}
// intersectStringSets returns the intersection of two sets.
func intersectStringSets(a, b map[string]struct{}) map[string]struct{} {
if len(a) < len(b) {
a, b = b, a
}
other := make(map[string]struct{})
for k := range a {
if _, ok := b[k]; ok {
other[k] = struct{}{}
}
}
return other
}
2016-12-15 15:31:18 +00:00
2017-04-25 16:26:45 +00:00
var fileIDRegex = regexp.MustCompile(`^L(\d+)-(\d+)\..+$`)
2016-12-15 15:31:18 +00:00
2017-04-25 16:26:45 +00:00
// ParseFilename extracts the numeric id from a log or index file path.
2016-12-15 15:31:18 +00:00
// Returns 0 if it cannot be parsed.
2017-04-25 16:26:45 +00:00
func ParseFilename(name string) (level, id int) {
2016-12-15 15:31:18 +00:00
a := fileIDRegex.FindStringSubmatch(filepath.Base(name))
if a == nil {
2017-04-25 16:26:45 +00:00
return 0, 0
2016-12-15 15:31:18 +00:00
}
2017-04-25 16:26:45 +00:00
level, _ = strconv.Atoi(a[1])
id, _ = strconv.Atoi(a[2])
return id, level
2016-12-15 15:31:18 +00:00
}
// Manifest represents the list of log & index files that make up the index.
// The files are listed in time order, not necessarily ID order.
type Manifest struct {
2017-04-25 16:26:45 +00:00
Levels []CompactionLevel `json:"levels,omitempty`
Files []string `json:"files,omitempty"`
}
// NewManifest returns a new instance of Manifest with default compaction levels.
func NewManifest() *Manifest {
m := &Manifest{
Levels: make([]CompactionLevel, len(DefaultCompactionLevels)),
}
copy(m.Levels, DefaultCompactionLevels[:])
return m
2016-12-15 15:31:18 +00:00
}
// HasFile returns true if name is listed in the log files or index files.
func (m *Manifest) HasFile(name string) bool {
2017-02-10 15:48:11 +00:00
for _, filename := range m.Files {
2016-12-15 15:31:18 +00:00
if filename == name {
return true
}
}
return false
}
// ReadManifestFile reads a manifest from a file path.
func ReadManifestFile(path string) (*Manifest, error) {
buf, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
// Decode manifest.
var m Manifest
if err := json.Unmarshal(buf, &m); err != nil {
return nil, err
}
2017-04-25 16:26:45 +00:00
2016-12-15 15:31:18 +00:00
return &m, nil
}
// WriteManifestFile writes a manifest to a file path.
func WriteManifestFile(path string, m *Manifest) error {
buf, err := json.MarshalIndent(m, "", " ")
if err != nil {
return err
}
buf = append(buf, '\n')
if err := ioutil.WriteFile(path, buf, 0666); err != nil {
return err
}
return nil
}
2017-02-14 20:59:48 +00:00
func joinIntSlice(a []int, sep string) string {
other := make([]string, len(a))
for i := range a {
other[i] = strconv.Itoa(a[i])
}
return strings.Join(other, sep)
}
2017-04-25 16:26:45 +00:00
// CompactionLevel represents a grouping of index files based on size and
// bloom filter settings. By having the same bloom filter settings, the filters
// can be merged and evaluated at a higher level.
type CompactionLevel struct {
// Minimum expected index size
MinSize int64 `json:"minSize,omitempty"`
// Bloom filter bit size & hash count
M uint64 `json:"m,omitempty"`
K uint64 `json:"k,omitempty"`
}
// DefaultCompactionLevels is the default settings used by the index.
var DefaultCompactionLevels = []CompactionLevel{
// Log files, no filter.
{M: 0, K: 0},
// Initial compaction, 4MB filter
{
MinSize: 0,
M: 1 << 25,
K: 6,
},
// 200MB min file, 33MB filter
{
MinSize: 200 * (1 << 20),
M: 1 << 28,
K: 6,
},
// 2GB min file, 134MB filter
{
MinSize: 2 * (1 << 30),
M: 1 << 30,
K: 6,
},
}
// MaxIndexFileSize is the maximum expected size of an index file.
const MaxIndexFileSize = 4 * (1 << 30)
// TEMP
const CompactionFactor = 10