Merge branch 'er-tsi-index-part' of https://github.com/influxdata/influxdb into er-tsi-index-part

pull/9150/head
Ben Johnson 2017-12-19 10:33:02 -07:00
commit 8b2dbf4d83
No known key found for this signature in database
GPG Key ID: 81741CD251883081
50 changed files with 1962 additions and 448 deletions

View File

@ -13,6 +13,8 @@
- [#9162](https://github.com/influxdata/influxdb/pull/9162): Improve inmem index startup performance for high cardinality.
- [#8491](https://github.com/influxdata/influxdb/pull/8491): Add further tsi support for streaming/copying shards.
- [#9181](https://github.com/influxdata/influxdb/pull/9181): Schedule a full compaction after a successful import
- [#9218](https://github.com/influxdata/influxdb/pull/9218): Add Prometheus `/metrics` endpoint.
- [#9213](https://github.com/influxdata/influxdb/pull/9213): Add ability to generate shard digests.
### Bugfixes
@ -22,6 +24,8 @@
- [#9163](https://github.com/influxdata/influxdb/pull/9163): Fix race condition in the merge iterator close method.
- [#9144](https://github.com/influxdata/influxdb/issues/9144): Fix query compilation so multiple nested distinct calls is allowable
- [#8789](https://github.com/influxdata/influxdb/issues/8789): Fix CLI to allow quoted database names in use statement
- [#9208](https://github.com/influxdata/influxdb/pull/9208): Updated client 4xx error message when response body length is zero.
- [#9230](https://github.com/influxdata/influxdb/pull/9230): Remove extraneous newlines from the log.
## v1.4.3 [unreleased]

8
Godeps
View File

@ -1,5 +1,6 @@
collectd.org e84e8af5356e7f47485bbc95c96da6dd7984a67e
github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895
github.com/beorn7/perks 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9
github.com/bmizerany/pat c068ca2f0aacee5ac3681d68e4d0a003b7d1fd2c
github.com/boltdb/bolt 4b1ebc1869ad66568b313d0dc410e2be72670dda
github.com/cespare/xxhash 1b6d2e40c16ba0dfce5c8eac2480ad6e7394819b
@ -8,6 +9,7 @@ github.com/dgrijalva/jwt-go 24c63f56522a87ec5339cc3567883f1039378fdb
github.com/dgryski/go-bits 2ad8d707cc05b1815ce6ff2543bb5e8d8f9298ef
github.com/dgryski/go-bitstream 7d46cd22db7004f0cceb6f7975824b560cf0e486
github.com/gogo/protobuf 1c2b16bc280d6635de6c52fc1471ab962dc36ec9
github.com/golang/protobuf 1e59b77b52bf8e4b449a57e6f79f21226d571845
github.com/golang/snappy d9eb7a3d35ec988b8585d4a0068e462c27d28380
github.com/google/go-cmp 18107e6c56edb2d51f965f7d68e59404f0daee54
github.com/influxdata/influxql c108c5fb9a432242754d18371795aa8099e73fe7
@ -15,10 +17,15 @@ github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967
github.com/influxdata/yamux 1f58ded512de5feabbe30b60c7d33a7a896c5f16
github.com/influxdata/yarpc 036268cdec22b7074cd6d50cc6d7315c667063c7
github.com/jwilder/encoding 27894731927e49b0a9023f00312be26733744815
github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c
github.com/opentracing/opentracing-go 1361b9cd60be79c4c3a7fa9841b3c132e40066a7
github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447
github.com/peterh/liner 88609521dc4b6c858fd4c98b628147da928ce4ac
github.com/philhofer/fwd 1612a298117663d7bc9a760ae20d383413859798
github.com/prometheus/client_golang 661e31bf844dfca9aeba15f27ea8aa0d485ad212
github.com/prometheus/client_model 99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c
github.com/prometheus/common 2e54d0b93cba2fd133edc32211dcc32c06ef72ca
github.com/prometheus/procfs a6e9df898b1336106c743392c48ee0b71f5c4efa
github.com/retailnext/hllpp 38a7bb71b483e855d35010808143beaf05b67f9d
github.com/spaolacci/murmur3 0d12bf811670bf6a1a63828dfbd003eded177fce
github.com/tinylib/msgp ad0ff2e232ad2e37faf67087fb24bf8d04a8ce20
@ -30,3 +37,4 @@ golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd
golang.org/x/net 9dfe39835686865bff950a07b394c12a98ddc811
golang.org/x/sys 062cd7e4e68206d8bab9b18396626e855c992658
golang.org/x/text a71fd10341b064c10f4a81ceac72bcf70f26ea34
golang.org/x/time 6dc17368e09b0e8634d71cac8168d853e869a0c7

View File

@ -534,7 +534,7 @@ func (c *client) Query(q Query) (*Response, error) {
// like downstream serving a large file
body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024))
if err != nil || len(body) == 0 {
return nil, fmt.Errorf("expected json response, got %q, with status: %v", cType, resp.StatusCode)
return nil, fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode)
}
return nil, fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body)

View File

@ -240,7 +240,7 @@ func TestClientDownstream400_Query(t *testing.T) {
query := Query{}
_, err := c.Query(query)
expected := fmt.Sprintf(`expected json response, got "text/plain", with status: %v`, http.StatusForbidden)
expected := fmt.Sprintf(`expected json response, got empty body, with status: %v`, http.StatusForbidden)
if err.Error() != expected {
t.Errorf("unexpected error. expected %v, actual %v", expected, err)
}
@ -407,7 +407,7 @@ func TestClientDownstream400_ChunkedQuery(t *testing.T) {
query := Query{Chunked: true}
_, err := c.Query(query)
expected := fmt.Sprintf(`expected json response, got "text/plain", with status: %v`, http.StatusForbidden)
expected := fmt.Sprintf(`expected json response, got empty body, with status: %v`, http.StatusForbidden)
if err.Error() != expected {
t.Errorf("unexpected error. expected %v, actual %v", expected, err)
}

View File

@ -136,7 +136,10 @@ func (cmd *Command) run() error {
defer idx.Close()
for i := 0; i < int(idx.PartitionN); i++ {
if err := func() error {
fs := idx.PartitionAt(i).RetainFileSet()
fs, err := idx.PartitionAt(i).RetainFileSet()
if err != nil {
return err
}
defer fs.Release()
return cmd.printFileSet(sfile, fs)
}(); err != nil {

View File

@ -330,8 +330,8 @@ func (e *StatementExecutor) executeDeleteSeriesStatement(stmt *influxql.DeleteSe
// Convert "now()" to current time.
stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: time.Now().UTC()})
// Locally delete the series.
return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition)
// Locally delete the series. The series will not be removed from the index.
return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition, false)
}
func (e *StatementExecutor) executeDropContinuousQueryStatement(q *influxql.DropContinuousQueryStatement) error {
@ -375,7 +375,7 @@ func (e *StatementExecutor) executeDropSeriesStatement(stmt *influxql.DropSeries
}
// Locally drop the series.
return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition)
return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition, true)
}
func (e *StatementExecutor) executeDropShardStatement(stmt *influxql.DropShardStatement) error {
@ -1375,7 +1375,7 @@ type TSDBStore interface {
DeleteDatabase(name string) error
DeleteMeasurement(database, name string) error
DeleteRetentionPolicy(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr, removeIndex bool) error
DeleteShard(id uint64) error
MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)

View File

@ -79,7 +79,7 @@
# snapshot the cache and write it to a TSM file, freeing up memory
# Valid size suffixes are k, m, or g (case insensitive, 1024 = 1k).
# Values without a size suffix are in bytes.
# cache-snapshot-memory-size = "256m"
# cache-snapshot-memory-size = "25m"
# CacheSnapshotWriteColdDuration is the length of time at
# which the engine will snapshot the cache and write it to

View File

@ -23,7 +23,7 @@ type TSDBStoreMock struct {
DeleteDatabaseFn func(name string) error
DeleteMeasurementFn func(database, name string) error
DeleteRetentionPolicyFn func(database, name string) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr, removeIndex bool) error
DeleteShardFn func(id uint64) error
DiskSizeFn func() (int64, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
@ -77,8 +77,8 @@ func (s *TSDBStoreMock) DeleteMeasurement(database string, name string) error {
func (s *TSDBStoreMock) DeleteRetentionPolicy(database string, name string) error {
return s.DeleteRetentionPolicyFn(database, name)
}
func (s *TSDBStoreMock) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {
return s.DeleteSeriesFn(database, sources, condition)
func (s *TSDBStoreMock) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr, removeIndex bool) error {
return s.DeleteSeriesFn(database, sources, condition, removeIndex)
}
func (s *TSDBStoreMock) DeleteShard(shardID uint64) error {
return s.DeleteShardFn(shardID)

View File

@ -3,6 +3,7 @@ package bloom_test
import (
"encoding/binary"
"fmt"
"os"
"testing"
"github.com/influxdata/influxdb/pkg/bloom"
@ -10,6 +11,10 @@ import (
// Ensure filter can insert values and verify they exist.
func TestFilter_InsertContains(t *testing.T) {
if testing.Short() || os.Getenv("GORACE") != "" || os.Getenv("APPVEYOR") != "" {
t.Skip("Skipping test in short, race and appveyor mode.")
}
// Short, less comprehensive test.
testShortFilter_InsertContains(t)

34
pkg/limiter/write_test.go Normal file
View File

@ -0,0 +1,34 @@
package limiter_test
import (
"bytes"
"io"
"testing"
"time"
"github.com/influxdata/influxdb/pkg/limiter"
)
func TestWriter_Limited(t *testing.T) {
r := bytes.NewReader(bytes.Repeat([]byte{0}, 1024*1024))
limit := 512 * 1024
w := limiter.NewWriter(discardCloser{}, limit, 10*1024*1024)
start := time.Now()
n, err := io.Copy(w, r)
elapsed := time.Since(start)
if err != nil {
t.Error("copy error: ", err)
}
rate := float64(n) / elapsed.Seconds()
if rate > float64(limit) {
t.Errorf("rate limit mismath: exp %f, got %f", float64(limit), rate)
}
}
type discardCloser struct{}
func (d discardCloser) Write(b []byte) (int, error) { return len(b), nil }
func (d discardCloser) Close() error { return nil }

83
pkg/limiter/writer.go Normal file
View File

@ -0,0 +1,83 @@
package limiter
import (
"context"
"io"
"os"
"time"
"golang.org/x/time/rate"
)
type Writer struct {
w io.WriteCloser
limiter Rate
ctx context.Context
}
type Rate interface {
WaitN(ctx context.Context, n int) error
}
func NewRate(bytesPerSec, burstLimit int) Rate {
limiter := rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit)
limiter.AllowN(time.Now(), burstLimit) // spend initial burst
return limiter
}
// NewWriter returns a writer that implements io.Writer with rate limiting.
// The limiter use a token bucket approach and limits the rate to bytesPerSec
// with a maximum burst of burstLimit.
func NewWriter(w io.WriteCloser, bytesPerSec, burstLimit int) *Writer {
limiter := NewRate(bytesPerSec, burstLimit)
return &Writer{
w: w,
ctx: context.Background(),
limiter: limiter,
}
}
// WithRate returns a Writer with the specified rate limiter.
func NewWriterWithRate(w io.WriteCloser, limiter Rate) *Writer {
return &Writer{
w: w,
ctx: context.Background(),
limiter: limiter,
}
}
// Write writes bytes from p.
func (s *Writer) Write(b []byte) (int, error) {
if s.limiter == nil {
return s.w.Write(b)
}
n, err := s.w.Write(b)
if err != nil {
return n, err
}
if err := s.limiter.WaitN(s.ctx, n); err != nil {
return n, err
}
return n, err
}
func (s *Writer) Sync() error {
if f, ok := s.w.(*os.File); ok {
return f.Sync()
}
return nil
}
func (s *Writer) Name() string {
if f, ok := s.w.(*os.File); ok {
return f.Name()
}
return ""
}
func (s *Writer) Close() error {
return s.w.Close()
}

View File

@ -20,9 +20,10 @@ func Map(path string, sz int64) ([]byte, error) {
}
// Use file size if map size is not passed in.
if sz == 0 {
sz = fi.Size()
}
// TODO(edd): test.
// if sz == 0 {
// }
sz = fi.Size()
if fi.Size() == 0 {
return nil, nil
}

View File

@ -123,7 +123,7 @@ func (s *Service) Open() error {
readdir = func(path string) {
files, err := ioutil.ReadDir(path)
if err != nil {
s.Logger.Info(fmt.Sprintf("Unable to read directory %s: %s\n", path, err))
s.Logger.Info(fmt.Sprintf("Unable to read directory %s: %s", path, err))
return
}
@ -134,10 +134,10 @@ func (s *Service) Open() error {
continue
}
s.Logger.Info(fmt.Sprintf("Loading %s\n", fullpath))
s.Logger.Info(fmt.Sprintf("Loading %s", fullpath))
types, err := TypesDBFile(fullpath)
if err != nil {
s.Logger.Info(fmt.Sprintf("Unable to parse collectd types file: %s\n", f.Name()))
s.Logger.Info(fmt.Sprintf("Unable to parse collectd types file: %s", f.Name()))
continue
}
@ -147,7 +147,7 @@ func (s *Service) Open() error {
readdir(s.Config.TypesDB)
s.popts.TypesDB = alltypesdb
} else {
s.Logger.Info(fmt.Sprintf("Loading %s\n", s.Config.TypesDB))
s.Logger.Info(fmt.Sprintf("Loading %s", s.Config.TypesDB))
types, err := TypesDBFile(s.Config.TypesDB)
if err != nil {
return fmt.Errorf("Open(): %s", err)

View File

@ -361,7 +361,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
}
if err := cq.q.SetTimeRange(startTime, endTime); err != nil {
s.Logger.Info(fmt.Sprintf("error setting time range: %s\n", err))
s.Logger.Info(fmt.Sprintf("error setting time range: %s", err))
return false, err
}
@ -377,7 +377,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
// Do the actual processing of the query & writing of results.
res := s.runContinuousQueryAndWriteResult(cq)
if res.Err != nil {
s.Logger.Info(fmt.Sprintf("error: %s. running: %s\n", res.Err, cq.q.String()))
s.Logger.Info(fmt.Sprintf("error: %s. running: %s", res.Err, cq.q.String()))
return false, res.Err
}

View File

@ -34,6 +34,7 @@ import (
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/uuid"
"github.com/influxdata/influxql"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)
@ -170,6 +171,10 @@ func NewHandler(c Config) *Handler {
"status-head",
"HEAD", "/status", false, true, h.serveStatus,
},
Route{
"prometheus-metrics",
"GET", "/metrics", false, true, promhttp.Handler().ServeHTTP,
},
}...)
return h

View File

@ -26,7 +26,7 @@ OUTPUT_DIR=${OUTPUT_DIR-./test-logs}
# Set default parallelism
PARALLELISM=${PARALLELISM-1}
# Set default timeout
TIMEOUT=${TIMEOUT-1200s}
TIMEOUT=${TIMEOUT-1500s}
# Default to deleteing the container
DOCKER_RM=${DOCKER_RM-true}

View File

@ -11,6 +11,7 @@ import (
"net/url"
"os"
"regexp"
"runtime"
"strings"
"sync"
"time"
@ -253,6 +254,15 @@ type LocalServer struct {
Config *run.Config
}
// Open opens the server. If running this test on a 32-bit platform it reduces
// the size of series files so that they can all be addressable in the process.
func (s *LocalServer) Open() error {
if runtime.GOARCH == "386" {
s.Server.TSDBStore.SeriesFileMaxSize = 1 << 27 // 128MB
}
return s.Server.Open()
}
// Close shuts down the server and removes all temporary paths.
func (s *LocalServer) Close() {
s.mu.Lock()

View File

@ -295,6 +295,12 @@ func init() {
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverB","uswest",23.2],["2000-01-03T00:00:00Z","serverA","uswest",200]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "Make sure other points are deleted",
command: `SELECT COUNT(val) FROM cpu WHERE "host" = 'serverA'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "Make sure data wasn't deleted from other database.",
command: `SELECT * FROM cpu`,

View File

@ -26,7 +26,7 @@ const (
// DefaultCacheSnapshotMemorySize is the size at which the engine will
// snapshot the cache and write it to a TSM file, freeing up memory
DefaultCacheSnapshotMemorySize = 256 * 1024 * 1024 // 256MB
DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB
// DefaultCacheSnapshotWriteColdDuration is the length of time at which
// the engine will snapshot the cache and write it to a new TSM file if

View File

@ -45,6 +45,7 @@ type Engine interface {
Export(w io.Writer, basePath string, start time.Time, end time.Time) error
Restore(r io.Reader, basePath string) error
Import(r io.Reader, basePath string) error
Digest() (io.ReadCloser, error)
CreateIterator(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error)
CreateCursor(ctx context.Context, r *CursorRequest) (Cursor, error)
@ -53,7 +54,7 @@ type Engine interface {
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DeleteSeriesRange(itr SeriesIterator, min, max int64) error
DeleteSeriesRange(itr SeriesIterator, min, max int64, removeIndex bool) error
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
SeriesN() int64
@ -146,7 +147,8 @@ type EngineOptions struct {
ShardID uint64
InmemIndex interface{} // shared in-memory index
CompactionLimiter limiter.Fixed
CompactionLimiter limiter.Fixed
CompactionThroughputLimiter limiter.Rate
Config Config
}

View File

@ -15,6 +15,7 @@ package tsm1
import (
"bytes"
"fmt"
"io"
"math"
"os"
"path/filepath"
@ -24,6 +25,7 @@ import (
"sync/atomic"
"time"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/tsdb"
)
@ -251,21 +253,14 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
}
}
// Determine the minimum number of files required for the level. Higher levels are more
// CPU intensive so we only want to include them when we have enough data to make them
// worthwhile.
// minGenerations 1 -> 2
// minGenerations 2 -> 2
// minGenerations 3 -> 4
// minGenerations 4 -> 4
minGenerations := level
if minGenerations%2 != 0 {
minGenerations = level + 1
minGenerations := 4
if level == 1 {
minGenerations = 8
}
var cGroups []CompactionGroup
for _, group := range levelGroups {
for _, chunk := range group.chunk(4) {
for _, chunk := range group.chunk(minGenerations) {
var cGroup CompactionGroup
var hasTombstones bool
for _, gen := range chunk {
@ -323,6 +318,11 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup {
for i := 0; i < len(generations); i++ {
cur := generations[i]
// Skip the file if it's over the max size and contains a full block and it does not have any tombstones
if cur.count() > 2 && cur.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(cur.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !cur.hasTombstones() {
continue
}
// See if this generation is orphan'd which would prevent it from being further
// compacted until a final full compactin runs.
if i < len(generations)-1 {
@ -551,7 +551,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
compactable := []tsmGenerations{}
for _, group := range groups {
//if we don't have enough generations to compact, skip it
if len(group) < 2 && !group.hasTombstones() {
if len(group) < 4 && !group.hasTombstones() {
continue
}
compactable = append(compactable, group)
@ -672,10 +672,18 @@ type Compactor struct {
TSMReader(path string) *TSMReader
}
// RateLimit is the limit for disk writes for all concurrent compactions.
RateLimit limiter.Rate
mu sync.RWMutex
snapshotsEnabled bool
compactionsEnabled bool
// lastSnapshotDuration is the amount of time the last snapshot took to complete.
lastSnapshotDuration time.Duration
snapshotLatencies *latencies
// The channel to signal that any in progress snapshots should be aborted.
snapshotsInterrupt chan struct{}
// The channel to signal that any in progress level compactions should be aborted.
@ -696,6 +704,7 @@ func (c *Compactor) Open() {
c.compactionsEnabled = true
c.snapshotsInterrupt = make(chan struct{})
c.compactionsInterrupt = make(chan struct{})
c.snapshotLatencies = &latencies{values: make([]time.Duration, 4)}
c.files = make(map[string]struct{})
}
@ -770,25 +779,22 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
return nil, errSnapshotsDisabled
}
start := time.Now()
card := cache.Count()
concurrency, maxConcurrency := 1, runtime.GOMAXPROCS(0)/2
if maxConcurrency < 1 {
maxConcurrency = 1
}
if maxConcurrency > 4 {
maxConcurrency = 4
// Enable throttling if we have lower cardinality or snapshots are going fast.
throttle := card < 3e6 && c.snapshotLatencies.avg() < 15*time.Second
// Write snapshost concurrently if cardinality is relatively high.
concurrency := card / 2e6
if concurrency < 1 {
concurrency = 1
}
concurrency = 1
if card >= 3*1024*1024 {
// Special case very high cardinality, use max concurrency and don't throttle writes.
if card >= 3e6 {
concurrency = 4
} else if card >= 1024*1024 {
concurrency = 2
}
if concurrency > maxConcurrency {
concurrency = maxConcurrency
throttle = false
}
splits := cache.Split(concurrency)
@ -802,7 +808,7 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
for i := 0; i < concurrency; i++ {
go func(sp *Cache) {
iter := NewCacheKeyIterator(sp, tsdb.DefaultMaxPointsPerBlock, intC)
files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter)
files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter, throttle)
resC <- res{files: files, err: err}
}(splits[i])
@ -818,10 +824,15 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
files = append(files, result.files...)
}
dur := time.Since(start).Truncate(time.Second)
c.mu.Lock()
// See if we were disabled while writing a snapshot
c.mu.RLock()
enabled = c.snapshotsEnabled
c.mu.RUnlock()
c.lastSnapshotDuration = dur
c.snapshotLatencies.add(time.Since(start))
c.mu.Unlock()
if !enabled {
return nil, errSnapshotsDisabled
@ -889,7 +900,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
return nil, err
}
return c.writeNewFiles(maxGeneration, maxSequence, tsm)
return c.writeNewFiles(maxGeneration, maxSequence, tsm, true)
}
// CompactFull writes multiple smaller TSM files into 1 or more larger files.
@ -970,7 +981,7 @@ func (c *Compactor) removeTmpFiles(files []string) error {
// writeNewFiles writes from the iterator into new TSM files, rotating
// to a new file once it has reached the max TSM file size.
func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([]string, error) {
func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator, throttle bool) ([]string, error) {
// These are the new TSM files written
var files []string
@ -980,7 +991,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([
fileName := filepath.Join(c.Dir, fmt.Sprintf("%09d-%09d.%s.%s", generation, sequence, TSMFileExtension, TmpTSMFileExtension))
// Write as much as possible to this file
err := c.write(fileName, iter)
err := c.write(fileName, iter, throttle)
// We've hit the max file limit and there is more to write. Create a new file
// and continue.
@ -1019,24 +1030,31 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([
return files, nil
}
func (c *Compactor) write(path string, iter KeyIterator) (err error) {
func (c *Compactor) write(path string, iter KeyIterator, throttle bool) (err error) {
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return errCompactionInProgress{err: err}
}
// Create the write for the new TSM file.
var w TSMWriter
var (
w TSMWriter
limitWriter io.Writer = fd
)
if c.RateLimit != nil && throttle {
limitWriter = limiter.NewWriterWithRate(fd, c.RateLimit)
}
// Use a disk based TSM buffer if it looks like we might create a big index
// in memory.
if iter.EstimatedIndexSize() > 64*1024*1024 {
w, err = NewTSMWriterWithDiskBuffer(fd)
w, err = NewTSMWriterWithDiskBuffer(limitWriter)
if err != nil {
return err
}
} else {
w, err = NewTSMWriter(fd)
w, err = NewTSMWriter(limitWriter)
if err != nil {
return err
}
@ -1534,8 +1552,11 @@ func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIte
}
func (c *cacheKeyIterator) EstimatedIndexSize() int {
// We return 0 here since we already have all the entries in memory to write an index.
return 0
var n int
for _, v := range c.order {
n += len(v)
}
return n
}
func (c *cacheKeyIterator) encode() {
@ -1709,3 +1730,30 @@ func (a tsmGenerations) IsSorted() bool {
}
return true
}
type latencies struct {
i int
values []time.Duration
}
func (l *latencies) add(t time.Duration) {
l.values[l.i%len(l.values)] = t
l.i++
}
func (l *latencies) avg() time.Duration {
var n int64
var sum time.Duration
for _, v := range l.values {
if v == 0 {
continue
}
sum += v
n++
}
if n > 0 {
return time.Duration(int64(sum) / n)
}
return time.Duration(0)
}

View File

@ -1461,6 +1461,30 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) {
Path: "06-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "07-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "08-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "09-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "10-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "11-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "12-01.tsm1",
Size: 1 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
@ -1471,7 +1495,7 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) {
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{data[4], data[5]}
expFiles := []tsm1.FileStat{data[4], data[5], data[6], data[7], data[8], data[9], data[10], data[11]}
tsm := cp.PlanLevel(1)
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
@ -1537,55 +1561,6 @@ func TestDefaultPlanner_PlanLevel_SplitFile(t *testing.T) {
}
}
func TestDefaultPlanner_PlanLevel_IsolatedLowLevel(t *testing.T) {
data := []tsm1.FileStat{
tsm1.FileStat{
Path: "01-03.tsm1",
Size: 251 * 1024 * 1024,
},
tsm1.FileStat{
Path: "02-03.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "03-01.tsm1",
Size: 2 * 1024 * 1024 * 1024,
},
tsm1.FileStat{
Path: "04-01.tsm1",
Size: 10 * 1024 * 1024,
},
tsm1.FileStat{
Path: "05-02.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "06-01.tsm1",
Size: 1 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles := []tsm1.FileStat{data[2], data[3]}
tsm := cp.PlanLevel(1)
if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
for i, p := range expFiles {
if got, exp := tsm[0][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
func TestDefaultPlanner_PlanLevel_IsolatedHighLevel(t *testing.T) {
data := []tsm1.FileStat{
tsm1.FileStat{
@ -1802,8 +1777,7 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) {
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]}
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]}
tsm := cp.PlanLevel(1)
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
@ -1815,16 +1789,6 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
if exp, got := len(expFiles2), len(tsm[1]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
for i, p := range expFiles2 {
if got, exp := tsm[1][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}
func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) {
@ -1869,6 +1833,30 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) {
Path: "10-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "11-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "12-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "13-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "14-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "15-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "16-01.tsm1",
Size: 1 * 1024 * 1024,
},
}
cp := tsm1.NewDefaultPlanner(
@ -1879,8 +1867,8 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) {
}, tsdb.DefaultCompactFullWriteColdDuration,
)
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]}
expFiles1 := data[0:8]
expFiles2 := data[8:16]
tsm := cp.PlanLevel(1)
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
@ -2559,25 +2547,41 @@ func TestDefaultPlanner_Plan_ForceFull(t *testing.T) {
Size: 2148728539,
},
tsm1.FileStat{
Path: "000000005-000000002.tsm",
Size: 701863692,
Path: "000000005-000000001.tsm",
Size: 2148340232,
},
tsm1.FileStat{
Path: "000000006-000000002.tsm",
Size: 701863692,
Path: "000000006-000000001.tsm",
Size: 2148356556,
},
tsm1.FileStat{
Path: "000000007-000000002.tsm",
Size: 701863692,
Path: "000000007-000000001.tsm",
Size: 167780181,
},
tsm1.FileStat{
Path: "000000008-000000002.tsm",
Size: 701863692,
Path: "000000008-000000001.tsm",
Size: 2148728539,
},
tsm1.FileStat{
Path: "000000009-000000002.tsm",
Size: 701863692,
},
tsm1.FileStat{
Path: "000000010-000000002.tsm",
Size: 701863692,
},
tsm1.FileStat{
Path: "000000011-000000002.tsm",
Size: 701863692,
},
tsm1.FileStat{
Path: "000000012-000000002.tsm",
Size: 701863692,
},
tsm1.FileStat{
Path: "000000013-000000002.tsm",
Size: 701863692,
},
}
},
}, tsdb.DefaultCompactFullWriteColdDuration,
@ -2615,7 +2619,7 @@ func TestDefaultPlanner_Plan_ForceFull(t *testing.T) {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
if got, exp := len(tsm[0]), 9; got != exp {
if got, exp := len(tsm[0]), 13; got != exp {
t.Fatalf("plan length mismatch: got %v, exp %v", got, exp)
}
cp.Release(tsm)

136
tsdb/engine/tsm1/digest.go Normal file
View File

@ -0,0 +1,136 @@
package tsm1
import (
"bytes"
"fmt"
"io"
"math"
"os"
"path/filepath"
"sort"
)
type DigestOptions struct {
MinTime, MaxTime int64
MinKey, MaxKey []byte
}
// DigestWithOptions writes a digest of dir to w using options to filter by
// time and key range.
func DigestWithOptions(dir string, opts DigestOptions, w io.WriteCloser) error {
if dir == "" {
return fmt.Errorf("dir is required")
}
files, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("*.%s", TSMFileExtension)))
if err != nil {
return err
}
readers := make([]*TSMReader, 0, len(files))
for _, fi := range files {
f, err := os.Open(fi)
if err != nil {
return err
}
r, err := NewTSMReader(f)
if err != nil {
return err
}
readers = append(readers, r)
}
ch := make([]chan seriesKey, 0, len(files))
for _, fi := range files {
f, err := os.Open(fi)
if err != nil {
return err
}
r, err := NewTSMReader(f)
if err != nil {
return err
}
defer r.Close()
s := make(chan seriesKey)
ch = append(ch, s)
go func() {
for i := 0; i < r.KeyCount(); i++ {
key, typ := r.KeyAt(i)
if len(opts.MinKey) > 0 && bytes.Compare(key, opts.MinKey) < 0 {
continue
}
if len(opts.MaxKey) > 0 && bytes.Compare(key, opts.MaxKey) > 0 {
continue
}
s <- seriesKey{key: key, typ: typ}
}
close(s)
}()
}
dw, err := NewDigestWriter(w)
if err != nil {
return err
}
defer dw.Close()
var n int
for key := range merge(ch...) {
ts := &DigestTimeSpan{}
n++
kstr := string(key.key)
for _, r := range readers {
entries := r.Entries(key.key)
for _, entry := range entries {
crc, b, err := r.ReadBytes(&entry, nil)
if err != nil {
return err
}
// Filter blocks that are outside the time filter. If they overlap, we
// still include them.
if entry.MaxTime < opts.MinTime || entry.MinTime > opts.MaxTime {
continue
}
cnt := BlockCount(b)
ts.Add(entry.MinTime, entry.MaxTime, cnt, crc)
}
}
sort.Sort(ts)
if err := dw.WriteTimeSpan(kstr, ts); err != nil {
return err
}
}
return dw.Close()
}
// Digest writes a digest of dir to w of a full shard dir.
func Digest(dir string, w io.WriteCloser) error {
return DigestWithOptions(dir, DigestOptions{
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
}, w)
}
type rwPair struct {
r *TSMReader
w TSMWriter
outf *os.File
}
func (rw *rwPair) close() {
rw.r.Close()
rw.w.Close()
rw.outf.Close()
}

View File

@ -0,0 +1,70 @@
package tsm1
import (
"bufio"
"compress/gzip"
"encoding/binary"
"io"
)
type DigestReader struct {
r io.ReadCloser
gr *gzip.Reader
}
func NewDigestReader(r io.ReadCloser) (*DigestReader, error) {
gr, err := gzip.NewReader(bufio.NewReader(r))
if err != nil {
return nil, err
}
return &DigestReader{r: r, gr: gr}, nil
}
func (w *DigestReader) ReadTimeSpan() (string, *DigestTimeSpan, error) {
var n uint16
if err := binary.Read(w.gr, binary.BigEndian, &n); err != nil {
return "", nil, err
}
b := make([]byte, n)
if _, err := io.ReadFull(w.gr, b); err != nil {
return "", nil, err
}
var cnt uint32
if err := binary.Read(w.gr, binary.BigEndian, &cnt); err != nil {
return "", nil, err
}
ts := &DigestTimeSpan{}
for i := 0; i < int(cnt); i++ {
var min, max int64
var crc uint32
if err := binary.Read(w.gr, binary.BigEndian, &min); err != nil {
return "", nil, err
}
if err := binary.Read(w.gr, binary.BigEndian, &max); err != nil {
return "", nil, err
}
if err := binary.Read(w.gr, binary.BigEndian, &crc); err != nil {
return "", nil, err
}
if err := binary.Read(w.gr, binary.BigEndian, &n); err != nil {
return "", nil, err
}
ts.Add(min, max, int(n), crc)
}
return string(b), ts, nil
}
func (w *DigestReader) Close() error {
if err := w.gr.Close(); err != nil {
return err
}
return w.r.Close()
}

View File

@ -0,0 +1,228 @@
package tsm1_test
import (
"io"
"os"
"path/filepath"
"testing"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
func TestDigest_None(t *testing.T) {
dir := MustTempDir()
dataDir := filepath.Join(dir, "data")
if err := os.Mkdir(dataDir, 0755); err != nil {
t.Fatalf("create data dir: %v", err)
}
df := MustTempFile(dir)
if err := tsm1.Digest(dir, df); err != nil {
t.Fatalf("digest error: %v", err)
}
df, err := os.Open(df.Name())
if err != nil {
t.Fatalf("open error: %v", err)
}
r, err := tsm1.NewDigestReader(df)
if err != nil {
t.Fatalf("NewDigestReader error: %v", err)
}
defer r.Close()
var count int
for {
_, _, err := r.ReadTimeSpan()
if err == io.EOF {
break
}
count++
}
if got, exp := count, 0; got != exp {
t.Fatalf("count mismatch: got %v, exp %v", got, exp)
}
}
func TestDigest_One(t *testing.T) {
dir := MustTempDir()
dataDir := filepath.Join(dir, "data")
if err := os.Mkdir(dataDir, 0755); err != nil {
t.Fatalf("create data dir: %v", err)
}
a1 := tsm1.NewValue(1, 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{a1},
}
MustWriteTSM(dir, 1, writes)
df := MustTempFile(dir)
if err := tsm1.Digest(dir, df); err != nil {
t.Fatalf("digest error: %v", err)
}
df, err := os.Open(df.Name())
if err != nil {
t.Fatalf("open error: %v", err)
}
r, err := tsm1.NewDigestReader(df)
if err != nil {
t.Fatalf("NewDigestReader error: %v", err)
}
defer r.Close()
var count int
for {
key, _, err := r.ReadTimeSpan()
if err == io.EOF {
break
}
if got, exp := key, "cpu,host=A#!~#value"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
count++
}
if got, exp := count, 1; got != exp {
t.Fatalf("count mismatch: got %v, exp %v", got, exp)
}
}
func TestDigest_TimeFilter(t *testing.T) {
dir := MustTempDir()
dataDir := filepath.Join(dir, "data")
if err := os.Mkdir(dataDir, 0755); err != nil {
t.Fatalf("create data dir: %v", err)
}
a1 := tsm1.NewValue(1, 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{a1},
}
MustWriteTSM(dir, 1, writes)
a2 := tsm1.NewValue(2, 2.1)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{a2},
}
MustWriteTSM(dir, 2, writes)
a3 := tsm1.NewValue(3, 3.1)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{a3},
}
MustWriteTSM(dir, 3, writes)
df := MustTempFile(dir)
if err := tsm1.DigestWithOptions(dir, tsm1.DigestOptions{MinTime: 2, MaxTime: 2}, df); err != nil {
t.Fatalf("digest error: %v", err)
}
df, err := os.Open(df.Name())
if err != nil {
t.Fatalf("open error: %v", err)
}
r, err := tsm1.NewDigestReader(df)
if err != nil {
t.Fatalf("NewDigestReader error: %v", err)
}
defer r.Close()
var count int
for {
key, ts, err := r.ReadTimeSpan()
if err == io.EOF {
break
}
if got, exp := key, "cpu,host=A#!~#value"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
for _, tr := range ts.Ranges {
if got, exp := tr.Max, int64(2); got != exp {
t.Fatalf("min time not filtered: got %v, exp %v", got, exp)
}
}
count++
}
if got, exp := count, 1; got != exp {
t.Fatalf("count mismatch: got %v, exp %v", got, exp)
}
}
func TestDigest_KeyFilter(t *testing.T) {
dir := MustTempDir()
dataDir := filepath.Join(dir, "data")
if err := os.Mkdir(dataDir, 0755); err != nil {
t.Fatalf("create data dir: %v", err)
}
a1 := tsm1.NewValue(1, 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{a1},
}
MustWriteTSM(dir, 1, writes)
a2 := tsm1.NewValue(2, 2.1)
writes = map[string][]tsm1.Value{
"cpu,host=B#!~#value": []tsm1.Value{a2},
}
MustWriteTSM(dir, 2, writes)
a3 := tsm1.NewValue(3, 3.1)
writes = map[string][]tsm1.Value{
"cpu,host=C#!~#value": []tsm1.Value{a3},
}
MustWriteTSM(dir, 3, writes)
df := MustTempFile(dir)
if err := tsm1.DigestWithOptions(dir, tsm1.DigestOptions{
MinKey: []byte("cpu,host=B#!~#value"),
MaxKey: []byte("cpu,host=B#!~#value")}, df); err != nil {
t.Fatalf("digest error: %v", err)
}
df, err := os.Open(df.Name())
if err != nil {
t.Fatalf("open error: %v", err)
}
r, err := tsm1.NewDigestReader(df)
if err != nil {
t.Fatalf("NewDigestReader error: %v", err)
}
defer r.Close()
var count int
for {
key, _, err := r.ReadTimeSpan()
if err == io.EOF {
break
}
if got, exp := key, "cpu,host=B#!~#value"; got != exp {
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
}
count++
}
if got, exp := count, 1; got != exp {
t.Fatalf("count mismatch: got %v, exp %v", got, exp)
}
}

View File

@ -0,0 +1,101 @@
package tsm1
import (
"compress/gzip"
"encoding/binary"
"io"
)
type writeFlushCloser interface {
Close() error
Write(b []byte) (int, error)
Flush() error
}
// DigestWriter allows for writing a digest of a shard. A digest is a condensed
// representation of the contents of a shard. It can be scoped to one or more series
// keys, ranges of times or sets of files.
type DigestWriter struct {
w io.WriteCloser
F writeFlushCloser
}
func NewDigestWriter(w io.WriteCloser) (*DigestWriter, error) {
gw := gzip.NewWriter(w)
return &DigestWriter{w: w, F: gw}, nil
}
func (w *DigestWriter) WriteTimeSpan(key string, t *DigestTimeSpan) error {
if err := binary.Write(w.F, binary.BigEndian, uint16(len(key))); err != nil {
return err
}
if _, err := w.F.Write([]byte(key)); err != nil {
return err
}
if err := binary.Write(w.F, binary.BigEndian, uint32(t.Len())); err != nil {
return err
}
for _, tr := range t.Ranges {
if err := binary.Write(w.F, binary.BigEndian, tr.Min); err != nil {
return err
}
if err := binary.Write(w.F, binary.BigEndian, tr.Max); err != nil {
return err
}
if err := binary.Write(w.F, binary.BigEndian, tr.CRC); err != nil {
return err
}
if err := binary.Write(w.F, binary.BigEndian, uint16(tr.N)); err != nil {
return err
}
}
return nil
}
func (w *DigestWriter) Flush() error {
return w.F.Flush()
}
func (w *DigestWriter) Close() error {
if err := w.Flush(); err != nil {
return err
}
if err := w.F.Close(); err != nil {
return err
}
return w.w.Close()
}
type DigestTimeSpan struct {
Ranges []DigestTimeRange
}
func (a DigestTimeSpan) Len() int { return len(a.Ranges) }
func (a DigestTimeSpan) Swap(i, j int) { a.Ranges[i], a.Ranges[j] = a.Ranges[j], a.Ranges[i] }
func (a DigestTimeSpan) Less(i, j int) bool {
return a.Ranges[i].Min < a.Ranges[j].Min
}
func (t *DigestTimeSpan) Add(min, max int64, n int, crc uint32) {
for _, v := range t.Ranges {
if v.Min == min && v.Max == max && v.N == n && v.CRC == crc {
return
}
}
t.Ranges = append(t.Ranges, DigestTimeRange{Min: min, Max: max, N: n, CRC: crc})
}
type DigestTimeRange struct {
Min, Max int64
N int
CRC uint32
}

View File

@ -0,0 +1,61 @@
package tsm1_test
import (
"io"
"os"
"reflect"
"testing"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
func TestEngine_DigestWriterReader(t *testing.T) {
f := MustTempFile("")
w, err := tsm1.NewDigestWriter(f)
if err != nil {
t.Fatalf("NewDigestWriter: %v", err)
}
ts := &tsm1.DigestTimeSpan{}
ts.Add(1, 2, 3, 4)
if err := w.WriteTimeSpan("cpu", ts); err != nil {
t.Fatalf("WriteTimeSpan: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
f, err = os.Open(f.Name())
if err != nil {
t.Fatalf("Open: %v", err)
}
r, err := tsm1.NewDigestReader(f)
if err != nil {
t.Fatalf("NewDigestReader: %v", err)
}
for {
key, ts, err := r.ReadTimeSpan()
if err == io.EOF {
break
} else if err != nil {
t.Fatalf("ReadTimeSpan: %v", err)
}
if exp, got := "cpu", key; exp != got {
t.Fatalf("key mismatch: exp %v, got %v", exp, got)
}
if exp, got := 1, len(ts.Ranges); exp != got {
t.Fatalf("range len mismatch: exp %v, got %v", exp, got)
}
exp := tsm1.DigestTimeRange{Min: 1, Max: 2, N: 3, CRC: 4}
if got := ts.Ranges[0]; !reflect.DeepEqual(exp, got) {
t.Fatalf("time range mismatch: exp %v, got %v", exp, got)
}
}
}

View File

@ -190,6 +190,7 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
c := &Compactor{
Dir: path,
FileStore: fs,
RateLimit: opt.CompactionThroughputLimiter,
}
logger := zap.NewNop()
@ -227,6 +228,55 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
return e
}
// Digest returns a reader for the shard's digest.
func (e *Engine) Digest() (io.ReadCloser, error) {
digestPath := filepath.Join(e.path, "digest.tsd")
// See if there's an existing digest file on disk.
f, err := os.Open(digestPath)
if err == nil {
// There is an existing digest file. Now see if it is still fresh.
fi, err := f.Stat()
if err != nil {
f.Close()
return nil, err
}
if !e.LastModified().After(fi.ModTime()) {
// Existing digest is still fresh so return a reader for it.
return f, nil
}
if err := f.Close(); err != nil {
return nil, err
}
}
// Either no digest existed or the existing one was stale
// so generate a new digest.
// Create a tmp file to write the digest to.
tf, err := os.Create(digestPath + ".tmp")
if err != nil {
return nil, err
}
// Write the new digest to the tmp file.
if err := Digest(e.path, tf); err != nil {
tf.Close()
os.Remove(tf.Name())
return nil, err
}
// Rename the temporary digest file to the actual digest file.
if err := renameFile(tf.Name(), digestPath); err != nil {
return nil, err
}
// Create and return a reader for the new digest file.
return os.Open(digestPath)
}
// SetEnabled sets whether the engine is enabled.
func (e *Engine) SetEnabled(enabled bool) {
e.enableCompactionsOnOpen = enabled
@ -1152,7 +1202,7 @@ func (e *Engine) WritePoints(points []models.Point) error {
}
// DeleteSeriesRange removes the values between min and max (inclusive) from all series
func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) error {
func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64, removeIndex bool) error {
var disableOnce bool
var sz int
@ -1164,6 +1214,7 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro
} else if elem == nil {
break
}
if elem.Expr() != nil {
if v, ok := elem.Expr().(*influxql.BooleanLiteral); !ok || !v.Val {
return errors.New("fields not supported in WHERE clause during deletion")
@ -1188,7 +1239,7 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro
if sz >= deleteFlushThreshold {
// Delete all matching batch.
if err := e.deleteSeriesRange(batch, min, max); err != nil {
if err := e.deleteSeriesRange(batch, min, max, removeIndex); err != nil {
return err
}
batch = batch[:0]
@ -1198,20 +1249,24 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro
if len(batch) > 0 {
// Delete all matching batch.
if err := e.deleteSeriesRange(batch, min, max); err != nil {
if err := e.deleteSeriesRange(batch, min, max, removeIndex); err != nil {
return err
}
batch = batch[:0]
}
e.index.Rebuild()
if removeIndex {
e.index.Rebuild()
}
return nil
}
// deleteSeriesRange removes the values between min and max (inclusive) from all series. This
// does not update the index or disable compactions. This should mainly be called by DeleteSeriesRange
// and not directly.
func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
// deleteSeriesRange removes the values between min and max (inclusive) from all
// series in the TSM engine. If removeIndex is true, then series will also be
// removed from the index.
//
// This should mainly be called by DeleteSeriesRange and not directly.
func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64, removeIndex bool) error {
ts := time.Now().UTC().UnixNano()
if len(seriesKeys) == 0 {
return nil
@ -1307,7 +1362,7 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
// exists now. To reconcile the index, we walk the series keys that still exists
// on disk and cross out any keys that match the passed in series. Any series
// left in the slice at the end do not exist and can be deleted from the index.
// Note: this is inherently racy if writes are occuring to the same measurement/series are
// Note: this is inherently racy if writes are occurring to the same measurement/series are
// being removed. A write could occur and exist in the cache at this point, but we
// would delete it from the index.
minKey := seriesKeys[0]
@ -1371,12 +1426,11 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
i++
}
// Some cache values still exists, leave the series in the index.
if hasCacheValues {
if hasCacheValues || !removeIndex {
continue
}
// Remove the series from the index for this shard
// Remove the series from the index.
if err := e.index.UnassignShard(string(k), e.id, ts); err != nil {
return err
}
@ -1439,7 +1493,8 @@ func (e *Engine) deleteMeasurement(name []byte) error {
return nil
}
defer itr.Close()
return e.DeleteSeriesRange(tsdb.NewSeriesIteratorAdapter(e.sfile, itr), math.MinInt64, math.MaxInt64)
// Delete all associated series and remove them from the index.
return e.DeleteSeriesRange(tsdb.NewSeriesIteratorAdapter(e.sfile, itr), math.MinInt64, math.MaxInt64, true)
}
// ForEachMeasurementName iterates over each measurement name in the engine.
@ -1608,7 +1663,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool {
}
func (e *Engine) compact(quit <-chan struct{}) {
t := time.NewTicker(5 * time.Second)
t := time.NewTicker(time.Second)
defer t.Stop()
for {
@ -1670,15 +1725,15 @@ func (e *Engine) compact(quit <-chan struct{}) {
switch level {
case 1:
if e.compactHiPriorityLevel(level1Groups[0], 1) {
if e.compactHiPriorityLevel(level1Groups[0], 1, false) {
level1Groups = level1Groups[1:]
}
case 2:
if e.compactHiPriorityLevel(level2Groups[0], 2) {
if e.compactHiPriorityLevel(level2Groups[0], 2, false) {
level2Groups = level2Groups[1:]
}
case 3:
if e.compactLoPriorityLevel(level3Groups[0], 3) {
if e.compactLoPriorityLevel(level3Groups[0], 3, true) {
level3Groups = level3Groups[1:]
}
case 4:
@ -1699,8 +1754,8 @@ func (e *Engine) compact(quit <-chan struct{}) {
// compactHiPriorityLevel kicks off compactions using the high priority policy. It returns
// true if the compaction was started
func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int) bool {
s := e.levelCompactionStrategy(grp, true, level)
func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast bool) bool {
s := e.levelCompactionStrategy(grp, fast, level)
if s == nil {
return false
}
@ -1728,8 +1783,8 @@ func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int) bool {
// compactLoPriorityLevel kicks off compactions using the lo priority policy. It returns
// the plans that were not able to be started
func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int) bool {
s := e.levelCompactionStrategy(grp, true, level)
func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int, fast bool) bool {
s := e.levelCompactionStrategy(grp, fast, level)
if s == nil {
return false
}

View File

@ -45,7 +45,7 @@ func TestEngine_DeleteWALLoadMetadata(t *testing.T) {
// Remove series.
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}}
if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil {
if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64, false); err != nil {
t.Fatalf("failed to delete series: %s", err.Error())
}
@ -65,6 +65,145 @@ func TestEngine_DeleteWALLoadMetadata(t *testing.T) {
}
}
// Ensure that the engine can write & read shard digest files.
func TestEngine_Digest(t *testing.T) {
e := MustOpenEngine(inmem.IndexName)
defer e.Close()
if err := e.Open(); err != nil {
t.Fatalf("failed to open tsm1 engine: %s", err.Error())
}
// Create a few points.
points := []models.Point{
MustParsePointString("cpu,host=A value=1.1 1000000000"),
MustParsePointString("cpu,host=B value=1.2 2000000000"),
}
if err := e.WritePoints(points); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
// Force a compaction.
e.ScheduleFullCompaction()
digest := func() ([]span, error) {
// Get a reader for the shard's digest.
r, err := e.Digest()
if err != nil {
return nil, err
}
// Make sure the digest can be read.
dr, err := tsm1.NewDigestReader(r)
if err != nil {
r.Close()
return nil, err
}
defer dr.Close()
got := []span{}
for {
k, s, err := dr.ReadTimeSpan()
if err == io.EOF {
break
} else if err != nil {
return nil, err
}
got = append(got, span{
key: k,
tspan: s,
})
}
return got, nil
}
exp := []span{
span{
key: "cpu,host=A#!~#value",
tspan: &tsm1.DigestTimeSpan{
Ranges: []tsm1.DigestTimeRange{
tsm1.DigestTimeRange{
Min: 1000000000,
Max: 1000000000,
N: 1,
CRC: 1048747083,
},
},
},
},
span{
key: "cpu,host=B#!~#value",
tspan: &tsm1.DigestTimeSpan{
Ranges: []tsm1.DigestTimeRange{
tsm1.DigestTimeRange{
Min: 2000000000,
Max: 2000000000,
N: 1,
CRC: 734984746,
},
},
},
},
}
for n := 0; n < 2; n++ {
got, err := digest()
if err != nil {
t.Fatalf("n = %d: %s", n, err)
}
// Make sure the data in the digest was valid.
if !reflect.DeepEqual(exp, got) {
t.Fatalf("n = %d\nexp = %v\ngot = %v\n", n, exp, got)
}
}
// Test that writing more points causes the digest to be updated.
points = []models.Point{
MustParsePointString("cpu,host=C value=1.1 3000000000"),
}
if err := e.WritePoints(points); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
// Force a compaction.
e.ScheduleFullCompaction()
// Get new digest.
got, err := digest()
if err != nil {
t.Fatal(err)
}
exp = append(exp, span{
key: "cpu,host=C#!~#value",
tspan: &tsm1.DigestTimeSpan{
Ranges: []tsm1.DigestTimeRange{
tsm1.DigestTimeRange{
Min: 3000000000,
Max: 3000000000,
N: 1,
CRC: 2553233514,
},
},
},
})
if !reflect.DeepEqual(exp, got) {
t.Fatalf("\nexp = %v\ngot = %v\n", exp, got)
}
}
type span struct {
key string
tspan *tsm1.DigestTimeSpan
}
// Ensure that the engine will backup any TSM files created since the passed in time
func TestEngine_Backup(t *testing.T) {
sfile := MustOpenSeriesFile()
@ -788,7 +927,7 @@ func TestEngine_CreateIterator_Condition(t *testing.T) {
}
// Ensures that deleting series from TSM files with multiple fields removes all the
/// series
// series from the TSM files but leaves the series in the index intact.
func TestEngine_DeleteSeries(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
@ -797,11 +936,23 @@ func TestEngine_DeleteSeries(t *testing.T) {
p2 := MustParsePointString("cpu,host=B value=1.2 2000000000")
p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")
e := MustOpenEngine(index)
defer e.Close()
e, err := NewEngine(index)
if err != nil {
t.Fatal(err)
}
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
if err := e.Open(); err != nil {
t.Fatal(err)
}
defer e.Close()
for _, p := range []models.Point{p1, p2, p3} {
if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil {
t.Fatalf("create series index error: %v", err)
}
}
if err := e.WritePoints([]models.Point{p1, p2, p3}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
@ -816,7 +967,7 @@ func TestEngine_DeleteSeries(t *testing.T) {
}
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}}
if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil {
if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64, false); err != nil {
t.Fatalf("failed to delete series: %v", err)
}
@ -829,27 +980,68 @@ func TestEngine_DeleteSeries(t *testing.T) {
if _, ok := keys[exp]; !ok {
t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys)
}
// Deleting all the TSM values for a single series should still leave
// the series in the index intact.
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu"))
if err != nil {
t.Fatalf("iterator error: %v", err)
} else if iter == nil {
t.Fatal("nil iterator")
}
defer iter.Close()
var gotKeys []string
expKeys := []string{"cpu,host=A", "cpu,host=B"}
for {
elem, err := iter.Next()
if err != nil {
t.Fatal(err)
}
if elem.SeriesID == 0 {
break
}
// Lookup series.
name, tags := e.sfile.Series(elem.SeriesID)
gotKeys = append(gotKeys, string(models.MakeKey(name, tags)))
}
if !reflect.DeepEqual(gotKeys, expKeys) {
t.Fatalf("got keys %v, expected %v", gotKeys, expKeys)
}
})
}
}
// Ensures that deleting series from TSM files over a range of time deleted the
// series from the TSM files but leaves the series in the index.
func TestEngine_DeleteSeriesRange(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
// Create a few points.
p1 := MustParsePointString("cpu,host=0 value=1.1 6000000000") // Should not be deleted
p1 := MustParsePointString("cpu,host=0 value=1.1 6000000000")
p2 := MustParsePointString("cpu,host=A value=1.2 2000000000")
p3 := MustParsePointString("cpu,host=A value=1.3 3000000000")
p4 := MustParsePointString("cpu,host=B value=1.3 4000000000") // Should not be deleted
p5 := MustParsePointString("cpu,host=B value=1.3 5000000000") // Should not be deleted
p4 := MustParsePointString("cpu,host=B value=1.3 4000000000")
p5 := MustParsePointString("cpu,host=B value=1.3 5000000000")
p6 := MustParsePointString("cpu,host=C value=1.3 1000000000")
p7 := MustParsePointString("mem,host=C value=1.3 1000000000") // Should not be deleted
p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted
p7 := MustParsePointString("mem,host=C value=1.3 1000000000")
p8 := MustParsePointString("disk,host=C value=1.3 1000000000")
e, err := NewEngine(index)
if err != nil {
t.Fatal(err)
}
e := MustOpenEngine(index)
defer e.Close()
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
if err := e.Open(); err != nil {
t.Fatal(err)
}
defer e.Close()
for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} {
if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil {
@ -870,7 +1062,7 @@ func TestEngine_DeleteSeriesRange(t *testing.T) {
}
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=0"), []byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C")}}
if err := e.DeleteSeriesRange(itr, 0, 3000000000); err != nil {
if err := e.DeleteSeriesRange(itr, 0, 3000000000, false); err != nil {
t.Fatalf("failed to delete series: %v", err)
}
@ -884,46 +1076,36 @@ func TestEngine_DeleteSeriesRange(t *testing.T) {
t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys)
}
// Check that the series still exists in the index
iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu"))
// Deleting all the TSM values for a single series should still leave
// the series in the index intact.
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu"))
if err != nil {
t.Fatalf("iterator error: %v", err)
} else if iter == nil {
t.Fatal("nil iterator")
}
defer iter.Close()
elem, err := iter.Next()
if err != nil {
t.Fatal(err)
}
if elem.SeriesID == 0 {
t.Fatalf("series index mismatch: EOF, exp 2 series")
var gotKeys []string
expKeys := []string{"cpu,host=0", "cpu,host=A", "cpu,host=B", "cpu,host=C"}
for {
elem, err := iter.Next()
if err != nil {
t.Fatal(err)
}
if elem.SeriesID == 0 {
break
}
// Lookup series.
name, tags := e.sfile.Series(elem.SeriesID)
gotKeys = append(gotKeys, string(models.MakeKey(name, tags)))
}
// Lookup series.
name, tags := e.sfile.Series(elem.SeriesID)
if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) {
t.Fatalf("series mismatch: got %s, exp %s", got, exp)
}
if got, exp := tags, models.NewTags(map[string]string{"host": "0"}); !got.Equal(exp) {
t.Fatalf("series mismatch: got %s, exp %s", got, exp)
}
if elem, err = iter.Next(); err != nil {
t.Fatal(err)
}
if elem.SeriesID == 0 {
t.Fatalf("series index mismatch: EOF, exp 2 series")
}
// Lookup series.
name, tags = e.sfile.Series(elem.SeriesID)
if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) {
t.Fatalf("series mismatch: got %s, exp %s", got, exp)
}
if got, exp := tags, models.NewTags(map[string]string{"host": "B"}); !got.Equal(exp) {
t.Fatalf("series mismatch: got %s, exp %s", got, exp)
if !reflect.DeepEqual(gotKeys, expKeys) {
t.Fatalf("got keys %v, expected %v", gotKeys, expKeys)
}
})
@ -936,10 +1118,17 @@ func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) {
// Create a few points.
p1 := MustParsePointString("cpu,host=A value=1.1 1000000000") // Should not be deleted
e := MustOpenEngine(index)
defer e.Close()
e, err := NewEngine(index)
if err != nil {
t.Fatal(err)
}
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
if err := e.Open(); err != nil {
t.Fatal(err)
}
defer e.Close()
for _, p := range []models.Point{p1} {
if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil {
@ -960,7 +1149,7 @@ func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) {
}
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}}
if err := e.DeleteSeriesRange(itr, 0, 0); err != nil {
if err := e.DeleteSeriesRange(itr, 0, 0, false); err != nil {
t.Fatalf("failed to delete series: %v", err)
}
@ -1010,12 +1199,18 @@ func TestEngine_LastModified(t *testing.T) {
p2 := MustParsePointString("cpu,host=B value=1.2 2000000000")
p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")
e := MustOpenEngine(index)
defer e.Close()
e, err := NewEngine(index)
if err != nil {
t.Fatal(err)
}
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
e.SetEnabled(false)
if err := e.Open(); err != nil {
t.Fatal(err)
}
defer e.Close()
if err := e.WritePoints([]models.Point{p1, p2, p3}); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
@ -1038,7 +1233,7 @@ func TestEngine_LastModified(t *testing.T) {
}
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}}
if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil {
if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64, false); err != nil {
t.Fatalf("failed to delete series: %v", err)
}
@ -1494,8 +1689,14 @@ func NewEngine(index string) (*Engine, error) {
return nil, err
}
f.Close()
sfile := tsdb.NewSeriesFile(f.Name())
// If we're running on a 32-bit system then reduce the SeriesFile size, so we
// can address is in memory.
if runtime.GOARCH == "386" {
sfile.MaxSize = 1 << 27 // 128MB
}
if err = sfile.Open(); err != nil {
return nil, err
}
@ -1534,7 +1735,13 @@ func NewSeriesFile() *SeriesFile {
}
file.Close()
return &SeriesFile{SeriesFile: tsdb.NewSeriesFile(file.Name())}
s := &SeriesFile{SeriesFile: tsdb.NewSeriesFile(file.Name())}
// If we're running on a 32-bit system then reduce the SeriesFile size, so we
// can address is in memory.
if runtime.GOARCH == "386" {
s.SeriesFile.MaxSize = 1 << 27 // 128MB
}
return s
}
// MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error.

View File

@ -968,7 +968,8 @@ func (f *FileStore) CreateSnapshot() (string, error) {
defer f.mu.RUnlock()
// get a tmp directory name
tmpPath := fmt.Sprintf("%s/%d.%s", f.dir, f.currentTempDirID, TmpTSMFileExtension)
tmpPath := fmt.Sprintf("%d.%s", f.currentTempDirID, TmpTSMFileExtension)
tmpPath = filepath.Join(f.dir, tmpPath)
err := os.Mkdir(tmpPath, 0777)
if err != nil {
return "", err

View File

@ -100,7 +100,7 @@ const (
// The threshold amount data written before we periodically fsync a TSM file. This helps avoid
// long pauses due to very large fsyncs at the end of writing a TSM file.
fsyncEvery = 512 * 1024 * 1024
fsyncEvery = 25 * 1024 * 1024
)
var (
@ -252,6 +252,11 @@ type indexBlock struct {
entries *indexEntries
}
type syncer interface {
Name() string
Sync() error
}
// directIndex is a simple in-memory index implementation for a TSM file. The full index
// must fit in memory.
type directIndex struct {
@ -263,6 +268,8 @@ type directIndex struct {
fd *os.File
buf *bytes.Buffer
f syncer
w *bufio.Writer
key []byte
@ -367,6 +374,48 @@ func (d *directIndex) KeyCount() int {
return d.keyCount
}
// copyBuffer is the actual implementation of Copy and CopyBuffer.
// if buf is nil, one is allocated. This is copied from the Go stdlib
// in order to remove the fast path WriteTo calls which circumvent any
// IO throttling as well as to add periodic fsyncs to avoid long stalls.
func copyBuffer(f syncer, dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
if buf == nil {
buf = make([]byte, 32*1024)
}
var lastSync int64
for {
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if nw > 0 {
written += int64(nw)
}
if written-lastSync > fsyncEvery {
if err := f.Sync(); err != nil {
return 0, err
}
lastSync = written
}
if ew != nil {
err = ew
break
}
if nr != nw {
err = io.ErrShortWrite
break
}
}
if er != nil {
if er != io.EOF {
err = er
}
break
}
}
return written, err
}
func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
if _, err := d.flush(d.w); err != nil {
return 0, err
@ -377,7 +426,7 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
}
if d.fd == nil {
return io.Copy(w, d.buf)
return copyBuffer(d.f, w, d.buf, nil)
}
if _, err := d.fd.Seek(0, io.SeekStart); err != nil {
@ -518,7 +567,7 @@ func NewTSMWriter(w io.Writer) (TSMWriter, error) {
func NewTSMWriterWithDiskBuffer(w io.Writer) (TSMWriter, error) {
var index IndexWriter
// Make sure is a File so we can write the temp index alongside it.
if fw, ok := w.(*os.File); ok {
if fw, ok := w.(syncer); ok {
f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return nil, err
@ -664,6 +713,12 @@ func (t *tsmWriter) WriteIndex() error {
return ErrNoValues
}
// Set the destination file on the index so we can periodically
// fsync while writing the index.
if f, ok := t.wrapped.(syncer); ok {
t.index.(*directIndex).f = f
}
// Write the index
if _, err := t.index.WriteTo(t.w); err != nil {
return err

View File

@ -1368,9 +1368,12 @@ func (is IndexSet) measurementAuthorizedSeries(auth query.Authorizer, name []byt
return true
}
// TODO(edd) there isn't a need to return an error when instantiating the iterator.
sitr, _ := is.MeasurementSeriesIDIterator(name)
sitr, err := is.MeasurementSeriesIDIterator(name)
if err != nil || sitr == nil {
return false
}
defer sitr.Close()
for {
series, err := sitr.Next()
if err != nil {
@ -1504,7 +1507,7 @@ func (is IndexSet) MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, e
a = append(a, itr)
}
}
return MergeSeriesIDIterators(a...), nil
return FilterUndeletedSeriesIDIterator(is.SeriesFile, MergeSeriesIDIterators(a...)), nil
}
// ForEachMeasurementTagKey iterates over all tag keys in a measurement and applies
@ -1559,7 +1562,7 @@ func (is IndexSet) TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, e
a = append(a, itr)
}
}
return MergeSeriesIDIterators(a...), nil
return FilterUndeletedSeriesIDIterator(is.SeriesFile, MergeSeriesIDIterators(a...)), nil
}
// TagValueSeriesIDIterator returns a series iterator for a single tag value.
@ -1574,7 +1577,7 @@ func (is IndexSet) TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIt
a = append(a, itr)
}
}
return MergeSeriesIDIterators(a...), nil
return FilterUndeletedSeriesIDIterator(is.SeriesFile, MergeSeriesIDIterators(a...)), nil
}
// MeasurementSeriesByExprIterator returns a series iterator for a measurement
@ -1586,7 +1589,12 @@ func (is IndexSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Ex
return is.MeasurementSeriesIDIterator(name)
}
fieldset := is.FieldSet()
return is.seriesByExprIterator(name, expr, fieldset.CreateFieldsIfNotExists(name))
itr, err := is.seriesByExprIterator(name, expr, fieldset.CreateFieldsIfNotExists(name))
if err != nil {
return nil, err
}
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
}
// MeasurementSeriesKeysByExpr returns a list of series keys matching expr.
@ -1997,6 +2005,7 @@ func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, name []byte, key
} else if itr == nil {
return nil, nil
}
itr = FilterUndeletedSeriesIDIterator(is.SeriesFile, itr)
defer itr.Close()
keyIdxs := make(map[string]int, len(keys))

View File

@ -457,6 +457,9 @@ func (i *Index) TagsForSeries(key string) (models.Tags, error) {
// MeasurementNamesByExpr takes an expression containing only tags and returns a
// list of matching measurement names.
//
// TODO(edd): Remove authorisation from these methods. There shouldn't need to
// be any auth passed down into the index.
func (i *Index) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
i.mu.RLock()
defer i.mu.RUnlock()
@ -603,7 +606,14 @@ func (i *Index) measurementNamesByTagFilters(auth query.Authorizer, filter *TagF
// Is there a series with this matching tag value that is
// authorized to be read?
for _, sid := range seriesIDs {
if s := m.SeriesByID(sid); s != nil && auth.AuthorizeSeriesRead(i.database, m.name, s.Tags()) {
s := m.SeriesByID(sid)
// If the series is deleted then it can't be used to authorise against.
if s != nil && s.Deleted() {
continue
}
if s != nil && auth.AuthorizeSeriesRead(i.database, m.name, s.Tags()) {
// The Range call can return early as a matching
// tag value with an authorized series has been found.
authorized = true
@ -705,7 +715,6 @@ func (i *Index) DropSeries(key []byte, ts int64) error {
// Remove the measurement's reference.
series.Measurement().DropSeries(series)
// Mark the series as deleted.
series.Delete(ts)
@ -748,11 +757,15 @@ func (i *Index) SeriesKeys() []string {
// SetFieldSet sets a shared field set from the engine.
func (i *Index) SetFieldSet(fieldset *tsdb.MeasurementFieldSet) {
i.mu.Lock()
defer i.mu.Unlock()
i.fieldset = fieldset
}
// FieldSet returns the assigned fieldset.
func (i *Index) FieldSet() *tsdb.MeasurementFieldSet {
i.mu.RLock()
defer i.mu.RUnlock()
return i.fieldset
}

View File

@ -55,12 +55,8 @@ func NewMeasurement(database, name string) *Measurement {
// Authorized determines if this Measurement is authorized to be read, according
// to the provided Authorizer. A measurement is authorized to be read if at
// least one series from the measurement is authorized to be read.
// least one undeleted series from the measurement is authorized to be read.
func (m *Measurement) Authorized(auth query.Authorizer) bool {
if auth == nil {
return true
}
// Note(edd): the cost of this check scales linearly with the number of series
// belonging to a measurement, which means it may become expensive when there
// are large numbers of series on a measurement.
@ -68,7 +64,11 @@ func (m *Measurement) Authorized(auth query.Authorizer) bool {
// In the future we might want to push the set of series down into the
// authorizer, but that will require an API change.
for _, s := range m.SeriesByIDMap() {
if auth.AuthorizeSeriesRead(m.database, m.name, s.tags) {
if s != nil && s.Deleted() {
continue
}
if auth == nil || auth.AuthorizeSeriesRead(m.database, m.name, s.tags) {
return true
}
}

View File

@ -472,7 +472,7 @@ func (itr *fileSetSeriesIDIterator) Next() (tsdb.SeriesIDElem, error) {
func (itr *fileSetSeriesIDIterator) Close() error {
itr.once.Do(func() { itr.fs.Release() })
return nil
return itr.itr.Close()
}
// fileSetMeasurementIterator attaches a fileset to an iterator that is released on close.
@ -492,7 +492,7 @@ func (itr *fileSetMeasurementIterator) Next() ([]byte, error) {
func (itr *fileSetMeasurementIterator) Close() error {
itr.once.Do(func() { itr.fs.Release() })
return nil
return itr.itr.Close()
}
// fileSetTagKeyIterator attaches a fileset to an iterator that is released on close.
@ -512,7 +512,7 @@ func (itr *fileSetTagKeyIterator) Next() ([]byte, error) {
func (itr *fileSetTagKeyIterator) Close() error {
itr.once.Do(func() { itr.fs.Release() })
return nil
return itr.itr.Close()
}
// fileSetTagValueIterator attaches a fileset to an iterator that is released on close.
@ -532,5 +532,5 @@ func (itr *fileSetTagValueIterator) Next() ([]byte, error) {
func (itr *fileSetTagValueIterator) Close() error {
itr.once.Do(func() { itr.fs.Release() })
return nil
return itr.itr.Close()
}

View File

@ -25,7 +25,10 @@ func TestFileSet_SeriesIDIterator(t *testing.T) {
// Verify initial set of series.
idx.Run(t, func(t *testing.T) {
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
itr := fs.SeriesFile().SeriesIDIterator()
@ -66,7 +69,10 @@ func TestFileSet_SeriesIDIterator(t *testing.T) {
// Verify additional series.
idx.Run(t, func(t *testing.T) {
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
itr := fs.SeriesFile().SeriesIDIterator()
@ -128,7 +134,10 @@ func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) {
// Verify initial set of series.
idx.Run(t, func(t *testing.T) {
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
itr := fs.MeasurementSeriesIDIterator([]byte("cpu"))
@ -163,7 +172,10 @@ func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) {
// Verify additional series.
idx.Run(t, func(t *testing.T) {
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
itr := fs.MeasurementSeriesIDIterator([]byte("cpu"))
@ -212,7 +224,10 @@ func TestFileSet_MeasurementIterator(t *testing.T) {
// Verify initial set of series.
idx.Run(t, func(t *testing.T) {
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
itr := fs.MeasurementIterator()
@ -239,7 +254,10 @@ func TestFileSet_MeasurementIterator(t *testing.T) {
// Verify additional series.
idx.Run(t, func(t *testing.T) {
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
itr := fs.MeasurementIterator()
@ -278,7 +296,10 @@ func TestFileSet_TagKeyIterator(t *testing.T) {
// Verify initial set of series.
idx.Run(t, func(t *testing.T) {
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
itr := fs.TagKeyIterator([]byte("cpu"))
@ -305,7 +326,10 @@ func TestFileSet_TagKeyIterator(t *testing.T) {
// Verify additional series.
idx.Run(t, func(t *testing.T) {
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
itr := fs.TagKeyIterator([]byte("cpu"))
@ -324,66 +348,3 @@ func TestFileSet_TagKeyIterator(t *testing.T) {
}
})
}
/*
var (
byteSliceResult [][]byte
tagsSliceResult []models.Tags
)
func BenchmarkFileset_FilterNamesTags(b *testing.B) {
sfile := MustOpenSeriesFile()
defer sfile.Close()
idx := MustOpenIndex(sfile.SeriesFile, 1)
defer idx.Close()
allNames := make([][]byte, 0, 2000*1000)
allTags := make([]models.Tags, 0, 2000*1000)
for i := 0; i < 2000; i++ {
for j := 0; j < 1000; j++ {
name := []byte(fmt.Sprintf("measurement-%d", i))
tags := models.NewTags(map[string]string{"host": fmt.Sprintf("server-%d", j)})
allNames = append(allNames, name)
allTags = append(allTags, tags)
}
}
if err := idx.CreateSeriesListIfNotExists(nil, allNames, allTags); err != nil {
b.Fatal(err)
}
// idx.CheckFastCompaction()
fs := idx.PartitionAt(0).RetainFileSet()
defer fs.Release()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
names := [][]byte{
[]byte("foo"),
[]byte("measurement-222"), // filtered
[]byte("measurement-222"), // kept (tags won't match)
[]byte("measurements-1"),
[]byte("measurement-900"), // filtered
[]byte("measurement-44444"),
[]byte("bar"),
}
tags := []models.Tags{
nil,
models.NewTags(map[string]string{"host": "server-297"}), // filtered
models.NewTags(map[string]string{"host": "wrong"}),
nil,
models.NewTags(map[string]string{"host": "server-1026"}), // filtered
models.NewTags(map[string]string{"host": "server-23"}), // kept (measurement won't match)
models.NewTags(map[string]string{"host": "zoo"}),
}
b.StartTimer()
byteSliceResult, tagsSliceResult = fs.FilterNamesTags(names, tags)
}
}
*/

View File

@ -46,7 +46,7 @@ func init() {
// NOTE: Currently, this must not be change once a database is created. Further,
// it must also be a power of 2.
//
var DefaultPartitionN uint64 = 16
var DefaultPartitionN uint64 = 8
// An IndexOption is a functional option for changing the configuration of
// an Index.
@ -271,33 +271,13 @@ func (i *Index) FieldSet() *tsdb.MeasurementFieldSet {
}
// ForEachMeasurementName iterates over all measurement names in the index,
// applying fn. Note, the provided function may be called concurrently, and it
// must be safe to do so.
// applying fn. It returns the first error encountered, if any.
//
// It returns the first error encountered, if any.
// ForEachMeasurementName does not call fn on each partition concurrently so the
// call may provide a non-goroutine safe fn.
func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error {
n := i.availableThreads()
// Store results.
errC := make(chan error, i.PartitionN)
// Run fn on each partition using a fixed number of goroutines.
var pidx uint32 // Index of maximum Partition being worked on.
for k := 0; k < n; k++ {
go func() {
for {
idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
if idx >= len(i.partitions) {
return // No more work.
}
errC <- i.partitions[idx].ForEachMeasurementName(fn)
}
}()
}
// Check for error
for i := 0; i < cap(errC); i++ {
if err := <-errC; err != nil {
for _, p := range i.partitions {
if err := p.ForEachMeasurementName(fn); err != nil {
return err
}
}
@ -730,7 +710,6 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s
errC := make(chan error, i.PartitionN)
var pidx uint32 // Index of maximum Partition being worked on.
var err error
for k := 0; k < n; k++ {
go func() {
for {
@ -741,7 +720,8 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s
// This is safe since there are no readers on keys until all
// the writers are done.
keys[idx], err = i.partitions[idx].MeasurementTagKeysByExpr(name, expr)
tagKeys, err := i.partitions[idx].MeasurementTagKeysByExpr(name, expr)
keys[idx] = tagKeys
errC <- err
}
}()
@ -766,7 +746,11 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s
// DiskSizeBytes returns the size of the index on disk.
func (i *Index) DiskSizeBytes() int64 {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
i.logger.Warn("Index is closing down")
return 0
}
defer fs.Release()
var manifestSize int64
@ -810,16 +794,20 @@ func (i *Index) SnapshotTo(path string) error {
// RetainFileSet returns the set of all files across all partitions.
// This is only needed when all files need to be retained for an operation.
func (i *Index) RetainFileSet() *FileSet {
func (i *Index) RetainFileSet() (*FileSet, error) {
i.mu.RLock()
defer i.mu.RUnlock()
fs, _ := NewFileSet(i.database, nil, i.sfile, nil)
for _, p := range i.partitions {
pfs := p.RetainFileSet()
pfs, err := p.RetainFileSet()
if err != nil {
fs.Close()
return nil, err
}
fs.files = append(fs.files, pfs.files...)
}
return fs
return fs, nil
}
func (i *Index) SetFieldName(measurement []byte, name string) {}

View File

@ -188,7 +188,10 @@ func TestIndex_DropMeasurement(t *testing.T) {
}
// Obtain file set to perform lower level checks.
fs := idx.PartitionAt(0).RetainFileSet()
fs, err := idx.PartitionAt(0).RetainFileSet()
if err != nil {
t.Fatal(err)
}
defer fs.Release()
// Verify tags & values are gone.
@ -288,9 +291,9 @@ func TestIndex_Manifest(t *testing.T) {
func TestIndex_DiskSizeBytes(t *testing.T) {
sfile := MustOpenSeriesFile()
// defer sfile.Close()
defer sfile.Close()
idx := MustOpenIndex(sfile.SeriesFile, tsi1.DefaultPartitionN)
// defer idx.Close()
defer idx.Close()
// Add series to index.
if err := idx.CreateSeriesSliceIfNotExists([]Series{
@ -301,9 +304,14 @@ func TestIndex_DiskSizeBytes(t *testing.T) {
}); err != nil {
t.Fatal(err)
}
fmt.Println(idx.Path())
// Verify on disk size is the same in each stage.
expSize := int64(520) // 419 bytes for MANIFEST and 101 bytes for index file
// There are four series, and each series id is 8 bytes plus one byte for the tombstone header
expSize := int64(4 * 9)
// Each MANIFEST file is 419 bytes and there are tsi1.DefaultPartitionN of them
expSize += int64(tsi1.DefaultPartitionN * 419)
idx.Run(t, func(t *testing.T) {
if got, exp := idx.DiskSizeBytes(), expSize; got != exp {
t.Fatalf("got %d bytes, expected %d", got, exp)

View File

@ -55,7 +55,7 @@ type Partition struct {
// Close management.
once sync.Once
closing chan struct{}
closing chan struct{} // closing is used to inform iterators the partition is closing.
wg sync.WaitGroup
// Fieldset shared with engine.
@ -88,9 +88,8 @@ type Partition struct {
func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition {
return &Partition{
closing: make(chan struct{}),
path: path,
sfile: sfile,
path: path,
sfile: sfile,
// Default compaction thresholds.
MaxLogFileSize: DefaultMaxLogFileSize,
@ -110,6 +109,8 @@ func (i *Partition) Open() error {
i.mu.Lock()
defer i.mu.Unlock()
i.closing = make(chan struct{})
if i.opened {
return errors.New("index partition already open")
}
@ -258,13 +259,14 @@ func (i *Partition) Wait() {
// Close closes the index.
func (i *Partition) Close() error {
// Wait for goroutines to finish outstanding compactions.
i.once.Do(func() { close(i.closing) })
i.wg.Wait()
// Lock index and close remaining
i.mu.Lock()
defer i.mu.Unlock()
i.once.Do(func() { close(i.closing) })
// Close log files.
for _, f := range i.fileSet.files {
f.Close()
@ -274,6 +276,17 @@ func (i *Partition) Close() error {
return nil
}
// closing returns true if the partition is currently closing. It does not require
// a lock so will always return to callers.
// func (i *Partition) closing() bool {
// select {
// case <-i.closing:
// return true
// default:
// return false
// }
// }
// Path returns the path to the partition.
func (i *Partition) Path() string { return i.path }
@ -334,11 +347,15 @@ func (i *Partition) FieldSet() *tsdb.MeasurementFieldSet {
}
// RetainFileSet returns the current fileset and adds a reference count.
func (i *Partition) RetainFileSet() *FileSet {
i.mu.RLock()
fs := i.retainFileSet()
i.mu.RUnlock()
return fs
func (i *Partition) RetainFileSet() (*FileSet, error) {
select {
case <-i.closing:
return nil, errors.New("index is closing")
default:
i.mu.RLock()
defer i.mu.RUnlock()
return i.retainFileSet(), nil
}
}
func (i *Partition) retainFileSet() *FileSet {
@ -374,7 +391,10 @@ func (i *Partition) prependActiveLogFile() error {
// ForEachMeasurementName iterates over all measurement names in the index.
func (i *Partition) ForEachMeasurementName(fn func(name []byte) error) error {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return err
}
defer fs.Release()
itr := fs.MeasurementIterator()
@ -393,7 +413,10 @@ func (i *Partition) ForEachMeasurementName(fn func(name []byte) error) error {
// MeasurementIterator returns an iterator over all measurement names.
func (i *Partition) MeasurementIterator() (tsdb.MeasurementIterator, error) {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil, err
}
itr := fs.MeasurementIterator()
if itr == nil {
fs.Release()
@ -404,14 +427,20 @@ func (i *Partition) MeasurementIterator() (tsdb.MeasurementIterator, error) {
// MeasurementExists returns true if a measurement exists.
func (i *Partition) MeasurementExists(name []byte) (bool, error) {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return false, err
}
defer fs.Release()
m := fs.Measurement(name)
return m != nil && !m.Deleted(), nil
}
func (i *Partition) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil, err
}
defer fs.Release()
itr := fs.MeasurementIterator()
@ -430,13 +459,19 @@ func (i *Partition) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
}
func (i *Partition) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator, error) {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil, err
}
return newFileSetSeriesIDIterator(fs, fs.MeasurementSeriesIDIterator(name)), nil
}
// DropMeasurement deletes a measurement from the index.
func (i *Partition) DropMeasurement(name []byte) error {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return err
}
defer fs.Release()
// Delete all keys and values.
@ -514,7 +549,10 @@ func (i *Partition) DropMeasurement(name []byte) error {
// bulk.
func (i *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags) error {
// Maintain reference count on files in file set.
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return err
}
defer fs.Release()
// Ensure fileset cannot change during insert.
@ -540,7 +578,6 @@ func (i *Partition) DropSeries(key []byte, ts int64) error {
mname := []byte(name)
seriesID := i.sfile.SeriesID(mname, tags, nil)
if err := i.sfile.DeleteSeriesID(seriesID); err != nil {
return err
}
@ -550,7 +587,7 @@ func (i *Partition) DropSeries(key []byte, ts int64) error {
return err
}
// Swap log file, if necesssary.
// Swap log file, if necessary.
if err := i.CheckLogFile(); err != nil {
return err
}
@ -560,28 +597,41 @@ func (i *Partition) DropSeries(key []byte, ts int64) error {
// MeasurementsSketches returns the two sketches for the index by merging all
// instances of the type sketch types in all the index files.
func (i *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil, nil, err
}
defer fs.Release()
return fs.MeasurementsSketches()
}
// HasTagKey returns true if tag key exists.
func (i *Partition) HasTagKey(name, key []byte) (bool, error) {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return false, err
}
defer fs.Release()
return fs.HasTagKey(name, key), nil
}
// HasTagValue returns true if tag value exists.
func (i *Partition) HasTagValue(name, key, value []byte) (bool, error) {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return false, err
}
defer fs.Release()
return fs.HasTagValue(name, key, value), nil
}
// TagKeyIterator returns an iterator for all keys across a single measurement.
func (i *Partition) TagKeyIterator(name []byte) tsdb.TagKeyIterator {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil // TODO(edd): this should probably return an error.
}
itr := fs.TagKeyIterator(name)
if itr == nil {
fs.Release()
@ -592,7 +642,11 @@ func (i *Partition) TagKeyIterator(name []byte) tsdb.TagKeyIterator {
// TagValueIterator returns an iterator for all values across a single key.
func (i *Partition) TagValueIterator(name, key []byte) tsdb.TagValueIterator {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil // TODO(edd): this should probably return an error.
}
itr := fs.TagValueIterator(name, key)
if itr == nil {
fs.Release()
@ -603,7 +657,11 @@ func (i *Partition) TagValueIterator(name, key []byte) tsdb.TagValueIterator {
// TagKeySeriesIDIterator returns a series iterator for all values across a single key.
func (i *Partition) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil // TODO(edd): this should probably return an error.
}
itr := fs.TagKeySeriesIDIterator(name, key)
if itr == nil {
fs.Release()
@ -614,7 +672,11 @@ func (i *Partition) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterat
// TagValueSeriesIDIterator returns a series iterator for a single key value.
func (i *Partition) TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil // TODO(edd): this should probably return an error.
}
itr := fs.TagValueSeriesIDIterator(name, key, value)
if itr == nil {
fs.Release()
@ -625,14 +687,21 @@ func (i *Partition) TagValueSeriesIDIterator(name, key, value []byte) tsdb.Serie
// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression.
func (i *Partition) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return nil, err
}
defer fs.Release()
return fs.MeasurementTagKeysByExpr(name, expr)
}
// ForEachMeasurementTagKey iterates over all tag keys in a measurement.
func (i *Partition) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
fs := i.RetainFileSet()
fs, err := i.RetainFileSet()
if err != nil {
return err
}
defer fs.Release()
itr := fs.TagKeyIterator(name)
@ -655,34 +724,6 @@ func (i *Partition) TagKeyCardinality(name, key []byte) int {
return 0
}
/*
func (i *Partition) MeasurementSeriesKeysByExprIterator(name []byte, condition influxql.Expr) (tsdb.SeriesIDIterator, error) {
fs := i.RetainFileSet()
defer fs.Release()
itr, err := fs.MeasurementSeriesByExprIterator(name, condition, i.fieldset)
if err != nil {
return nil, err
} else if itr == nil {
return nil, nil
}
return itr, err
}
*/
/*
// MeasurementSeriesKeysByExpr returns a list of series keys matching expr.
func (i *Partition) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) {
fs := i.RetainFileSet()
defer fs.Release()
keys, err := fs.MeasurementSeriesKeysByExpr(name, expr, i.fieldset)
// Clone byte slices since they will be used after the fileset is released.
return bytesutil.CloneSlice(keys), err
}
*/
// SnapshotTo creates hard links to the file set into path.
func (i *Partition) SnapshotTo(path string) error {
i.mu.Lock()
@ -720,11 +761,6 @@ func (i *Partition) SetFieldName(measurement []byte, name string) {}
func (i *Partition) RemoveShard(shardID uint64) {}
func (i *Partition) AssignShard(k string, shardID uint64) {}
func (i *Partition) UnassignShard(k string, shardID uint64, ts int64) error {
// This can be called directly once inmem is gone.
return i.DropSeries([]byte(k), ts)
}
// Compact requests a compaction of log files.
func (i *Partition) Compact() {
i.mu.Lock()

View File

@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"reflect"
"runtime"
"testing"
"github.com/influxdata/influxdb/models"
@ -295,7 +296,13 @@ func NewSeriesFile() *SeriesFile {
}
file.Close()
return &SeriesFile{SeriesFile: tsdb.NewSeriesFile(file.Name())}
s := &SeriesFile{SeriesFile: tsdb.NewSeriesFile(file.Name())}
// If we're running on a 32-bit system then reduce the SeriesFile size, so we
// can address is in memory.
if runtime.GOARCH == "386" {
s.SeriesFile.MaxSize = 1 << 27 // 128MB
}
return s
}
// MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error.

View File

@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"reflect"
"runtime"
"testing"
"github.com/influxdata/influxdb/internal"
@ -153,6 +154,12 @@ func MustNewIndex(index string) *Index {
file.Close()
sfile := tsdb.NewSeriesFile(file.Name())
// If we're running on a 32-bit system then reduce the SeriesFile size, so we
// can address is in memory.
if runtime.GOARCH == "386" {
sfile.MaxSize = 1 << 27 // 128MB
}
if err := sfile.Open(); err != nil {
panic(err)
}

View File

@ -31,9 +31,12 @@ const (
SeriesFileTombstoneFlag = 0x01
)
// MaxSeriesFileHashSize is the maximum number of series in a single hash map.
const MaxSeriesFileHashSize = (1 << 20 * SeriesMapLoadFactor) / 100 // (1MB * 90) / 100 == ~943K
// SeriesMapThreshold is the number of series IDs to hold in the in-memory
// series map before compacting and rebuilding the on-disk representation.
const SeriesMapThreshold = 1 << 22 // ~4M ids * 8 bytes per id == ~32MB
const SeriesMapThreshold = 1 << 25 // ~33M ids * 8 bytes per id == 256MB
const (
// DefaultMaxSeriesFileSize is the maximum series file size. Assuming that each

5
tsdb/series_file_386.go Normal file
View File

@ -0,0 +1,5 @@
package tsdb
// DefaultMaxSeriesFileSize is the maximum series file size. Assuming that each
// series key takes, for example, 150 bytes, the limit would support ~3.5M series.
const DefaultMaxSeriesFileSize = (1 << 29) // 512MB

View File

@ -0,0 +1,5 @@
package tsdb
// DefaultMaxSeriesFileSize is the maximum series file size. Assuming that each
// series key takes, for example, 150 bytes, the limit would support ~229M series.
const DefaultMaxSeriesFileSize = 32 * (1 << 30) // 32GB

View File

@ -3,6 +3,7 @@ package tsdb_test
import (
"io/ioutil"
"os"
"runtime"
"testing"
"github.com/influxdata/influxdb/models"
@ -63,7 +64,13 @@ func NewSeriesFile() *SeriesFile {
}
file.Close()
return &SeriesFile{SeriesFile: tsdb.NewSeriesFile(file.Name())}
s := &SeriesFile{SeriesFile: tsdb.NewSeriesFile(file.Name())}
// If we're running on a 32-bit system then reduce the SeriesFile size, so we
// can address is in memory.
if runtime.GOARCH == "386" {
s.SeriesFile.MaxSize = 1 << 27 // 128MB
}
return s
}
// MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error.

View File

@ -71,6 +71,10 @@ var (
// the file's magic number.
ErrUnknownFieldsFormat = errors.New("unknown field index format")
// ErrShardNotIdle is returned when an operation requring the shard to be idle/cold is
// attempted on a hot shard.
ErrShardNotIdle = errors.New("shard not idle")
// fieldsIndexMagicNumber is the file magic number for the fields index file.
fieldsIndexMagicNumber = []byte{0, 6, 1, 3}
)
@ -179,7 +183,7 @@ func NewShard(id uint64, path string, walPath string, sfile *SeriesFile, opt Eng
return s
}
// WithLogger sets the logger on the shard.
// WithLogger sets the logger on the shard. It must be called before Open.
func (s *Shard) WithLogger(log *zap.Logger) {
s.baseLogger = log
engine, err := s.engine()
@ -430,12 +434,15 @@ func (s *Shard) UnloadIndex() {
s.index.RemoveShard(s.id)
}
// Index returns a reference to the underlying index.
// This should only be used by utilities and not directly accessed by the database.
func (s *Shard) Index() Index {
// Index returns a reference to the underlying index. It returns an error if
// the index is nil.
func (s *Shard) Index() (Index, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.index
if err := s.ready(); err != nil {
return nil, err
}
return s.index, nil
}
// IsIdle return true if the shard is not receiving writes and is fully compacted.
@ -711,12 +718,12 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error
}
// DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive)
func (s *Shard) DeleteSeriesRange(itr SeriesIterator, min, max int64) error {
func (s *Shard) DeleteSeriesRange(itr SeriesIterator, min, max int64, removeIndex bool) error {
engine, err := s.engine()
if err != nil {
return err
}
return engine.DeleteSeriesRange(itr, min, max)
return engine.DeleteSeriesRange(itr, min, max, removeIndex)
}
// DeleteMeasurement deletes a measurement and all underlying series.
@ -767,7 +774,11 @@ func (s *Shard) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s
// MeasurementTagKeyValuesByExpr returns all the tag keys values for the
// provided expression.
func (s *Shard) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
indexSet := IndexSet{Indexes: []Index{s.index}, SeriesFile: s.sfile}
index, err := s.Index()
if err != nil {
return nil, err
}
indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile}
return indexSet.MeasurementTagKeyValuesByExpr(auth, name, key, expr, keysSorted)
}
@ -816,7 +827,11 @@ func (s *Shard) CreateIterator(ctx context.Context, m *influxql.Measurement, opt
return NewFieldKeysIterator(s, opt)
case "_series":
// TODO(benbjohnson): Move up to the Shards.CreateIterator().
indexSet := IndexSet{Indexes: []Index{s.index}, SeriesFile: s.sfile}
index, err := s.Index()
if err != nil {
return nil, err
}
indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile}
return NewSeriesPointIterator(indexSet, engine.MeasurementFieldSet(), opt)
case "_tagKeys":
return NewTagKeysIterator(s, opt)
@ -842,6 +857,10 @@ func (s *Shard) FieldDimensions(measurements []string) (fields map[string]influx
fields = make(map[string]influxql.DataType)
dimensions = make(map[string]struct{})
index, err := s.Index()
if err != nil {
return nil, nil, err
}
for _, name := range measurements {
// Handle system sources.
if strings.HasPrefix(name, "_") {
@ -883,7 +902,7 @@ func (s *Shard) FieldDimensions(measurements []string) (fields map[string]influx
}
}
indexSet := IndexSet{Indexes: []Index{s.index}, SeriesFile: s.sfile}
indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile}
if err := indexSet.ForEachMeasurementTagKey([]byte(name), func(key []byte) error {
dimensions[string(key)] = struct{}{}
return nil
@ -1093,6 +1112,22 @@ func (s *Shard) TagKeyCardinality(name, key []byte) int {
return engine.TagKeyCardinality(name, key)
}
// Digest returns a digest of the shard.
func (s *Shard) Digest() (io.ReadCloser, error) {
engine, err := s.engine()
if err != nil {
return nil, err
}
// Make sure the shard is idle/cold. (No use creating a digest of a
// hot shard that is rapidly changing.)
if !engine.IsIdle() {
return nil, ErrShardNotIdle
}
return engine.Digest()
}
// engine safely (under an RLock) returns a reference to the shard's Engine, or
// an error if the Engine is closed, or the shard is currently disabled.
//
@ -1647,10 +1682,15 @@ type Field struct {
func NewFieldKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, error) {
itr := &fieldKeysIterator{shard: sh}
index, err := sh.Index()
if err != nil {
return nil, err
}
// Retrieve measurements from shard. Filter if condition specified.
//
// FGA is currently not supported when retrieving field keys.
indexSet := IndexSet{Indexes: []Index{sh.index}, SeriesFile: sh.sfile}
indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile}
names, err := indexSet.MeasurementNamesByExpr(query.OpenAuthorizer, opt.Condition)
if err != nil {
return nil, err
@ -1724,7 +1764,12 @@ func (itr *fieldKeysIterator) Next() (*query.FloatPoint, error) {
// NewTagKeysIterator returns a new instance of TagKeysIterator.
func NewTagKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, error) {
fn := func(name []byte) ([][]byte, error) {
indexSet := IndexSet{Indexes: []Index{sh.index}, SeriesFile: sh.sfile}
index, err := sh.Index()
if err != nil {
return nil, err
}
indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile}
var keys [][]byte
if err := indexSet.ForEachMeasurementTagKey(name, func(key []byte) error {
keys = append(keys, key)
@ -1741,7 +1786,12 @@ func NewTagKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, e
type measurementKeyFunc func(name []byte) ([][]byte, error)
func newMeasurementKeysIterator(sh *Shard, fn measurementKeyFunc, opt query.IteratorOptions) (*measurementKeysIterator, error) {
indexSet := IndexSet{Indexes: []Index{sh.index}, SeriesFile: sh.sfile}
index, err := sh.Index()
if err != nil {
return nil, err
}
indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile}
itr := &measurementKeysIterator{fn: fn}
names, err := indexSet.MeasurementNamesByExpr(opt.Authorizer, opt.Condition)
if err != nil {

View File

@ -7,6 +7,7 @@ import (
"path"
"path/filepath"
"regexp"
"runtime"
"sort"
"strings"
"testing"
@ -221,6 +222,12 @@ func NewTempShard(index string) *TempShard {
// Create series file.
sfile := NewSeriesFile(filepath.Join(dir, "db0", SeriesFileName))
// If we're running on a 32-bit system then reduce the SeriesFile size, so we
// can address is in memory.
if runtime.GOARCH == "386" {
sfile.MaxSize = 1 << 27 // 128MB
}
if err := sfile.Open(); err != nil {
panic(err)
}

View File

@ -890,6 +890,58 @@ cpu,secret=foo value=100 0
if gotCount != expCount {
return fmt.Errorf("got %d series, expected %d", gotCount, expCount)
}
// Delete series cpu,host=serverA,region=uswest
idx, err := sh.Index()
if err != nil {
return err
}
if err := idx.DropSeries([]byte("cpu,host=serverA,region=uswest"), time.Now().UnixNano()); err != nil {
return err
}
if itr, err = sh.CreateIterator(context.Background(), v.m, query.IteratorOptions{
Aux: v.aux,
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Authorizer: seriesAuthorizer,
}); err != nil {
return err
}
if itr == nil {
return fmt.Errorf("iterator is nil")
}
defer itr.Close()
fitr = itr.(query.FloatIterator)
defer fitr.Close()
expCount = 1
gotCount = 0
for {
f, err := fitr.Next()
if err != nil {
return err
}
if f == nil {
break
}
if got := f.Aux[0].(string); strings.Contains(got, "secret") {
return fmt.Errorf("got a series %q that should be filtered", got)
} else if got := f.Aux[0].(string); strings.Contains(got, "serverA") {
return fmt.Errorf("got a series %q that should be filtered", got)
}
gotCount++
}
if gotCount != expCount {
return fmt.Errorf("got %d series, expected %d", gotCount, expCount)
}
return nil
}
@ -1845,3 +1897,30 @@ func MustTempDir() (string, func()) {
}
return dir, func() { os.RemoveAll(dir) }
}
type seriesIterator struct {
keys [][]byte
}
type series struct {
name []byte
tags models.Tags
deleted bool
}
func (s series) Name() []byte { return s.name }
func (s series) Tags() models.Tags { return s.tags }
func (s series) Deleted() bool { return s.deleted }
func (s series) Expr() influxql.Expr { return nil }
func (itr *seriesIterator) Close() error { return nil }
func (itr *seriesIterator) Next() (tsdb.SeriesElem, error) {
if len(itr.keys) == 0 {
return nil, nil
}
name, tags := models.ParseKeyBytes(itr.keys[0])
s := series{name: name, tags: tags}
itr.keys = itr.keys[1:]
return s, nil
}

View File

@ -41,12 +41,12 @@ const SeriesFileName = "series"
// Store manages shards and indexes for databases.
type Store struct {
mu sync.RWMutex
shards map[uint64]*Shard
databases map[string]struct{}
sfiles map[string]*SeriesFile
path string
mu sync.RWMutex
shards map[uint64]*Shard
databases map[string]struct{}
sfiles map[string]*SeriesFile
SeriesFileMaxSize int64 // Determines size of series file mmap. Can be altered in tests.
path string
// shared per-database indexes, only if using "inmem".
indexes map[string]interface{}
@ -188,6 +188,13 @@ func (s *Store) loadShards() error {
s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim)
// Env var to disable throughput limiter. This will be moved to a config option in 1.5.
if os.Getenv("INFLUXDB_DATA_COMPACTION_THROUGHPUT") == "" {
s.EngineOptions.CompactionThroughputLimiter = limiter.NewRate(48*1024*1024, 48*1024*1024)
} else {
s.Logger.Info("Compaction throughput limit disabled")
}
t := limiter.NewFixed(runtime.GOMAXPROCS(0))
resC := make(chan *res)
var n int
@ -325,18 +332,34 @@ func (s *Store) Close() error {
}
s.mu.Lock()
for _, sfile := range s.sfiles {
// Close out the series files.
if err := sfile.Close(); err != nil {
return err
}
}
s.shards = nil
s.sfiles = map[string]*SeriesFile{}
s.opened = false // Store may now be opened again.
s.mu.Unlock()
return nil
}
// openSeriesFile either returns or creates a series file for the provided
// database. It must be called under a full lock.
func (s *Store) openSeriesFile(database string) (*SeriesFile, error) {
if sfile := s.sfiles[database]; sfile != nil {
return sfile, nil
}
sfile := NewSeriesFile(filepath.Join(s.path, database, SeriesFileName))
// Set a custom mmap size if one has been specified, otherwise the default
// will be used.
if s.SeriesFileMaxSize > 0 {
sfile.MaxSize = s.SeriesFileMaxSize
}
if err := sfile.Open(); err != nil {
return nil, err
}
@ -344,6 +367,16 @@ func (s *Store) openSeriesFile(database string) (*SeriesFile, error) {
return sfile, nil
}
func (s *Store) seriesFile(database string) (*SeriesFile, error) {
s.mu.RLock()
defer s.mu.RUnlock()
sfile, ok := s.sfiles[database]
if !ok {
return nil, fmt.Errorf("no series file present for database %q", database)
}
return sfile, nil
}
// createIndexIfNotExists returns a shared index for a database, if the inmem
// index is being used. If the TSI index is being used, then this method is
// basically a no-op.
@ -404,6 +437,16 @@ func (s *Store) ShardN() int {
return len(s.shards)
}
// ShardDigest returns a digest of the shard with the specified ID.
func (s *Store) ShardDigest(id uint64) (io.ReadCloser, error) {
sh := s.Shard(id)
if sh == nil {
return nil, ErrShardNotFound
}
return sh.Digest()
}
// CreateShard creates a shard with the given id and retention policy on a database.
func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error {
s.mu.Lock()
@ -799,7 +842,12 @@ func (s *Store) SeriesCardinality(database string) (int64, error) {
// TODO(benbjohnson): Series file will be shared by the DB.
var max int64
for _, shard := range shards {
if n := shard.Index().SeriesN(); n > max {
index, err := shard.Index()
if err != nil {
return 0, err
}
if n := index.SeriesN(); n > max {
max = n
}
}
@ -893,7 +941,7 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) {
// DeleteSeries loops through the local shards and deletes the series data for
// the passed in series keys.
func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {
func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr, removeIndex bool) error {
// Expand regex expressions in the FROM clause.
a, err := s.ExpandSources(sources)
if err != nil {
@ -925,6 +973,9 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
defer s.mu.RUnlock()
sfile := s.sfiles[database]
if sfile == nil {
return fmt.Errorf("unable to locate series file for database: %q", database)
}
shards := s.filterShards(byDatabase(database))
// Limit to 1 delete for each shard since expanding the measurement into the list
@ -952,7 +1003,12 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
limit.Take()
defer limit.Release()
indexSet := IndexSet{Indexes: []Index{sh.index}, SeriesFile: sfile}
index, err := sh.Index()
if err != nil {
return err
}
indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sfile}
// Find matching series keys for each measurement.
for _, name := range names {
itr, err := indexSet.MeasurementSeriesByExprIterator([]byte(name), condition)
@ -962,8 +1018,7 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
continue
}
defer itr.Close()
if err := sh.DeleteSeriesRange(NewSeriesIteratorAdapter(sfile, itr), min, max); err != nil {
if err := sh.DeleteSeriesRange(NewSeriesIteratorAdapter(sfile, itr), min, max, removeIndex); err != nil {
return err
}
@ -1018,12 +1073,19 @@ func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond in
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
sfile, err := s.seriesFile(database)
if err != nil {
return nil, err
}
// Build indexset.
is := IndexSet{Indexes: make([]Index, 0, len(shards)), SeriesFile: s.sfiles[database]}
is := IndexSet{Indexes: make([]Index, 0, len(shards)), SeriesFile: sfile}
for _, sh := range shards {
if sh.index != nil {
is.Indexes = append(is.Indexes, sh.index)
index, err := sh.Index()
if err != nil {
return nil, err
}
is.Indexes = append(is.Indexes, index)
}
is = is.DedupeInmemIndexes()
return is.MeasurementNamesByExpr(auth, cond)
@ -1539,16 +1601,31 @@ func (s *Store) monitorShards() {
databases[db] = struct{}{}
dbLock.Unlock()
sfile, err := s.seriesFile(sh.database)
if err != nil {
return err
}
firstShardIndex, err := sh.Index()
if err != nil {
return err
}
index, err := sh.Index()
if err != nil {
return err
}
// inmem shards share the same index instance so just use the first one to avoid
// allocating the same measurements repeatedly
indexSet := IndexSet{Indexes: []Index{shards[0].index}, SeriesFile: s.sfiles[db]}
indexSet := IndexSet{Indexes: []Index{firstShardIndex}, SeriesFile: sfile}
names, err := indexSet.MeasurementNamesByExpr(nil, nil)
if err != nil {
s.Logger.Warn("cannot retrieve measurement names", zap.Error(err))
return nil
}
indexSet.Indexes = []Index{sh.Index()}
indexSet.Indexes = []Index{index}
for _, name := range names {
indexSet.ForEachMeasurementTagKey(name, func(k []byte) error {
n := sh.TagKeyCardinality(name, k)

View File

@ -11,6 +11,7 @@ import (
"path/filepath"
"reflect"
"regexp"
"runtime"
"sort"
"strings"
"testing"
@ -561,7 +562,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) {
}
for _, name := range mnames {
if err := store.DeleteSeries("db", []influxql.Source{&influxql.Measurement{Name: string(name)}}, nil); err != nil {
if err := store.DeleteSeries("db", []influxql.Source{&influxql.Measurement{Name: string(name)}}, nil, true); err != nil {
t.Fatal(err)
}
}
@ -769,7 +770,7 @@ func TestStore_Cardinality_Duplicates(t *testing.T) {
// Creates a large number of series in multiple shards, which will force
// compactions to occur.
func testStoreCardinalityCompactions(t *testing.T, store *Store) {
func testStoreCardinalityCompactions(store *Store) error {
// Generate point data to write to the shards.
series := genTestSeries(300, 5, 5) // 937,500 series
@ -784,44 +785,43 @@ func testStoreCardinalityCompactions(t *testing.T, store *Store) {
// shards such that we never write the same series to multiple shards.
for shardID := 0; shardID < 2; shardID++ {
if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil {
t.Fatalf("create shard: %s", err)
return fmt.Errorf("create shard: %s", err)
}
if err := store.BatchWrite(shardID, points[shardID*468750:(shardID+1)*468750]); err != nil {
t.Fatalf("batch write: %s", err)
return fmt.Errorf("batch write: %s", err)
}
}
// Estimate the series cardinality...
cardinality, err := store.Store.SeriesCardinality("db")
if err != nil {
t.Fatal(err)
return err
}
// Estimated cardinality should be well within 1.5% of the actual cardinality.
if got, exp := math.Abs(float64(cardinality)-float64(expCardinality))/float64(expCardinality), 0.015; got > exp {
t.Errorf("got epsilon of %v for series cardinality %v (expected %v), which is larger than expected %v", got, cardinality, expCardinality, exp)
return fmt.Errorf("got epsilon of %v for series cardinality %v (expected %v), which is larger than expected %v", got, cardinality, expCardinality, exp)
}
// Estimate the measurement cardinality...
if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil {
t.Fatal(err)
return err
}
// Estimated cardinality should be well within 2 of the actual cardinality. (Arbitrary...)
expCardinality = 300
if got, exp := math.Abs(float64(cardinality)-float64(expCardinality)), 2.0; got > exp {
t.Errorf("got measurement cardinality %v, expected upto %v; difference is larger than expected %v", cardinality, expCardinality, exp)
return fmt.Errorf("got measurement cardinality %v, expected upto %v; difference is larger than expected %v", cardinality, expCardinality, exp)
}
return nil
}
func TestStore_Cardinality_Compactions(t *testing.T) {
t.Parallel()
if testing.Short() || os.Getenv("GORACE") != "" || os.Getenv("APPVEYOR") != "" {
t.Skip("Skipping test in short, race and appveyor mode.")
}
test := func(index string) {
test := func(index string) error {
store := NewStore()
store.EngineOptions.Config.Index = "inmem"
store.EngineOptions.Config.MaxSeriesPerDatabase = 0
@ -829,11 +829,15 @@ func TestStore_Cardinality_Compactions(t *testing.T) {
panic(err)
}
defer store.Close()
testStoreCardinalityCompactions(t, store)
return testStoreCardinalityCompactions(store)
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) {
if err := test(index); err != nil {
t.Fatal(err)
}
})
}
}
@ -1010,6 +1014,36 @@ func TestStore_Measurements_Auth(t *testing.T) {
if gotNames != expNames {
return fmt.Errorf("got %d measurements, but expected %d", gotNames, expNames)
}
// Now delete all of the cpu series.
cond, err := influxql.ParseExpr("host = 'serverA' OR region = 'west'")
if err != nil {
return err
}
if err := s.DeleteSeries("db0", nil, cond, true); err != nil {
return err
}
if names, err = s.MeasurementNames(authorizer, "db0", nil); err != nil {
return err
}
// names should not contain any measurements where none of the associated
// series are authorised for reads.
expNames = 1
gotNames = 0
for _, name := range names {
if string(name) == "mem" || string(name) == "cpu" {
return fmt.Errorf("after delete got measurement %q but it should be filtered.", name)
}
gotNames++
}
if gotNames != expNames {
return fmt.Errorf("after delete got %d measurements, but expected %d", gotNames, expNames)
}
return nil
}
@ -1020,6 +1054,7 @@ func TestStore_Measurements_Auth(t *testing.T) {
}
})
}
}
func TestStore_TagKeys_Auth(t *testing.T) {
@ -1072,6 +1107,41 @@ func TestStore_TagKeys_Auth(t *testing.T) {
if gotKeys != expKeys {
return fmt.Errorf("got %d keys, but expected %d", gotKeys, expKeys)
}
// Delete the series with region = west
cond, err := influxql.ParseExpr("region = 'west'")
if err != nil {
return err
}
if err := s.DeleteSeries("db0", nil, cond, true); err != nil {
return err
}
if keys, err = s.TagKeys(authorizer, []uint64{0}, nil); err != nil {
return err
}
// keys should not contain any tag keys associated with a series containing
// a secret tag or the deleted series
expKeys = 2
gotKeys = 0
for _, tk := range keys {
if got, exp := tk.Measurement, "cpu"; got != exp {
return fmt.Errorf("got measurement %q, expected %q", got, exp)
}
for _, key := range tk.Keys {
if key == "secret" || key == "machine" || key == "region" {
return fmt.Errorf("got tag key %q but it should be filtered.", key)
}
gotKeys++
}
}
if gotKeys != expKeys {
return fmt.Errorf("got %d keys, but expected %d", gotKeys, expKeys)
}
return nil
}
@ -1082,6 +1152,7 @@ func TestStore_TagKeys_Auth(t *testing.T) {
}
})
}
}
func TestStore_TagValues_Auth(t *testing.T) {
@ -1136,6 +1207,48 @@ func TestStore_TagValues_Auth(t *testing.T) {
}
}
if gotValues != expValues {
return fmt.Errorf("got %d tags, but expected %d", gotValues, expValues)
}
// Delete the series with values serverA
cond, err := influxql.ParseExpr("host = 'serverA'")
if err != nil {
return err
}
if err := s.DeleteSeries("db0", nil, cond, true); err != nil {
return err
}
values, err = s.TagValues(authorizer, []uint64{0}, &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "_tagKey"},
RHS: &influxql.StringLiteral{Val: "host"},
})
if err != nil {
return err
}
// values should not contain any tag values associated with a series containing
// a secret tag.
expValues = 1
gotValues = 0
for _, tv := range values {
if got, exp := tv.Measurement, "cpu"; got != exp {
return fmt.Errorf("got measurement %q, expected %q", got, exp)
}
for _, v := range tv.Values {
if got, exp := v.Value, "serverD"; got == exp {
return fmt.Errorf("got tag value %q but it should be filtered.", got)
} else if got, exp := v.Value, "serverA"; got == exp {
return fmt.Errorf("got tag value %q but it should be filtered.", got)
}
gotValues++
}
}
if gotValues != expValues {
return fmt.Errorf("got %d tags, but expected %d", gotValues, expValues)
}
@ -1400,6 +1513,12 @@ func NewStore() *Store {
if testing.Verbose() {
s.WithLogger(logger.New(os.Stdout))
}
if runtime.GOARCH == "386" {
// Set the mmap size to something addressable in the process.
s.SeriesFileMaxSize = 1 << 27 // 128MB
}
return s
}
@ -1408,6 +1527,7 @@ func NewStore() *Store {
func MustOpenStore(index string) *Store {
s := NewStore()
s.EngineOptions.IndexVersion = index
if err := s.Open(); err != nil {
panic(err)
}
@ -1419,9 +1539,14 @@ func (s *Store) Reopen() error {
if err := s.Store.Close(); err != nil {
return err
}
// Keep old max series file size.
seriesMapSize := s.Store.SeriesFileMaxSize
s.Store = tsdb.NewStore(s.Path())
s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal")
return s.Open()
s.SeriesFileMaxSize = seriesMapSize
return s.Store.Open()
}
// Close closes the store and removes the underlying data.