fix(tsdb): never use an inmem index (#20313)
And fix the logging setup for the TSDB storage enginepull/20412/head
parent
9b8c81c49c
commit
9aefa6f868
13
CHANGELOG.md
13
CHANGELOG.md
|
@ -1,5 +1,14 @@
|
|||
## unreleased
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
#### inmem index option removed
|
||||
This release fully removes the `inmem` indexing option, along with the associated config options:
|
||||
* `max-series-per-database`
|
||||
* `max-values-per-tag`
|
||||
|
||||
Replacement `tsi1` indexes will be automatically generated on startup for shards that need it.
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
1. [20339](https://github.com/influxdata/influxdb/pull/20339): Include upgrade helper script in goreleaser manifest.
|
||||
|
@ -10,7 +19,9 @@
|
|||
1. [20362](https://github.com/influxdata/influxdb/pull/20362): Don't overwrite stack name/description on `influx stack update`.
|
||||
1. [20355](https://github.com/influxdata/influxdb/pull/20355): Fix timeout setup for `influxd` graceful shutdown.
|
||||
1. [20387](https://github.com/influxdata/influxdb/pull/20387): Improve error message shown when `influx` CLI can't find an org by name.
|
||||
1. [20380](https://github.com/influxdata/influxdb/pull/20380): Remove duplication from task error messages
|
||||
1. [20380](https://github.com/influxdata/influxdb/pull/20380): Remove duplication from task error messages.
|
||||
1. [20313](https://github.com/influxdata/influxdb/pull/20313): Automatically build `tsi1` indexes for shards that need it instead of falling back to `inmem`.
|
||||
1. [20313](https://github.com/influxdata/influxdb/pull/20313): Fix logging initialization for storage engine.
|
||||
|
||||
## v2.0.3 [2020-12-14]
|
||||
|
||||
|
|
|
@ -1,558 +0,0 @@
|
|||
// Package buildtsi reads an in-memory index and exports it as a TSI index.
|
||||
package buildtsi
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"os/user"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/logger"
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/pkg/file"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const defaultBatchSize = 10000
|
||||
|
||||
// Command represents the program execution for "influx_inspect buildtsi".
|
||||
type Command struct {
|
||||
Stderr io.Writer
|
||||
Stdout io.Writer
|
||||
Verbose bool
|
||||
Logger *zap.Logger
|
||||
|
||||
concurrency int // Number of goroutines to dedicate to shard index building.
|
||||
databaseFilter string
|
||||
retentionFilter string
|
||||
shardFilter string
|
||||
compactSeriesFile bool
|
||||
maxLogFileSize int64
|
||||
maxCacheSize uint64
|
||||
batchSize int
|
||||
}
|
||||
|
||||
// NewCommand returns a new instance of Command.
|
||||
func NewCommand() *Command {
|
||||
return &Command{
|
||||
Stderr: os.Stderr,
|
||||
Stdout: os.Stdout,
|
||||
Logger: zap.NewNop(),
|
||||
batchSize: defaultBatchSize,
|
||||
concurrency: runtime.GOMAXPROCS(0),
|
||||
}
|
||||
}
|
||||
|
||||
// Run executes the command.
|
||||
func (cmd *Command) Run(args ...string) error {
|
||||
fs := flag.NewFlagSet("buildtsi", flag.ExitOnError)
|
||||
dataDir := fs.String("datadir", "", "data directory")
|
||||
walDir := fs.String("waldir", "", "WAL directory")
|
||||
fs.IntVar(&cmd.concurrency, "concurrency", runtime.GOMAXPROCS(0), "Number of workers to dedicate to shard index building. Defaults to GOMAXPROCS")
|
||||
fs.StringVar(&cmd.databaseFilter, "database", "", "optional: database name")
|
||||
fs.StringVar(&cmd.retentionFilter, "retention", "", "optional: retention policy")
|
||||
fs.StringVar(&cmd.shardFilter, "shard", "", "optional: shard id")
|
||||
fs.BoolVar(&cmd.compactSeriesFile, "compact-series-file", false, "optional: compact existing series file. Do not rebuilt index.")
|
||||
fs.Int64Var(&cmd.maxLogFileSize, "max-log-file-size", tsdb.DefaultMaxIndexLogFileSize, "optional: maximum log file size")
|
||||
fs.Uint64Var(&cmd.maxCacheSize, "max-cache-size", tsdb.DefaultCacheMaxMemorySize, "optional: maximum cache size")
|
||||
fs.IntVar(&cmd.batchSize, "batch-size", defaultBatchSize, "optional: set the size of the batches we write to the index. Setting this can have adverse affects on performance and heap requirements")
|
||||
fs.BoolVar(&cmd.Verbose, "v", false, "verbose")
|
||||
fs.SetOutput(cmd.Stdout)
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return err
|
||||
} else if fs.NArg() > 0 || *dataDir == "" || *walDir == "" {
|
||||
fs.Usage()
|
||||
return nil
|
||||
}
|
||||
cmd.Logger = logger.New(cmd.Stderr)
|
||||
|
||||
return cmd.run(*dataDir, *walDir)
|
||||
}
|
||||
|
||||
func (cmd *Command) run(dataDir, walDir string) error {
|
||||
// Verify the user actually wants to run as root.
|
||||
if isRoot() {
|
||||
fmt.Fprintln(cmd.Stdout, "You are currently running as root. This will build your")
|
||||
fmt.Fprintln(cmd.Stdout, "index files with root ownership and will be inaccessible")
|
||||
fmt.Fprintln(cmd.Stdout, "if you run influxd as a non-root user. You should run")
|
||||
fmt.Fprintln(cmd.Stdout, "buildtsi as the same user you are running influxd.")
|
||||
fmt.Fprint(cmd.Stdout, "Are you sure you want to continue? (y/N): ")
|
||||
var answer string
|
||||
if fmt.Scanln(&answer); !strings.HasPrefix(strings.TrimSpace(strings.ToLower(answer)), "y") {
|
||||
return fmt.Errorf("operation aborted")
|
||||
}
|
||||
}
|
||||
|
||||
if cmd.compactSeriesFile {
|
||||
if cmd.retentionFilter != "" {
|
||||
return errors.New("cannot specify retention policy when compacting series file")
|
||||
} else if cmd.shardFilter != "" {
|
||||
return errors.New("cannot specify shard ID when compacting series file")
|
||||
}
|
||||
}
|
||||
|
||||
fis, err := ioutil.ReadDir(dataDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, fi := range fis {
|
||||
name := fi.Name()
|
||||
if !fi.IsDir() {
|
||||
continue
|
||||
} else if cmd.databaseFilter != "" && name != cmd.databaseFilter {
|
||||
continue
|
||||
}
|
||||
|
||||
if cmd.compactSeriesFile {
|
||||
if err := cmd.compactDatabaseSeriesFile(name, filepath.Join(dataDir, name)); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if err := cmd.processDatabase(name, filepath.Join(dataDir, name), filepath.Join(walDir, name)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// compactDatabaseSeriesFile compacts the series file segments associated with
|
||||
// the series file for the provided database.
|
||||
func (cmd *Command) compactDatabaseSeriesFile(dbName, path string) error {
|
||||
sfilePath := filepath.Join(path, tsdb.SeriesFileDirectory)
|
||||
paths, err := cmd.seriesFilePartitionPaths(sfilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Build input channel.
|
||||
pathCh := make(chan string, len(paths))
|
||||
for _, path := range paths {
|
||||
pathCh <- path
|
||||
}
|
||||
close(pathCh)
|
||||
|
||||
// Concurrently process each partition in the series file
|
||||
var g errgroup.Group
|
||||
for i := 0; i < cmd.concurrency; i++ {
|
||||
g.Go(func() error {
|
||||
for path := range pathCh {
|
||||
if err := cmd.compactSeriesFilePartition(path); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
if err := g.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Build new series file indexes
|
||||
sfile := tsdb.NewSeriesFile(sfilePath)
|
||||
if err = sfile.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
compactor := tsdb.NewSeriesPartitionCompactor()
|
||||
for _, partition := range sfile.Partitions() {
|
||||
if err = compactor.Compact(partition); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Fprintln(cmd.Stdout, "compacted ", partition.Path())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cmd *Command) compactSeriesFilePartition(path string) error {
|
||||
const tmpExt = ".tmp"
|
||||
|
||||
fmt.Fprintf(cmd.Stdout, "processing partition for %q\n", path)
|
||||
|
||||
// Open partition so index can recover from entries not in the snapshot.
|
||||
partitionID, err := strconv.Atoi(filepath.Base(path))
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse partition id from path: %s", path)
|
||||
}
|
||||
p := tsdb.NewSeriesPartition(partitionID, path, nil)
|
||||
if err := p.Open(); err != nil {
|
||||
return fmt.Errorf("cannot open partition: path=%s err=%s", path, err)
|
||||
}
|
||||
defer p.Close()
|
||||
|
||||
// Loop over segments and compact.
|
||||
indexPath := p.IndexPath()
|
||||
var segmentPaths []string
|
||||
for _, segment := range p.Segments() {
|
||||
fmt.Fprintf(cmd.Stdout, "processing segment %q %d\n", segment.Path(), segment.ID())
|
||||
|
||||
if err := segment.CompactToPath(segment.Path()+tmpExt, p.Index()); err != nil {
|
||||
return err
|
||||
}
|
||||
segmentPaths = append(segmentPaths, segment.Path())
|
||||
}
|
||||
|
||||
// Close partition.
|
||||
if err := p.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove the old segment files and replace with new ones.
|
||||
for _, dst := range segmentPaths {
|
||||
src := dst + tmpExt
|
||||
|
||||
fmt.Fprintf(cmd.Stdout, "renaming new segment %q to %q\n", src, dst)
|
||||
if err = file.RenameFile(src, dst); err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("serious failure. Please rebuild index and series file: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove index file so it will be rebuilt when reopened.
|
||||
fmt.Fprintln(cmd.Stdout, "removing index file", indexPath)
|
||||
if err = os.Remove(indexPath); err != nil && !os.IsNotExist(err) { // index won't exist for low cardinality
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// seriesFilePartitionPaths returns the paths to each partition in the series file.
|
||||
func (cmd *Command) seriesFilePartitionPaths(path string) ([]string, error) {
|
||||
sfile := tsdb.NewSeriesFile(path)
|
||||
sfile.Logger = cmd.Logger
|
||||
if err := sfile.Open(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var paths []string
|
||||
for _, partition := range sfile.Partitions() {
|
||||
paths = append(paths, partition.Path())
|
||||
}
|
||||
if err := sfile.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return paths, nil
|
||||
}
|
||||
|
||||
func (cmd *Command) processDatabase(dbName, dataDir, walDir string) error {
|
||||
cmd.Logger.Info("Rebuilding database", zap.String("name", dbName))
|
||||
|
||||
sfile := tsdb.NewSeriesFile(filepath.Join(dataDir, tsdb.SeriesFileDirectory))
|
||||
sfile.Logger = cmd.Logger
|
||||
if err := sfile.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer sfile.Close()
|
||||
|
||||
fis, err := ioutil.ReadDir(dataDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, fi := range fis {
|
||||
rpName := fi.Name()
|
||||
if !fi.IsDir() {
|
||||
continue
|
||||
} else if rpName == tsdb.SeriesFileDirectory {
|
||||
continue
|
||||
} else if cmd.retentionFilter != "" && rpName != cmd.retentionFilter {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := cmd.processRetentionPolicy(sfile, dbName, rpName, filepath.Join(dataDir, rpName), filepath.Join(walDir, rpName)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cmd *Command) processRetentionPolicy(sfile *tsdb.SeriesFile, dbName, rpName, dataDir, walDir string) error {
|
||||
cmd.Logger.Info("Rebuilding retention policy", logger.Database(dbName), logger.RetentionPolicy(rpName))
|
||||
|
||||
fis, err := ioutil.ReadDir(dataDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
type shard struct {
|
||||
ID uint64
|
||||
Path string
|
||||
}
|
||||
|
||||
var shards []shard
|
||||
|
||||
for _, fi := range fis {
|
||||
if !fi.IsDir() {
|
||||
continue
|
||||
} else if cmd.shardFilter != "" && fi.Name() != cmd.shardFilter {
|
||||
continue
|
||||
}
|
||||
|
||||
shardID, err := strconv.ParseUint(fi.Name(), 10, 64)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
shards = append(shards, shard{shardID, fi.Name()})
|
||||
}
|
||||
|
||||
errC := make(chan error, len(shards))
|
||||
var maxi uint32 // index of maximum shard being worked on.
|
||||
for k := 0; k < cmd.concurrency; k++ {
|
||||
go func() {
|
||||
for {
|
||||
i := int(atomic.AddUint32(&maxi, 1) - 1) // Get next partition to work on.
|
||||
if i >= len(shards) {
|
||||
return // No more work.
|
||||
}
|
||||
|
||||
id, name := shards[i].ID, shards[i].Path
|
||||
log := cmd.Logger.With(logger.Database(dbName), logger.RetentionPolicy(rpName), logger.Shard(id))
|
||||
errC <- IndexShard(sfile, filepath.Join(dataDir, name), filepath.Join(walDir, name), cmd.maxLogFileSize, cmd.maxCacheSize, cmd.batchSize, log, cmd.Verbose)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Check for error
|
||||
for i := 0; i < cap(errC); i++ {
|
||||
if err := <-errC; err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func IndexShard(sfile *tsdb.SeriesFile, dataDir, walDir string, maxLogFileSize int64, maxCacheSize uint64, batchSize int, log *zap.Logger, verboseLogging bool) error {
|
||||
log.Info("Rebuilding shard")
|
||||
|
||||
// Check if shard already has a TSI index.
|
||||
indexPath := filepath.Join(dataDir, "index")
|
||||
log.Info("Checking index path", zap.String("path", indexPath))
|
||||
if _, err := os.Stat(indexPath); !os.IsNotExist(err) {
|
||||
log.Info("tsi1 index already exists, skipping", zap.String("path", indexPath))
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Info("Opening shard")
|
||||
|
||||
// Remove temporary index files if this is being re-run.
|
||||
tmpPath := filepath.Join(dataDir, ".index")
|
||||
log.Info("Cleaning up partial index from previous run, if any")
|
||||
if err := os.RemoveAll(tmpPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Open TSI index in temporary path.
|
||||
tsiIndex := tsi1.NewIndex(sfile, "",
|
||||
tsi1.WithPath(tmpPath),
|
||||
tsi1.WithMaximumLogFileSize(maxLogFileSize),
|
||||
tsi1.DisableFsync(),
|
||||
// Each new series entry in a log file is ~12 bytes so this should
|
||||
// roughly equate to one flush to the file for every batch.
|
||||
tsi1.WithLogFileBufferSize(12*batchSize),
|
||||
)
|
||||
|
||||
tsiIndex.WithLogger(log)
|
||||
|
||||
log.Info("Opening tsi index in temporary location", zap.String("path", tmpPath))
|
||||
if err := tsiIndex.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer tsiIndex.Close()
|
||||
|
||||
// Write out tsm1 files.
|
||||
// Find shard files.
|
||||
tsmPaths, err := collectTSMFiles(dataDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("Iterating over tsm files")
|
||||
for _, path := range tsmPaths {
|
||||
log.Info("Processing tsm file", zap.String("path", path))
|
||||
if err := IndexTSMFile(tsiIndex, path, batchSize, log, verboseLogging); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Write out wal files.
|
||||
walPaths, err := collectWALFiles(walDir)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
} else {
|
||||
log.Info("Building cache from wal files")
|
||||
cache := tsm1.NewCache(maxCacheSize)
|
||||
loader := tsm1.NewCacheLoader(walPaths)
|
||||
loader.WithLogger(log)
|
||||
if err := loader.Load(cache); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("Iterating over cache")
|
||||
keysBatch := make([][]byte, 0, batchSize)
|
||||
namesBatch := make([][]byte, 0, batchSize)
|
||||
tagsBatch := make([]models.Tags, 0, batchSize)
|
||||
|
||||
for _, key := range cache.Keys() {
|
||||
seriesKey, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
|
||||
name, tags := models.ParseKeyBytes(seriesKey)
|
||||
|
||||
if verboseLogging {
|
||||
log.Info("Series", zap.String("name", string(name)), zap.String("tags", tags.String()))
|
||||
}
|
||||
|
||||
keysBatch = append(keysBatch, seriesKey)
|
||||
namesBatch = append(namesBatch, name)
|
||||
tagsBatch = append(tagsBatch, tags)
|
||||
|
||||
// Flush batch?
|
||||
if len(keysBatch) == batchSize {
|
||||
if err := tsiIndex.CreateSeriesListIfNotExists(keysBatch, namesBatch, tagsBatch); err != nil {
|
||||
return fmt.Errorf("problem creating series: (%s)", err)
|
||||
}
|
||||
keysBatch = keysBatch[:0]
|
||||
namesBatch = namesBatch[:0]
|
||||
tagsBatch = tagsBatch[:0]
|
||||
}
|
||||
}
|
||||
|
||||
// Flush any remaining series in the batches
|
||||
if len(keysBatch) > 0 {
|
||||
if err := tsiIndex.CreateSeriesListIfNotExists(keysBatch, namesBatch, tagsBatch); err != nil {
|
||||
return fmt.Errorf("problem creating series: (%s)", err)
|
||||
}
|
||||
keysBatch = nil
|
||||
namesBatch = nil
|
||||
tagsBatch = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Attempt to compact the index & wait for all compactions to complete.
|
||||
log.Info("compacting index")
|
||||
tsiIndex.Compact()
|
||||
tsiIndex.Wait()
|
||||
|
||||
// Close TSI index.
|
||||
log.Info("Closing tsi index")
|
||||
if err := tsiIndex.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Rename TSI to standard path.
|
||||
log.Info("Moving tsi to permanent location")
|
||||
return os.Rename(tmpPath, indexPath)
|
||||
}
|
||||
|
||||
func IndexTSMFile(index *tsi1.Index, path string, batchSize int, log *zap.Logger, verboseLogging bool) error {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
r, err := tsm1.NewTSMReader(f)
|
||||
if err != nil {
|
||||
log.Warn("Unable to read, skipping", zap.String("path", path), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
keysBatch := make([][]byte, 0, batchSize)
|
||||
namesBatch := make([][]byte, 0, batchSize)
|
||||
tagsBatch := make([]models.Tags, batchSize)
|
||||
var ti int
|
||||
for i := 0; i < r.KeyCount(); i++ {
|
||||
key, _ := r.KeyAt(i)
|
||||
seriesKey, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
|
||||
var name []byte
|
||||
name, tagsBatch[ti] = models.ParseKeyBytesWithTags(seriesKey, tagsBatch[ti])
|
||||
|
||||
if verboseLogging {
|
||||
log.Info("Series", zap.String("name", string(name)), zap.String("tags", tagsBatch[ti].String()))
|
||||
}
|
||||
|
||||
keysBatch = append(keysBatch, seriesKey)
|
||||
namesBatch = append(namesBatch, name)
|
||||
ti++
|
||||
|
||||
// Flush batch?
|
||||
if len(keysBatch) == batchSize {
|
||||
if err := index.CreateSeriesListIfNotExists(keysBatch, namesBatch, tagsBatch[:ti]); err != nil {
|
||||
return fmt.Errorf("problem creating series: (%s)", err)
|
||||
}
|
||||
keysBatch = keysBatch[:0]
|
||||
namesBatch = namesBatch[:0]
|
||||
ti = 0 // Reset tags.
|
||||
}
|
||||
}
|
||||
|
||||
// Flush any remaining series in the batches
|
||||
if len(keysBatch) > 0 {
|
||||
if err := index.CreateSeriesListIfNotExists(keysBatch, namesBatch, tagsBatch[:ti]); err != nil {
|
||||
return fmt.Errorf("problem creating series: (%s)", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func collectTSMFiles(path string) ([]string, error) {
|
||||
fis, err := ioutil.ReadDir(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var paths []string
|
||||
for _, fi := range fis {
|
||||
if filepath.Ext(fi.Name()) != "."+tsm1.TSMFileExtension {
|
||||
continue
|
||||
}
|
||||
paths = append(paths, filepath.Join(path, fi.Name()))
|
||||
}
|
||||
return paths, nil
|
||||
}
|
||||
|
||||
func collectWALFiles(path string) ([]string, error) {
|
||||
if path == "" {
|
||||
return nil, os.ErrNotExist
|
||||
}
|
||||
if _, err := os.Stat(path); os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
fis, err := ioutil.ReadDir(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var paths []string
|
||||
for _, fi := range fis {
|
||||
if filepath.Ext(fi.Name()) != "."+tsm1.WALFileExtension {
|
||||
continue
|
||||
}
|
||||
paths = append(paths, filepath.Join(path, fi.Name()))
|
||||
}
|
||||
return paths, nil
|
||||
}
|
||||
|
||||
func isRoot() bool {
|
||||
user, _ := user.Current()
|
||||
return user != nil && user.Username == "root"
|
||||
}
|
|
@ -214,7 +214,7 @@ bind-address = "127.0.0.1:8088"
|
|||
wal-dir = "/var/lib/influxdb/wal"
|
||||
wal-fsync-delay = "0s"
|
||||
validate-keys = false
|
||||
index-version = "inmem"
|
||||
index-version = "tsi1"
|
||||
query-log-enabled = true
|
||||
cache-max-memory-size = 1073741824
|
||||
cache-snapshot-memory-size = 26214400
|
||||
|
@ -222,8 +222,6 @@ bind-address = "127.0.0.1:8088"
|
|||
compact-full-write-cold-duration = "4h0m0s"
|
||||
compact-throughput = 50331648
|
||||
compact-throughput-burst = 50331648
|
||||
max-series-per-database = 1000000
|
||||
max-values-per-tag = 100000
|
||||
max-concurrent-compactions = 0
|
||||
max-index-log-file-size = 1048576
|
||||
series-id-set-cache-size = 100
|
||||
|
|
|
@ -112,11 +112,6 @@ should be emitted only when a support engineer can take some action to
|
|||
remedy the situation _and_ the system may not continue operating
|
||||
properly in the near future without remedying the situation.**
|
||||
|
||||
An example of what may qualify as a warning is the `max-values-per-tag`
|
||||
setting. If the server starts to approach the maximum number of values,
|
||||
the server may stop being able to function properly when it reaches the
|
||||
maximum number.
|
||||
|
||||
An example of what does not qualify as a warning is the
|
||||
`log-queries-after` setting. While the message is "warning" that a query
|
||||
was running for a long period of time, it is not clearly actionable and
|
||||
|
|
|
@ -16,8 +16,6 @@ bind-address = ":8188"
|
|||
cache-snapshot-memory-size = 26214400
|
||||
cache-snapshot-write-cold-duration = "10m0s"
|
||||
compact-full-write-cold-duration = "4h0m0s"
|
||||
max-series-per-database = 1000000
|
||||
max-values-per-tag = 100000
|
||||
max-concurrent-compactions = 0
|
||||
trace-logging-enabled = false
|
||||
|
||||
|
|
|
@ -14,7 +14,6 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
_ "github.com/influxdata/influxdb/v2/tsdb/engine"
|
||||
_ "github.com/influxdata/influxdb/v2/tsdb/index/inmem"
|
||||
_ "github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
|
||||
"github.com/influxdata/influxdb/v2/v1/coordinator"
|
||||
"github.com/influxdata/influxdb/v2/v1/services/meta"
|
||||
|
@ -138,13 +137,11 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
|
|||
|
||||
// WithLogger sets the logger on the Store. It must be called before Open.
|
||||
func (e *Engine) WithLogger(log *zap.Logger) {
|
||||
fields := []zap.Field{}
|
||||
fields = append(fields, zap.String("service", "storage-engine"))
|
||||
e.logger = log.With(fields...)
|
||||
e.logger = log.With(zap.String("service", "storage-engine"))
|
||||
|
||||
e.tsdbStore.Logger = e.logger
|
||||
e.tsdbStore.WithLogger(e.logger)
|
||||
if pw, ok := e.pointsWriter.(*coordinator.PointsWriter); ok {
|
||||
pw.Logger = e.logger
|
||||
pw.WithLogger(e.logger)
|
||||
}
|
||||
|
||||
if e.retentionService != nil {
|
||||
|
|
|
@ -52,10 +52,6 @@ const (
|
|||
// block in a TSM file
|
||||
DefaultMaxPointsPerBlock = 1000
|
||||
|
||||
// DefaultMaxSeriesPerDatabase is the maximum number of series a node can hold per database.
|
||||
// This limit only applies to the "inmem" index.
|
||||
DefaultMaxSeriesPerDatabase = 1000000
|
||||
|
||||
// DefaultMaxValuesPerTag is the maximum number of values a tag can have within a measurement.
|
||||
DefaultMaxValuesPerTag = 100000
|
||||
|
||||
|
@ -106,16 +102,6 @@ type Config struct {
|
|||
|
||||
// Limits
|
||||
|
||||
// MaxSeriesPerDatabase is the maximum number of series a node can hold per database.
|
||||
// When this limit is exceeded, writes return a 'max series per database exceeded' error.
|
||||
// A value of 0 disables the limit. This limit only applies when using the "inmem" index.
|
||||
MaxSeriesPerDatabase int `toml:"max-series-per-database"`
|
||||
|
||||
// MaxValuesPerTag is the maximum number of tag values a single tag key can have within
|
||||
// a measurement. When the limit is exceeded, writes return an error.
|
||||
// A value of 0 disables the limit.
|
||||
MaxValuesPerTag int `toml:"max-values-per-tag"`
|
||||
|
||||
// MaxConcurrentCompactions is the maximum number of concurrent level and full compactions
|
||||
// that can be running at one time across all shards. Compactions scheduled to run when the
|
||||
// limit is reached are blocked until a running compaction completes. Snapshot compactions are
|
||||
|
@ -164,8 +150,6 @@ func NewConfig() Config {
|
|||
CompactThroughput: toml.Size(DefaultCompactThroughput),
|
||||
CompactThroughputBurst: toml.Size(DefaultCompactThroughputBurst),
|
||||
|
||||
MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase,
|
||||
MaxValuesPerTag: DefaultMaxValuesPerTag,
|
||||
MaxConcurrentCompactions: DefaultMaxConcurrentCompactions,
|
||||
|
||||
MaxIndexLogFileSize: toml.Size(DefaultMaxIndexLogFileSize),
|
||||
|
@ -233,8 +217,6 @@ func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) {
|
|||
"cache-snapshot-memory-size": c.CacheSnapshotMemorySize,
|
||||
"cache-snapshot-write-cold-duration": c.CacheSnapshotWriteColdDuration,
|
||||
"compact-full-write-cold-duration": c.CompactFullWriteColdDuration,
|
||||
"max-series-per-database": c.MaxSeriesPerDatabase,
|
||||
"max-values-per-tag": c.MaxValuesPerTag,
|
||||
"max-concurrent-compactions": c.MaxConcurrentCompactions,
|
||||
"max-index-log-file-size": c.MaxIndexLogFileSize,
|
||||
"series-id-set-cache-size": c.SeriesIDSetCacheSize,
|
||||
|
|
|
@ -61,11 +61,6 @@ func TestConfig_Validate_Error(t *testing.T) {
|
|||
t.Errorf("unexpected error: %s", err)
|
||||
}
|
||||
|
||||
c.Index = tsdb.InmemIndexName
|
||||
if err := c.Validate(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
c.Index = tsdb.TSI1IndexName
|
||||
if err := c.Validate(); err != nil {
|
||||
t.Error(err)
|
||||
|
|
|
@ -81,6 +81,8 @@ type Engine interface {
|
|||
IsIdle() bool
|
||||
Free() error
|
||||
|
||||
Reindex() error
|
||||
|
||||
io.WriterTo
|
||||
}
|
||||
|
||||
|
@ -92,11 +94,6 @@ type SeriesIDSets interface {
|
|||
// EngineFormat represents the format for an engine.
|
||||
type EngineFormat int
|
||||
|
||||
const (
|
||||
// TSM1Format is the format used by the tsm1 engine.
|
||||
TSM1Format EngineFormat = 2
|
||||
)
|
||||
|
||||
// NewEngineFunc creates a new engine.
|
||||
type NewEngineFunc func(id uint64, i Index, path string, walPath string, sfile *SeriesFile, options EngineOptions) Engine
|
||||
|
||||
|
@ -161,7 +158,6 @@ type EngineOptions struct {
|
|||
EngineVersion string
|
||||
IndexVersion string
|
||||
ShardID uint64
|
||||
InmemIndex interface{} // shared in-memory index
|
||||
|
||||
// Limits the concurrent number of TSM files that can be loaded at once.
|
||||
OpenLimiter limiter.Fixed
|
||||
|
@ -208,9 +204,6 @@ func NewEngineOptions() EngineOptions {
|
|||
}
|
||||
}
|
||||
|
||||
// NewInmemIndex returns a new "inmem" index type.
|
||||
var NewInmemIndex func(name string, sfile *SeriesFile) (interface{}, error)
|
||||
|
||||
type CompactionPlannerCreator func(cfg Config) interface{}
|
||||
|
||||
// FileStoreObserver is passed notifications before the file store adds or deletes files. In this way, it can
|
||||
|
|
|
@ -33,7 +33,6 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/pkg/tracing"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
_ "github.com/influxdata/influxdb/v2/tsdb/index"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/index/inmem"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
|
||||
"github.com/influxdata/influxql"
|
||||
"go.uber.org/zap"
|
||||
|
@ -234,15 +233,14 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
|
|||
planner.SetFileStore(fs)
|
||||
}
|
||||
|
||||
logger := zap.NewNop()
|
||||
stats := &EngineStatistics{}
|
||||
e := &Engine{
|
||||
id: id,
|
||||
path: path,
|
||||
index: idx,
|
||||
sfile: sfile,
|
||||
logger: logger,
|
||||
traceLogger: logger,
|
||||
logger: zap.NewNop(),
|
||||
traceLogger: zap.NewNop(),
|
||||
traceLogging: opt.Config.TraceLoggingEnabled,
|
||||
|
||||
WAL: wal,
|
||||
|
@ -570,10 +568,6 @@ func (e *Engine) ScheduleFullCompaction() error {
|
|||
// Path returns the path the engine was opened with.
|
||||
func (e *Engine) Path() string { return e.path }
|
||||
|
||||
func (e *Engine) SetFieldName(measurement []byte, name string) {
|
||||
e.index.SetFieldName(measurement, name)
|
||||
}
|
||||
|
||||
func (e *Engine) MeasurementExists(name []byte) (bool, error) {
|
||||
return e.index.MeasurementExists(name)
|
||||
}
|
||||
|
@ -810,9 +804,8 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
|
|||
// Save reference to index for iterator creation.
|
||||
e.index = index
|
||||
|
||||
// If we have the cached fields index on disk and we're using TSI, we
|
||||
// can skip scanning all the TSM files.
|
||||
if e.index.Type() != inmem.IndexName && !e.fieldset.IsEmpty() {
|
||||
// If we have the cached fields index on disk, we can skip scanning all the TSM files.
|
||||
if !e.fieldset.IsEmpty() {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1255,18 +1248,7 @@ func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType
|
|||
tags = append(tags, models.ParseTags(keys[i]))
|
||||
}
|
||||
|
||||
// Build in-memory index, if necessary.
|
||||
if e.index.Type() == inmem.IndexName {
|
||||
if err := e.index.InitializeSeries(keys, names, tags); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := e.index.CreateSeriesListIfNotExists(keys, names, tags); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return e.index.CreateSeriesListIfNotExists(keys, names, tags)
|
||||
}
|
||||
|
||||
// WritePoints writes metadata and point data into the engine.
|
||||
|
@ -1489,7 +1471,6 @@ func (e *Engine) DeleteSeriesRangeWithPredicate(itr tsdb.SeriesIterator, predica
|
|||
}
|
||||
}
|
||||
|
||||
e.index.Rebuild()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1750,20 +1731,10 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
|
|||
// in any shard.
|
||||
var err error
|
||||
ids.ForEach(func(id uint64) {
|
||||
name, tags := e.sfile.Series(id)
|
||||
if err1 := e.sfile.DeleteSeriesID(id); err1 != nil {
|
||||
err = err1
|
||||
return
|
||||
}
|
||||
|
||||
// In the case of the inmem index the series can be removed across
|
||||
// the global index (all shards).
|
||||
if index, ok := e.index.(*inmem.ShardIndex); ok {
|
||||
key := models.MakeKey(name, tags)
|
||||
if e := index.Index.DropSeriesGlobal(key); e != nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -2425,10 +2396,6 @@ func (e *Engine) CreateIterator(ctx context.Context, measurement string, opt que
|
|||
return newMergeFinalizerIterator(ctx, itrs, opt, e.logger)
|
||||
}
|
||||
|
||||
type indexTagSets interface {
|
||||
TagSets(name []byte, options query.IteratorOptions) ([]*query.TagSet, error)
|
||||
}
|
||||
|
||||
func (e *Engine) createCallIterator(ctx context.Context, measurement string, call *influxql.Call, opt query.IteratorOptions) ([]query.Iterator, error) {
|
||||
ref, _ := call.Args[0].(*influxql.VarRef)
|
||||
|
||||
|
@ -2443,13 +2410,8 @@ func (e *Engine) createCallIterator(ctx context.Context, measurement string, cal
|
|||
tagSets []*query.TagSet
|
||||
err error
|
||||
)
|
||||
if e.index.Type() == tsdb.InmemIndexName {
|
||||
ts := e.index.(indexTagSets)
|
||||
tagSets, err = ts.TagSets([]byte(measurement), opt)
|
||||
} else {
|
||||
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
||||
tagSets, err = indexSet.TagSets(e.sfile, []byte(measurement), opt)
|
||||
}
|
||||
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
||||
tagSets, err = indexSet.TagSets(e.sfile, []byte(measurement), opt)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -2523,13 +2485,8 @@ func (e *Engine) createVarRefIterator(ctx context.Context, measurement string, o
|
|||
tagSets []*query.TagSet
|
||||
err error
|
||||
)
|
||||
if e.index.Type() == tsdb.InmemIndexName {
|
||||
ts := e.index.(indexTagSets)
|
||||
tagSets, err = ts.TagSets([]byte(measurement), opt)
|
||||
} else {
|
||||
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
||||
tagSets, err = indexSet.TagSets(e.sfile, []byte(measurement), opt)
|
||||
}
|
||||
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
||||
tagSets, err = indexSet.TagSets(e.sfile, []byte(measurement), opt)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -3140,3 +3097,77 @@ func varRefSliceRemove(a []influxql.VarRef, v string) []influxql.VarRef {
|
|||
}
|
||||
return other
|
||||
}
|
||||
|
||||
const reindexBatchSize = 10000
|
||||
|
||||
func (e *Engine) Reindex() error {
|
||||
keys := make([][]byte, reindexBatchSize)
|
||||
seriesKeys := make([][]byte, reindexBatchSize)
|
||||
names := make([][]byte, reindexBatchSize)
|
||||
tags := make([]models.Tags, reindexBatchSize)
|
||||
|
||||
n := 0
|
||||
|
||||
reindexBatch := func() error {
|
||||
if n == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for i, key := range keys[:n] {
|
||||
seriesKeys[i], _ = SeriesAndFieldFromCompositeKey(key)
|
||||
names[i], tags[i] = models.ParseKeyBytes(seriesKeys[i])
|
||||
e.traceLogger.Debug(
|
||||
"Read series during reindexing",
|
||||
logger.Shard(e.id),
|
||||
zap.String("name", string(names[i])),
|
||||
zap.String("tags", tags[i].String()),
|
||||
)
|
||||
}
|
||||
|
||||
e.logger.Debug("Reindexing data batch", logger.Shard(e.id), zap.Int("batch_size", n))
|
||||
if err := e.index.CreateSeriesListIfNotExists(seriesKeys[:n], names[:n], tags[:n]); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n = 0
|
||||
return nil
|
||||
}
|
||||
reindexKey := func(key []byte) error {
|
||||
keys[n] = key
|
||||
n++
|
||||
|
||||
if n < reindexBatchSize {
|
||||
return nil
|
||||
}
|
||||
return reindexBatch()
|
||||
}
|
||||
|
||||
// Index data stored in TSM files.
|
||||
e.logger.Info("Reindexing TSM data", logger.Shard(e.id))
|
||||
if err := e.FileStore.WalkKeys(nil, func(key []byte, _ byte) error {
|
||||
return reindexKey(key)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Make sure all TSM data is indexed.
|
||||
if err := reindexBatch(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !e.WALEnabled {
|
||||
// All done.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reindex data stored in the WAL cache.
|
||||
e.logger.Info("Reindexing WAL data", logger.Shard(e.id))
|
||||
for _, key := range e.Cache.Keys() {
|
||||
if err := reindexKey(key); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure all WAL data is indexed.
|
||||
return reindexBatch()
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/pkg/deep"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/index/inmem"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
|
||||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
||||
|
@ -174,7 +174,7 @@ func seriesExist(e *Engine, m string, dims []string) (int, error) {
|
|||
|
||||
// Ensure that the engine can write & read shard digest files.
|
||||
func TestEngine_Digest(t *testing.T) {
|
||||
e := MustOpenEngine(inmem.IndexName)
|
||||
e := MustOpenEngine(tsi1.IndexName)
|
||||
defer e.Close()
|
||||
|
||||
if err := e.Open(); err != nil {
|
||||
|
@ -322,7 +322,7 @@ type span struct {
|
|||
|
||||
// Ensure engine handles concurrent calls to Digest().
|
||||
func TestEngine_Digest_Concurrent(t *testing.T) {
|
||||
e := MustOpenEngine(inmem.IndexName)
|
||||
e := MustOpenEngine(tsi1.IndexName)
|
||||
defer e.Close()
|
||||
|
||||
if err := e.Open(); err != nil {
|
||||
|
@ -392,7 +392,6 @@ func TestEngine_Backup(t *testing.T) {
|
|||
// Write those points to the engine.
|
||||
db := path.Base(f.Name())
|
||||
opt := tsdb.NewEngineOptions()
|
||||
opt.InmemIndex = inmem.NewIndex(db, sfile.SeriesFile)
|
||||
idx := tsdb.MustOpenIndex(1, db, filepath.Join(f.Name(), "index"), tsdb.NewSeriesIDSet(), sfile.SeriesFile, opt)
|
||||
defer idx.Close()
|
||||
|
||||
|
@ -499,7 +498,6 @@ func TestEngine_Export(t *testing.T) {
|
|||
// Write those points to the engine.
|
||||
db := path.Base(f.Name())
|
||||
opt := tsdb.NewEngineOptions()
|
||||
opt.InmemIndex = inmem.NewIndex(db, sfile.SeriesFile)
|
||||
idx := tsdb.MustOpenIndex(1, db, filepath.Join(f.Name(), "index"), tsdb.NewSeriesIDSet(), sfile.SeriesFile, opt)
|
||||
defer idx.Close()
|
||||
|
||||
|
@ -1046,8 +1044,6 @@ func TestEngine_CreateIterator_Condition(t *testing.T) {
|
|||
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("X"), influxql.Float)
|
||||
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("Y"), influxql.Float)
|
||||
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
|
||||
e.SetFieldName([]byte("cpu"), "X")
|
||||
e.SetFieldName([]byte("cpu"), "Y")
|
||||
|
||||
if err := e.WritePointsString(
|
||||
`cpu,host=A value=1.1 1000000000`,
|
||||
|
@ -1842,7 +1838,6 @@ func TestEngine_SnapshotsDisabled(t *testing.T) {
|
|||
// Create a tsm1 engine.
|
||||
db := path.Base(dir)
|
||||
opt := tsdb.NewEngineOptions()
|
||||
opt.InmemIndex = inmem.NewIndex(db, sfile.SeriesFile)
|
||||
idx := tsdb.MustOpenIndex(1, db, filepath.Join(dir, "index"), tsdb.NewSeriesIDSet(), sfile.SeriesFile, opt)
|
||||
defer idx.Close()
|
||||
|
||||
|
@ -1870,10 +1865,11 @@ func TestEngine_SnapshotsDisabled(t *testing.T) {
|
|||
func TestEngine_ShouldCompactCache(t *testing.T) {
|
||||
nowTime := time.Now()
|
||||
|
||||
e, err := NewEngine(inmem.IndexName)
|
||||
e, err := NewEngine(tsi1.IndexName)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer e.Close()
|
||||
|
||||
// mock the planner so compactions don't run during the test
|
||||
e.CompactionPlan = &mockPlanner{}
|
||||
|
@ -1881,7 +1877,6 @@ func TestEngine_ShouldCompactCache(t *testing.T) {
|
|||
if err := e.Open(); err != nil {
|
||||
t.Fatalf("failed to open tsm1 engine: %s", err.Error())
|
||||
}
|
||||
defer e.Close()
|
||||
|
||||
e.CacheFlushMemorySizeThreshold = 1024
|
||||
e.CacheFlushWriteColdDuration = time.Minute
|
||||
|
@ -2543,9 +2538,6 @@ func NewEngine(index string) (*Engine, error) {
|
|||
|
||||
opt := tsdb.NewEngineOptions()
|
||||
opt.IndexVersion = index
|
||||
if index == tsdb.InmemIndexName {
|
||||
opt.InmemIndex = inmem.NewIndex(db, sfile)
|
||||
}
|
||||
// Initialise series id sets. Need to do this as it's normally done at the
|
||||
// store level.
|
||||
seriesIDs := tsdb.NewSeriesIDSet()
|
||||
|
@ -2616,7 +2608,6 @@ func (e *Engine) Reopen() error {
|
|||
|
||||
db := path.Base(e.root)
|
||||
opt := tsdb.NewEngineOptions()
|
||||
opt.InmemIndex = inmem.NewIndex(db, e.sfile)
|
||||
|
||||
// Re-initialise the series id set
|
||||
seriesIDSet := tsdb.NewSeriesIDSet()
|
||||
|
|
|
@ -21,8 +21,7 @@ import (
|
|||
|
||||
// Available index types.
|
||||
const (
|
||||
InmemIndexName = "inmem"
|
||||
TSI1IndexName = "tsi1"
|
||||
TSI1IndexName = "tsi1"
|
||||
)
|
||||
|
||||
// ErrIndexClosing can be returned to from an Index method if the index is currently closing.
|
||||
|
@ -39,15 +38,11 @@ type Index interface {
|
|||
DropMeasurement(name []byte) error
|
||||
ForEachMeasurementName(fn func(name []byte) error) error
|
||||
|
||||
InitializeSeries(keys, names [][]byte, tags []models.Tags) error
|
||||
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
|
||||
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
|
||||
DropSeries(seriesID uint64, key []byte, cascade bool) error
|
||||
DropMeasurementIfSeriesNotExist(name []byte) (bool, error)
|
||||
|
||||
// Used to clean up series in inmem index that were dropped with a shard.
|
||||
DropSeriesGlobal(key []byte) error
|
||||
|
||||
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
|
||||
SeriesN() int64
|
||||
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
|
||||
|
@ -78,15 +73,10 @@ type Index interface {
|
|||
// Bytes estimates the memory footprint of this Index, in bytes.
|
||||
Bytes() int
|
||||
|
||||
// To be removed w/ tsi1.
|
||||
SetFieldName(measurement []byte, name string)
|
||||
|
||||
Type() string
|
||||
// Returns a unique reference ID to the index instance.
|
||||
// For inmem, returns a reference to the backing Index, not ShardIndex.
|
||||
UniqueReferenceID() uintptr
|
||||
|
||||
Rebuild()
|
||||
// Returns a unique reference ID to the index instance.
|
||||
UniqueReferenceID() uintptr
|
||||
}
|
||||
|
||||
// SeriesElem represents a generic series element.
|
||||
|
@ -1273,16 +1263,6 @@ type IndexSet struct {
|
|||
fieldSets []*MeasurementFieldSet // field sets for _all_ indexes in this set's DB.
|
||||
}
|
||||
|
||||
// HasInmemIndex returns true if any in-memory index is in use.
|
||||
func (is IndexSet) HasInmemIndex() bool {
|
||||
for _, idx := range is.Indexes {
|
||||
if idx.Type() == InmemIndexName {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Database returns the database name of the first index.
|
||||
func (is IndexSet) Database() string {
|
||||
if len(is.Indexes) == 0 {
|
||||
|
@ -1314,28 +1294,6 @@ func (is IndexSet) HasField(measurement []byte, field string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// DedupeInmemIndexes returns an index set which removes duplicate indexes.
|
||||
// Useful because inmem indexes are shared by shards per database.
|
||||
func (is IndexSet) DedupeInmemIndexes() IndexSet {
|
||||
other := IndexSet{
|
||||
Indexes: make([]Index, 0, len(is.Indexes)),
|
||||
SeriesFile: is.SeriesFile,
|
||||
fieldSets: make([]*MeasurementFieldSet, 0, len(is.Indexes)),
|
||||
}
|
||||
|
||||
uniqueIndexes := make(map[uintptr]Index)
|
||||
for _, idx := range is.Indexes {
|
||||
uniqueIndexes[idx.UniqueReferenceID()] = idx
|
||||
}
|
||||
|
||||
for _, idx := range uniqueIndexes {
|
||||
other.Indexes = append(other.Indexes, idx)
|
||||
other.fieldSets = append(other.fieldSets, idx.FieldSet())
|
||||
}
|
||||
|
||||
return other
|
||||
}
|
||||
|
||||
// MeasurementNamesByExpr returns a slice of measurement names matching the
|
||||
// provided condition. If no condition is provided then all names are returned.
|
||||
func (is IndexSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
|
||||
|
@ -2056,7 +2014,7 @@ func (is IndexSet) tagValueIterator(name, key []byte) (TagValueIterator, error)
|
|||
// TagKeyHasAuthorizedSeries determines if there exists an authorized series for
|
||||
// the provided measurement name and tag key.
|
||||
func (is IndexSet) TagKeyHasAuthorizedSeries(auth query.Authorizer, name, tagKey []byte) (bool, error) {
|
||||
if !is.HasInmemIndex() && query.AuthorizerIsOpen(auth) {
|
||||
if query.AuthorizerIsOpen(auth) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
|
@ -3015,17 +2973,6 @@ func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOpt
|
|||
return sortedTagsSets, nil
|
||||
}
|
||||
|
||||
// IndexFormat represents the format for an index.
|
||||
type IndexFormat int
|
||||
|
||||
const (
|
||||
// InMemFormat is the format used by the original in-memory shared index.
|
||||
InMemFormat IndexFormat = 1
|
||||
|
||||
// TSI1Format is the format used by the tsi1 index.
|
||||
TSI1Format IndexFormat = 2
|
||||
)
|
||||
|
||||
// NewIndexFunc creates a new index.
|
||||
type NewIndexFunc func(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) Index
|
||||
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
package index // import "github.com/influxdata/influxdb/v2/tsdb/index"
|
||||
|
||||
import (
|
||||
_ "github.com/influxdata/influxdb/v2/tsdb/index/inmem"
|
||||
_ "github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
|
||||
)
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,186 +0,0 @@
|
|||
package inmem_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/index/inmem"
|
||||
)
|
||||
|
||||
func createData(lo, hi int) (keys, names [][]byte, tags []models.Tags) {
|
||||
for i := lo; i < hi; i++ {
|
||||
keys = append(keys, []byte(fmt.Sprintf("m0,tag0=t%d", i)))
|
||||
names = append(names, []byte("m0"))
|
||||
var t models.Tags
|
||||
t.Set([]byte("tag0"), []byte(fmt.Sprintf("%d", i)))
|
||||
tags = append(tags, t)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func BenchmarkShardIndex_CreateSeriesListIfNotExists_MaxValuesExceeded(b *testing.B) {
|
||||
sfile := mustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
|
||||
opt.Config.MaxValuesPerTag = 10
|
||||
si := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt)
|
||||
si.Open()
|
||||
keys, names, tags := createData(0, 10)
|
||||
si.CreateSeriesListIfNotExists(keys, names, tags)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
keys, names, tags = createData(9, 5010)
|
||||
for i := 0; i < b.N; i++ {
|
||||
si.CreateSeriesListIfNotExists(keys, names, tags)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkShardIndex_CreateSeriesListIfNotExists_MaxValuesNotExceeded(b *testing.B) {
|
||||
sfile := mustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
|
||||
opt.Config.MaxValuesPerTag = 100000
|
||||
si := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt)
|
||||
si.Open()
|
||||
keys, names, tags := createData(0, 10)
|
||||
si.CreateSeriesListIfNotExists(keys, names, tags)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
keys, names, tags = createData(9, 5010)
|
||||
for i := 0; i < b.N; i++ {
|
||||
si.CreateSeriesListIfNotExists(keys, names, tags)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkShardIndex_CreateSeriesListIfNotExists_NoMaxValues(b *testing.B) {
|
||||
sfile := mustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
|
||||
si := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt)
|
||||
si.Open()
|
||||
keys, names, tags := createData(0, 10)
|
||||
si.CreateSeriesListIfNotExists(keys, names, tags)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
keys, names, tags = createData(9, 5010)
|
||||
for i := 0; i < b.N; i++ {
|
||||
si.CreateSeriesListIfNotExists(keys, names, tags)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkShardIndex_CreateSeriesListIfNotExists_MaxSeriesExceeded(b *testing.B) {
|
||||
sfile := mustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
|
||||
opt.Config.MaxValuesPerTag = 0
|
||||
opt.Config.MaxSeriesPerDatabase = 10
|
||||
si := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt)
|
||||
si.Open()
|
||||
keys, names, tags := createData(0, 10)
|
||||
si.CreateSeriesListIfNotExists(keys, names, tags)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
keys, names, tags = createData(9, 5010)
|
||||
for i := 0; i < b.N; i++ {
|
||||
si.CreateSeriesListIfNotExists(keys, names, tags)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndex_Bytes(t *testing.T) {
|
||||
sfile := mustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
|
||||
si := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt).(*inmem.ShardIndex)
|
||||
|
||||
indexBaseBytes := si.Bytes()
|
||||
|
||||
name := []byte("name")
|
||||
err := si.CreateSeriesIfNotExists(name, name, models.Tags{})
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
indexNewBytes := si.Bytes()
|
||||
if indexBaseBytes >= indexNewBytes {
|
||||
t.Errorf("index Bytes(): want >%d, got %d", indexBaseBytes, indexNewBytes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndex_MeasurementTracking(t *testing.T) {
|
||||
sfile := mustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
|
||||
s1 := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt).(*inmem.ShardIndex)
|
||||
s2 := inmem.NewShardIndex(2, tsdb.NewSeriesIDSet(), opt).(*inmem.ShardIndex)
|
||||
b := func(s string) []byte { return []byte(s) }
|
||||
mt := func(k, v string) models.Tag { return models.Tag{Key: b(k), Value: b(v)} }
|
||||
|
||||
s1.CreateSeriesIfNotExists(b("m,t=t1"), b("m"), models.Tags{mt("t", "t1")})
|
||||
s1.CreateSeriesIfNotExists(b("m,t=t2"), b("m"), models.Tags{mt("t", "t2")})
|
||||
s2.CreateSeriesIfNotExists(b("m,t=t1"), b("m"), models.Tags{mt("t", "t1")})
|
||||
s2.CreateSeriesIfNotExists(b("m,t=t2"), b("m"), models.Tags{mt("t", "t2")})
|
||||
series1, _ := s1.Series(b("m,t=t1"))
|
||||
series2, _ := s1.Series(b("m,t=t2"))
|
||||
|
||||
if ok, err := s1.DropMeasurementIfSeriesNotExist(b("m")); err != nil || ok {
|
||||
t.Fatal("invalid drop")
|
||||
}
|
||||
if ok, err := s2.DropMeasurementIfSeriesNotExist(b("m")); err != nil || ok {
|
||||
t.Fatal("invalid drop")
|
||||
}
|
||||
|
||||
s1.DropSeries(series1.ID, b(series1.Key), false)
|
||||
s1.DropSeries(series2.ID, b(series2.Key), false)
|
||||
|
||||
if ok, err := s1.DropMeasurementIfSeriesNotExist(b("m")); err != nil || !ok {
|
||||
t.Fatal("invalid drop")
|
||||
}
|
||||
if ok, err := s2.DropMeasurementIfSeriesNotExist(b("m")); err != nil || ok {
|
||||
t.Fatal("invalid drop")
|
||||
}
|
||||
|
||||
s2.DropSeries(series1.ID, b(series1.Key), false)
|
||||
s2.DropSeries(series2.ID, b(series2.Key), false)
|
||||
|
||||
if ok, err := s2.DropMeasurementIfSeriesNotExist(b("m")); err != nil || !ok {
|
||||
t.Fatal("invalid drop")
|
||||
}
|
||||
}
|
||||
|
||||
// seriesFileWrapper is a test wrapper for tsdb.seriesFileWrapper.
|
||||
type seriesFileWrapper struct {
|
||||
*tsdb.SeriesFile
|
||||
}
|
||||
|
||||
// newSeriesFileWrapper returns a new instance of seriesFileWrapper with a temporary file path.
|
||||
func newSeriesFileWrapper() *seriesFileWrapper {
|
||||
dir, err := ioutil.TempDir("", "tsdb-series-file-")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return &seriesFileWrapper{SeriesFile: tsdb.NewSeriesFile(dir)}
|
||||
}
|
||||
|
||||
// mustOpenSeriesFile returns a new, open instance of seriesFileWrapper. Panic on error.
|
||||
func mustOpenSeriesFile() *seriesFileWrapper {
|
||||
f := newSeriesFileWrapper()
|
||||
if err := f.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return f
|
||||
}
|
||||
|
||||
// Close closes the log file and removes it from disk.
|
||||
func (f *seriesFileWrapper) Close() error {
|
||||
defer os.RemoveAll(f.Path())
|
||||
return f.SeriesFile.Close()
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -1,292 +0,0 @@
|
|||
package inmem
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/influxql/query"
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
||||
// Test comparing SeriesIDs for equality.
|
||||
func TestSeriesIDs_Equals(t *testing.T) {
|
||||
ids1 := seriesIDs([]uint64{1, 2, 3})
|
||||
ids2 := seriesIDs([]uint64{1, 2, 3})
|
||||
ids3 := seriesIDs([]uint64{4, 5, 6})
|
||||
|
||||
if !ids1.Equals(ids2) {
|
||||
t.Fatal("expected ids1 == ids2")
|
||||
} else if ids1.Equals(ids3) {
|
||||
t.Fatal("expected ids1 != ids3")
|
||||
}
|
||||
}
|
||||
|
||||
// Test intersecting sets of SeriesIDs.
|
||||
func TestSeriesIDs_Intersect(t *testing.T) {
|
||||
// Test swapping l & r, all branches of if-else, and exit loop when 'j < len(r)'
|
||||
ids1 := seriesIDs([]uint64{1, 3, 4, 5, 6})
|
||||
ids2 := seriesIDs([]uint64{1, 2, 3, 7})
|
||||
exp := seriesIDs([]uint64{1, 3})
|
||||
got := ids1.Intersect(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
|
||||
// Test exit for loop when 'i < len(l)'
|
||||
ids1 = seriesIDs([]uint64{1})
|
||||
ids2 = seriesIDs([]uint64{1, 2})
|
||||
exp = seriesIDs([]uint64{1})
|
||||
got = ids1.Intersect(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
// Test union sets of SeriesIDs.
|
||||
func TestSeriesIDs_Union(t *testing.T) {
|
||||
// Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left.
|
||||
ids1 := seriesIDs([]uint64{1, 2, 3, 7})
|
||||
ids2 := seriesIDs([]uint64{1, 3, 4, 5, 6})
|
||||
exp := seriesIDs([]uint64{1, 2, 3, 4, 5, 6, 7})
|
||||
got := ids1.Union(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
|
||||
// Test exit because of 'i < len(l)' and append remainder from right.
|
||||
ids1 = seriesIDs([]uint64{1})
|
||||
ids2 = seriesIDs([]uint64{1, 2})
|
||||
exp = seriesIDs([]uint64{1, 2})
|
||||
got = ids1.Union(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
// Test removing one set of SeriesIDs from another.
|
||||
func TestSeriesIDs_Reject(t *testing.T) {
|
||||
// Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left.
|
||||
ids1 := seriesIDs([]uint64{1, 2, 3, 7})
|
||||
ids2 := seriesIDs([]uint64{1, 3, 4, 5, 6})
|
||||
exp := seriesIDs([]uint64{2, 7})
|
||||
got := ids1.Reject(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
|
||||
// Test exit because of 'i < len(l)'.
|
||||
ids1 = seriesIDs([]uint64{1})
|
||||
ids2 = seriesIDs([]uint64{1, 2})
|
||||
exp = seriesIDs{}
|
||||
got = ids1.Reject(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMeasurement_AddSeries_Nil(t *testing.T) {
|
||||
m := newMeasurement("foo", "cpu")
|
||||
if m.AddSeries(nil) {
|
||||
t.Fatalf("AddSeries mismatch: exp false, got true")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMeasurement_AppendSeriesKeysByID_Missing(t *testing.T) {
|
||||
m := newMeasurement("foo", "cpu")
|
||||
var dst []string
|
||||
dst = m.AppendSeriesKeysByID(dst, []uint64{1})
|
||||
if exp, got := 0, len(dst); exp != got {
|
||||
t.Fatalf("series len mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMeasurement_AppendSeriesKeysByID_Exists(t *testing.T) {
|
||||
m := newMeasurement("foo", "cpu")
|
||||
s := newSeries(1, m, "cpu,host=foo", models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
|
||||
m.AddSeries(s)
|
||||
|
||||
var dst []string
|
||||
dst = m.AppendSeriesKeysByID(dst, []uint64{1})
|
||||
if exp, got := 1, len(dst); exp != got {
|
||||
t.Fatalf("series len mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
if exp, got := "cpu,host=foo", dst[0]; exp != got {
|
||||
t.Fatalf("series mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMeasurement_TagsSet_Deadlock(t *testing.T) {
|
||||
m := newMeasurement("foo", "cpu")
|
||||
s1 := newSeries(1, m, "cpu,host=foo", models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
|
||||
m.AddSeries(s1)
|
||||
|
||||
s2 := newSeries(2, m, "cpu,host=bar", models.Tags{models.NewTag([]byte("host"), []byte("bar"))})
|
||||
m.AddSeries(s2)
|
||||
|
||||
m.DropSeries(s1)
|
||||
|
||||
// This was deadlocking
|
||||
s := tsdb.NewSeriesIDSet()
|
||||
s.Add(1)
|
||||
m.TagSets(s, query.IteratorOptions{})
|
||||
if got, exp := len(m.SeriesIDs()), 1; got != exp {
|
||||
t.Fatalf("series count mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensures the tagKeyValue API contains no deadlocks or sync issues.
|
||||
func TestTagKeyValue_Concurrent(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
done := make(chan struct{})
|
||||
time.AfterFunc(2*time.Second, func() { close(done) })
|
||||
|
||||
v := newTagKeyValue()
|
||||
for i := 0; i < 4; i++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
|
||||
rand := rand.New(rand.NewSource(int64(i)))
|
||||
for {
|
||||
// Continue running until time limit.
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Randomly choose next API.
|
||||
switch rand.Intn(7) {
|
||||
case 0:
|
||||
v.bytes()
|
||||
case 1:
|
||||
v.Cardinality()
|
||||
case 2:
|
||||
v.Contains(fmt.Sprintf("%d", rand.Intn(52)+65))
|
||||
case 3:
|
||||
v.InsertSeriesIDByte([]byte(fmt.Sprintf("%d", rand.Intn(52)+65)), rand.Uint64()%1000)
|
||||
case 4:
|
||||
v.Load(fmt.Sprintf("%d", rand.Intn(52)+65))
|
||||
case 5:
|
||||
v.Range(func(tagValue string, a seriesIDs) bool {
|
||||
return rand.Intn(10) == 0
|
||||
})
|
||||
case 6:
|
||||
v.RangeAll(func(k string, a seriesIDs) {})
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func BenchmarkMeasurement_SeriesIDForExp_EQRegex(b *testing.B) {
|
||||
m := newMeasurement("foo", "cpu")
|
||||
for i := 0; i < 100000; i++ {
|
||||
s := newSeries(uint64(i), m, "cpu", models.Tags{models.NewTag(
|
||||
[]byte("host"),
|
||||
[]byte(fmt.Sprintf("host%d", i)))})
|
||||
m.AddSeries(s)
|
||||
}
|
||||
|
||||
if exp, got := 100000, len(m.SeriesKeys()); exp != got {
|
||||
b.Fatalf("series count mismatch: exp %v got %v", exp, got)
|
||||
}
|
||||
|
||||
stmt, err := influxql.NewParser(strings.NewReader(`SELECT * FROM cpu WHERE host =~ /host\d+/`)).ParseStatement()
|
||||
if err != nil {
|
||||
b.Fatalf("invalid statement: %s", err)
|
||||
}
|
||||
|
||||
selectStmt := stmt.(*influxql.SelectStatement)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
ids := m.IDsForExpr(selectStmt.Condition.(*influxql.BinaryExpr))
|
||||
if exp, got := 100000, len(ids); exp != got {
|
||||
b.Fatalf("series count mismatch: exp %v got %v", exp, got)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMeasurement_SeriesIDForExp_NERegex(b *testing.B) {
|
||||
m := newMeasurement("foo", "cpu")
|
||||
for i := 0; i < 100000; i++ {
|
||||
s := newSeries(uint64(i), m, "cpu", models.Tags{models.Tag{
|
||||
Key: []byte("host"),
|
||||
Value: []byte(fmt.Sprintf("host%d", i))}})
|
||||
m.AddSeries(s)
|
||||
}
|
||||
|
||||
if exp, got := 100000, len(m.SeriesKeys()); exp != got {
|
||||
b.Fatalf("series count mismatch: exp %v got %v", exp, got)
|
||||
}
|
||||
|
||||
stmt, err := influxql.NewParser(strings.NewReader(`SELECT * FROM cpu WHERE host !~ /foo\d+/`)).ParseStatement()
|
||||
if err != nil {
|
||||
b.Fatalf("invalid statement: %s", err)
|
||||
}
|
||||
|
||||
selectStmt := stmt.(*influxql.SelectStatement)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
ids := m.IDsForExpr(selectStmt.Condition.(*influxql.BinaryExpr))
|
||||
if exp, got := 100000, len(ids); exp != got {
|
||||
b.Fatalf("series count mismatch: exp %v got %v", exp, got)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func benchmarkTagSets(b *testing.B, n int, opt query.IteratorOptions) {
|
||||
m := newMeasurement("foo", "m")
|
||||
ss := tsdb.NewSeriesIDSet()
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
tags := map[string]string{"tag1": "value1", "tag2": "value2"}
|
||||
s := newSeries(uint64(i), m, "m,tag1=value1,tag2=value2", models.NewTags(tags))
|
||||
ss.Add(uint64(i))
|
||||
m.AddSeries(s)
|
||||
}
|
||||
|
||||
// warm caches
|
||||
m.TagSets(ss, opt)
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
m.TagSets(ss, opt)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMeasurement_TagSetsNoDimensions_1000(b *testing.B) {
|
||||
benchmarkTagSets(b, 1000, query.IteratorOptions{})
|
||||
}
|
||||
|
||||
func BenchmarkMeasurement_TagSetsDimensions_1000(b *testing.B) {
|
||||
benchmarkTagSets(b, 1000, query.IteratorOptions{Dimensions: []string{"tag1", "tag2"}})
|
||||
}
|
||||
|
||||
func BenchmarkMeasurement_TagSetsNoDimensions_100000(b *testing.B) {
|
||||
benchmarkTagSets(b, 100000, query.IteratorOptions{})
|
||||
}
|
||||
|
||||
func BenchmarkMeasurement_TagSetsDimensions_100000(b *testing.B) {
|
||||
benchmarkTagSets(b, 100000, query.IteratorOptions{Dimensions: []string{"tag1", "tag2"}})
|
||||
}
|
|
@ -3,7 +3,6 @@ package tsi1
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
|
@ -775,11 +774,6 @@ func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) erro
|
|||
return nil
|
||||
}
|
||||
|
||||
// InitializeSeries is a no-op. This only applies to the in-memory index.
|
||||
func (i *Index) InitializeSeries(keys, names [][]byte, tags []models.Tags) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// DropSeries drops the provided series from the index. If cascade is true
|
||||
// and this is the last series to the measurement, the measurement will also be dropped.
|
||||
func (i *Index) DropSeries(seriesID uint64, key []byte, cascade bool) error {
|
||||
|
@ -824,9 +818,6 @@ func (i *Index) DropSeries(seriesID uint64, key []byte, cascade bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// DropSeriesGlobal is a no-op on the tsi1 index.
|
||||
func (i *Index) DropSeriesGlobal(key []byte) error { return nil }
|
||||
|
||||
// DropMeasurementIfSeriesNotExist drops a measurement only if there are no more
|
||||
// series for the measurement.
|
||||
func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) (bool, error) {
|
||||
|
@ -1109,27 +1100,3 @@ func (i *Index) RetainFileSet() (*FileSet, error) {
|
|||
}
|
||||
return fs, nil
|
||||
}
|
||||
|
||||
// SetFieldName is a no-op on this index.
|
||||
func (i *Index) SetFieldName(measurement []byte, name string) {}
|
||||
|
||||
// Rebuild rebuilds an index. It's a no-op for this index.
|
||||
func (i *Index) Rebuild() {}
|
||||
|
||||
// IsIndexDir returns true if directory contains at least one partition directory.
|
||||
func IsIndexDir(path string) (bool, error) {
|
||||
fis, err := ioutil.ReadDir(path)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
for _, fi := range fis {
|
||||
if !fi.IsDir() {
|
||||
continue
|
||||
} else if ok, err := IsPartitionDir(filepath.Join(path, fi.Name())); err != nil {
|
||||
return false, err
|
||||
} else if ok {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/pkg/slices"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/index/inmem"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
|
||||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
@ -227,59 +226,6 @@ func TestIndexSet_MeasurementNamesByPredicate(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestIndexSet_DedupeInmemIndexes(t *testing.T) {
|
||||
testCases := []struct {
|
||||
tsiN int // Quantity of TSI indexes
|
||||
inmem1N int // Quantity of ShardIndexes proxying the first inmem Index
|
||||
inmem2N int // Quantity of ShardIndexes proxying the second inmem Index
|
||||
uniqueN int // Quantity of total, deduplicated indexes
|
||||
}{
|
||||
{tsiN: 1, inmem1N: 0, uniqueN: 1},
|
||||
{tsiN: 2, inmem1N: 0, uniqueN: 2},
|
||||
{tsiN: 0, inmem1N: 1, uniqueN: 1},
|
||||
{tsiN: 0, inmem1N: 2, uniqueN: 1},
|
||||
{tsiN: 0, inmem1N: 1, inmem2N: 1, uniqueN: 2},
|
||||
{tsiN: 0, inmem1N: 2, inmem2N: 2, uniqueN: 2},
|
||||
{tsiN: 2, inmem1N: 2, inmem2N: 2, uniqueN: 4},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
name := fmt.Sprintf("%d/%d/%d -> %d", testCase.tsiN, testCase.inmem1N, testCase.inmem2N, testCase.uniqueN)
|
||||
t.Run(name, func(t *testing.T) {
|
||||
|
||||
var indexes []tsdb.Index
|
||||
for i := 0; i < testCase.tsiN; i++ {
|
||||
indexes = append(indexes, MustOpenNewIndex(tsi1.IndexName))
|
||||
}
|
||||
if testCase.inmem1N > 0 {
|
||||
sfile := MustOpenSeriesFile()
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.IndexVersion = inmem.IndexName
|
||||
opts.InmemIndex = inmem.NewIndex("db", sfile.SeriesFile)
|
||||
|
||||
for i := 0; i < testCase.inmem1N; i++ {
|
||||
indexes = append(indexes, inmem.NewShardIndex(uint64(i), tsdb.NewSeriesIDSet(), opts))
|
||||
}
|
||||
}
|
||||
if testCase.inmem2N > 0 {
|
||||
sfile := MustOpenSeriesFile()
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.IndexVersion = inmem.IndexName
|
||||
opts.InmemIndex = inmem.NewIndex("db", sfile.SeriesFile)
|
||||
|
||||
for i := 0; i < testCase.inmem2N; i++ {
|
||||
indexes = append(indexes, inmem.NewShardIndex(uint64(i), tsdb.NewSeriesIDSet(), opts))
|
||||
}
|
||||
}
|
||||
|
||||
is := tsdb.IndexSet{Indexes: indexes}.DedupeInmemIndexes()
|
||||
if len(is.Indexes) != testCase.uniqueN {
|
||||
t.Errorf("expected %d indexes, got %d", testCase.uniqueN, len(is.Indexes))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndex_Sketches(t *testing.T) {
|
||||
checkCardinalities := func(t *testing.T, index *Index, state string, series, tseries, measurements, tmeasurements int) {
|
||||
t.Helper()
|
||||
|
@ -436,10 +382,6 @@ func MustNewIndex(index string, eopts ...EngineOption) *Index {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
if index == inmem.IndexName {
|
||||
opts.InmemIndex = inmem.NewIndex("db0", sfile)
|
||||
}
|
||||
|
||||
i, err := tsdb.NewIndex(0, "db0", filepath.Join(rootPath, "index"), tsdb.NewSeriesIDSet(), sfile, opts)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -500,9 +442,6 @@ func (i *Index) Reopen() error {
|
|||
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.IndexVersion = i.indexType
|
||||
if i.indexType == inmem.IndexName {
|
||||
opts.InmemIndex = inmem.NewIndex("db0", i.sfile)
|
||||
}
|
||||
|
||||
idx, err := tsdb.NewIndex(0, "db0", filepath.Join(i.rootPath, "index"), tsdb.NewSeriesIDSet(), i.sfile, opts)
|
||||
if err != nil {
|
||||
|
@ -532,7 +471,6 @@ func (i *Index) Close() error {
|
|||
//
|
||||
// Typical results on an i7 laptop.
|
||||
//
|
||||
// BenchmarkIndexSet_TagSets/1M_series/inmem-8 100 10430732 ns/op 3556728 B/op 51 allocs/op
|
||||
// BenchmarkIndexSet_TagSets/1M_series/tsi1-8 100 18995530 ns/op 5221180 B/op 20379 allocs/op
|
||||
func BenchmarkIndexSet_TagSets(b *testing.B) {
|
||||
// Read line-protocol and coerce into tsdb format.
|
||||
|
@ -588,11 +526,6 @@ func BenchmarkIndexSet_TagSets(b *testing.B) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO(edd): refactor how we call into tag sets in the tsdb package.
|
||||
type indexTagSets interface {
|
||||
TagSets(name []byte, options query.IteratorOptions) ([]*query.TagSet, error)
|
||||
}
|
||||
|
||||
var errResult error
|
||||
|
||||
// This benchmark will merge eight bitsets each containing ~10,000 series IDs.
|
||||
|
@ -609,18 +542,8 @@ func BenchmarkIndexSet_TagSets(b *testing.B) {
|
|||
Indexes: []tsdb.Index{idx.Index},
|
||||
} // For TSI implementation
|
||||
|
||||
var ts func() ([]*query.TagSet, error)
|
||||
// TODO(edd): this is somewhat awkward. We should unify this difference somewhere higher
|
||||
// up than the engine. I don't want to open an engine do a benchmark on
|
||||
// different index implementations.
|
||||
if indexType == tsdb.InmemIndexName {
|
||||
ts = func() ([]*query.TagSet, error) {
|
||||
return idx.Index.(indexTagSets).TagSets(name, opt)
|
||||
}
|
||||
} else {
|
||||
ts = func() ([]*query.TagSet, error) {
|
||||
return indexSet.TagSets(idx.sfile, name, opt)
|
||||
}
|
||||
ts := func() ([]*query.TagSet, error) {
|
||||
return indexSet.TagSets(idx.sfile, name, opt)
|
||||
}
|
||||
|
||||
b.Run(indexType, func(b *testing.B) {
|
||||
|
@ -645,8 +568,6 @@ func BenchmarkIndexSet_TagSets(b *testing.B) {
|
|||
//
|
||||
// Typical results for an i7 laptop
|
||||
//
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/inmem/queries_100000/cache-8 1 5963346204 ns/op 2499655768 B/op 23964183 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/inmem/queries_100000/no_cache-8 1 5314841090 ns/op 2499495280 B/op 23963322 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/tsi1/queries_100000/cache-8 1 1645048376 ns/op 2215402840 B/op 23048978 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/tsi1/queries_100000/no_cache-8 1 22242155616 ns/op 28277544136 B/op 79620463 allocs/op
|
||||
func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) {
|
||||
|
|
|
@ -315,9 +315,15 @@ func (s *Shard) Open() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
idx.WithLogger(s.baseLogger)
|
||||
|
||||
// Check if the index needs to be rebuilt before Open() initializes
|
||||
// its file system layout.
|
||||
var shouldReindex bool
|
||||
if _, err := os.Stat(ipath); os.IsNotExist(err) {
|
||||
shouldReindex = true
|
||||
}
|
||||
|
||||
// Open index.
|
||||
if err := idx.Open(); err != nil {
|
||||
return err
|
||||
|
@ -340,8 +346,12 @@ func (s *Shard) Open() error {
|
|||
if err := e.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
if shouldReindex {
|
||||
if err := e.Reindex(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Load metadata index for the inmem index only.
|
||||
if err := e.LoadMetadataIndex(s.id, s.index); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -704,8 +714,6 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error
|
|||
if err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.index.SetFieldName(f.Measurement, f.Field.Name)
|
||||
}
|
||||
|
||||
if len(fieldsToCreate) > 0 {
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"sort"
|
||||
|
@ -231,9 +230,6 @@ func NewTempShard(index string) *TempShard {
|
|||
opt := NewEngineOptions()
|
||||
opt.IndexVersion = index
|
||||
opt.Config.WALDir = filepath.Join(dir, "wal")
|
||||
if index == InmemIndexName {
|
||||
opt.InmemIndex, _ = NewInmemIndex(path.Base(dir), sfile)
|
||||
}
|
||||
|
||||
return &TempShard{
|
||||
Shard: NewShard(0,
|
||||
|
|
|
@ -28,7 +28,6 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
_ "github.com/influxdata/influxdb/v2/tsdb/engine"
|
||||
_ "github.com/influxdata/influxdb/v2/tsdb/index"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/index/inmem"
|
||||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
||||
|
@ -43,7 +42,6 @@ func TestShardWriteAndIndex(t *testing.T) {
|
|||
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
|
||||
opts.InmemIndex = inmem.NewIndex(filepath.Base(tmpDir), sfile.SeriesFile)
|
||||
|
||||
sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
|
||||
|
||||
|
@ -102,6 +100,84 @@ func TestShardWriteAndIndex(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestShardRebuildIndex(t *testing.T) {
|
||||
tmpDir, _ := ioutil.TempDir("", "shard_test")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
tmpShard := filepath.Join(tmpDir, "shard")
|
||||
tmpWal := filepath.Join(tmpDir, "wal")
|
||||
|
||||
sfile := MustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
|
||||
|
||||
sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
|
||||
if err := sh.Open(); err != nil {
|
||||
t.Fatalf("error opening shard: %s", err.Error())
|
||||
}
|
||||
|
||||
pt := models.MustNewPoint(
|
||||
"cpu",
|
||||
models.Tags{{Key: []byte("host"), Value: []byte("server")}},
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
|
||||
err := sh.WritePoints([]models.Point{pt})
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
pt.SetTime(time.Unix(2, 3))
|
||||
err = sh.WritePoints([]models.Point{pt})
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
indexPath := filepath.Join(tmpShard, "index")
|
||||
validateIndex := func() {
|
||||
cnt := sh.SeriesN()
|
||||
if got, exp := cnt, int64(1); got != exp {
|
||||
t.Fatalf("got %v series, exp %v series in index", got, exp)
|
||||
}
|
||||
fi, err := os.Stat(indexPath)
|
||||
|
||||
// Make sure index data is being persisted to disk.
|
||||
if os.IsNotExist(err) {
|
||||
t.Fatalf("index path %q does not exist", indexPath)
|
||||
}
|
||||
if !fi.IsDir() {
|
||||
t.Fatalf("index path %q is not a directory", indexPath)
|
||||
}
|
||||
}
|
||||
|
||||
validateIndex()
|
||||
|
||||
// ensure the index gets rebuilt after its directory is deleted and
|
||||
// the shard is reopened.
|
||||
if err := sh.Close(); err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
if err := os.RemoveAll(indexPath); err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
sh = tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
|
||||
if err := sh.Open(); err != nil {
|
||||
t.Fatalf("error opening shard: %s", err.Error())
|
||||
}
|
||||
|
||||
validateIndex()
|
||||
|
||||
// and ensure that we can still write data
|
||||
pt.SetTime(time.Unix(2, 6))
|
||||
err = sh.WritePoints([]models.Point{pt})
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func TestShard_Open_CorruptFieldsIndex(t *testing.T) {
|
||||
tmpDir, _ := ioutil.TempDir("", "shard_test")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
@ -113,7 +189,6 @@ func TestShard_Open_CorruptFieldsIndex(t *testing.T) {
|
|||
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
|
||||
opts.InmemIndex = inmem.NewIndex(filepath.Base(tmpDir), sfile.SeriesFile)
|
||||
|
||||
sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
|
||||
|
||||
|
@ -153,120 +228,6 @@ func TestShard_Open_CorruptFieldsIndex(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMaxSeriesLimit(t *testing.T) {
|
||||
tmpDir, _ := ioutil.TempDir("", "shard_test")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
tmpShard := filepath.Join(tmpDir, "db", "rp", "1")
|
||||
tmpWal := filepath.Join(tmpDir, "wal")
|
||||
|
||||
sfile := MustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
|
||||
opts.Config.MaxSeriesPerDatabase = 1000
|
||||
opts.InmemIndex = inmem.NewIndex(filepath.Base(tmpDir), sfile.SeriesFile)
|
||||
opts.IndexVersion = tsdb.InmemIndexName
|
||||
|
||||
sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
|
||||
|
||||
if err := sh.Open(); err != nil {
|
||||
t.Fatalf("error opening shard: %s", err.Error())
|
||||
}
|
||||
|
||||
// Writing 1K series should succeed.
|
||||
points := []models.Point{}
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
pt := models.MustNewPoint(
|
||||
"cpu",
|
||||
models.Tags{{Key: []byte("host"), Value: []byte(fmt.Sprintf("server%d", i))}},
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
points = append(points, pt)
|
||||
}
|
||||
|
||||
err := sh.WritePoints(points)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
// Writing one more series should exceed the series limit.
|
||||
pt := models.MustNewPoint(
|
||||
"cpu",
|
||||
models.Tags{{Key: []byte("host"), Value: []byte("server9999")}},
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
|
||||
err = sh.WritePoints([]models.Point{pt})
|
||||
if err == nil {
|
||||
t.Fatal("expected error")
|
||||
} else if exp, got := `partial write: max-series-per-database limit exceeded: (1000) dropped=1`, err.Error(); exp != got {
|
||||
t.Fatalf("unexpected error message:\n\texp = %s\n\tgot = %s", exp, got)
|
||||
}
|
||||
|
||||
sh.Close()
|
||||
}
|
||||
|
||||
func TestShard_MaxTagValuesLimit(t *testing.T) {
|
||||
tmpDir, _ := ioutil.TempDir("", "shard_test")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
tmpShard := filepath.Join(tmpDir, "db", "rp", "1")
|
||||
tmpWal := filepath.Join(tmpDir, "wal")
|
||||
|
||||
sfile := MustOpenSeriesFile()
|
||||
defer sfile.Close()
|
||||
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
|
||||
opts.Config.MaxValuesPerTag = 1000
|
||||
opts.InmemIndex = inmem.NewIndex(filepath.Base(tmpDir), sfile.SeriesFile)
|
||||
opts.IndexVersion = tsdb.InmemIndexName
|
||||
|
||||
sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
|
||||
|
||||
if err := sh.Open(); err != nil {
|
||||
t.Fatalf("error opening shard: %s", err.Error())
|
||||
}
|
||||
|
||||
// Writing 1K series should succeed.
|
||||
points := []models.Point{}
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
pt := models.MustNewPoint(
|
||||
"cpu",
|
||||
models.Tags{{Key: []byte("host"), Value: []byte(fmt.Sprintf("server%d", i))}},
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
points = append(points, pt)
|
||||
}
|
||||
|
||||
err := sh.WritePoints(points)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
// Writing one more series should exceed the series limit.
|
||||
pt := models.MustNewPoint(
|
||||
"cpu",
|
||||
models.Tags{{Key: []byte("host"), Value: []byte("server9999")}},
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
|
||||
err = sh.WritePoints([]models.Point{pt})
|
||||
if err == nil {
|
||||
t.Fatal("expected error")
|
||||
} else if exp, got := `partial write: max-values-per-tag limit exceeded (1000/1000): measurement="cpu" tag="host" value="server9999" dropped=1`, err.Error(); exp != got {
|
||||
t.Fatalf("unexpected error message:\n\texp = %s\n\tgot = %s", exp, got)
|
||||
}
|
||||
|
||||
sh.Close()
|
||||
}
|
||||
|
||||
func TestWriteTimeTag(t *testing.T) {
|
||||
tmpDir, _ := ioutil.TempDir("", "shard_test")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
@ -278,7 +239,6 @@ func TestWriteTimeTag(t *testing.T) {
|
|||
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
|
||||
opts.InmemIndex = inmem.NewIndex(filepath.Base(tmpDir), sfile.SeriesFile)
|
||||
|
||||
sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
|
||||
if err := sh.Open(); err != nil {
|
||||
|
@ -329,7 +289,6 @@ func TestWriteTimeField(t *testing.T) {
|
|||
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
|
||||
opts.InmemIndex = inmem.NewIndex(filepath.Base(tmpDir), sfile.SeriesFile)
|
||||
|
||||
sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
|
||||
if err := sh.Open(); err != nil {
|
||||
|
@ -365,7 +324,6 @@ func TestShardWriteAddNewField(t *testing.T) {
|
|||
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
|
||||
opts.InmemIndex = inmem.NewIndex(filepath.Base(tmpDir), sfile.SeriesFile)
|
||||
|
||||
sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
|
||||
if err := sh.Open(); err != nil {
|
||||
|
@ -418,7 +376,6 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
|
|||
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
|
||||
opts.InmemIndex = inmem.NewIndex(filepath.Base(tmpDir), sfile.SeriesFile)
|
||||
opts.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{})
|
||||
|
||||
sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
|
||||
|
@ -507,7 +464,6 @@ func TestShard_WritePoints_FieldConflictConcurrentQuery(t *testing.T) {
|
|||
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
|
||||
opts.InmemIndex = inmem.NewIndex(filepath.Base(tmpDir), sfile.SeriesFile)
|
||||
opts.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{})
|
||||
|
||||
sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
|
||||
|
@ -658,7 +614,6 @@ func TestShard_Close_RemoveIndex(t *testing.T) {
|
|||
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
|
||||
opts.InmemIndex = inmem.NewIndex(filepath.Base(tmpDir), sfile.SeriesFile)
|
||||
|
||||
sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
|
||||
if err := sh.Open(); err != nil {
|
||||
|
@ -2054,7 +2009,6 @@ func openShard(sfile *SeriesFile) (*tsdb.Shard, string, error) {
|
|||
tmpWal := filepath.Join(tmpDir, "wal")
|
||||
opts := tsdb.NewEngineOptions()
|
||||
opts.Config.WALDir = tmpWal
|
||||
opts.InmemIndex = inmem.NewIndex(filepath.Base(tmpDir), sfile.SeriesFile)
|
||||
shard := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
|
||||
err := shard.Open()
|
||||
return shard, tmpDir, err
|
||||
|
@ -2202,9 +2156,6 @@ func NewShards(index string, n int) Shards {
|
|||
opt := tsdb.NewEngineOptions()
|
||||
opt.IndexVersion = index
|
||||
opt.Config.WALDir = filepath.Join(dir, "wal")
|
||||
if index == tsdb.InmemIndexName {
|
||||
opt.InmemIndex = inmem.NewIndex(filepath.Base(dir), sfile.SeriesFile)
|
||||
}
|
||||
|
||||
// Initialise series id sets. Need to do this as it's normally done at the
|
||||
// store level.
|
||||
|
|
174
tsdb/store.go
174
tsdb/store.go
|
@ -38,7 +38,7 @@ var (
|
|||
ErrShardDeletion = errors.New("shard is being deleted")
|
||||
// ErrMultipleIndexTypes is returned when trying to do deletes on a database with
|
||||
// multiple index types.
|
||||
ErrMultipleIndexTypes = errors.New("cannot delete data. DB contains shards using both inmem and tsi1 indexes. Please convert all shards to use the same index type to delete data.")
|
||||
ErrMultipleIndexTypes = errors.New("cannot delete data. DB contains shards using multiple indexes. Please convert all shards to use the same index type to delete data")
|
||||
)
|
||||
|
||||
// Statistics gathered by the store.
|
||||
|
@ -84,9 +84,6 @@ type Store struct {
|
|||
SeriesFileMaxSize int64 // Determines size of series file mmap. Can be altered in tests.
|
||||
path string
|
||||
|
||||
// shared per-database indexes, only if using "inmem".
|
||||
indexes map[string]interface{}
|
||||
|
||||
// Maintains a set of shards that are in the process of deletion.
|
||||
// This prevents new shards from being created while old ones are being deleted.
|
||||
pendingShardDeletes map[uint64]struct{}
|
||||
|
@ -108,17 +105,15 @@ type Store struct {
|
|||
// NewStore returns a new store with the given path and a default configuration.
|
||||
// The returned store must be initialized by calling Open before using it.
|
||||
func NewStore(path string) *Store {
|
||||
logger := zap.NewNop()
|
||||
return &Store{
|
||||
databases: make(map[string]*databaseState),
|
||||
path: path,
|
||||
sfiles: make(map[string]*SeriesFile),
|
||||
indexes: make(map[string]interface{}),
|
||||
pendingShardDeletes: make(map[uint64]struct{}),
|
||||
epochs: make(map[uint64]*epochTracker),
|
||||
EngineOptions: NewEngineOptions(),
|
||||
Logger: logger,
|
||||
baseLogger: logger,
|
||||
Logger: zap.NewNop(),
|
||||
baseLogger: zap.NewNop(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -187,7 +182,6 @@ func (s *Store) IndexBytes() int {
|
|||
is.Indexes = append(is.Indexes, shard.index)
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
is = is.DedupeInmemIndexes()
|
||||
|
||||
var b int
|
||||
for _, idx := range is.Indexes {
|
||||
|
@ -320,12 +314,6 @@ func (s *Store) loadShards() error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Retrieve database index.
|
||||
idx, err := s.createIndexIfNotExists(db.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Load each retention policy within the database directory.
|
||||
rpDirs, err := ioutil.ReadDir(dbPath)
|
||||
if err != nil {
|
||||
|
@ -386,16 +374,10 @@ func (s *Store) loadShards() error {
|
|||
|
||||
// Copy options and assign shared index.
|
||||
opt := s.EngineOptions
|
||||
opt.InmemIndex = idx
|
||||
|
||||
// Provide an implementation of the ShardIDSets
|
||||
opt.SeriesIDSets = shardSet{store: s, db: db}
|
||||
|
||||
// Existing shards should continue to use inmem index.
|
||||
if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) {
|
||||
opt.IndexVersion = InmemIndexName
|
||||
}
|
||||
|
||||
// Open engine.
|
||||
shard := NewShard(shardID, path, walPath, sfile, opt)
|
||||
|
||||
|
@ -406,8 +388,8 @@ func (s *Store) loadShards() error {
|
|||
|
||||
err = shard.Open()
|
||||
if err != nil {
|
||||
log.Info("Failed to open shard", logger.Shard(shardID), zap.Error(err))
|
||||
resC <- &res{err: fmt.Errorf("Failed to open shard: %d: %s", shardID, err)}
|
||||
log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err))
|
||||
resC <- &res{err: fmt.Errorf("failed to open shard: %d: %s", shardID, err)}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -488,7 +470,6 @@ func (s *Store) Close() error {
|
|||
|
||||
s.databases = make(map[string]*databaseState)
|
||||
s.sfiles = map[string]*SeriesFile{}
|
||||
s.indexes = make(map[string]interface{})
|
||||
s.pendingShardDeletes = make(map[uint64]struct{})
|
||||
s.shards = nil
|
||||
s.opened = false // Store may now be opened again.
|
||||
|
@ -529,28 +510,6 @@ func (s *Store) seriesFile(database string) *SeriesFile {
|
|||
return s.sfiles[database]
|
||||
}
|
||||
|
||||
// createIndexIfNotExists returns a shared index for a database, if the inmem
|
||||
// index is being used. If the TSI index is being used, then this method is
|
||||
// basically a no-op.
|
||||
func (s *Store) createIndexIfNotExists(name string) (interface{}, error) {
|
||||
if idx := s.indexes[name]; idx != nil {
|
||||
return idx, nil
|
||||
}
|
||||
|
||||
sfile, err := s.openSeriesFile(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
idx, err := NewInmemIndex(name, sfile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.indexes[name] = idx
|
||||
return idx, nil
|
||||
}
|
||||
|
||||
// Shard returns a shard by id.
|
||||
func (s *Store) Shard(id uint64) *Shard {
|
||||
s.mu.RLock()
|
||||
|
@ -638,15 +597,8 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en
|
|||
return err
|
||||
}
|
||||
|
||||
// Retrieve shared index, if needed.
|
||||
idx, err := s.createIndexIfNotExists(database)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Copy index options and pass in shared index.
|
||||
opt := s.EngineOptions
|
||||
opt.InmemIndex = idx
|
||||
opt.SeriesIDSets = shardSet{store: s, db: database}
|
||||
|
||||
path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
|
||||
|
@ -767,32 +719,6 @@ func (s *Store) DeleteShard(shardID uint64) error {
|
|||
if ss.Cardinality() > 0 {
|
||||
sfile := s.seriesFile(db)
|
||||
if sfile != nil {
|
||||
// If the inmem index is in use, then the series being removed from the
|
||||
// series file will also need to be removed from the index.
|
||||
if index.Type() == InmemIndexName {
|
||||
var keyBuf []byte // Series key buffer.
|
||||
var name []byte
|
||||
var tagsBuf models.Tags // Buffer for tags container.
|
||||
var err error
|
||||
|
||||
ss.ForEach(func(id uint64) {
|
||||
skey := sfile.SeriesKey(id) // Series File series key
|
||||
if skey == nil {
|
||||
return
|
||||
}
|
||||
|
||||
name, tagsBuf = ParseSeriesKeyInto(skey, tagsBuf)
|
||||
keyBuf = models.AppendMakeKey(keyBuf, name, tagsBuf)
|
||||
if err = index.DropSeriesGlobal(keyBuf); err != nil {
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
ss.ForEach(func(id uint64) {
|
||||
sfile.DeleteSeriesID(id)
|
||||
})
|
||||
|
@ -872,9 +798,6 @@ func (s *Store) DeleteDatabase(name string) error {
|
|||
// Remove database from store list of databases
|
||||
delete(s.databases, name)
|
||||
|
||||
// Remove shared index for database if using inmem index.
|
||||
delete(s.indexes, name)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1524,7 +1447,6 @@ func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond in
|
|||
}
|
||||
is.Indexes = append(is.Indexes, index)
|
||||
}
|
||||
is = is.DedupeInmemIndexes()
|
||||
return is.MeasurementNamesByExpr(auth, cond)
|
||||
}
|
||||
|
||||
|
@ -1610,7 +1532,6 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.
|
|||
s.mu.RUnlock()
|
||||
|
||||
// Determine list of measurements.
|
||||
is = is.DedupeInmemIndexes()
|
||||
names, err := is.MeasurementNamesByExpr(nil, measurementExpr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1775,7 +1696,6 @@ func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxq
|
|||
is.Indexes = append(is.Indexes, index)
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
is = is.DedupeInmemIndexes()
|
||||
|
||||
// Stores each list of TagValues for each measurement.
|
||||
var allResults []tagValues
|
||||
|
@ -1996,8 +1916,6 @@ func mergeTagValues(valueIdxs [][2]int, tvs ...tagValues) TagValues {
|
|||
func (s *Store) monitorShards() {
|
||||
t := time.NewTicker(10 * time.Second)
|
||||
defer t.Stop()
|
||||
t2 := time.NewTicker(time.Minute)
|
||||
defer t2.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-s.closing:
|
||||
|
@ -2016,88 +1934,6 @@ func (s *Store) monitorShards() {
|
|||
}
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
case <-t2.C:
|
||||
if s.EngineOptions.Config.MaxValuesPerTag == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
shards := s.filterShards(func(sh *Shard) bool {
|
||||
return sh.IndexType() == InmemIndexName
|
||||
})
|
||||
s.mu.RUnlock()
|
||||
|
||||
// No inmem shards...
|
||||
if len(shards) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
var dbLock sync.Mutex
|
||||
databases := make(map[string]struct{}, len(shards))
|
||||
|
||||
s.walkShards(shards, func(sh *Shard) error {
|
||||
db := sh.database
|
||||
|
||||
// Only process 1 shard from each database
|
||||
dbLock.Lock()
|
||||
if _, ok := databases[db]; ok {
|
||||
dbLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
databases[db] = struct{}{}
|
||||
dbLock.Unlock()
|
||||
|
||||
sfile := s.seriesFile(sh.database)
|
||||
if sfile == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
firstShardIndex, err := sh.Index()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
index, err := sh.Index()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// inmem shards share the same index instance so just use the first one to avoid
|
||||
// allocating the same measurements repeatedly
|
||||
indexSet := IndexSet{Indexes: []Index{firstShardIndex}, SeriesFile: sfile}
|
||||
names, err := indexSet.MeasurementNamesByExpr(nil, nil)
|
||||
if err != nil {
|
||||
s.Logger.Warn("Cannot retrieve measurement names",
|
||||
zap.Error(err),
|
||||
logger.Shard(sh.ID()),
|
||||
logger.Database(db))
|
||||
return nil
|
||||
}
|
||||
|
||||
indexSet.Indexes = []Index{index}
|
||||
for _, name := range names {
|
||||
indexSet.ForEachMeasurementTagKey(name, func(k []byte) error {
|
||||
n := sh.TagKeyCardinality(name, k)
|
||||
perc := int(float64(n) / float64(s.EngineOptions.Config.MaxValuesPerTag) * 100)
|
||||
if perc > 100 {
|
||||
perc = 100
|
||||
}
|
||||
|
||||
// Log at 80, 85, 90-100% levels
|
||||
if perc == 80 || perc == 85 || perc >= 90 {
|
||||
s.Logger.Warn("max-values-per-tag limit may be exceeded soon",
|
||||
zap.String("perc", fmt.Sprintf("%d%%", perc)),
|
||||
zap.Int("n", n),
|
||||
zap.Int("max", s.EngineOptions.Config.MaxValuesPerTag),
|
||||
logger.Database(db),
|
||||
zap.ByteString("measurement", name),
|
||||
zap.ByteString("tag", k))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,6 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/pkg/deep"
|
||||
"github.com/influxdata/influxdb/v2/pkg/slices"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/index/inmem"
|
||||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
||||
|
@ -198,50 +197,6 @@ func TestStore_CreateMixedShards(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStore_DropMeasurementMixedShards(t *testing.T) {
|
||||
|
||||
test := func(index1 string, index2 string) {
|
||||
s := MustOpenStore(index1)
|
||||
defer s.Close()
|
||||
|
||||
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s.MustWriteToShardString(1, "mem,server=a v=1 10")
|
||||
|
||||
s.EngineOptions.IndexVersion = index2
|
||||
s.index = index2
|
||||
if err := s.Reopen(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := s.CreateShard("db0", "rp0", 2, true); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s.MustWriteToShardString(2, "mem,server=b v=1 20")
|
||||
|
||||
s.MustWriteToShardString(1, "cpu,server=a v=1 10")
|
||||
s.MustWriteToShardString(2, "cpu,server=b v=1 20")
|
||||
|
||||
err := s.DeleteMeasurement("db0", "cpu")
|
||||
if err != tsdb.ErrMultipleIndexTypes {
|
||||
t.Fatal(err)
|
||||
} else if err == nil {
|
||||
t.Fatal("expect failure deleting measurement on multiple index types")
|
||||
}
|
||||
}
|
||||
|
||||
indexes := tsdb.RegisteredIndexes()
|
||||
for i := range indexes {
|
||||
j := (i + 1) % len(indexes)
|
||||
index1 := indexes[i]
|
||||
index2 := indexes[j]
|
||||
t.Run(fmt.Sprintf("%s-%s", index1, index2), func(t *testing.T) { test(index1, index2) })
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_DropConcurrentWriteMultipleShards(t *testing.T) {
|
||||
|
||||
test := func(index string) {
|
||||
|
@ -786,10 +741,6 @@ func TestStore_BackupRestoreShard(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
if index == tsdb.TSI1IndexName {
|
||||
t.Skip("Skipping failing test for tsi1")
|
||||
}
|
||||
|
||||
t.Run(index, func(t *testing.T) {
|
||||
test(index)
|
||||
})
|
||||
|
@ -998,7 +949,6 @@ func TestStore_Cardinality_Unique(t *testing.T) {
|
|||
|
||||
test := func(index string) {
|
||||
store := NewStore(index)
|
||||
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
|
||||
if err := store.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -1079,7 +1029,6 @@ func TestStore_Cardinality_Duplicates(t *testing.T) {
|
|||
|
||||
test := func(index string) {
|
||||
store := NewStore(index)
|
||||
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
|
||||
if err := store.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -1147,7 +1096,6 @@ func TestStore_Cardinality_Compactions(t *testing.T) {
|
|||
|
||||
test := func(index string) error {
|
||||
store := NewStore(index)
|
||||
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
|
||||
if err := store.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -1164,65 +1112,6 @@ func TestStore_Cardinality_Compactions(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStore_Cardinality_Limit_On_InMem_Index(t *testing.T) {
|
||||
|
||||
if testing.Short() || os.Getenv("GORACE") != "" || os.Getenv("APPVEYOR") != "" || os.Getenv("CIRCLECI") != "" {
|
||||
t.Skip("Skipping test in short, race, circleci and appveyor mode.")
|
||||
}
|
||||
|
||||
store := NewStore("inmem")
|
||||
store.EngineOptions.Config.MaxSeriesPerDatabase = 100000
|
||||
if err := store.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
// Generate 200,000 series to write.
|
||||
series := genTestSeries(64, 5, 5)
|
||||
|
||||
// Add 1 point to each series.
|
||||
points := make([]models.Point, 0, len(series))
|
||||
for _, s := range series {
|
||||
points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
|
||||
}
|
||||
|
||||
// Create shards to write points into.
|
||||
numShards := 10
|
||||
for shardID := 0; shardID < numShards; shardID++ {
|
||||
if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil {
|
||||
t.Fatalf("create shard: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Write series / points to the shards.
|
||||
pointsPerShard := len(points) / numShards
|
||||
|
||||
for shardID := 0; shardID < numShards; shardID++ {
|
||||
from := shardID * pointsPerShard
|
||||
to := from + pointsPerShard
|
||||
|
||||
if err := store.Store.WriteToShard(uint64(shardID), points[from:to]); err != nil {
|
||||
if !strings.Contains(err.Error(), "partial write: max-series-per-database limit exceeded:") {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get updated series cardinality from store after writing data.
|
||||
cardinality, err := store.Store.SeriesCardinality("db")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
expCardinality := store.EngineOptions.Config.MaxSeriesPerDatabase
|
||||
|
||||
// Estimated cardinality should be well within 1.5% of the actual cardinality.
|
||||
got := math.Abs(float64(cardinality)-float64(expCardinality)) / float64(expCardinality)
|
||||
exp := 0.015
|
||||
if got > exp {
|
||||
t.Errorf("got epsilon of %v for series cardinality %d (expected %d), which is larger than expected %v", got, cardinality, expCardinality, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_Sketches(t *testing.T) {
|
||||
|
||||
checkCardinalities := func(store *tsdb.Store, series, tseries, measurements, tmeasurements int) error {
|
||||
|
@ -1318,11 +1207,8 @@ func TestStore_Sketches(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Check cardinalities. In this case, the indexes behave differently.
|
||||
// Check cardinalities.
|
||||
expS, expTS, expM, expTM := 160, 80, 10, 5
|
||||
if index == inmem.IndexName {
|
||||
expS, expTS, expM, expTM = 160, 80, 10, 5
|
||||
}
|
||||
|
||||
// Check cardinalities - tombstones should be in
|
||||
if err := checkCardinalities(store.Store, expS, expTS, expM, expTM); err != nil {
|
||||
|
@ -1334,11 +1220,8 @@ func TestStore_Sketches(t *testing.T) {
|
|||
return err
|
||||
}
|
||||
|
||||
// Check cardinalities. In this case, the indexes behave differently.
|
||||
// Check cardinalities.
|
||||
expS, expTS, expM, expTM = 80, 80, 5, 5
|
||||
if index == inmem.IndexName {
|
||||
expS, expTS, expM, expTM = 80, 0, 5, 0
|
||||
}
|
||||
|
||||
if err := checkCardinalities(store.Store, expS, expTS, expM, expTM); err != nil {
|
||||
return fmt.Errorf("[initial|re-open|delete|re-open] %v", err)
|
||||
|
|
|
@ -13,9 +13,6 @@ your database.
|
|||
options to exclude unneeded tags.
|
||||
- Write to a database with an appropriate
|
||||
[retention policy](https://docs.influxdata.com/influxdb/latest/guides/downsampling_and_retention/).
|
||||
- Limit series cardinality in your database using the
|
||||
[`max-series-per-database`](https://docs.influxdata.com/influxdb/latest/administration/config/#max-series-per-database-1000000) and
|
||||
[`max-values-per-tag`](https://docs.influxdata.com/influxdb/latest/administration/config/#max-values-per-tag-100000) settings.
|
||||
- Consider using the
|
||||
[Time Series Index](https://docs.influxdata.com/influxdb/latest/concepts/time-series-index/).
|
||||
- Monitor your databases
|
||||
|
|
|
@ -27,8 +27,6 @@ avoid cardinality issues:
|
|||
|
||||
- Use [metric filtering][] options to exclude unneeded measurements and tags.
|
||||
- Write to a database with an appropriate [retention policy][].
|
||||
- Limit series cardinality in your database using the
|
||||
[max-series-per-database][] and [max-values-per-tag][] settings.
|
||||
- Consider using the [Time Series Index][tsi].
|
||||
- Monitor your databases [series cardinality][].
|
||||
- Consult the [InfluxDB documentation][influx-docs] for the most up-to-date techniques.
|
||||
|
@ -305,8 +303,6 @@ kubernetes_statefulset,namespace=default,selector_select1=s1,statefulset_name=et
|
|||
|
||||
[metric filtering]: https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md#metric-filtering
|
||||
[retention policy]: https://docs.influxdata.com/influxdb/latest/guides/downsampling_and_retention/
|
||||
[max-series-per-database]: https://docs.influxdata.com/influxdb/latest/administration/config/#max-series-per-database-1000000
|
||||
[max-values-per-tag]: https://docs.influxdata.com/influxdb/latest/administration/config/#max-values-per-tag-100000
|
||||
[tsi]: https://docs.influxdata.com/influxdb/latest/concepts/time-series-index/
|
||||
[series cardinality]: https://docs.influxdata.com/influxdb/latest/query_language/spec/#show-cardinality
|
||||
[influx-docs]: https://docs.influxdata.com/influxdb/latest/
|
||||
|
|
|
@ -28,8 +28,6 @@ avoid cardinality issues:
|
|||
|
||||
- Use [metric filtering][] options to exclude unneeded measurements and tags.
|
||||
- Write to a database with an appropriate [retention policy][].
|
||||
- Limit series cardinality in your database using the
|
||||
[max-series-per-database][] and [max-values-per-tag][] settings.
|
||||
- Consider using the [Time Series Index][tsi].
|
||||
- Monitor your databases [series cardinality][].
|
||||
- Consult the [InfluxDB documentation][influx-docs] for the most up-to-date techniques.
|
||||
|
@ -154,8 +152,6 @@ kubernetes_system_container
|
|||
|
||||
[metric filtering]: https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md#metric-filtering
|
||||
[retention policy]: https://docs.influxdata.com/influxdb/latest/guides/downsampling_and_retention/
|
||||
[max-series-per-database]: https://docs.influxdata.com/influxdb/latest/administration/config/#max-series-per-database-1000000
|
||||
[max-values-per-tag]: https://docs.influxdata.com/influxdb/latest/administration/config/#max-values-per-tag-100000
|
||||
[tsi]: https://docs.influxdata.com/influxdb/latest/concepts/time-series-index/
|
||||
[series cardinality]: https://docs.influxdata.com/influxdb/latest/query_language/spec/#show-cardinality
|
||||
[influx-docs]: https://docs.influxdata.com/influxdb/latest/
|
||||
|
|
|
@ -15,9 +15,6 @@ manage your series cardinality:
|
|||
`tagexclude` to remove the `pid` and `process_group_id` tags.
|
||||
- Write to a database with an appropriate
|
||||
[retention policy](https://docs.influxdata.com/influxdb/latest/guides/downsampling_and_retention/).
|
||||
- Limit series cardinality in your database using the
|
||||
[`max-series-per-database`](https://docs.influxdata.com/influxdb/latest/administration/config/#max-series-per-database-1000000) and
|
||||
[`max-values-per-tag`](https://docs.influxdata.com/influxdb/latest/administration/config/#max-values-per-tag-100000) settings.
|
||||
- Consider using the
|
||||
[Time Series Index](https://docs.influxdata.com/influxdb/latest/concepts/time-series-index/).
|
||||
- Monitor your databases
|
||||
|
|
|
@ -14,8 +14,6 @@ avoid cardinality issues:
|
|||
|
||||
- Use [metric filtering][] options to exclude unneeded measurements and tags.
|
||||
- Write to a database with an appropriate [retention policy][].
|
||||
- Limit series cardinality in your database using the
|
||||
[max-series-per-database][] and [max-values-per-tag][] settings.
|
||||
- Consider using the [Time Series Index][tsi].
|
||||
- Monitor your databases [series cardinality][].
|
||||
- Consult the [InfluxDB documentation][influx-docs] for the most up-to-date techniques.
|
||||
|
@ -113,8 +111,6 @@ This sflow implementation was built from the reference document
|
|||
|
||||
[metric filtering]: https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md#metric-filtering
|
||||
[retention policy]: https://docs.influxdata.com/influxdb/latest/guides/downsampling_and_retention/
|
||||
[max-series-per-database]: https://docs.influxdata.com/influxdb/latest/administration/config/#max-series-per-database-1000000
|
||||
[max-values-per-tag]: https://docs.influxdata.com/influxdb/latest/administration/config/#max-values-per-tag-100000
|
||||
[tsi]: https://docs.influxdata.com/influxdb/latest/concepts/time-series-index/
|
||||
[series cardinality]: https://docs.influxdata.com/influxdb/latest/query_language/spec/#show-cardinality
|
||||
[influx-docs]: https://docs.influxdata.com/influxdb/latest/
|
||||
|
|
Loading…
Reference in New Issue