fix: tsi index should compact old or too-large log files (#21943)

* fix: tsi index should compact old log files that are too large

* chore: run automated formatter

* chore: update changelog

* fix: review comments
pull/21973/head
Sam Arnold 2021-07-26 17:40:15 -04:00 committed by GitHub
parent dec56f9c31
commit b64c2c3dcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 208 additions and 76 deletions

View File

@ -19,6 +19,7 @@
- [#21895](https://github.com/influxdata/influxdb/pull/21895): fix: systemd unit should block on startup until http endpoint is ready
- [#21863](https://github.com/influxdata/influxdb/pull/21863): fix: export example and fix adjacent shards
- [#21934](https://github.com/influxdata/influxdb/pull/21934): chore: use community maintained golang-jwt
- [#21943](https://github.com/influxdata/influxdb/pull/21943): fix: tsi index should compact old or too-large log files
v1.9.2 [unreleased]
- [#21631](https://github.com/influxdata/influxdb/pull/21631): fix: group by returns multiple results per group in some circumstances

View File

@ -457,6 +457,22 @@ func IndexShard(sfile *tsdb.SeriesFile, dataDir, walDir string, maxLogFileSize i
return err
}
log.Info("Reopening TSI index with max-index-log-file-size=1 to fully compact log files")
compactingIndex := tsi1.NewIndex(sfile, "",
tsi1.WithPath(tmpPath),
tsi1.WithMaximumLogFileSize(1),
)
if err := compactingIndex.Open(); err != nil {
return err
}
compactingIndex.Compact()
compactingIndex.Wait()
// Close TSI index.
log.Info("re-closing tsi index")
if err := compactingIndex.Close(); err != nil {
return err
}
// Rename TSI to standard path.
log.Info("Moving tsi to permanent location")
return os.Rename(tmpPath, indexPath)

View File

@ -8,7 +8,6 @@ import (
"time"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/errlist"
"github.com/influxdata/influxdb/cmd/influx_tools/internal/format/binary"
"github.com/influxdata/influxdb/cmd/influx_tools/server"
"github.com/influxdata/influxdb/services/meta"

View File

@ -16,12 +16,11 @@ import (
"strconv"
"strings"
gzip "github.com/klauspost/pgzip"
"github.com/influxdata/influxdb/cmd/influxd/backup_util"
tarstream "github.com/influxdata/influxdb/pkg/tar"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/services/snapshotter"
gzip "github.com/klauspost/pgzip"
)
// Command represents the program execution for "influxd restore".

View File

@ -4,7 +4,6 @@ import (
"time"
"github.com/influxdata/influxdb/kit/platform/errors"
"github.com/prometheus/client_golang/prometheus"
)

View File

@ -9,13 +9,12 @@ import (
"time"
"github.com/go-chi/chi"
"github.com/prometheus/client_golang/prometheus"
"github.com/uber/jaeger-client-go"
"github.com/influxdata/httprouter"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/uber/jaeger-client-go"
)
// LogError adds a span log for an error.

View File

@ -10,7 +10,6 @@ import (
"net/http"
"github.com/influxdata/influxdb/kit/platform/errors"
"go.uber.org/zap"
)

View File

@ -8,10 +8,9 @@ import (
"strings"
"time"
"github.com/go-chi/chi"
"github.com/influxdata/influxdb/kit/platform"
"github.com/influxdata/influxdb/kit/platform/errors"
"github.com/go-chi/chi"
"github.com/influxdata/influxdb/kit/tracing"
ua "github.com/mileusna/useragent"
"github.com/prometheus/client_golang/prometheus"

View File

@ -6,7 +6,6 @@ import (
"testing"
"github.com/influxdata/influxdb/kit/platform"
"github.com/influxdata/influxdb/pkg/testttp"
"github.com/stretchr/testify/assert"
)

View File

@ -6,13 +6,12 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/flux/csv"
"github.com/influxdata/influxdb/pkg/testing/assert"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/flux"
"github.com/influxdata/flux/csv"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/execute/executetest"
"github.com/influxdata/influxdb/pkg/testing/assert"
"github.com/influxdata/influxdb/query"
)
func TestReturnNoContent(t *testing.T) {

View File

@ -1,9 +1,6 @@
package main
import (
"collectd.org/api"
"collectd.org/network"
"context"
"flag"
"fmt"
@ -11,6 +8,9 @@ import (
"os"
"strconv"
"time"
"collectd.org/api"
"collectd.org/network"
)
var nMeasurments = flag.Int("m", 1, "Number of measurements")

View File

@ -11,7 +11,6 @@ import (
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxql"
)

View File

@ -2,9 +2,8 @@ package meta
import (
"sort"
"time"
"testing"
"time"
)
func TestShardGroupSort(t *testing.T) {

View File

@ -9,9 +9,8 @@ import (
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/pkg/testing/assert"
"github.com/influxdata/influxql"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxql"
)
func init() {

View File

@ -1,14 +1,13 @@
package snapshotter
import (
"archive/tar"
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"archive/tar"
"io/ioutil"
"path/filepath"
"strconv"

View File

@ -4,6 +4,7 @@
package engine // import "github.com/influxdata/influxdb/tsdb/engine"
import (
// Initialize and register tsm1 engine
_ "github.com/influxdata/influxdb/tsdb/engine/tsm1"
)

View File

@ -11,6 +11,7 @@ import (
"strconv"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/cespare/xxhash"
@ -43,6 +44,7 @@ func init() {
idx := NewIndex(sfile, db,
WithPath(path),
WithMaximumLogFileSize(int64(opt.Config.MaxIndexLogFileSize)),
WithMaximumLogFileAge(time.Duration(opt.Config.CompactFullWriteColdDuration)),
WithSeriesIDCacheSize(opt.Config.SeriesIDSetCacheSize),
)
return idx
@ -89,6 +91,12 @@ var WithMaximumLogFileSize = func(size int64) IndexOption {
}
}
var WithMaximumLogFileAge = func(dur time.Duration) IndexOption {
return func(i *Index) {
i.maxLogFileAge = dur
}
}
// DisableFsync disables flushing and syncing of underlying files. Primarily this
// impacts the LogFiles. This option can be set when working with the index in
// an offline manner, for cases where a hard failure can be overcome by re-running the tooling.
@ -131,12 +139,13 @@ type Index struct {
tagValueCacheSize int
// The following may be set when initializing an Index.
path string // Root directory of the index partitions.
disableCompactions bool // Initially disables compactions on the index.
maxLogFileSize int64 // Maximum size of a LogFile before it's compacted.
logfileBufferSize int // The size of the buffer used by the LogFile.
disableFsync bool // Disables flushing buffers and fsyning files. Used when working with indexes offline.
logger *zap.Logger // Index's logger.
path string // Root directory of the index partitions.
disableCompactions bool // Initially disables compactions on the index.
maxLogFileSize int64 // Maximum size of a LogFile before it's compacted.
maxLogFileAge time.Duration // Maximum age of a LogFile before it's compacted
logfileBufferSize int // The size of the buffer used by the LogFile.
disableFsync bool // Disables flushing buffers and fsyning files. Used when working with indexes offline.
logger *zap.Logger // Index's logger.
// The following must be set when initializing an Index.
sfile *tsdb.SeriesFile // series lookup file
@ -162,6 +171,7 @@ func NewIndex(sfile *tsdb.SeriesFile, database string, options ...IndexOption) *
idx := &Index{
tagValueCacheSize: tsdb.DefaultSeriesIDSetCacheSize,
maxLogFileSize: tsdb.DefaultMaxIndexLogFileSize,
maxLogFileAge: tsdb.DefaultCompactFullWriteColdDuration,
logger: zap.NewNop(),
version: Version,
sfile: sfile,
@ -194,6 +204,7 @@ func (i *Index) Bytes() int {
b += int(unsafe.Sizeof(i.path)) + len(i.path)
b += int(unsafe.Sizeof(i.disableCompactions))
b += int(unsafe.Sizeof(i.maxLogFileSize))
b += int(unsafe.Sizeof(i.maxLogFileAge))
b += int(unsafe.Sizeof(i.logger))
b += int(unsafe.Sizeof(i.sfile))
// Do not count SeriesFile because it belongs to the code that constructed this Index.
@ -260,6 +271,7 @@ func (i *Index) Open() error {
for j := 0; j < len(i.partitions); j++ {
p := NewPartition(i.sfile, filepath.Join(i.path, fmt.Sprint(j)))
p.MaxLogFileSize = i.maxLogFileSize
p.MaxLogFileAge = i.maxLogFileAge
p.nosync = i.disableFsync
p.logbufferSize = i.logfileBufferSize
p.logger = i.logger.With(zap.String("tsi1_partition", fmt.Sprint(j+1)))

View File

@ -11,6 +11,7 @@ import (
"sort"
"sync"
"testing"
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
@ -222,6 +223,13 @@ func TestIndex_Open(t *testing.T) {
t.Fatalf("got index version %d, expected %d", got, exp)
}
}
for i := 0; i < int(idx.PartitionN); i++ {
p := idx.PartitionAt(i)
if got, exp := p.NeedsCompaction(), false; got != exp {
t.Fatalf("got needs compaction %v, expected %v", got, exp)
}
}
})
// Reopening an open index should return an error.
@ -298,14 +306,22 @@ func TestIndex_DiskSizeBytes(t *testing.T) {
t.Fatal(err)
}
// Verify on disk size is the same in each stage.
// Each series stores flag(1) + series(uvarint(2)) + len(name)(1) + len(key)(1) + len(value)(1) + checksum(4).
expSize := int64(4 * 9)
idx.RunStateAware(t, func(t *testing.T, state int) {
// Each MANIFEST file is 419 bytes and there are tsi1.DefaultPartitionN of them
expSize := int64(tsi1.DefaultPartitionN * 419)
switch state {
case Initial:
fallthrough
case Reopen:
// In the log file, each series stores flag(1) + series(uvarint(2)) + len(name)(1) + len(key)(1) + len(value)(1) + checksum(4).
expSize += 4 * 9
case PostCompaction:
fallthrough
case PostCompactionReopen:
// For TSI files after a compaction, instead of 4*9, we have encoded measurement names, tag names, etc which is larger
expSize += 2202
}
// Each MANIFEST file is 419 bytes and there are tsi1.DefaultPartitionN of them
expSize += int64(tsi1.DefaultPartitionN * 419)
idx.Run(t, func(t *testing.T) {
if got, exp := idx.DiskSizeBytes(), expSize; got != exp {
t.Fatalf("got %d bytes, expected %d", got, exp)
}
@ -566,7 +582,7 @@ func (idx *Index) Close() error {
}
// Reopen closes and opens the index.
func (idx *Index) Reopen() error {
func (idx *Index) Reopen(maxLogSize int64) error {
if err := idx.Index.Close(); err != nil {
return err
}
@ -578,11 +594,24 @@ func (idx *Index) Reopen() error {
}
partitionN := idx.Index.PartitionN // Remember how many partitions to use.
idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", tsi1.WithPath(idx.Index.Path()))
idx.Index = tsi1.NewIndex(idx.SeriesFile.SeriesFile, "db0", tsi1.WithPath(idx.Index.Path()), tsi1.WithMaximumLogFileSize(maxLogSize))
idx.Index.PartitionN = partitionN
return idx.Open()
}
const (
Initial = iota
Reopen
PostCompaction
PostCompactionReopen
)
func curryState(state int, f func(t *testing.T, state int)) func(t *testing.T) {
return func(t *testing.T) {
f(t, state)
}
}
// Run executes a subtest for each of several different states:
//
// - Immediately
@ -593,27 +622,42 @@ func (idx *Index) Reopen() error {
// The index should always respond in the same fashion regardless of
// how data is stored. This helper allows the index to be easily tested
// in all major states.
func (idx *Index) Run(t *testing.T, fn func(t *testing.T)) {
func (idx *Index) RunStateAware(t *testing.T, fn func(t *testing.T, state int)) {
// Invoke immediately.
t.Run("state=initial", fn)
t.Run("state=initial", curryState(Initial, fn))
// Reopen and invoke again.
if err := idx.Reopen(); err != nil {
if err := idx.Reopen(tsdb.DefaultMaxIndexLogFileSize); err != nil {
t.Fatalf("reopen error: %s", err)
}
t.Run("state=reopen", fn)
t.Run("state=reopen", curryState(Reopen, fn))
// TODO: Request a compaction.
// if err := idx.Compact(); err != nil {
// t.Fatalf("compact error: %s", err)
// }
// t.Run("state=post-compaction", fn)
// Reopen requiring a full compaction of the TSL files and invoke again
idx.Reopen(1)
for {
needsCompaction := false
for i := 0; i < int(idx.PartitionN); i++ {
needsCompaction = needsCompaction || idx.PartitionAt(i).NeedsCompaction()
}
if !needsCompaction {
break
}
time.Sleep(10 * time.Millisecond)
}
t.Run("state=post-compaction", curryState(PostCompaction, fn))
// Reopen and invoke again.
if err := idx.Reopen(); err != nil {
if err := idx.Reopen(tsdb.DefaultMaxIndexLogFileSize); err != nil {
t.Fatalf("post-compaction reopen error: %s", err)
}
t.Run("state=post-compaction-reopen", fn)
t.Run("state=post-compaction-reopen", curryState(PostCompactionReopen, fn))
}
// Run is the same is RunStateAware but for tests that do not depend on compaction state
func (idx *Index) Run(t *testing.T, fn func(t *testing.T)) {
idx.RunStateAware(t, func(t *testing.T, _ int) {
fn(t)
})
}
// CreateSeriesSliceIfNotExists creates multiple series at a time.

View File

@ -14,10 +14,9 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb/pkg/slices"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/bloom"
"github.com/influxdata/influxdb/pkg/slices"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/tsi1"
)

View File

@ -70,6 +70,7 @@ type Partition struct {
// Log file compaction thresholds.
MaxLogFileSize int64
MaxLogFileAge time.Duration
nosync bool // when true, flushing and syncing of LogFile will be disabled.
logbufferSize int // the LogFile's buffer is set to this value.
@ -95,6 +96,7 @@ func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition {
seriesIDSet: tsdb.NewSeriesIDSet(),
MaxLogFileSize: tsdb.DefaultMaxIndexLogFileSize,
MaxLogFileAge: tsdb.DefaultCompactFullWriteColdDuration,
// compactionEnabled: true,
compactionInterrupt: make(chan struct{}),
@ -129,6 +131,7 @@ func (p *Partition) bytes() int {
b += int(unsafe.Sizeof(p.path)) + len(p.path)
b += int(unsafe.Sizeof(p.id)) + len(p.id)
b += int(unsafe.Sizeof(p.MaxLogFileSize))
b += int(unsafe.Sizeof(p.MaxLogFileAge))
b += int(unsafe.Sizeof(p.compactionInterrupt))
b += int(unsafe.Sizeof(p.compactionsDisabled))
b += int(unsafe.Sizeof(p.logger))
@ -237,7 +240,7 @@ func (p *Partition) Open() error {
p.opened = true
// Send a compaction request on start up.
p.compact()
go p.runPeriodicCompaction()
return nil
}
@ -897,7 +900,54 @@ func (p *Partition) compactionsEnabled() bool {
return p.compactionsDisabled == 0
}
func (p *Partition) runPeriodicCompaction() {
// kick off an initial compaction at startup without the optimization check
p.Compact()
// Avoid a race when using Reopen in tests
p.mu.RLock()
closing := p.closing
p.mu.RUnlock()
// check for compactions once an hour (usually not necessary but a nice safety check)
t := time.NewTicker(1 * time.Hour)
defer t.Stop()
for {
select {
case <-closing:
return
case <-t.C:
if p.NeedsCompaction() {
p.Compact()
}
}
}
}
// needsCompaction only requires a read lock and checks if there are files that could be compacted.
// If compact is updated we should also update needsCompaction
func (p *Partition) NeedsCompaction() bool {
p.mu.RLock()
defer p.mu.RUnlock()
if p.needsLogCompaction() {
return true
}
levelCount := make(map[int]int)
maxLevel := len(p.levels) - 2
// If we have 2 log files (level 0), or if we have 2 files at the same level, we should do a compaction.
for _, f := range p.fileSet.files {
level := f.Level()
levelCount[level]++
if level <= maxLevel && levelCount[level] > 1 && !p.levelCompacting[level] {
return true
}
}
return false
}
// compact compacts continguous groups of files that are not currently compacting.
//
// compact requires that mu is write-locked.
func (p *Partition) compact() {
if p.isClosing() {
return
@ -909,6 +959,30 @@ func (p *Partition) compact() {
fs := p.retainFileSet()
defer fs.Release()
// compact any non-active log files first
for _, f := range p.fileSet.files {
if f.Level() == 0 {
logFile := f.(*LogFile) // It is an invariant that a file is level 0 iff it is a log file
if logFile == p.activeLogFile {
continue
}
if p.levelCompacting[0] {
break
}
// Mark the level as compacting.
p.levelCompacting[0] = true
p.currentCompactionN++
go func() {
p.compactLogFile(logFile)
p.mu.Lock()
p.currentCompactionN--
p.levelCompacting[0] = false
p.mu.Unlock()
p.Compact()
}()
}
}
// Iterate over each level we are going to compact.
// We skip the first level (0) because it is log files and they are compacted separately.
// We skip the last level because the files have no higher level to compact into.
@ -961,6 +1035,11 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
assert(len(files) >= 2, "at least two index files are required for compaction")
assert(level > 0, "cannot compact level zero")
// Files have already been retained by caller.
// Ensure files are released only once.
var once sync.Once
defer once.Do(func() { IndexFiles(files).Release() })
// Build a logger for this compaction.
log, logEnd := logger.NewOperation(p.logger, "TSI level compaction", "tsi1_compact_to_level", zap.Int("tsi1_level", level))
defer logEnd()
@ -973,11 +1052,6 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
default:
}
// Files have already been retained by caller.
// Ensure files are released only once.
var once sync.Once
defer once.Do(func() { IndexFiles(files).Release() })
// Track time to compact.
start := time.Now()
@ -1065,13 +1139,21 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
func (p *Partition) Rebuild() {}
// needsLogCompaction returns true if the log file is too big or too old
// The caller must have at least a read lock on the partition
func (p *Partition) needsLogCompaction() bool {
size := p.activeLogFile.Size()
return size >= p.MaxLogFileSize || (size > 0 && p.activeLogFile.modTime.Before(time.Now().Add(-p.MaxLogFileAge)))
}
func (p *Partition) CheckLogFile() error {
// Check log file size under read lock.
if size := func() int64 {
// Check log file under read lock.
needsCompaction := func() bool {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.Size()
}(); size < p.MaxLogFileSize {
return p.needsLogCompaction()
}()
if !needsCompaction {
return nil
}
@ -1082,27 +1164,17 @@ func (p *Partition) CheckLogFile() error {
}
func (p *Partition) checkLogFile() error {
if p.activeLogFile.Size() < p.MaxLogFileSize {
if !p.needsLogCompaction() {
return nil
}
// Swap current log file.
logFile := p.activeLogFile
// Open new log file and insert it into the first position.
if err := p.prependActiveLogFile(); err != nil {
return err
}
// Begin compacting in a background goroutine.
p.currentCompactionN++
go func() {
p.compactLogFile(logFile)
p.mu.Lock()
p.currentCompactionN-- // compaction is now complete
p.mu.Unlock()
p.Compact() // check for new compactions
}()

View File

@ -17,11 +17,10 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb/internal"
"github.com/davecgh/go-spew/spew"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/influxdb/internal"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/deep"
"github.com/influxdata/influxdb/query"