diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ddce5f840..01a3de1baa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ - [#9162](https://github.com/influxdata/influxdb/pull/9162): Improve inmem index startup performance for high cardinality. - [#8491](https://github.com/influxdata/influxdb/pull/8491): Add further tsi support for streaming/copying shards. - [#9181](https://github.com/influxdata/influxdb/pull/9181): Schedule a full compaction after a successful import +- [#9218](https://github.com/influxdata/influxdb/pull/9218): Add Prometheus `/metrics` endpoint. +- [#9213](https://github.com/influxdata/influxdb/pull/9213): Add ability to generate shard digests. ### Bugfixes @@ -22,6 +24,8 @@ - [#9163](https://github.com/influxdata/influxdb/pull/9163): Fix race condition in the merge iterator close method. - [#9144](https://github.com/influxdata/influxdb/issues/9144): Fix query compilation so multiple nested distinct calls is allowable - [#8789](https://github.com/influxdata/influxdb/issues/8789): Fix CLI to allow quoted database names in use statement +- [#9208](https://github.com/influxdata/influxdb/pull/9208): Updated client 4xx error message when response body length is zero. +- [#9230](https://github.com/influxdata/influxdb/pull/9230): Remove extraneous newlines from the log. ## v1.4.3 [unreleased] diff --git a/Godeps b/Godeps index bc72df5f0d..33f3e2f83a 100644 --- a/Godeps +++ b/Godeps @@ -1,5 +1,6 @@ collectd.org e84e8af5356e7f47485bbc95c96da6dd7984a67e github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895 +github.com/beorn7/perks 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9 github.com/bmizerany/pat c068ca2f0aacee5ac3681d68e4d0a003b7d1fd2c github.com/boltdb/bolt 4b1ebc1869ad66568b313d0dc410e2be72670dda github.com/cespare/xxhash 1b6d2e40c16ba0dfce5c8eac2480ad6e7394819b @@ -8,6 +9,7 @@ github.com/dgrijalva/jwt-go 24c63f56522a87ec5339cc3567883f1039378fdb github.com/dgryski/go-bits 2ad8d707cc05b1815ce6ff2543bb5e8d8f9298ef github.com/dgryski/go-bitstream 7d46cd22db7004f0cceb6f7975824b560cf0e486 github.com/gogo/protobuf 1c2b16bc280d6635de6c52fc1471ab962dc36ec9 +github.com/golang/protobuf 1e59b77b52bf8e4b449a57e6f79f21226d571845 github.com/golang/snappy d9eb7a3d35ec988b8585d4a0068e462c27d28380 github.com/google/go-cmp 18107e6c56edb2d51f965f7d68e59404f0daee54 github.com/influxdata/influxql c108c5fb9a432242754d18371795aa8099e73fe7 @@ -15,10 +17,15 @@ github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967 github.com/influxdata/yamux 1f58ded512de5feabbe30b60c7d33a7a896c5f16 github.com/influxdata/yarpc 036268cdec22b7074cd6d50cc6d7315c667063c7 github.com/jwilder/encoding 27894731927e49b0a9023f00312be26733744815 +github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c github.com/opentracing/opentracing-go 1361b9cd60be79c4c3a7fa9841b3c132e40066a7 github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447 github.com/peterh/liner 88609521dc4b6c858fd4c98b628147da928ce4ac github.com/philhofer/fwd 1612a298117663d7bc9a760ae20d383413859798 +github.com/prometheus/client_golang 661e31bf844dfca9aeba15f27ea8aa0d485ad212 +github.com/prometheus/client_model 99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c +github.com/prometheus/common 2e54d0b93cba2fd133edc32211dcc32c06ef72ca +github.com/prometheus/procfs a6e9df898b1336106c743392c48ee0b71f5c4efa github.com/retailnext/hllpp 38a7bb71b483e855d35010808143beaf05b67f9d github.com/spaolacci/murmur3 0d12bf811670bf6a1a63828dfbd003eded177fce github.com/tinylib/msgp ad0ff2e232ad2e37faf67087fb24bf8d04a8ce20 @@ -30,3 +37,4 @@ golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd golang.org/x/net 9dfe39835686865bff950a07b394c12a98ddc811 golang.org/x/sys 062cd7e4e68206d8bab9b18396626e855c992658 golang.org/x/text a71fd10341b064c10f4a81ceac72bcf70f26ea34 +golang.org/x/time 6dc17368e09b0e8634d71cac8168d853e869a0c7 diff --git a/client/v2/client.go b/client/v2/client.go index 2870cf8e50..77d44f2b34 100644 --- a/client/v2/client.go +++ b/client/v2/client.go @@ -534,7 +534,7 @@ func (c *client) Query(q Query) (*Response, error) { // like downstream serving a large file body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024)) if err != nil || len(body) == 0 { - return nil, fmt.Errorf("expected json response, got %q, with status: %v", cType, resp.StatusCode) + return nil, fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode) } return nil, fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body) diff --git a/client/v2/client_test.go b/client/v2/client_test.go index 5edc866ef9..3094bc4cfc 100644 --- a/client/v2/client_test.go +++ b/client/v2/client_test.go @@ -240,7 +240,7 @@ func TestClientDownstream400_Query(t *testing.T) { query := Query{} _, err := c.Query(query) - expected := fmt.Sprintf(`expected json response, got "text/plain", with status: %v`, http.StatusForbidden) + expected := fmt.Sprintf(`expected json response, got empty body, with status: %v`, http.StatusForbidden) if err.Error() != expected { t.Errorf("unexpected error. expected %v, actual %v", expected, err) } @@ -407,7 +407,7 @@ func TestClientDownstream400_ChunkedQuery(t *testing.T) { query := Query{Chunked: true} _, err := c.Query(query) - expected := fmt.Sprintf(`expected json response, got "text/plain", with status: %v`, http.StatusForbidden) + expected := fmt.Sprintf(`expected json response, got empty body, with status: %v`, http.StatusForbidden) if err.Error() != expected { t.Errorf("unexpected error. expected %v, actual %v", expected, err) } diff --git a/cmd/influx_inspect/dumptsi/dumptsi.go b/cmd/influx_inspect/dumptsi/dumptsi.go index 7a10018493..04eda19d11 100644 --- a/cmd/influx_inspect/dumptsi/dumptsi.go +++ b/cmd/influx_inspect/dumptsi/dumptsi.go @@ -136,7 +136,10 @@ func (cmd *Command) run() error { defer idx.Close() for i := 0; i < int(idx.PartitionN); i++ { if err := func() error { - fs := idx.PartitionAt(i).RetainFileSet() + fs, err := idx.PartitionAt(i).RetainFileSet() + if err != nil { + return err + } defer fs.Release() return cmd.printFileSet(sfile, fs) }(); err != nil { diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 3c77ed23aa..8de99be6f0 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -330,8 +330,8 @@ func (e *StatementExecutor) executeDeleteSeriesStatement(stmt *influxql.DeleteSe // Convert "now()" to current time. stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: time.Now().UTC()}) - // Locally delete the series. - return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition) + // Locally delete the series. The series will not be removed from the index. + return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition, false) } func (e *StatementExecutor) executeDropContinuousQueryStatement(q *influxql.DropContinuousQueryStatement) error { @@ -375,7 +375,7 @@ func (e *StatementExecutor) executeDropSeriesStatement(stmt *influxql.DropSeries } // Locally drop the series. - return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition) + return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition, true) } func (e *StatementExecutor) executeDropShardStatement(stmt *influxql.DropShardStatement) error { @@ -1375,7 +1375,7 @@ type TSDBStore interface { DeleteDatabase(name string) error DeleteMeasurement(database, name string) error DeleteRetentionPolicy(database, name string) error - DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error + DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr, removeIndex bool) error DeleteShard(id uint64) error MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 5f9a4e8f78..c56eb1ca63 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -79,7 +79,7 @@ # snapshot the cache and write it to a TSM file, freeing up memory # Valid size suffixes are k, m, or g (case insensitive, 1024 = 1k). # Values without a size suffix are in bytes. - # cache-snapshot-memory-size = "256m" + # cache-snapshot-memory-size = "25m" # CacheSnapshotWriteColdDuration is the length of time at # which the engine will snapshot the cache and write it to diff --git a/internal/tsdb_store.go b/internal/tsdb_store.go index e2f27ca1a8..95a79a5b59 100644 --- a/internal/tsdb_store.go +++ b/internal/tsdb_store.go @@ -23,7 +23,7 @@ type TSDBStoreMock struct { DeleteDatabaseFn func(name string) error DeleteMeasurementFn func(database, name string) error DeleteRetentionPolicyFn func(database, name string) error - DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error + DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr, removeIndex bool) error DeleteShardFn func(id uint64) error DiskSizeFn func() (int64, error) ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error) @@ -77,8 +77,8 @@ func (s *TSDBStoreMock) DeleteMeasurement(database string, name string) error { func (s *TSDBStoreMock) DeleteRetentionPolicy(database string, name string) error { return s.DeleteRetentionPolicyFn(database, name) } -func (s *TSDBStoreMock) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error { - return s.DeleteSeriesFn(database, sources, condition) +func (s *TSDBStoreMock) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr, removeIndex bool) error { + return s.DeleteSeriesFn(database, sources, condition, removeIndex) } func (s *TSDBStoreMock) DeleteShard(shardID uint64) error { return s.DeleteShardFn(shardID) diff --git a/pkg/bloom/bloom_test.go b/pkg/bloom/bloom_test.go index 528ee87ffa..a93165b81a 100644 --- a/pkg/bloom/bloom_test.go +++ b/pkg/bloom/bloom_test.go @@ -3,6 +3,7 @@ package bloom_test import ( "encoding/binary" "fmt" + "os" "testing" "github.com/influxdata/influxdb/pkg/bloom" @@ -10,6 +11,10 @@ import ( // Ensure filter can insert values and verify they exist. func TestFilter_InsertContains(t *testing.T) { + if testing.Short() || os.Getenv("GORACE") != "" || os.Getenv("APPVEYOR") != "" { + t.Skip("Skipping test in short, race and appveyor mode.") + } + // Short, less comprehensive test. testShortFilter_InsertContains(t) diff --git a/pkg/limiter/write_test.go b/pkg/limiter/write_test.go new file mode 100644 index 0000000000..094e15d067 --- /dev/null +++ b/pkg/limiter/write_test.go @@ -0,0 +1,34 @@ +package limiter_test + +import ( + "bytes" + "io" + "testing" + "time" + + "github.com/influxdata/influxdb/pkg/limiter" +) + +func TestWriter_Limited(t *testing.T) { + r := bytes.NewReader(bytes.Repeat([]byte{0}, 1024*1024)) + + limit := 512 * 1024 + w := limiter.NewWriter(discardCloser{}, limit, 10*1024*1024) + + start := time.Now() + n, err := io.Copy(w, r) + elapsed := time.Since(start) + if err != nil { + t.Error("copy error: ", err) + } + + rate := float64(n) / elapsed.Seconds() + if rate > float64(limit) { + t.Errorf("rate limit mismath: exp %f, got %f", float64(limit), rate) + } +} + +type discardCloser struct{} + +func (d discardCloser) Write(b []byte) (int, error) { return len(b), nil } +func (d discardCloser) Close() error { return nil } diff --git a/pkg/limiter/writer.go b/pkg/limiter/writer.go new file mode 100644 index 0000000000..f14981a5cc --- /dev/null +++ b/pkg/limiter/writer.go @@ -0,0 +1,83 @@ +package limiter + +import ( + "context" + "io" + "os" + "time" + + "golang.org/x/time/rate" +) + +type Writer struct { + w io.WriteCloser + limiter Rate + ctx context.Context +} + +type Rate interface { + WaitN(ctx context.Context, n int) error +} + +func NewRate(bytesPerSec, burstLimit int) Rate { + limiter := rate.NewLimiter(rate.Limit(bytesPerSec), burstLimit) + limiter.AllowN(time.Now(), burstLimit) // spend initial burst + return limiter +} + +// NewWriter returns a writer that implements io.Writer with rate limiting. +// The limiter use a token bucket approach and limits the rate to bytesPerSec +// with a maximum burst of burstLimit. +func NewWriter(w io.WriteCloser, bytesPerSec, burstLimit int) *Writer { + limiter := NewRate(bytesPerSec, burstLimit) + + return &Writer{ + w: w, + ctx: context.Background(), + limiter: limiter, + } +} + +// WithRate returns a Writer with the specified rate limiter. +func NewWriterWithRate(w io.WriteCloser, limiter Rate) *Writer { + return &Writer{ + w: w, + ctx: context.Background(), + limiter: limiter, + } +} + +// Write writes bytes from p. +func (s *Writer) Write(b []byte) (int, error) { + if s.limiter == nil { + return s.w.Write(b) + } + + n, err := s.w.Write(b) + if err != nil { + return n, err + } + + if err := s.limiter.WaitN(s.ctx, n); err != nil { + return n, err + } + return n, err +} + +func (s *Writer) Sync() error { + if f, ok := s.w.(*os.File); ok { + return f.Sync() + } + return nil +} + +func (s *Writer) Name() string { + if f, ok := s.w.(*os.File); ok { + return f.Name() + } + return "" +} + +func (s *Writer) Close() error { + return s.w.Close() +} diff --git a/pkg/mmap/mmap_windows.go b/pkg/mmap/mmap_windows.go index 97a6f5103a..8efe48daf5 100644 --- a/pkg/mmap/mmap_windows.go +++ b/pkg/mmap/mmap_windows.go @@ -20,9 +20,10 @@ func Map(path string, sz int64) ([]byte, error) { } // Use file size if map size is not passed in. - if sz == 0 { - sz = fi.Size() - } + // TODO(edd): test. + // if sz == 0 { + // } + sz = fi.Size() if fi.Size() == 0 { return nil, nil } diff --git a/services/collectd/service.go b/services/collectd/service.go index 85feefc0f7..7d15ef710f 100644 --- a/services/collectd/service.go +++ b/services/collectd/service.go @@ -123,7 +123,7 @@ func (s *Service) Open() error { readdir = func(path string) { files, err := ioutil.ReadDir(path) if err != nil { - s.Logger.Info(fmt.Sprintf("Unable to read directory %s: %s\n", path, err)) + s.Logger.Info(fmt.Sprintf("Unable to read directory %s: %s", path, err)) return } @@ -134,10 +134,10 @@ func (s *Service) Open() error { continue } - s.Logger.Info(fmt.Sprintf("Loading %s\n", fullpath)) + s.Logger.Info(fmt.Sprintf("Loading %s", fullpath)) types, err := TypesDBFile(fullpath) if err != nil { - s.Logger.Info(fmt.Sprintf("Unable to parse collectd types file: %s\n", f.Name())) + s.Logger.Info(fmt.Sprintf("Unable to parse collectd types file: %s", f.Name())) continue } @@ -147,7 +147,7 @@ func (s *Service) Open() error { readdir(s.Config.TypesDB) s.popts.TypesDB = alltypesdb } else { - s.Logger.Info(fmt.Sprintf("Loading %s\n", s.Config.TypesDB)) + s.Logger.Info(fmt.Sprintf("Loading %s", s.Config.TypesDB)) types, err := TypesDBFile(s.Config.TypesDB) if err != nil { return fmt.Errorf("Open(): %s", err) diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index e7d7cead75..14500ab3f4 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -361,7 +361,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti } if err := cq.q.SetTimeRange(startTime, endTime); err != nil { - s.Logger.Info(fmt.Sprintf("error setting time range: %s\n", err)) + s.Logger.Info(fmt.Sprintf("error setting time range: %s", err)) return false, err } @@ -377,7 +377,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti // Do the actual processing of the query & writing of results. res := s.runContinuousQueryAndWriteResult(cq) if res.Err != nil { - s.Logger.Info(fmt.Sprintf("error: %s. running: %s\n", res.Err, cq.q.String())) + s.Logger.Info(fmt.Sprintf("error: %s. running: %s", res.Err, cq.q.String())) return false, res.Err } diff --git a/services/httpd/handler.go b/services/httpd/handler.go index a842643d0a..cb5ca367ed 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -34,6 +34,7 @@ import ( "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/uuid" "github.com/influxdata/influxql" + "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/zap" ) @@ -170,6 +171,10 @@ func NewHandler(c Config) *Handler { "status-head", "HEAD", "/status", false, true, h.serveStatus, }, + Route{ + "prometheus-metrics", + "GET", "/metrics", false, true, promhttp.Handler().ServeHTTP, + }, }...) return h diff --git a/test.sh b/test.sh index 2da83eb265..0da63a96b6 100755 --- a/test.sh +++ b/test.sh @@ -26,7 +26,7 @@ OUTPUT_DIR=${OUTPUT_DIR-./test-logs} # Set default parallelism PARALLELISM=${PARALLELISM-1} # Set default timeout -TIMEOUT=${TIMEOUT-1200s} +TIMEOUT=${TIMEOUT-1500s} # Default to deleteing the container DOCKER_RM=${DOCKER_RM-true} diff --git a/tests/server_helpers.go b/tests/server_helpers.go index 90891fb122..4def909e73 100644 --- a/tests/server_helpers.go +++ b/tests/server_helpers.go @@ -11,6 +11,7 @@ import ( "net/url" "os" "regexp" + "runtime" "strings" "sync" "time" @@ -253,6 +254,15 @@ type LocalServer struct { Config *run.Config } +// Open opens the server. If running this test on a 32-bit platform it reduces +// the size of series files so that they can all be addressable in the process. +func (s *LocalServer) Open() error { + if runtime.GOARCH == "386" { + s.Server.TSDBStore.SeriesFileMaxSize = 1 << 27 // 128MB + } + return s.Server.Open() +} + // Close shuts down the server and removes all temporary paths. func (s *LocalServer) Close() { s.mu.Lock() diff --git a/tests/server_suite.go b/tests/server_suite.go index 4db7b982c2..0cad2142f2 100644 --- a/tests/server_suite.go +++ b/tests/server_suite.go @@ -295,6 +295,12 @@ func init() { exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverB","uswest",23.2],["2000-01-03T00:00:00Z","serverA","uswest",200]]}]}]}`, params: url.Values{"db": []string{"db0"}}, }, + &Query{ + name: "Make sure other points are deleted", + command: `SELECT COUNT(val) FROM cpu WHERE "host" = 'serverA'`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, &Query{ name: "Make sure data wasn't deleted from other database.", command: `SELECT * FROM cpu`, diff --git a/tsdb/config.go b/tsdb/config.go index 5041a44cae..6ab91feaad 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -26,7 +26,7 @@ const ( // DefaultCacheSnapshotMemorySize is the size at which the engine will // snapshot the cache and write it to a TSM file, freeing up memory - DefaultCacheSnapshotMemorySize = 256 * 1024 * 1024 // 256MB + DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB // DefaultCacheSnapshotWriteColdDuration is the length of time at which // the engine will snapshot the cache and write it to a new TSM file if diff --git a/tsdb/engine.go b/tsdb/engine.go index e0fe2d02fc..87e9408ab2 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -45,6 +45,7 @@ type Engine interface { Export(w io.Writer, basePath string, start time.Time, end time.Time) error Restore(r io.Reader, basePath string) error Import(r io.Reader, basePath string) error + Digest() (io.ReadCloser, error) CreateIterator(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error) CreateCursor(ctx context.Context, r *CursorRequest) (Cursor, error) @@ -53,7 +54,7 @@ type Engine interface { CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error - DeleteSeriesRange(itr SeriesIterator, min, max int64) error + DeleteSeriesRange(itr SeriesIterator, min, max int64, removeIndex bool) error MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) SeriesN() int64 @@ -146,7 +147,8 @@ type EngineOptions struct { ShardID uint64 InmemIndex interface{} // shared in-memory index - CompactionLimiter limiter.Fixed + CompactionLimiter limiter.Fixed + CompactionThroughputLimiter limiter.Rate Config Config } diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 57d938a993..2e58058c53 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -15,6 +15,7 @@ package tsm1 import ( "bytes" "fmt" + "io" "math" "os" "path/filepath" @@ -24,6 +25,7 @@ import ( "sync/atomic" "time" + "github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/tsdb" ) @@ -251,21 +253,14 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { } } - // Determine the minimum number of files required for the level. Higher levels are more - // CPU intensive so we only want to include them when we have enough data to make them - // worthwhile. - // minGenerations 1 -> 2 - // minGenerations 2 -> 2 - // minGenerations 3 -> 4 - // minGenerations 4 -> 4 - minGenerations := level - if minGenerations%2 != 0 { - minGenerations = level + 1 + minGenerations := 4 + if level == 1 { + minGenerations = 8 } var cGroups []CompactionGroup for _, group := range levelGroups { - for _, chunk := range group.chunk(4) { + for _, chunk := range group.chunk(minGenerations) { var cGroup CompactionGroup var hasTombstones bool for _, gen := range chunk { @@ -323,6 +318,11 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup { for i := 0; i < len(generations); i++ { cur := generations[i] + // Skip the file if it's over the max size and contains a full block and it does not have any tombstones + if cur.count() > 2 && cur.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(cur.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !cur.hasTombstones() { + continue + } + // See if this generation is orphan'd which would prevent it from being further // compacted until a final full compactin runs. if i < len(generations)-1 { @@ -551,7 +551,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { compactable := []tsmGenerations{} for _, group := range groups { //if we don't have enough generations to compact, skip it - if len(group) < 2 && !group.hasTombstones() { + if len(group) < 4 && !group.hasTombstones() { continue } compactable = append(compactable, group) @@ -672,10 +672,18 @@ type Compactor struct { TSMReader(path string) *TSMReader } + // RateLimit is the limit for disk writes for all concurrent compactions. + RateLimit limiter.Rate + mu sync.RWMutex snapshotsEnabled bool compactionsEnabled bool + // lastSnapshotDuration is the amount of time the last snapshot took to complete. + lastSnapshotDuration time.Duration + + snapshotLatencies *latencies + // The channel to signal that any in progress snapshots should be aborted. snapshotsInterrupt chan struct{} // The channel to signal that any in progress level compactions should be aborted. @@ -696,6 +704,7 @@ func (c *Compactor) Open() { c.compactionsEnabled = true c.snapshotsInterrupt = make(chan struct{}) c.compactionsInterrupt = make(chan struct{}) + c.snapshotLatencies = &latencies{values: make([]time.Duration, 4)} c.files = make(map[string]struct{}) } @@ -770,25 +779,22 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { return nil, errSnapshotsDisabled } + start := time.Now() card := cache.Count() - concurrency, maxConcurrency := 1, runtime.GOMAXPROCS(0)/2 - if maxConcurrency < 1 { - maxConcurrency = 1 - } - if maxConcurrency > 4 { - maxConcurrency = 4 + // Enable throttling if we have lower cardinality or snapshots are going fast. + throttle := card < 3e6 && c.snapshotLatencies.avg() < 15*time.Second + + // Write snapshost concurrently if cardinality is relatively high. + concurrency := card / 2e6 + if concurrency < 1 { + concurrency = 1 } - concurrency = 1 - if card >= 3*1024*1024 { + // Special case very high cardinality, use max concurrency and don't throttle writes. + if card >= 3e6 { concurrency = 4 - } else if card >= 1024*1024 { - concurrency = 2 - } - - if concurrency > maxConcurrency { - concurrency = maxConcurrency + throttle = false } splits := cache.Split(concurrency) @@ -802,7 +808,7 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { for i := 0; i < concurrency; i++ { go func(sp *Cache) { iter := NewCacheKeyIterator(sp, tsdb.DefaultMaxPointsPerBlock, intC) - files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter) + files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter, throttle) resC <- res{files: files, err: err} }(splits[i]) @@ -818,10 +824,15 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { files = append(files, result.files...) } + dur := time.Since(start).Truncate(time.Second) + + c.mu.Lock() + // See if we were disabled while writing a snapshot - c.mu.RLock() enabled = c.snapshotsEnabled - c.mu.RUnlock() + c.lastSnapshotDuration = dur + c.snapshotLatencies.add(time.Since(start)) + c.mu.Unlock() if !enabled { return nil, errSnapshotsDisabled @@ -889,7 +900,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) { return nil, err } - return c.writeNewFiles(maxGeneration, maxSequence, tsm) + return c.writeNewFiles(maxGeneration, maxSequence, tsm, true) } // CompactFull writes multiple smaller TSM files into 1 or more larger files. @@ -970,7 +981,7 @@ func (c *Compactor) removeTmpFiles(files []string) error { // writeNewFiles writes from the iterator into new TSM files, rotating // to a new file once it has reached the max TSM file size. -func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([]string, error) { +func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator, throttle bool) ([]string, error) { // These are the new TSM files written var files []string @@ -980,7 +991,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ fileName := filepath.Join(c.Dir, fmt.Sprintf("%09d-%09d.%s.%s", generation, sequence, TSMFileExtension, TmpTSMFileExtension)) // Write as much as possible to this file - err := c.write(fileName, iter) + err := c.write(fileName, iter, throttle) // We've hit the max file limit and there is more to write. Create a new file // and continue. @@ -1019,24 +1030,31 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ return files, nil } -func (c *Compactor) write(path string, iter KeyIterator) (err error) { +func (c *Compactor) write(path string, iter KeyIterator, throttle bool) (err error) { fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return errCompactionInProgress{err: err} } // Create the write for the new TSM file. - var w TSMWriter + var ( + w TSMWriter + limitWriter io.Writer = fd + ) + + if c.RateLimit != nil && throttle { + limitWriter = limiter.NewWriterWithRate(fd, c.RateLimit) + } // Use a disk based TSM buffer if it looks like we might create a big index // in memory. if iter.EstimatedIndexSize() > 64*1024*1024 { - w, err = NewTSMWriterWithDiskBuffer(fd) + w, err = NewTSMWriterWithDiskBuffer(limitWriter) if err != nil { return err } } else { - w, err = NewTSMWriter(fd) + w, err = NewTSMWriter(limitWriter) if err != nil { return err } @@ -1534,8 +1552,11 @@ func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIte } func (c *cacheKeyIterator) EstimatedIndexSize() int { - // We return 0 here since we already have all the entries in memory to write an index. - return 0 + var n int + for _, v := range c.order { + n += len(v) + } + return n } func (c *cacheKeyIterator) encode() { @@ -1709,3 +1730,30 @@ func (a tsmGenerations) IsSorted() bool { } return true } + +type latencies struct { + i int + values []time.Duration +} + +func (l *latencies) add(t time.Duration) { + l.values[l.i%len(l.values)] = t + l.i++ +} + +func (l *latencies) avg() time.Duration { + var n int64 + var sum time.Duration + for _, v := range l.values { + if v == 0 { + continue + } + sum += v + n++ + } + + if n > 0 { + return time.Duration(int64(sum) / n) + } + return time.Duration(0) +} diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 69e5b0c392..617b7567a6 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -1461,6 +1461,30 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) { Path: "06-01.tsm1", Size: 1 * 1024 * 1024, }, + tsm1.FileStat{ + Path: "07-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "08-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "09-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "10-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "11-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "12-01.tsm1", + Size: 1 * 1024 * 1024, + }, } cp := tsm1.NewDefaultPlanner( @@ -1471,7 +1495,7 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles := []tsm1.FileStat{data[4], data[5]} + expFiles := []tsm1.FileStat{data[4], data[5], data[6], data[7], data[8], data[9], data[10], data[11]} tsm := cp.PlanLevel(1) if exp, got := len(expFiles), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) @@ -1537,55 +1561,6 @@ func TestDefaultPlanner_PlanLevel_SplitFile(t *testing.T) { } } -func TestDefaultPlanner_PlanLevel_IsolatedLowLevel(t *testing.T) { - data := []tsm1.FileStat{ - tsm1.FileStat{ - Path: "01-03.tsm1", - Size: 251 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "02-03.tsm1", - Size: 1 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "03-01.tsm1", - Size: 2 * 1024 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "04-01.tsm1", - Size: 10 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "05-02.tsm1", - Size: 1 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "06-01.tsm1", - Size: 1 * 1024 * 1024, - }, - } - - cp := tsm1.NewDefaultPlanner( - &fakeFileStore{ - PathsFn: func() []tsm1.FileStat { - return data - }, - }, tsdb.DefaultCompactFullWriteColdDuration, - ) - - expFiles := []tsm1.FileStat{data[2], data[3]} - tsm := cp.PlanLevel(1) - if exp, got := len(expFiles), len(tsm[0]); got != exp { - t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) - } - - for i, p := range expFiles { - if got, exp := tsm[0][i], p.Path; got != exp { - t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp) - } - } -} - func TestDefaultPlanner_PlanLevel_IsolatedHighLevel(t *testing.T) { data := []tsm1.FileStat{ tsm1.FileStat{ @@ -1802,8 +1777,7 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} - expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]} + expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]} tsm := cp.PlanLevel(1) if exp, got := len(expFiles1), len(tsm[0]); got != exp { @@ -1815,16 +1789,6 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp) } } - - if exp, got := len(expFiles2), len(tsm[1]); got != exp { - t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) - } - - for i, p := range expFiles2 { - if got, exp := tsm[1][i], p.Path; got != exp { - t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp) - } - } } func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) { @@ -1869,6 +1833,30 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) { Path: "10-01.tsm1", Size: 1 * 1024 * 1024, }, + tsm1.FileStat{ + Path: "11-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "12-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "13-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "14-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "15-01.tsm1", + Size: 1 * 1024 * 1024, + }, + tsm1.FileStat{ + Path: "16-01.tsm1", + Size: 1 * 1024 * 1024, + }, } cp := tsm1.NewDefaultPlanner( @@ -1879,8 +1867,8 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} - expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]} + expFiles1 := data[0:8] + expFiles2 := data[8:16] tsm := cp.PlanLevel(1) if exp, got := len(expFiles1), len(tsm[0]); got != exp { @@ -2559,25 +2547,41 @@ func TestDefaultPlanner_Plan_ForceFull(t *testing.T) { Size: 2148728539, }, tsm1.FileStat{ - Path: "000000005-000000002.tsm", - Size: 701863692, + Path: "000000005-000000001.tsm", + Size: 2148340232, }, tsm1.FileStat{ - Path: "000000006-000000002.tsm", - Size: 701863692, + Path: "000000006-000000001.tsm", + Size: 2148356556, }, tsm1.FileStat{ - Path: "000000007-000000002.tsm", - Size: 701863692, + Path: "000000007-000000001.tsm", + Size: 167780181, }, tsm1.FileStat{ - Path: "000000008-000000002.tsm", - Size: 701863692, + Path: "000000008-000000001.tsm", + Size: 2148728539, }, tsm1.FileStat{ Path: "000000009-000000002.tsm", Size: 701863692, }, + tsm1.FileStat{ + Path: "000000010-000000002.tsm", + Size: 701863692, + }, + tsm1.FileStat{ + Path: "000000011-000000002.tsm", + Size: 701863692, + }, + tsm1.FileStat{ + Path: "000000012-000000002.tsm", + Size: 701863692, + }, + tsm1.FileStat{ + Path: "000000013-000000002.tsm", + Size: 701863692, + }, } }, }, tsdb.DefaultCompactFullWriteColdDuration, @@ -2615,7 +2619,7 @@ func TestDefaultPlanner_Plan_ForceFull(t *testing.T) { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } - if got, exp := len(tsm[0]), 9; got != exp { + if got, exp := len(tsm[0]), 13; got != exp { t.Fatalf("plan length mismatch: got %v, exp %v", got, exp) } cp.Release(tsm) diff --git a/tsdb/engine/tsm1/digest.go b/tsdb/engine/tsm1/digest.go new file mode 100644 index 0000000000..7b3d9142a7 --- /dev/null +++ b/tsdb/engine/tsm1/digest.go @@ -0,0 +1,136 @@ +package tsm1 + +import ( + "bytes" + "fmt" + "io" + "math" + "os" + "path/filepath" + "sort" +) + +type DigestOptions struct { + MinTime, MaxTime int64 + MinKey, MaxKey []byte +} + +// DigestWithOptions writes a digest of dir to w using options to filter by +// time and key range. +func DigestWithOptions(dir string, opts DigestOptions, w io.WriteCloser) error { + if dir == "" { + return fmt.Errorf("dir is required") + } + + files, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("*.%s", TSMFileExtension))) + if err != nil { + return err + } + + readers := make([]*TSMReader, 0, len(files)) + + for _, fi := range files { + f, err := os.Open(fi) + if err != nil { + return err + } + + r, err := NewTSMReader(f) + if err != nil { + return err + } + readers = append(readers, r) + } + + ch := make([]chan seriesKey, 0, len(files)) + for _, fi := range files { + f, err := os.Open(fi) + if err != nil { + return err + } + + r, err := NewTSMReader(f) + if err != nil { + return err + } + defer r.Close() + + s := make(chan seriesKey) + ch = append(ch, s) + go func() { + for i := 0; i < r.KeyCount(); i++ { + key, typ := r.KeyAt(i) + if len(opts.MinKey) > 0 && bytes.Compare(key, opts.MinKey) < 0 { + continue + } + + if len(opts.MaxKey) > 0 && bytes.Compare(key, opts.MaxKey) > 0 { + continue + } + + s <- seriesKey{key: key, typ: typ} + } + close(s) + }() + + } + + dw, err := NewDigestWriter(w) + if err != nil { + return err + } + defer dw.Close() + + var n int + for key := range merge(ch...) { + + ts := &DigestTimeSpan{} + n++ + kstr := string(key.key) + + for _, r := range readers { + entries := r.Entries(key.key) + for _, entry := range entries { + crc, b, err := r.ReadBytes(&entry, nil) + if err != nil { + return err + } + + // Filter blocks that are outside the time filter. If they overlap, we + // still include them. + if entry.MaxTime < opts.MinTime || entry.MinTime > opts.MaxTime { + continue + } + + cnt := BlockCount(b) + ts.Add(entry.MinTime, entry.MaxTime, cnt, crc) + } + } + + sort.Sort(ts) + if err := dw.WriteTimeSpan(kstr, ts); err != nil { + return err + } + } + return dw.Close() +} + +// Digest writes a digest of dir to w of a full shard dir. +func Digest(dir string, w io.WriteCloser) error { + return DigestWithOptions(dir, DigestOptions{ + MinTime: math.MinInt64, + MaxTime: math.MaxInt64, + }, w) +} + +type rwPair struct { + r *TSMReader + w TSMWriter + outf *os.File +} + +func (rw *rwPair) close() { + rw.r.Close() + rw.w.Close() + rw.outf.Close() +} diff --git a/tsdb/engine/tsm1/digest_reader.go b/tsdb/engine/tsm1/digest_reader.go new file mode 100644 index 0000000000..4efe73db2b --- /dev/null +++ b/tsdb/engine/tsm1/digest_reader.go @@ -0,0 +1,70 @@ +package tsm1 + +import ( + "bufio" + "compress/gzip" + "encoding/binary" + "io" +) + +type DigestReader struct { + r io.ReadCloser + gr *gzip.Reader +} + +func NewDigestReader(r io.ReadCloser) (*DigestReader, error) { + gr, err := gzip.NewReader(bufio.NewReader(r)) + if err != nil { + return nil, err + } + return &DigestReader{r: r, gr: gr}, nil +} + +func (w *DigestReader) ReadTimeSpan() (string, *DigestTimeSpan, error) { + var n uint16 + if err := binary.Read(w.gr, binary.BigEndian, &n); err != nil { + return "", nil, err + } + + b := make([]byte, n) + if _, err := io.ReadFull(w.gr, b); err != nil { + return "", nil, err + } + + var cnt uint32 + if err := binary.Read(w.gr, binary.BigEndian, &cnt); err != nil { + return "", nil, err + } + + ts := &DigestTimeSpan{} + for i := 0; i < int(cnt); i++ { + var min, max int64 + var crc uint32 + + if err := binary.Read(w.gr, binary.BigEndian, &min); err != nil { + return "", nil, err + } + + if err := binary.Read(w.gr, binary.BigEndian, &max); err != nil { + return "", nil, err + } + + if err := binary.Read(w.gr, binary.BigEndian, &crc); err != nil { + return "", nil, err + } + + if err := binary.Read(w.gr, binary.BigEndian, &n); err != nil { + return "", nil, err + } + ts.Add(min, max, int(n), crc) + } + + return string(b), ts, nil +} + +func (w *DigestReader) Close() error { + if err := w.gr.Close(); err != nil { + return err + } + return w.r.Close() +} diff --git a/tsdb/engine/tsm1/digest_test.go b/tsdb/engine/tsm1/digest_test.go new file mode 100644 index 0000000000..fe90858a63 --- /dev/null +++ b/tsdb/engine/tsm1/digest_test.go @@ -0,0 +1,228 @@ +package tsm1_test + +import ( + "io" + "os" + "path/filepath" + "testing" + + "github.com/influxdata/influxdb/tsdb/engine/tsm1" +) + +func TestDigest_None(t *testing.T) { + dir := MustTempDir() + dataDir := filepath.Join(dir, "data") + if err := os.Mkdir(dataDir, 0755); err != nil { + t.Fatalf("create data dir: %v", err) + } + + df := MustTempFile(dir) + + if err := tsm1.Digest(dir, df); err != nil { + t.Fatalf("digest error: %v", err) + } + + df, err := os.Open(df.Name()) + if err != nil { + t.Fatalf("open error: %v", err) + } + + r, err := tsm1.NewDigestReader(df) + if err != nil { + t.Fatalf("NewDigestReader error: %v", err) + } + defer r.Close() + + var count int + for { + _, _, err := r.ReadTimeSpan() + if err == io.EOF { + break + } + + count++ + } + + if got, exp := count, 0; got != exp { + t.Fatalf("count mismatch: got %v, exp %v", got, exp) + } +} + +func TestDigest_One(t *testing.T) { + dir := MustTempDir() + dataDir := filepath.Join(dir, "data") + if err := os.Mkdir(dataDir, 0755); err != nil { + t.Fatalf("create data dir: %v", err) + } + + a1 := tsm1.NewValue(1, 1.1) + writes := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{a1}, + } + MustWriteTSM(dir, 1, writes) + + df := MustTempFile(dir) + + if err := tsm1.Digest(dir, df); err != nil { + t.Fatalf("digest error: %v", err) + } + + df, err := os.Open(df.Name()) + if err != nil { + t.Fatalf("open error: %v", err) + } + + r, err := tsm1.NewDigestReader(df) + if err != nil { + t.Fatalf("NewDigestReader error: %v", err) + } + defer r.Close() + + var count int + for { + key, _, err := r.ReadTimeSpan() + if err == io.EOF { + break + } + + if got, exp := key, "cpu,host=A#!~#value"; got != exp { + t.Fatalf("key mismatch: got %v, exp %v", got, exp) + } + + count++ + } + + if got, exp := count, 1; got != exp { + t.Fatalf("count mismatch: got %v, exp %v", got, exp) + } +} + +func TestDigest_TimeFilter(t *testing.T) { + dir := MustTempDir() + dataDir := filepath.Join(dir, "data") + if err := os.Mkdir(dataDir, 0755); err != nil { + t.Fatalf("create data dir: %v", err) + } + + a1 := tsm1.NewValue(1, 1.1) + writes := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{a1}, + } + MustWriteTSM(dir, 1, writes) + + a2 := tsm1.NewValue(2, 2.1) + writes = map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{a2}, + } + MustWriteTSM(dir, 2, writes) + + a3 := tsm1.NewValue(3, 3.1) + writes = map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{a3}, + } + MustWriteTSM(dir, 3, writes) + + df := MustTempFile(dir) + + if err := tsm1.DigestWithOptions(dir, tsm1.DigestOptions{MinTime: 2, MaxTime: 2}, df); err != nil { + t.Fatalf("digest error: %v", err) + } + + df, err := os.Open(df.Name()) + if err != nil { + t.Fatalf("open error: %v", err) + } + + r, err := tsm1.NewDigestReader(df) + if err != nil { + t.Fatalf("NewDigestReader error: %v", err) + } + defer r.Close() + + var count int + for { + key, ts, err := r.ReadTimeSpan() + if err == io.EOF { + break + } + + if got, exp := key, "cpu,host=A#!~#value"; got != exp { + t.Fatalf("key mismatch: got %v, exp %v", got, exp) + } + + for _, tr := range ts.Ranges { + if got, exp := tr.Max, int64(2); got != exp { + t.Fatalf("min time not filtered: got %v, exp %v", got, exp) + } + } + + count++ + } + + if got, exp := count, 1; got != exp { + t.Fatalf("count mismatch: got %v, exp %v", got, exp) + } +} + +func TestDigest_KeyFilter(t *testing.T) { + dir := MustTempDir() + dataDir := filepath.Join(dir, "data") + if err := os.Mkdir(dataDir, 0755); err != nil { + t.Fatalf("create data dir: %v", err) + } + + a1 := tsm1.NewValue(1, 1.1) + writes := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{a1}, + } + MustWriteTSM(dir, 1, writes) + + a2 := tsm1.NewValue(2, 2.1) + writes = map[string][]tsm1.Value{ + "cpu,host=B#!~#value": []tsm1.Value{a2}, + } + MustWriteTSM(dir, 2, writes) + + a3 := tsm1.NewValue(3, 3.1) + writes = map[string][]tsm1.Value{ + "cpu,host=C#!~#value": []tsm1.Value{a3}, + } + MustWriteTSM(dir, 3, writes) + + df := MustTempFile(dir) + + if err := tsm1.DigestWithOptions(dir, tsm1.DigestOptions{ + MinKey: []byte("cpu,host=B#!~#value"), + MaxKey: []byte("cpu,host=B#!~#value")}, df); err != nil { + t.Fatalf("digest error: %v", err) + } + + df, err := os.Open(df.Name()) + if err != nil { + t.Fatalf("open error: %v", err) + } + + r, err := tsm1.NewDigestReader(df) + if err != nil { + t.Fatalf("NewDigestReader error: %v", err) + } + defer r.Close() + + var count int + for { + key, _, err := r.ReadTimeSpan() + if err == io.EOF { + break + } + + if got, exp := key, "cpu,host=B#!~#value"; got != exp { + t.Fatalf("key mismatch: got %v, exp %v", got, exp) + } + + count++ + } + + if got, exp := count, 1; got != exp { + t.Fatalf("count mismatch: got %v, exp %v", got, exp) + } +} diff --git a/tsdb/engine/tsm1/digest_writer.go b/tsdb/engine/tsm1/digest_writer.go new file mode 100644 index 0000000000..212d5b7549 --- /dev/null +++ b/tsdb/engine/tsm1/digest_writer.go @@ -0,0 +1,101 @@ +package tsm1 + +import ( + "compress/gzip" + "encoding/binary" + "io" +) + +type writeFlushCloser interface { + Close() error + Write(b []byte) (int, error) + Flush() error +} + +// DigestWriter allows for writing a digest of a shard. A digest is a condensed +// representation of the contents of a shard. It can be scoped to one or more series +// keys, ranges of times or sets of files. +type DigestWriter struct { + w io.WriteCloser + F writeFlushCloser +} + +func NewDigestWriter(w io.WriteCloser) (*DigestWriter, error) { + gw := gzip.NewWriter(w) + return &DigestWriter{w: w, F: gw}, nil +} + +func (w *DigestWriter) WriteTimeSpan(key string, t *DigestTimeSpan) error { + if err := binary.Write(w.F, binary.BigEndian, uint16(len(key))); err != nil { + return err + } + + if _, err := w.F.Write([]byte(key)); err != nil { + return err + } + + if err := binary.Write(w.F, binary.BigEndian, uint32(t.Len())); err != nil { + return err + } + + for _, tr := range t.Ranges { + if err := binary.Write(w.F, binary.BigEndian, tr.Min); err != nil { + return err + } + + if err := binary.Write(w.F, binary.BigEndian, tr.Max); err != nil { + return err + } + + if err := binary.Write(w.F, binary.BigEndian, tr.CRC); err != nil { + return err + } + + if err := binary.Write(w.F, binary.BigEndian, uint16(tr.N)); err != nil { + return err + } + } + + return nil +} + +func (w *DigestWriter) Flush() error { + return w.F.Flush() +} + +func (w *DigestWriter) Close() error { + if err := w.Flush(); err != nil { + return err + } + + if err := w.F.Close(); err != nil { + return err + } + + return w.w.Close() +} + +type DigestTimeSpan struct { + Ranges []DigestTimeRange +} + +func (a DigestTimeSpan) Len() int { return len(a.Ranges) } +func (a DigestTimeSpan) Swap(i, j int) { a.Ranges[i], a.Ranges[j] = a.Ranges[j], a.Ranges[i] } +func (a DigestTimeSpan) Less(i, j int) bool { + return a.Ranges[i].Min < a.Ranges[j].Min +} + +func (t *DigestTimeSpan) Add(min, max int64, n int, crc uint32) { + for _, v := range t.Ranges { + if v.Min == min && v.Max == max && v.N == n && v.CRC == crc { + return + } + } + t.Ranges = append(t.Ranges, DigestTimeRange{Min: min, Max: max, N: n, CRC: crc}) +} + +type DigestTimeRange struct { + Min, Max int64 + N int + CRC uint32 +} diff --git a/tsdb/engine/tsm1/digest_writer_test.go b/tsdb/engine/tsm1/digest_writer_test.go new file mode 100644 index 0000000000..6315fd1e86 --- /dev/null +++ b/tsdb/engine/tsm1/digest_writer_test.go @@ -0,0 +1,61 @@ +package tsm1_test + +import ( + "io" + "os" + "reflect" + "testing" + + "github.com/influxdata/influxdb/tsdb/engine/tsm1" +) + +func TestEngine_DigestWriterReader(t *testing.T) { + f := MustTempFile("") + w, err := tsm1.NewDigestWriter(f) + if err != nil { + t.Fatalf("NewDigestWriter: %v", err) + } + + ts := &tsm1.DigestTimeSpan{} + ts.Add(1, 2, 3, 4) + + if err := w.WriteTimeSpan("cpu", ts); err != nil { + t.Fatalf("WriteTimeSpan: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + f, err = os.Open(f.Name()) + if err != nil { + t.Fatalf("Open: %v", err) + } + + r, err := tsm1.NewDigestReader(f) + if err != nil { + t.Fatalf("NewDigestReader: %v", err) + } + for { + + key, ts, err := r.ReadTimeSpan() + if err == io.EOF { + break + } else if err != nil { + t.Fatalf("ReadTimeSpan: %v", err) + } + + if exp, got := "cpu", key; exp != got { + t.Fatalf("key mismatch: exp %v, got %v", exp, got) + } + + if exp, got := 1, len(ts.Ranges); exp != got { + t.Fatalf("range len mismatch: exp %v, got %v", exp, got) + } + + exp := tsm1.DigestTimeRange{Min: 1, Max: 2, N: 3, CRC: 4} + if got := ts.Ranges[0]; !reflect.DeepEqual(exp, got) { + t.Fatalf("time range mismatch: exp %v, got %v", exp, got) + } + } +} diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 512ae84442..2c92ac36fe 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -190,6 +190,7 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string, c := &Compactor{ Dir: path, FileStore: fs, + RateLimit: opt.CompactionThroughputLimiter, } logger := zap.NewNop() @@ -227,6 +228,55 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string, return e } +// Digest returns a reader for the shard's digest. +func (e *Engine) Digest() (io.ReadCloser, error) { + digestPath := filepath.Join(e.path, "digest.tsd") + + // See if there's an existing digest file on disk. + f, err := os.Open(digestPath) + if err == nil { + // There is an existing digest file. Now see if it is still fresh. + fi, err := f.Stat() + if err != nil { + f.Close() + return nil, err + } + + if !e.LastModified().After(fi.ModTime()) { + // Existing digest is still fresh so return a reader for it. + return f, nil + } + + if err := f.Close(); err != nil { + return nil, err + } + } + + // Either no digest existed or the existing one was stale + // so generate a new digest. + + // Create a tmp file to write the digest to. + tf, err := os.Create(digestPath + ".tmp") + if err != nil { + return nil, err + } + + // Write the new digest to the tmp file. + if err := Digest(e.path, tf); err != nil { + tf.Close() + os.Remove(tf.Name()) + return nil, err + } + + // Rename the temporary digest file to the actual digest file. + if err := renameFile(tf.Name(), digestPath); err != nil { + return nil, err + } + + // Create and return a reader for the new digest file. + return os.Open(digestPath) +} + // SetEnabled sets whether the engine is enabled. func (e *Engine) SetEnabled(enabled bool) { e.enableCompactionsOnOpen = enabled @@ -1152,7 +1202,7 @@ func (e *Engine) WritePoints(points []models.Point) error { } // DeleteSeriesRange removes the values between min and max (inclusive) from all series -func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) error { +func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64, removeIndex bool) error { var disableOnce bool var sz int @@ -1164,6 +1214,7 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro } else if elem == nil { break } + if elem.Expr() != nil { if v, ok := elem.Expr().(*influxql.BooleanLiteral); !ok || !v.Val { return errors.New("fields not supported in WHERE clause during deletion") @@ -1188,7 +1239,7 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro if sz >= deleteFlushThreshold { // Delete all matching batch. - if err := e.deleteSeriesRange(batch, min, max); err != nil { + if err := e.deleteSeriesRange(batch, min, max, removeIndex); err != nil { return err } batch = batch[:0] @@ -1198,20 +1249,24 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro if len(batch) > 0 { // Delete all matching batch. - if err := e.deleteSeriesRange(batch, min, max); err != nil { + if err := e.deleteSeriesRange(batch, min, max, removeIndex); err != nil { return err } batch = batch[:0] } - e.index.Rebuild() + if removeIndex { + e.index.Rebuild() + } return nil } -// deleteSeriesRange removes the values between min and max (inclusive) from all series. This -// does not update the index or disable compactions. This should mainly be called by DeleteSeriesRange -// and not directly. -func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { +// deleteSeriesRange removes the values between min and max (inclusive) from all +// series in the TSM engine. If removeIndex is true, then series will also be +// removed from the index. +// +// This should mainly be called by DeleteSeriesRange and not directly. +func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64, removeIndex bool) error { ts := time.Now().UTC().UnixNano() if len(seriesKeys) == 0 { return nil @@ -1307,7 +1362,7 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { // exists now. To reconcile the index, we walk the series keys that still exists // on disk and cross out any keys that match the passed in series. Any series // left in the slice at the end do not exist and can be deleted from the index. - // Note: this is inherently racy if writes are occuring to the same measurement/series are + // Note: this is inherently racy if writes are occurring to the same measurement/series are // being removed. A write could occur and exist in the cache at this point, but we // would delete it from the index. minKey := seriesKeys[0] @@ -1371,12 +1426,11 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { i++ } - // Some cache values still exists, leave the series in the index. - if hasCacheValues { + if hasCacheValues || !removeIndex { continue } - // Remove the series from the index for this shard + // Remove the series from the index. if err := e.index.UnassignShard(string(k), e.id, ts); err != nil { return err } @@ -1439,7 +1493,8 @@ func (e *Engine) deleteMeasurement(name []byte) error { return nil } defer itr.Close() - return e.DeleteSeriesRange(tsdb.NewSeriesIteratorAdapter(e.sfile, itr), math.MinInt64, math.MaxInt64) + // Delete all associated series and remove them from the index. + return e.DeleteSeriesRange(tsdb.NewSeriesIteratorAdapter(e.sfile, itr), math.MinInt64, math.MaxInt64, true) } // ForEachMeasurementName iterates over each measurement name in the engine. @@ -1608,7 +1663,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool { } func (e *Engine) compact(quit <-chan struct{}) { - t := time.NewTicker(5 * time.Second) + t := time.NewTicker(time.Second) defer t.Stop() for { @@ -1670,15 +1725,15 @@ func (e *Engine) compact(quit <-chan struct{}) { switch level { case 1: - if e.compactHiPriorityLevel(level1Groups[0], 1) { + if e.compactHiPriorityLevel(level1Groups[0], 1, false) { level1Groups = level1Groups[1:] } case 2: - if e.compactHiPriorityLevel(level2Groups[0], 2) { + if e.compactHiPriorityLevel(level2Groups[0], 2, false) { level2Groups = level2Groups[1:] } case 3: - if e.compactLoPriorityLevel(level3Groups[0], 3) { + if e.compactLoPriorityLevel(level3Groups[0], 3, true) { level3Groups = level3Groups[1:] } case 4: @@ -1699,8 +1754,8 @@ func (e *Engine) compact(quit <-chan struct{}) { // compactHiPriorityLevel kicks off compactions using the high priority policy. It returns // true if the compaction was started -func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int) bool { - s := e.levelCompactionStrategy(grp, true, level) +func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast bool) bool { + s := e.levelCompactionStrategy(grp, fast, level) if s == nil { return false } @@ -1728,8 +1783,8 @@ func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int) bool { // compactLoPriorityLevel kicks off compactions using the lo priority policy. It returns // the plans that were not able to be started -func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int) bool { - s := e.levelCompactionStrategy(grp, true, level) +func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int, fast bool) bool { + s := e.levelCompactionStrategy(grp, fast, level) if s == nil { return false } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 019e86a1d7..5ecee30c93 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -45,7 +45,7 @@ func TestEngine_DeleteWALLoadMetadata(t *testing.T) { // Remove series. itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}} - if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil { + if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64, false); err != nil { t.Fatalf("failed to delete series: %s", err.Error()) } @@ -65,6 +65,145 @@ func TestEngine_DeleteWALLoadMetadata(t *testing.T) { } } +// Ensure that the engine can write & read shard digest files. +func TestEngine_Digest(t *testing.T) { + e := MustOpenEngine(inmem.IndexName) + defer e.Close() + + if err := e.Open(); err != nil { + t.Fatalf("failed to open tsm1 engine: %s", err.Error()) + } + + // Create a few points. + points := []models.Point{ + MustParsePointString("cpu,host=A value=1.1 1000000000"), + MustParsePointString("cpu,host=B value=1.2 2000000000"), + } + + if err := e.WritePoints(points); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + // Force a compaction. + e.ScheduleFullCompaction() + + digest := func() ([]span, error) { + // Get a reader for the shard's digest. + r, err := e.Digest() + if err != nil { + return nil, err + } + + // Make sure the digest can be read. + dr, err := tsm1.NewDigestReader(r) + if err != nil { + r.Close() + return nil, err + } + defer dr.Close() + + got := []span{} + + for { + k, s, err := dr.ReadTimeSpan() + if err == io.EOF { + break + } else if err != nil { + return nil, err + } + + got = append(got, span{ + key: k, + tspan: s, + }) + } + + return got, nil + } + + exp := []span{ + span{ + key: "cpu,host=A#!~#value", + tspan: &tsm1.DigestTimeSpan{ + Ranges: []tsm1.DigestTimeRange{ + tsm1.DigestTimeRange{ + Min: 1000000000, + Max: 1000000000, + N: 1, + CRC: 1048747083, + }, + }, + }, + }, + span{ + key: "cpu,host=B#!~#value", + tspan: &tsm1.DigestTimeSpan{ + Ranges: []tsm1.DigestTimeRange{ + tsm1.DigestTimeRange{ + Min: 2000000000, + Max: 2000000000, + N: 1, + CRC: 734984746, + }, + }, + }, + }, + } + + for n := 0; n < 2; n++ { + got, err := digest() + if err != nil { + t.Fatalf("n = %d: %s", n, err) + } + + // Make sure the data in the digest was valid. + if !reflect.DeepEqual(exp, got) { + t.Fatalf("n = %d\nexp = %v\ngot = %v\n", n, exp, got) + } + } + + // Test that writing more points causes the digest to be updated. + points = []models.Point{ + MustParsePointString("cpu,host=C value=1.1 3000000000"), + } + + if err := e.WritePoints(points); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + // Force a compaction. + e.ScheduleFullCompaction() + + // Get new digest. + got, err := digest() + if err != nil { + t.Fatal(err) + } + + exp = append(exp, span{ + key: "cpu,host=C#!~#value", + tspan: &tsm1.DigestTimeSpan{ + Ranges: []tsm1.DigestTimeRange{ + tsm1.DigestTimeRange{ + Min: 3000000000, + Max: 3000000000, + N: 1, + CRC: 2553233514, + }, + }, + }, + }) + + if !reflect.DeepEqual(exp, got) { + t.Fatalf("\nexp = %v\ngot = %v\n", exp, got) + } +} + +type span struct { + key string + tspan *tsm1.DigestTimeSpan +} + // Ensure that the engine will backup any TSM files created since the passed in time func TestEngine_Backup(t *testing.T) { sfile := MustOpenSeriesFile() @@ -788,7 +927,7 @@ func TestEngine_CreateIterator_Condition(t *testing.T) { } // Ensures that deleting series from TSM files with multiple fields removes all the -/// series +// series from the TSM files but leaves the series in the index intact. func TestEngine_DeleteSeries(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { @@ -797,11 +936,23 @@ func TestEngine_DeleteSeries(t *testing.T) { p2 := MustParsePointString("cpu,host=B value=1.2 2000000000") p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000") - e := MustOpenEngine(index) - defer e.Close() + e, err := NewEngine(index) + if err != nil { + t.Fatal(err) + } // mock the planner so compactions don't run during the test e.CompactionPlan = &mockPlanner{} + if err := e.Open(); err != nil { + t.Fatal(err) + } + defer e.Close() + + for _, p := range []models.Point{p1, p2, p3} { + if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil { + t.Fatalf("create series index error: %v", err) + } + } if err := e.WritePoints([]models.Point{p1, p2, p3}); err != nil { t.Fatalf("failed to write points: %s", err.Error()) @@ -816,7 +967,7 @@ func TestEngine_DeleteSeries(t *testing.T) { } itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}} - if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil { + if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64, false); err != nil { t.Fatalf("failed to delete series: %v", err) } @@ -829,27 +980,68 @@ func TestEngine_DeleteSeries(t *testing.T) { if _, ok := keys[exp]; !ok { t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys) } + + // Deleting all the TSM values for a single series should still leave + // the series in the index intact. + indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} + iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu")) + if err != nil { + t.Fatalf("iterator error: %v", err) + } else if iter == nil { + t.Fatal("nil iterator") + } + defer iter.Close() + + var gotKeys []string + expKeys := []string{"cpu,host=A", "cpu,host=B"} + + for { + elem, err := iter.Next() + if err != nil { + t.Fatal(err) + } + if elem.SeriesID == 0 { + break + } + + // Lookup series. + name, tags := e.sfile.Series(elem.SeriesID) + gotKeys = append(gotKeys, string(models.MakeKey(name, tags))) + } + + if !reflect.DeepEqual(gotKeys, expKeys) { + t.Fatalf("got keys %v, expected %v", gotKeys, expKeys) + } }) } } +// Ensures that deleting series from TSM files over a range of time deleted the +// series from the TSM files but leaves the series in the index. func TestEngine_DeleteSeriesRange(t *testing.T) { for _, index := range tsdb.RegisteredIndexes() { t.Run(index, func(t *testing.T) { // Create a few points. - p1 := MustParsePointString("cpu,host=0 value=1.1 6000000000") // Should not be deleted + p1 := MustParsePointString("cpu,host=0 value=1.1 6000000000") p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") p3 := MustParsePointString("cpu,host=A value=1.3 3000000000") - p4 := MustParsePointString("cpu,host=B value=1.3 4000000000") // Should not be deleted - p5 := MustParsePointString("cpu,host=B value=1.3 5000000000") // Should not be deleted + p4 := MustParsePointString("cpu,host=B value=1.3 4000000000") + p5 := MustParsePointString("cpu,host=B value=1.3 5000000000") p6 := MustParsePointString("cpu,host=C value=1.3 1000000000") - p7 := MustParsePointString("mem,host=C value=1.3 1000000000") // Should not be deleted - p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted + p7 := MustParsePointString("mem,host=C value=1.3 1000000000") + p8 := MustParsePointString("disk,host=C value=1.3 1000000000") + + e, err := NewEngine(index) + if err != nil { + t.Fatal(err) + } - e := MustOpenEngine(index) - defer e.Close() // mock the planner so compactions don't run during the test e.CompactionPlan = &mockPlanner{} + if err := e.Open(); err != nil { + t.Fatal(err) + } + defer e.Close() for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} { if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil { @@ -870,7 +1062,7 @@ func TestEngine_DeleteSeriesRange(t *testing.T) { } itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=0"), []byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C")}} - if err := e.DeleteSeriesRange(itr, 0, 3000000000); err != nil { + if err := e.DeleteSeriesRange(itr, 0, 3000000000, false); err != nil { t.Fatalf("failed to delete series: %v", err) } @@ -884,46 +1076,36 @@ func TestEngine_DeleteSeriesRange(t *testing.T) { t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys) } - // Check that the series still exists in the index - iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu")) + // Deleting all the TSM values for a single series should still leave + // the series in the index intact. + indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} + iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu")) if err != nil { t.Fatalf("iterator error: %v", err) + } else if iter == nil { + t.Fatal("nil iterator") } defer iter.Close() - elem, err := iter.Next() - if err != nil { - t.Fatal(err) - } - if elem.SeriesID == 0 { - t.Fatalf("series index mismatch: EOF, exp 2 series") + var gotKeys []string + expKeys := []string{"cpu,host=0", "cpu,host=A", "cpu,host=B", "cpu,host=C"} + + for { + elem, err := iter.Next() + if err != nil { + t.Fatal(err) + } + if elem.SeriesID == 0 { + break + } + + // Lookup series. + name, tags := e.sfile.Series(elem.SeriesID) + gotKeys = append(gotKeys, string(models.MakeKey(name, tags))) } - // Lookup series. - name, tags := e.sfile.Series(elem.SeriesID) - if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) { - t.Fatalf("series mismatch: got %s, exp %s", got, exp) - } - - if got, exp := tags, models.NewTags(map[string]string{"host": "0"}); !got.Equal(exp) { - t.Fatalf("series mismatch: got %s, exp %s", got, exp) - } - - if elem, err = iter.Next(); err != nil { - t.Fatal(err) - } - if elem.SeriesID == 0 { - t.Fatalf("series index mismatch: EOF, exp 2 series") - } - - // Lookup series. - name, tags = e.sfile.Series(elem.SeriesID) - if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) { - t.Fatalf("series mismatch: got %s, exp %s", got, exp) - } - - if got, exp := tags, models.NewTags(map[string]string{"host": "B"}); !got.Equal(exp) { - t.Fatalf("series mismatch: got %s, exp %s", got, exp) + if !reflect.DeepEqual(gotKeys, expKeys) { + t.Fatalf("got keys %v, expected %v", gotKeys, expKeys) } }) @@ -936,10 +1118,17 @@ func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) { // Create a few points. p1 := MustParsePointString("cpu,host=A value=1.1 1000000000") // Should not be deleted - e := MustOpenEngine(index) - defer e.Close() + e, err := NewEngine(index) + if err != nil { + t.Fatal(err) + } + // mock the planner so compactions don't run during the test e.CompactionPlan = &mockPlanner{} + if err := e.Open(); err != nil { + t.Fatal(err) + } + defer e.Close() for _, p := range []models.Point{p1} { if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil { @@ -960,7 +1149,7 @@ func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) { } itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}} - if err := e.DeleteSeriesRange(itr, 0, 0); err != nil { + if err := e.DeleteSeriesRange(itr, 0, 0, false); err != nil { t.Fatalf("failed to delete series: %v", err) } @@ -1010,12 +1199,18 @@ func TestEngine_LastModified(t *testing.T) { p2 := MustParsePointString("cpu,host=B value=1.2 2000000000") p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000") - e := MustOpenEngine(index) - defer e.Close() + e, err := NewEngine(index) + if err != nil { + t.Fatal(err) + } // mock the planner so compactions don't run during the test e.CompactionPlan = &mockPlanner{} e.SetEnabled(false) + if err := e.Open(); err != nil { + t.Fatal(err) + } + defer e.Close() if err := e.WritePoints([]models.Point{p1, p2, p3}); err != nil { t.Fatalf("failed to write points: %s", err.Error()) @@ -1038,7 +1233,7 @@ func TestEngine_LastModified(t *testing.T) { } itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}} - if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil { + if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64, false); err != nil { t.Fatalf("failed to delete series: %v", err) } @@ -1494,8 +1689,14 @@ func NewEngine(index string) (*Engine, error) { return nil, err } f.Close() - sfile := tsdb.NewSeriesFile(f.Name()) + + // If we're running on a 32-bit system then reduce the SeriesFile size, so we + // can address is in memory. + if runtime.GOARCH == "386" { + sfile.MaxSize = 1 << 27 // 128MB + } + if err = sfile.Open(); err != nil { return nil, err } @@ -1534,7 +1735,13 @@ func NewSeriesFile() *SeriesFile { } file.Close() - return &SeriesFile{SeriesFile: tsdb.NewSeriesFile(file.Name())} + s := &SeriesFile{SeriesFile: tsdb.NewSeriesFile(file.Name())} + // If we're running on a 32-bit system then reduce the SeriesFile size, so we + // can address is in memory. + if runtime.GOARCH == "386" { + s.SeriesFile.MaxSize = 1 << 27 // 128MB + } + return s } // MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error. diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index fae6a3b328..4db738c295 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -968,7 +968,8 @@ func (f *FileStore) CreateSnapshot() (string, error) { defer f.mu.RUnlock() // get a tmp directory name - tmpPath := fmt.Sprintf("%s/%d.%s", f.dir, f.currentTempDirID, TmpTSMFileExtension) + tmpPath := fmt.Sprintf("%d.%s", f.currentTempDirID, TmpTSMFileExtension) + tmpPath = filepath.Join(f.dir, tmpPath) err := os.Mkdir(tmpPath, 0777) if err != nil { return "", err diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index eda8985f2e..a82f22079e 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -100,7 +100,7 @@ const ( // The threshold amount data written before we periodically fsync a TSM file. This helps avoid // long pauses due to very large fsyncs at the end of writing a TSM file. - fsyncEvery = 512 * 1024 * 1024 + fsyncEvery = 25 * 1024 * 1024 ) var ( @@ -252,6 +252,11 @@ type indexBlock struct { entries *indexEntries } +type syncer interface { + Name() string + Sync() error +} + // directIndex is a simple in-memory index implementation for a TSM file. The full index // must fit in memory. type directIndex struct { @@ -263,6 +268,8 @@ type directIndex struct { fd *os.File buf *bytes.Buffer + f syncer + w *bufio.Writer key []byte @@ -367,6 +374,48 @@ func (d *directIndex) KeyCount() int { return d.keyCount } +// copyBuffer is the actual implementation of Copy and CopyBuffer. +// if buf is nil, one is allocated. This is copied from the Go stdlib +// in order to remove the fast path WriteTo calls which circumvent any +// IO throttling as well as to add periodic fsyncs to avoid long stalls. +func copyBuffer(f syncer, dst io.Writer, src io.Reader, buf []byte) (written int64, err error) { + if buf == nil { + buf = make([]byte, 32*1024) + } + var lastSync int64 + for { + nr, er := src.Read(buf) + if nr > 0 { + nw, ew := dst.Write(buf[0:nr]) + if nw > 0 { + written += int64(nw) + } + + if written-lastSync > fsyncEvery { + if err := f.Sync(); err != nil { + return 0, err + } + lastSync = written + } + if ew != nil { + err = ew + break + } + if nr != nw { + err = io.ErrShortWrite + break + } + } + if er != nil { + if er != io.EOF { + err = er + } + break + } + } + return written, err +} + func (d *directIndex) WriteTo(w io.Writer) (int64, error) { if _, err := d.flush(d.w); err != nil { return 0, err @@ -377,7 +426,7 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) { } if d.fd == nil { - return io.Copy(w, d.buf) + return copyBuffer(d.f, w, d.buf, nil) } if _, err := d.fd.Seek(0, io.SeekStart); err != nil { @@ -518,7 +567,7 @@ func NewTSMWriter(w io.Writer) (TSMWriter, error) { func NewTSMWriterWithDiskBuffer(w io.Writer) (TSMWriter, error) { var index IndexWriter // Make sure is a File so we can write the temp index alongside it. - if fw, ok := w.(*os.File); ok { + if fw, ok := w.(syncer); ok { f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return nil, err @@ -664,6 +713,12 @@ func (t *tsmWriter) WriteIndex() error { return ErrNoValues } + // Set the destination file on the index so we can periodically + // fsync while writing the index. + if f, ok := t.wrapped.(syncer); ok { + t.index.(*directIndex).f = f + } + // Write the index if _, err := t.index.WriteTo(t.w); err != nil { return err diff --git a/tsdb/index.go b/tsdb/index.go index 864d43b1ce..c93a147808 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -1368,9 +1368,12 @@ func (is IndexSet) measurementAuthorizedSeries(auth query.Authorizer, name []byt return true } - // TODO(edd) there isn't a need to return an error when instantiating the iterator. - sitr, _ := is.MeasurementSeriesIDIterator(name) + sitr, err := is.MeasurementSeriesIDIterator(name) + if err != nil || sitr == nil { + return false + } defer sitr.Close() + for { series, err := sitr.Next() if err != nil { @@ -1504,7 +1507,7 @@ func (is IndexSet) MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, e a = append(a, itr) } } - return MergeSeriesIDIterators(a...), nil + return FilterUndeletedSeriesIDIterator(is.SeriesFile, MergeSeriesIDIterators(a...)), nil } // ForEachMeasurementTagKey iterates over all tag keys in a measurement and applies @@ -1559,7 +1562,7 @@ func (is IndexSet) TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, e a = append(a, itr) } } - return MergeSeriesIDIterators(a...), nil + return FilterUndeletedSeriesIDIterator(is.SeriesFile, MergeSeriesIDIterators(a...)), nil } // TagValueSeriesIDIterator returns a series iterator for a single tag value. @@ -1574,7 +1577,7 @@ func (is IndexSet) TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIt a = append(a, itr) } } - return MergeSeriesIDIterators(a...), nil + return FilterUndeletedSeriesIDIterator(is.SeriesFile, MergeSeriesIDIterators(a...)), nil } // MeasurementSeriesByExprIterator returns a series iterator for a measurement @@ -1586,7 +1589,12 @@ func (is IndexSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Ex return is.MeasurementSeriesIDIterator(name) } fieldset := is.FieldSet() - return is.seriesByExprIterator(name, expr, fieldset.CreateFieldsIfNotExists(name)) + + itr, err := is.seriesByExprIterator(name, expr, fieldset.CreateFieldsIfNotExists(name)) + if err != nil { + return nil, err + } + return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil } // MeasurementSeriesKeysByExpr returns a list of series keys matching expr. @@ -1997,6 +2005,7 @@ func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, name []byte, key } else if itr == nil { return nil, nil } + itr = FilterUndeletedSeriesIDIterator(is.SeriesFile, itr) defer itr.Close() keyIdxs := make(map[string]int, len(keys)) diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index 823f580a0e..010038a1a0 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -457,6 +457,9 @@ func (i *Index) TagsForSeries(key string) (models.Tags, error) { // MeasurementNamesByExpr takes an expression containing only tags and returns a // list of matching measurement names. +// +// TODO(edd): Remove authorisation from these methods. There shouldn't need to +// be any auth passed down into the index. func (i *Index) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { i.mu.RLock() defer i.mu.RUnlock() @@ -603,7 +606,14 @@ func (i *Index) measurementNamesByTagFilters(auth query.Authorizer, filter *TagF // Is there a series with this matching tag value that is // authorized to be read? for _, sid := range seriesIDs { - if s := m.SeriesByID(sid); s != nil && auth.AuthorizeSeriesRead(i.database, m.name, s.Tags()) { + s := m.SeriesByID(sid) + + // If the series is deleted then it can't be used to authorise against. + if s != nil && s.Deleted() { + continue + } + + if s != nil && auth.AuthorizeSeriesRead(i.database, m.name, s.Tags()) { // The Range call can return early as a matching // tag value with an authorized series has been found. authorized = true @@ -705,7 +715,6 @@ func (i *Index) DropSeries(key []byte, ts int64) error { // Remove the measurement's reference. series.Measurement().DropSeries(series) - // Mark the series as deleted. series.Delete(ts) @@ -748,11 +757,15 @@ func (i *Index) SeriesKeys() []string { // SetFieldSet sets a shared field set from the engine. func (i *Index) SetFieldSet(fieldset *tsdb.MeasurementFieldSet) { + i.mu.Lock() + defer i.mu.Unlock() i.fieldset = fieldset } // FieldSet returns the assigned fieldset. func (i *Index) FieldSet() *tsdb.MeasurementFieldSet { + i.mu.RLock() + defer i.mu.RUnlock() return i.fieldset } diff --git a/tsdb/index/inmem/meta.go b/tsdb/index/inmem/meta.go index 0bf9d04f42..7481097a82 100644 --- a/tsdb/index/inmem/meta.go +++ b/tsdb/index/inmem/meta.go @@ -55,12 +55,8 @@ func NewMeasurement(database, name string) *Measurement { // Authorized determines if this Measurement is authorized to be read, according // to the provided Authorizer. A measurement is authorized to be read if at -// least one series from the measurement is authorized to be read. +// least one undeleted series from the measurement is authorized to be read. func (m *Measurement) Authorized(auth query.Authorizer) bool { - if auth == nil { - return true - } - // Note(edd): the cost of this check scales linearly with the number of series // belonging to a measurement, which means it may become expensive when there // are large numbers of series on a measurement. @@ -68,7 +64,11 @@ func (m *Measurement) Authorized(auth query.Authorizer) bool { // In the future we might want to push the set of series down into the // authorizer, but that will require an API change. for _, s := range m.SeriesByIDMap() { - if auth.AuthorizeSeriesRead(m.database, m.name, s.tags) { + if s != nil && s.Deleted() { + continue + } + + if auth == nil || auth.AuthorizeSeriesRead(m.database, m.name, s.tags) { return true } } diff --git a/tsdb/index/tsi1/file_set.go b/tsdb/index/tsi1/file_set.go index 4f62ab4cc1..1987c76c88 100644 --- a/tsdb/index/tsi1/file_set.go +++ b/tsdb/index/tsi1/file_set.go @@ -472,7 +472,7 @@ func (itr *fileSetSeriesIDIterator) Next() (tsdb.SeriesIDElem, error) { func (itr *fileSetSeriesIDIterator) Close() error { itr.once.Do(func() { itr.fs.Release() }) - return nil + return itr.itr.Close() } // fileSetMeasurementIterator attaches a fileset to an iterator that is released on close. @@ -492,7 +492,7 @@ func (itr *fileSetMeasurementIterator) Next() ([]byte, error) { func (itr *fileSetMeasurementIterator) Close() error { itr.once.Do(func() { itr.fs.Release() }) - return nil + return itr.itr.Close() } // fileSetTagKeyIterator attaches a fileset to an iterator that is released on close. @@ -512,7 +512,7 @@ func (itr *fileSetTagKeyIterator) Next() ([]byte, error) { func (itr *fileSetTagKeyIterator) Close() error { itr.once.Do(func() { itr.fs.Release() }) - return nil + return itr.itr.Close() } // fileSetTagValueIterator attaches a fileset to an iterator that is released on close. @@ -532,5 +532,5 @@ func (itr *fileSetTagValueIterator) Next() ([]byte, error) { func (itr *fileSetTagValueIterator) Close() error { itr.once.Do(func() { itr.fs.Release() }) - return nil + return itr.itr.Close() } diff --git a/tsdb/index/tsi1/file_set_test.go b/tsdb/index/tsi1/file_set_test.go index 31a43bb2b5..be396ebb42 100644 --- a/tsdb/index/tsi1/file_set_test.go +++ b/tsdb/index/tsi1/file_set_test.go @@ -25,7 +25,10 @@ func TestFileSet_SeriesIDIterator(t *testing.T) { // Verify initial set of series. idx.Run(t, func(t *testing.T) { - fs := idx.PartitionAt(0).RetainFileSet() + fs, err := idx.PartitionAt(0).RetainFileSet() + if err != nil { + t.Fatal(err) + } defer fs.Release() itr := fs.SeriesFile().SeriesIDIterator() @@ -66,7 +69,10 @@ func TestFileSet_SeriesIDIterator(t *testing.T) { // Verify additional series. idx.Run(t, func(t *testing.T) { - fs := idx.PartitionAt(0).RetainFileSet() + fs, err := idx.PartitionAt(0).RetainFileSet() + if err != nil { + t.Fatal(err) + } defer fs.Release() itr := fs.SeriesFile().SeriesIDIterator() @@ -128,7 +134,10 @@ func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) { // Verify initial set of series. idx.Run(t, func(t *testing.T) { - fs := idx.PartitionAt(0).RetainFileSet() + fs, err := idx.PartitionAt(0).RetainFileSet() + if err != nil { + t.Fatal(err) + } defer fs.Release() itr := fs.MeasurementSeriesIDIterator([]byte("cpu")) @@ -163,7 +172,10 @@ func TestFileSet_MeasurementSeriesIDIterator(t *testing.T) { // Verify additional series. idx.Run(t, func(t *testing.T) { - fs := idx.PartitionAt(0).RetainFileSet() + fs, err := idx.PartitionAt(0).RetainFileSet() + if err != nil { + t.Fatal(err) + } defer fs.Release() itr := fs.MeasurementSeriesIDIterator([]byte("cpu")) @@ -212,7 +224,10 @@ func TestFileSet_MeasurementIterator(t *testing.T) { // Verify initial set of series. idx.Run(t, func(t *testing.T) { - fs := idx.PartitionAt(0).RetainFileSet() + fs, err := idx.PartitionAt(0).RetainFileSet() + if err != nil { + t.Fatal(err) + } defer fs.Release() itr := fs.MeasurementIterator() @@ -239,7 +254,10 @@ func TestFileSet_MeasurementIterator(t *testing.T) { // Verify additional series. idx.Run(t, func(t *testing.T) { - fs := idx.PartitionAt(0).RetainFileSet() + fs, err := idx.PartitionAt(0).RetainFileSet() + if err != nil { + t.Fatal(err) + } defer fs.Release() itr := fs.MeasurementIterator() @@ -278,7 +296,10 @@ func TestFileSet_TagKeyIterator(t *testing.T) { // Verify initial set of series. idx.Run(t, func(t *testing.T) { - fs := idx.PartitionAt(0).RetainFileSet() + fs, err := idx.PartitionAt(0).RetainFileSet() + if err != nil { + t.Fatal(err) + } defer fs.Release() itr := fs.TagKeyIterator([]byte("cpu")) @@ -305,7 +326,10 @@ func TestFileSet_TagKeyIterator(t *testing.T) { // Verify additional series. idx.Run(t, func(t *testing.T) { - fs := idx.PartitionAt(0).RetainFileSet() + fs, err := idx.PartitionAt(0).RetainFileSet() + if err != nil { + t.Fatal(err) + } defer fs.Release() itr := fs.TagKeyIterator([]byte("cpu")) @@ -324,66 +348,3 @@ func TestFileSet_TagKeyIterator(t *testing.T) { } }) } - -/* -var ( - byteSliceResult [][]byte - tagsSliceResult []models.Tags -) - -func BenchmarkFileset_FilterNamesTags(b *testing.B) { - sfile := MustOpenSeriesFile() - defer sfile.Close() - - idx := MustOpenIndex(sfile.SeriesFile, 1) - defer idx.Close() - - allNames := make([][]byte, 0, 2000*1000) - allTags := make([]models.Tags, 0, 2000*1000) - - for i := 0; i < 2000; i++ { - for j := 0; j < 1000; j++ { - name := []byte(fmt.Sprintf("measurement-%d", i)) - tags := models.NewTags(map[string]string{"host": fmt.Sprintf("server-%d", j)}) - allNames = append(allNames, name) - allTags = append(allTags, tags) - } - } - - if err := idx.CreateSeriesListIfNotExists(nil, allNames, allTags); err != nil { - b.Fatal(err) - } - // idx.CheckFastCompaction() - - fs := idx.PartitionAt(0).RetainFileSet() - defer fs.Release() - - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - b.StopTimer() - names := [][]byte{ - []byte("foo"), - []byte("measurement-222"), // filtered - []byte("measurement-222"), // kept (tags won't match) - []byte("measurements-1"), - []byte("measurement-900"), // filtered - []byte("measurement-44444"), - []byte("bar"), - } - - tags := []models.Tags{ - nil, - models.NewTags(map[string]string{"host": "server-297"}), // filtered - models.NewTags(map[string]string{"host": "wrong"}), - nil, - models.NewTags(map[string]string{"host": "server-1026"}), // filtered - models.NewTags(map[string]string{"host": "server-23"}), // kept (measurement won't match) - models.NewTags(map[string]string{"host": "zoo"}), - } - b.StartTimer() - byteSliceResult, tagsSliceResult = fs.FilterNamesTags(names, tags) - } -} -*/ diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 3698a74d30..cb1041e98a 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -46,7 +46,7 @@ func init() { // NOTE: Currently, this must not be change once a database is created. Further, // it must also be a power of 2. // -var DefaultPartitionN uint64 = 16 +var DefaultPartitionN uint64 = 8 // An IndexOption is a functional option for changing the configuration of // an Index. @@ -271,33 +271,13 @@ func (i *Index) FieldSet() *tsdb.MeasurementFieldSet { } // ForEachMeasurementName iterates over all measurement names in the index, -// applying fn. Note, the provided function may be called concurrently, and it -// must be safe to do so. +// applying fn. It returns the first error encountered, if any. // -// It returns the first error encountered, if any. +// ForEachMeasurementName does not call fn on each partition concurrently so the +// call may provide a non-goroutine safe fn. func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error { - n := i.availableThreads() - - // Store results. - errC := make(chan error, i.PartitionN) - - // Run fn on each partition using a fixed number of goroutines. - var pidx uint32 // Index of maximum Partition being worked on. - for k := 0; k < n; k++ { - go func() { - for { - idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on. - if idx >= len(i.partitions) { - return // No more work. - } - errC <- i.partitions[idx].ForEachMeasurementName(fn) - } - }() - } - - // Check for error - for i := 0; i < cap(errC); i++ { - if err := <-errC; err != nil { + for _, p := range i.partitions { + if err := p.ForEachMeasurementName(fn); err != nil { return err } } @@ -730,7 +710,6 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s errC := make(chan error, i.PartitionN) var pidx uint32 // Index of maximum Partition being worked on. - var err error for k := 0; k < n; k++ { go func() { for { @@ -741,7 +720,8 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s // This is safe since there are no readers on keys until all // the writers are done. - keys[idx], err = i.partitions[idx].MeasurementTagKeysByExpr(name, expr) + tagKeys, err := i.partitions[idx].MeasurementTagKeysByExpr(name, expr) + keys[idx] = tagKeys errC <- err } }() @@ -766,7 +746,11 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s // DiskSizeBytes returns the size of the index on disk. func (i *Index) DiskSizeBytes() int64 { - fs := i.RetainFileSet() + fs, err := i.RetainFileSet() + if err != nil { + i.logger.Warn("Index is closing down") + return 0 + } defer fs.Release() var manifestSize int64 @@ -810,16 +794,20 @@ func (i *Index) SnapshotTo(path string) error { // RetainFileSet returns the set of all files across all partitions. // This is only needed when all files need to be retained for an operation. -func (i *Index) RetainFileSet() *FileSet { +func (i *Index) RetainFileSet() (*FileSet, error) { i.mu.RLock() defer i.mu.RUnlock() fs, _ := NewFileSet(i.database, nil, i.sfile, nil) for _, p := range i.partitions { - pfs := p.RetainFileSet() + pfs, err := p.RetainFileSet() + if err != nil { + fs.Close() + return nil, err + } fs.files = append(fs.files, pfs.files...) } - return fs + return fs, nil } func (i *Index) SetFieldName(measurement []byte, name string) {} diff --git a/tsdb/index/tsi1/index_test.go b/tsdb/index/tsi1/index_test.go index bc1da96bf4..4f92e1a71b 100644 --- a/tsdb/index/tsi1/index_test.go +++ b/tsdb/index/tsi1/index_test.go @@ -188,7 +188,10 @@ func TestIndex_DropMeasurement(t *testing.T) { } // Obtain file set to perform lower level checks. - fs := idx.PartitionAt(0).RetainFileSet() + fs, err := idx.PartitionAt(0).RetainFileSet() + if err != nil { + t.Fatal(err) + } defer fs.Release() // Verify tags & values are gone. @@ -288,9 +291,9 @@ func TestIndex_Manifest(t *testing.T) { func TestIndex_DiskSizeBytes(t *testing.T) { sfile := MustOpenSeriesFile() - // defer sfile.Close() + defer sfile.Close() idx := MustOpenIndex(sfile.SeriesFile, tsi1.DefaultPartitionN) - // defer idx.Close() + defer idx.Close() // Add series to index. if err := idx.CreateSeriesSliceIfNotExists([]Series{ @@ -301,9 +304,14 @@ func TestIndex_DiskSizeBytes(t *testing.T) { }); err != nil { t.Fatal(err) } - fmt.Println(idx.Path()) + // Verify on disk size is the same in each stage. - expSize := int64(520) // 419 bytes for MANIFEST and 101 bytes for index file + // There are four series, and each series id is 8 bytes plus one byte for the tombstone header + expSize := int64(4 * 9) + + // Each MANIFEST file is 419 bytes and there are tsi1.DefaultPartitionN of them + expSize += int64(tsi1.DefaultPartitionN * 419) + idx.Run(t, func(t *testing.T) { if got, exp := idx.DiskSizeBytes(), expSize; got != exp { t.Fatalf("got %d bytes, expected %d", got, exp) diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index e50ee8736b..6b03c627dd 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -55,7 +55,7 @@ type Partition struct { // Close management. once sync.Once - closing chan struct{} + closing chan struct{} // closing is used to inform iterators the partition is closing. wg sync.WaitGroup // Fieldset shared with engine. @@ -88,9 +88,8 @@ type Partition struct { func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition { return &Partition{ closing: make(chan struct{}), - - path: path, - sfile: sfile, + path: path, + sfile: sfile, // Default compaction thresholds. MaxLogFileSize: DefaultMaxLogFileSize, @@ -110,6 +109,8 @@ func (i *Partition) Open() error { i.mu.Lock() defer i.mu.Unlock() + i.closing = make(chan struct{}) + if i.opened { return errors.New("index partition already open") } @@ -258,13 +259,14 @@ func (i *Partition) Wait() { // Close closes the index. func (i *Partition) Close() error { // Wait for goroutines to finish outstanding compactions. - i.once.Do(func() { close(i.closing) }) i.wg.Wait() // Lock index and close remaining i.mu.Lock() defer i.mu.Unlock() + i.once.Do(func() { close(i.closing) }) + // Close log files. for _, f := range i.fileSet.files { f.Close() @@ -274,6 +276,17 @@ func (i *Partition) Close() error { return nil } +// closing returns true if the partition is currently closing. It does not require +// a lock so will always return to callers. +// func (i *Partition) closing() bool { +// select { +// case <-i.closing: +// return true +// default: +// return false +// } +// } + // Path returns the path to the partition. func (i *Partition) Path() string { return i.path } @@ -334,11 +347,15 @@ func (i *Partition) FieldSet() *tsdb.MeasurementFieldSet { } // RetainFileSet returns the current fileset and adds a reference count. -func (i *Partition) RetainFileSet() *FileSet { - i.mu.RLock() - fs := i.retainFileSet() - i.mu.RUnlock() - return fs +func (i *Partition) RetainFileSet() (*FileSet, error) { + select { + case <-i.closing: + return nil, errors.New("index is closing") + default: + i.mu.RLock() + defer i.mu.RUnlock() + return i.retainFileSet(), nil + } } func (i *Partition) retainFileSet() *FileSet { @@ -374,7 +391,10 @@ func (i *Partition) prependActiveLogFile() error { // ForEachMeasurementName iterates over all measurement names in the index. func (i *Partition) ForEachMeasurementName(fn func(name []byte) error) error { - fs := i.RetainFileSet() + fs, err := i.RetainFileSet() + if err != nil { + return err + } defer fs.Release() itr := fs.MeasurementIterator() @@ -393,7 +413,10 @@ func (i *Partition) ForEachMeasurementName(fn func(name []byte) error) error { // MeasurementIterator returns an iterator over all measurement names. func (i *Partition) MeasurementIterator() (tsdb.MeasurementIterator, error) { - fs := i.RetainFileSet() + fs, err := i.RetainFileSet() + if err != nil { + return nil, err + } itr := fs.MeasurementIterator() if itr == nil { fs.Release() @@ -404,14 +427,20 @@ func (i *Partition) MeasurementIterator() (tsdb.MeasurementIterator, error) { // MeasurementExists returns true if a measurement exists. func (i *Partition) MeasurementExists(name []byte) (bool, error) { - fs := i.RetainFileSet() + fs, err := i.RetainFileSet() + if err != nil { + return false, err + } defer fs.Release() m := fs.Measurement(name) return m != nil && !m.Deleted(), nil } func (i *Partition) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { - fs := i.RetainFileSet() + fs, err := i.RetainFileSet() + if err != nil { + return nil, err + } defer fs.Release() itr := fs.MeasurementIterator() @@ -430,13 +459,19 @@ func (i *Partition) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) } func (i *Partition) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator, error) { - fs := i.RetainFileSet() + fs, err := i.RetainFileSet() + if err != nil { + return nil, err + } return newFileSetSeriesIDIterator(fs, fs.MeasurementSeriesIDIterator(name)), nil } // DropMeasurement deletes a measurement from the index. func (i *Partition) DropMeasurement(name []byte) error { - fs := i.RetainFileSet() + fs, err := i.RetainFileSet() + if err != nil { + return err + } defer fs.Release() // Delete all keys and values. @@ -514,7 +549,10 @@ func (i *Partition) DropMeasurement(name []byte) error { // bulk. func (i *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags) error { // Maintain reference count on files in file set. - fs := i.RetainFileSet() + fs, err := i.RetainFileSet() + if err != nil { + return err + } defer fs.Release() // Ensure fileset cannot change during insert. @@ -540,7 +578,6 @@ func (i *Partition) DropSeries(key []byte, ts int64) error { mname := []byte(name) seriesID := i.sfile.SeriesID(mname, tags, nil) - if err := i.sfile.DeleteSeriesID(seriesID); err != nil { return err } @@ -550,7 +587,7 @@ func (i *Partition) DropSeries(key []byte, ts int64) error { return err } - // Swap log file, if necesssary. + // Swap log file, if necessary. if err := i.CheckLogFile(); err != nil { return err } @@ -560,28 +597,41 @@ func (i *Partition) DropSeries(key []byte, ts int64) error { // MeasurementsSketches returns the two sketches for the index by merging all // instances of the type sketch types in all the index files. func (i *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { - fs := i.RetainFileSet() + fs, err := i.RetainFileSet() + if err != nil { + return nil, nil, err + } defer fs.Release() return fs.MeasurementsSketches() } // HasTagKey returns true if tag key exists. func (i *Partition) HasTagKey(name, key []byte) (bool, error) { - fs := i.RetainFileSet() + fs, err := i.RetainFileSet() + if err != nil { + return false, err + } defer fs.Release() return fs.HasTagKey(name, key), nil } // HasTagValue returns true if tag value exists. func (i *Partition) HasTagValue(name, key, value []byte) (bool, error) { - fs := i.RetainFileSet() + fs, err := i.RetainFileSet() + if err != nil { + return false, err + } defer fs.Release() return fs.HasTagValue(name, key, value), nil } // TagKeyIterator returns an iterator for all keys across a single measurement. func (i *Partition) TagKeyIterator(name []byte) tsdb.TagKeyIterator { - fs := i.RetainFileSet() + fs, err := i.RetainFileSet() + if err != nil { + return nil // TODO(edd): this should probably return an error. + } + itr := fs.TagKeyIterator(name) if itr == nil { fs.Release() @@ -592,7 +642,11 @@ func (i *Partition) TagKeyIterator(name []byte) tsdb.TagKeyIterator { // TagValueIterator returns an iterator for all values across a single key. func (i *Partition) TagValueIterator(name, key []byte) tsdb.TagValueIterator { - fs := i.RetainFileSet() + fs, err := i.RetainFileSet() + if err != nil { + return nil // TODO(edd): this should probably return an error. + } + itr := fs.TagValueIterator(name, key) if itr == nil { fs.Release() @@ -603,7 +657,11 @@ func (i *Partition) TagValueIterator(name, key []byte) tsdb.TagValueIterator { // TagKeySeriesIDIterator returns a series iterator for all values across a single key. func (i *Partition) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator { - fs := i.RetainFileSet() + fs, err := i.RetainFileSet() + if err != nil { + return nil // TODO(edd): this should probably return an error. + } + itr := fs.TagKeySeriesIDIterator(name, key) if itr == nil { fs.Release() @@ -614,7 +672,11 @@ func (i *Partition) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterat // TagValueSeriesIDIterator returns a series iterator for a single key value. func (i *Partition) TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator { - fs := i.RetainFileSet() + fs, err := i.RetainFileSet() + if err != nil { + return nil // TODO(edd): this should probably return an error. + } + itr := fs.TagValueSeriesIDIterator(name, key, value) if itr == nil { fs.Release() @@ -625,14 +687,21 @@ func (i *Partition) TagValueSeriesIDIterator(name, key, value []byte) tsdb.Serie // MeasurementTagKeysByExpr extracts the tag keys wanted by the expression. func (i *Partition) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { - fs := i.RetainFileSet() + fs, err := i.RetainFileSet() + if err != nil { + return nil, err + } defer fs.Release() + return fs.MeasurementTagKeysByExpr(name, expr) } // ForEachMeasurementTagKey iterates over all tag keys in a measurement. func (i *Partition) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error { - fs := i.RetainFileSet() + fs, err := i.RetainFileSet() + if err != nil { + return err + } defer fs.Release() itr := fs.TagKeyIterator(name) @@ -655,34 +724,6 @@ func (i *Partition) TagKeyCardinality(name, key []byte) int { return 0 } -/* -func (i *Partition) MeasurementSeriesKeysByExprIterator(name []byte, condition influxql.Expr) (tsdb.SeriesIDIterator, error) { - fs := i.RetainFileSet() - defer fs.Release() - - itr, err := fs.MeasurementSeriesByExprIterator(name, condition, i.fieldset) - if err != nil { - return nil, err - } else if itr == nil { - return nil, nil - } - return itr, err -} -*/ - -/* -// MeasurementSeriesKeysByExpr returns a list of series keys matching expr. -func (i *Partition) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) { - fs := i.RetainFileSet() - defer fs.Release() - - keys, err := fs.MeasurementSeriesKeysByExpr(name, expr, i.fieldset) - - // Clone byte slices since they will be used after the fileset is released. - return bytesutil.CloneSlice(keys), err -} -*/ - // SnapshotTo creates hard links to the file set into path. func (i *Partition) SnapshotTo(path string) error { i.mu.Lock() @@ -720,11 +761,6 @@ func (i *Partition) SetFieldName(measurement []byte, name string) {} func (i *Partition) RemoveShard(shardID uint64) {} func (i *Partition) AssignShard(k string, shardID uint64) {} -func (i *Partition) UnassignShard(k string, shardID uint64, ts int64) error { - // This can be called directly once inmem is gone. - return i.DropSeries([]byte(k), ts) -} - // Compact requests a compaction of log files. func (i *Partition) Compact() { i.mu.Lock() diff --git a/tsdb/index/tsi1/tsi1_test.go b/tsdb/index/tsi1/tsi1_test.go index 234c4a1f5d..3c25abb288 100644 --- a/tsdb/index/tsi1/tsi1_test.go +++ b/tsdb/index/tsi1/tsi1_test.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "reflect" + "runtime" "testing" "github.com/influxdata/influxdb/models" @@ -295,7 +296,13 @@ func NewSeriesFile() *SeriesFile { } file.Close() - return &SeriesFile{SeriesFile: tsdb.NewSeriesFile(file.Name())} + s := &SeriesFile{SeriesFile: tsdb.NewSeriesFile(file.Name())} + // If we're running on a 32-bit system then reduce the SeriesFile size, so we + // can address is in memory. + if runtime.GOARCH == "386" { + s.SeriesFile.MaxSize = 1 << 27 // 128MB + } + return s } // MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error. diff --git a/tsdb/index_test.go b/tsdb/index_test.go index b9bcddee3b..fcb2625dec 100644 --- a/tsdb/index_test.go +++ b/tsdb/index_test.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "reflect" + "runtime" "testing" "github.com/influxdata/influxdb/internal" @@ -153,6 +154,12 @@ func MustNewIndex(index string) *Index { file.Close() sfile := tsdb.NewSeriesFile(file.Name()) + // If we're running on a 32-bit system then reduce the SeriesFile size, so we + // can address is in memory. + if runtime.GOARCH == "386" { + sfile.MaxSize = 1 << 27 // 128MB + } + if err := sfile.Open(); err != nil { panic(err) } diff --git a/tsdb/series_file.go b/tsdb/series_file.go index a25491c2fa..0cda71a2f5 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -31,9 +31,12 @@ const ( SeriesFileTombstoneFlag = 0x01 ) +// MaxSeriesFileHashSize is the maximum number of series in a single hash map. +const MaxSeriesFileHashSize = (1 << 20 * SeriesMapLoadFactor) / 100 // (1MB * 90) / 100 == ~943K + // SeriesMapThreshold is the number of series IDs to hold in the in-memory // series map before compacting and rebuilding the on-disk representation. -const SeriesMapThreshold = 1 << 22 // ~4M ids * 8 bytes per id == ~32MB +const SeriesMapThreshold = 1 << 25 // ~33M ids * 8 bytes per id == 256MB const ( // DefaultMaxSeriesFileSize is the maximum series file size. Assuming that each diff --git a/tsdb/series_file_386.go b/tsdb/series_file_386.go new file mode 100644 index 0000000000..cc5cd53df7 --- /dev/null +++ b/tsdb/series_file_386.go @@ -0,0 +1,5 @@ +package tsdb + +// DefaultMaxSeriesFileSize is the maximum series file size. Assuming that each +// series key takes, for example, 150 bytes, the limit would support ~3.5M series. +const DefaultMaxSeriesFileSize = (1 << 29) // 512MB diff --git a/tsdb/series_file_amd64.go b/tsdb/series_file_amd64.go new file mode 100644 index 0000000000..5c4770e496 --- /dev/null +++ b/tsdb/series_file_amd64.go @@ -0,0 +1,5 @@ +package tsdb + +// DefaultMaxSeriesFileSize is the maximum series file size. Assuming that each +// series key takes, for example, 150 bytes, the limit would support ~229M series. +const DefaultMaxSeriesFileSize = 32 * (1 << 30) // 32GB diff --git a/tsdb/series_file_test.go b/tsdb/series_file_test.go index 7cc464d805..824b3bd806 100644 --- a/tsdb/series_file_test.go +++ b/tsdb/series_file_test.go @@ -3,6 +3,7 @@ package tsdb_test import ( "io/ioutil" "os" + "runtime" "testing" "github.com/influxdata/influxdb/models" @@ -63,7 +64,13 @@ func NewSeriesFile() *SeriesFile { } file.Close() - return &SeriesFile{SeriesFile: tsdb.NewSeriesFile(file.Name())} + s := &SeriesFile{SeriesFile: tsdb.NewSeriesFile(file.Name())} + // If we're running on a 32-bit system then reduce the SeriesFile size, so we + // can address is in memory. + if runtime.GOARCH == "386" { + s.SeriesFile.MaxSize = 1 << 27 // 128MB + } + return s } // MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error. diff --git a/tsdb/shard.go b/tsdb/shard.go index 396f456792..4b1c8cc0c2 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -71,6 +71,10 @@ var ( // the file's magic number. ErrUnknownFieldsFormat = errors.New("unknown field index format") + // ErrShardNotIdle is returned when an operation requring the shard to be idle/cold is + // attempted on a hot shard. + ErrShardNotIdle = errors.New("shard not idle") + // fieldsIndexMagicNumber is the file magic number for the fields index file. fieldsIndexMagicNumber = []byte{0, 6, 1, 3} ) @@ -179,7 +183,7 @@ func NewShard(id uint64, path string, walPath string, sfile *SeriesFile, opt Eng return s } -// WithLogger sets the logger on the shard. +// WithLogger sets the logger on the shard. It must be called before Open. func (s *Shard) WithLogger(log *zap.Logger) { s.baseLogger = log engine, err := s.engine() @@ -430,12 +434,15 @@ func (s *Shard) UnloadIndex() { s.index.RemoveShard(s.id) } -// Index returns a reference to the underlying index. -// This should only be used by utilities and not directly accessed by the database. -func (s *Shard) Index() Index { +// Index returns a reference to the underlying index. It returns an error if +// the index is nil. +func (s *Shard) Index() (Index, error) { s.mu.RLock() defer s.mu.RUnlock() - return s.index + if err := s.ready(); err != nil { + return nil, err + } + return s.index, nil } // IsIdle return true if the shard is not receiving writes and is fully compacted. @@ -711,12 +718,12 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error } // DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive) -func (s *Shard) DeleteSeriesRange(itr SeriesIterator, min, max int64) error { +func (s *Shard) DeleteSeriesRange(itr SeriesIterator, min, max int64, removeIndex bool) error { engine, err := s.engine() if err != nil { return err } - return engine.DeleteSeriesRange(itr, min, max) + return engine.DeleteSeriesRange(itr, min, max, removeIndex) } // DeleteMeasurement deletes a measurement and all underlying series. @@ -767,7 +774,11 @@ func (s *Shard) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s // MeasurementTagKeyValuesByExpr returns all the tag keys values for the // provided expression. func (s *Shard) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { - indexSet := IndexSet{Indexes: []Index{s.index}, SeriesFile: s.sfile} + index, err := s.Index() + if err != nil { + return nil, err + } + indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile} return indexSet.MeasurementTagKeyValuesByExpr(auth, name, key, expr, keysSorted) } @@ -816,7 +827,11 @@ func (s *Shard) CreateIterator(ctx context.Context, m *influxql.Measurement, opt return NewFieldKeysIterator(s, opt) case "_series": // TODO(benbjohnson): Move up to the Shards.CreateIterator(). - indexSet := IndexSet{Indexes: []Index{s.index}, SeriesFile: s.sfile} + index, err := s.Index() + if err != nil { + return nil, err + } + indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile} return NewSeriesPointIterator(indexSet, engine.MeasurementFieldSet(), opt) case "_tagKeys": return NewTagKeysIterator(s, opt) @@ -842,6 +857,10 @@ func (s *Shard) FieldDimensions(measurements []string) (fields map[string]influx fields = make(map[string]influxql.DataType) dimensions = make(map[string]struct{}) + index, err := s.Index() + if err != nil { + return nil, nil, err + } for _, name := range measurements { // Handle system sources. if strings.HasPrefix(name, "_") { @@ -883,7 +902,7 @@ func (s *Shard) FieldDimensions(measurements []string) (fields map[string]influx } } - indexSet := IndexSet{Indexes: []Index{s.index}, SeriesFile: s.sfile} + indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile} if err := indexSet.ForEachMeasurementTagKey([]byte(name), func(key []byte) error { dimensions[string(key)] = struct{}{} return nil @@ -1093,6 +1112,22 @@ func (s *Shard) TagKeyCardinality(name, key []byte) int { return engine.TagKeyCardinality(name, key) } +// Digest returns a digest of the shard. +func (s *Shard) Digest() (io.ReadCloser, error) { + engine, err := s.engine() + if err != nil { + return nil, err + } + + // Make sure the shard is idle/cold. (No use creating a digest of a + // hot shard that is rapidly changing.) + if !engine.IsIdle() { + return nil, ErrShardNotIdle + } + + return engine.Digest() +} + // engine safely (under an RLock) returns a reference to the shard's Engine, or // an error if the Engine is closed, or the shard is currently disabled. // @@ -1647,10 +1682,15 @@ type Field struct { func NewFieldKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, error) { itr := &fieldKeysIterator{shard: sh} + index, err := sh.Index() + if err != nil { + return nil, err + } + // Retrieve measurements from shard. Filter if condition specified. // // FGA is currently not supported when retrieving field keys. - indexSet := IndexSet{Indexes: []Index{sh.index}, SeriesFile: sh.sfile} + indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile} names, err := indexSet.MeasurementNamesByExpr(query.OpenAuthorizer, opt.Condition) if err != nil { return nil, err @@ -1724,7 +1764,12 @@ func (itr *fieldKeysIterator) Next() (*query.FloatPoint, error) { // NewTagKeysIterator returns a new instance of TagKeysIterator. func NewTagKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, error) { fn := func(name []byte) ([][]byte, error) { - indexSet := IndexSet{Indexes: []Index{sh.index}, SeriesFile: sh.sfile} + index, err := sh.Index() + if err != nil { + return nil, err + } + + indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile} var keys [][]byte if err := indexSet.ForEachMeasurementTagKey(name, func(key []byte) error { keys = append(keys, key) @@ -1741,7 +1786,12 @@ func NewTagKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, e type measurementKeyFunc func(name []byte) ([][]byte, error) func newMeasurementKeysIterator(sh *Shard, fn measurementKeyFunc, opt query.IteratorOptions) (*measurementKeysIterator, error) { - indexSet := IndexSet{Indexes: []Index{sh.index}, SeriesFile: sh.sfile} + index, err := sh.Index() + if err != nil { + return nil, err + } + + indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile} itr := &measurementKeysIterator{fn: fn} names, err := indexSet.MeasurementNamesByExpr(opt.Authorizer, opt.Condition) if err != nil { diff --git a/tsdb/shard_internal_test.go b/tsdb/shard_internal_test.go index 1da93603e4..085f68cd6d 100644 --- a/tsdb/shard_internal_test.go +++ b/tsdb/shard_internal_test.go @@ -7,6 +7,7 @@ import ( "path" "path/filepath" "regexp" + "runtime" "sort" "strings" "testing" @@ -221,6 +222,12 @@ func NewTempShard(index string) *TempShard { // Create series file. sfile := NewSeriesFile(filepath.Join(dir, "db0", SeriesFileName)) + // If we're running on a 32-bit system then reduce the SeriesFile size, so we + // can address is in memory. + if runtime.GOARCH == "386" { + sfile.MaxSize = 1 << 27 // 128MB + } + if err := sfile.Open(); err != nil { panic(err) } diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 9d986b5e59..69e5ae5a8e 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -890,6 +890,58 @@ cpu,secret=foo value=100 0 if gotCount != expCount { return fmt.Errorf("got %d series, expected %d", gotCount, expCount) } + + // Delete series cpu,host=serverA,region=uswest + idx, err := sh.Index() + if err != nil { + return err + } + + if err := idx.DropSeries([]byte("cpu,host=serverA,region=uswest"), time.Now().UnixNano()); err != nil { + return err + } + + if itr, err = sh.CreateIterator(context.Background(), v.m, query.IteratorOptions{ + Aux: v.aux, + Ascending: true, + StartTime: influxql.MinTime, + EndTime: influxql.MaxTime, + Authorizer: seriesAuthorizer, + }); err != nil { + return err + } + + if itr == nil { + return fmt.Errorf("iterator is nil") + } + defer itr.Close() + + fitr = itr.(query.FloatIterator) + defer fitr.Close() + expCount = 1 + gotCount = 0 + for { + f, err := fitr.Next() + if err != nil { + return err + } + + if f == nil { + break + } + + if got := f.Aux[0].(string); strings.Contains(got, "secret") { + return fmt.Errorf("got a series %q that should be filtered", got) + } else if got := f.Aux[0].(string); strings.Contains(got, "serverA") { + return fmt.Errorf("got a series %q that should be filtered", got) + } + gotCount++ + } + + if gotCount != expCount { + return fmt.Errorf("got %d series, expected %d", gotCount, expCount) + } + return nil } @@ -1845,3 +1897,30 @@ func MustTempDir() (string, func()) { } return dir, func() { os.RemoveAll(dir) } } + +type seriesIterator struct { + keys [][]byte +} + +type series struct { + name []byte + tags models.Tags + deleted bool +} + +func (s series) Name() []byte { return s.name } +func (s series) Tags() models.Tags { return s.tags } +func (s series) Deleted() bool { return s.deleted } +func (s series) Expr() influxql.Expr { return nil } + +func (itr *seriesIterator) Close() error { return nil } + +func (itr *seriesIterator) Next() (tsdb.SeriesElem, error) { + if len(itr.keys) == 0 { + return nil, nil + } + name, tags := models.ParseKeyBytes(itr.keys[0]) + s := series{name: name, tags: tags} + itr.keys = itr.keys[1:] + return s, nil +} diff --git a/tsdb/store.go b/tsdb/store.go index 269850fd43..59fd169187 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -41,12 +41,12 @@ const SeriesFileName = "series" // Store manages shards and indexes for databases. type Store struct { - mu sync.RWMutex - shards map[uint64]*Shard - databases map[string]struct{} - sfiles map[string]*SeriesFile - - path string + mu sync.RWMutex + shards map[uint64]*Shard + databases map[string]struct{} + sfiles map[string]*SeriesFile + SeriesFileMaxSize int64 // Determines size of series file mmap. Can be altered in tests. + path string // shared per-database indexes, only if using "inmem". indexes map[string]interface{} @@ -188,6 +188,13 @@ func (s *Store) loadShards() error { s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim) + // Env var to disable throughput limiter. This will be moved to a config option in 1.5. + if os.Getenv("INFLUXDB_DATA_COMPACTION_THROUGHPUT") == "" { + s.EngineOptions.CompactionThroughputLimiter = limiter.NewRate(48*1024*1024, 48*1024*1024) + } else { + s.Logger.Info("Compaction throughput limit disabled") + } + t := limiter.NewFixed(runtime.GOMAXPROCS(0)) resC := make(chan *res) var n int @@ -325,18 +332,34 @@ func (s *Store) Close() error { } s.mu.Lock() + for _, sfile := range s.sfiles { + // Close out the series files. + if err := sfile.Close(); err != nil { + return err + } + } + s.shards = nil + s.sfiles = map[string]*SeriesFile{} s.opened = false // Store may now be opened again. s.mu.Unlock() return nil } +// openSeriesFile either returns or creates a series file for the provided +// database. It must be called under a full lock. func (s *Store) openSeriesFile(database string) (*SeriesFile, error) { if sfile := s.sfiles[database]; sfile != nil { return sfile, nil } sfile := NewSeriesFile(filepath.Join(s.path, database, SeriesFileName)) + // Set a custom mmap size if one has been specified, otherwise the default + // will be used. + if s.SeriesFileMaxSize > 0 { + sfile.MaxSize = s.SeriesFileMaxSize + } + if err := sfile.Open(); err != nil { return nil, err } @@ -344,6 +367,16 @@ func (s *Store) openSeriesFile(database string) (*SeriesFile, error) { return sfile, nil } +func (s *Store) seriesFile(database string) (*SeriesFile, error) { + s.mu.RLock() + defer s.mu.RUnlock() + sfile, ok := s.sfiles[database] + if !ok { + return nil, fmt.Errorf("no series file present for database %q", database) + } + return sfile, nil +} + // createIndexIfNotExists returns a shared index for a database, if the inmem // index is being used. If the TSI index is being used, then this method is // basically a no-op. @@ -404,6 +437,16 @@ func (s *Store) ShardN() int { return len(s.shards) } +// ShardDigest returns a digest of the shard with the specified ID. +func (s *Store) ShardDigest(id uint64) (io.ReadCloser, error) { + sh := s.Shard(id) + if sh == nil { + return nil, ErrShardNotFound + } + + return sh.Digest() +} + // CreateShard creates a shard with the given id and retention policy on a database. func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error { s.mu.Lock() @@ -799,7 +842,12 @@ func (s *Store) SeriesCardinality(database string) (int64, error) { // TODO(benbjohnson): Series file will be shared by the DB. var max int64 for _, shard := range shards { - if n := shard.Index().SeriesN(); n > max { + index, err := shard.Index() + if err != nil { + return 0, err + } + + if n := index.SeriesN(); n > max { max = n } } @@ -893,7 +941,7 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) { // DeleteSeries loops through the local shards and deletes the series data for // the passed in series keys. -func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error { +func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr, removeIndex bool) error { // Expand regex expressions in the FROM clause. a, err := s.ExpandSources(sources) if err != nil { @@ -925,6 +973,9 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi defer s.mu.RUnlock() sfile := s.sfiles[database] + if sfile == nil { + return fmt.Errorf("unable to locate series file for database: %q", database) + } shards := s.filterShards(byDatabase(database)) // Limit to 1 delete for each shard since expanding the measurement into the list @@ -952,7 +1003,12 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi limit.Take() defer limit.Release() - indexSet := IndexSet{Indexes: []Index{sh.index}, SeriesFile: sfile} + index, err := sh.Index() + if err != nil { + return err + } + + indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sfile} // Find matching series keys for each measurement. for _, name := range names { itr, err := indexSet.MeasurementSeriesByExprIterator([]byte(name), condition) @@ -962,8 +1018,7 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi continue } defer itr.Close() - - if err := sh.DeleteSeriesRange(NewSeriesIteratorAdapter(sfile, itr), min, max); err != nil { + if err := sh.DeleteSeriesRange(NewSeriesIteratorAdapter(sfile, itr), min, max, removeIndex); err != nil { return err } @@ -1018,12 +1073,19 @@ func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond in shards := s.filterShards(byDatabase(database)) s.mu.RUnlock() + sfile, err := s.seriesFile(database) + if err != nil { + return nil, err + } + // Build indexset. - is := IndexSet{Indexes: make([]Index, 0, len(shards)), SeriesFile: s.sfiles[database]} + is := IndexSet{Indexes: make([]Index, 0, len(shards)), SeriesFile: sfile} for _, sh := range shards { - if sh.index != nil { - is.Indexes = append(is.Indexes, sh.index) + index, err := sh.Index() + if err != nil { + return nil, err } + is.Indexes = append(is.Indexes, index) } is = is.DedupeInmemIndexes() return is.MeasurementNamesByExpr(auth, cond) @@ -1539,16 +1601,31 @@ func (s *Store) monitorShards() { databases[db] = struct{}{} dbLock.Unlock() + sfile, err := s.seriesFile(sh.database) + if err != nil { + return err + } + + firstShardIndex, err := sh.Index() + if err != nil { + return err + } + + index, err := sh.Index() + if err != nil { + return err + } + // inmem shards share the same index instance so just use the first one to avoid // allocating the same measurements repeatedly - indexSet := IndexSet{Indexes: []Index{shards[0].index}, SeriesFile: s.sfiles[db]} + indexSet := IndexSet{Indexes: []Index{firstShardIndex}, SeriesFile: sfile} names, err := indexSet.MeasurementNamesByExpr(nil, nil) if err != nil { s.Logger.Warn("cannot retrieve measurement names", zap.Error(err)) return nil } - indexSet.Indexes = []Index{sh.Index()} + indexSet.Indexes = []Index{index} for _, name := range names { indexSet.ForEachMeasurementTagKey(name, func(k []byte) error { n := sh.TagKeyCardinality(name, k) diff --git a/tsdb/store_test.go b/tsdb/store_test.go index c004583923..58202b969f 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -11,6 +11,7 @@ import ( "path/filepath" "reflect" "regexp" + "runtime" "sort" "strings" "testing" @@ -561,7 +562,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) { } for _, name := range mnames { - if err := store.DeleteSeries("db", []influxql.Source{&influxql.Measurement{Name: string(name)}}, nil); err != nil { + if err := store.DeleteSeries("db", []influxql.Source{&influxql.Measurement{Name: string(name)}}, nil, true); err != nil { t.Fatal(err) } } @@ -769,7 +770,7 @@ func TestStore_Cardinality_Duplicates(t *testing.T) { // Creates a large number of series in multiple shards, which will force // compactions to occur. -func testStoreCardinalityCompactions(t *testing.T, store *Store) { +func testStoreCardinalityCompactions(store *Store) error { // Generate point data to write to the shards. series := genTestSeries(300, 5, 5) // 937,500 series @@ -784,44 +785,43 @@ func testStoreCardinalityCompactions(t *testing.T, store *Store) { // shards such that we never write the same series to multiple shards. for shardID := 0; shardID < 2; shardID++ { if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil { - t.Fatalf("create shard: %s", err) + return fmt.Errorf("create shard: %s", err) } if err := store.BatchWrite(shardID, points[shardID*468750:(shardID+1)*468750]); err != nil { - t.Fatalf("batch write: %s", err) + return fmt.Errorf("batch write: %s", err) } } // Estimate the series cardinality... cardinality, err := store.Store.SeriesCardinality("db") if err != nil { - t.Fatal(err) + return err } // Estimated cardinality should be well within 1.5% of the actual cardinality. if got, exp := math.Abs(float64(cardinality)-float64(expCardinality))/float64(expCardinality), 0.015; got > exp { - t.Errorf("got epsilon of %v for series cardinality %v (expected %v), which is larger than expected %v", got, cardinality, expCardinality, exp) + return fmt.Errorf("got epsilon of %v for series cardinality %v (expected %v), which is larger than expected %v", got, cardinality, expCardinality, exp) } // Estimate the measurement cardinality... if cardinality, err = store.Store.MeasurementsCardinality("db"); err != nil { - t.Fatal(err) + return err } // Estimated cardinality should be well within 2 of the actual cardinality. (Arbitrary...) expCardinality = 300 if got, exp := math.Abs(float64(cardinality)-float64(expCardinality)), 2.0; got > exp { - t.Errorf("got measurement cardinality %v, expected upto %v; difference is larger than expected %v", cardinality, expCardinality, exp) + return fmt.Errorf("got measurement cardinality %v, expected upto %v; difference is larger than expected %v", cardinality, expCardinality, exp) } + return nil } func TestStore_Cardinality_Compactions(t *testing.T) { - t.Parallel() - if testing.Short() || os.Getenv("GORACE") != "" || os.Getenv("APPVEYOR") != "" { t.Skip("Skipping test in short, race and appveyor mode.") } - test := func(index string) { + test := func(index string) error { store := NewStore() store.EngineOptions.Config.Index = "inmem" store.EngineOptions.Config.MaxSeriesPerDatabase = 0 @@ -829,11 +829,15 @@ func TestStore_Cardinality_Compactions(t *testing.T) { panic(err) } defer store.Close() - testStoreCardinalityCompactions(t, store) + return testStoreCardinalityCompactions(store) } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { + if err := test(index); err != nil { + t.Fatal(err) + } + }) } } @@ -1010,6 +1014,36 @@ func TestStore_Measurements_Auth(t *testing.T) { if gotNames != expNames { return fmt.Errorf("got %d measurements, but expected %d", gotNames, expNames) } + + // Now delete all of the cpu series. + cond, err := influxql.ParseExpr("host = 'serverA' OR region = 'west'") + if err != nil { + return err + } + + if err := s.DeleteSeries("db0", nil, cond, true); err != nil { + return err + } + + if names, err = s.MeasurementNames(authorizer, "db0", nil); err != nil { + return err + } + + // names should not contain any measurements where none of the associated + // series are authorised for reads. + expNames = 1 + gotNames = 0 + for _, name := range names { + if string(name) == "mem" || string(name) == "cpu" { + return fmt.Errorf("after delete got measurement %q but it should be filtered.", name) + } + gotNames++ + } + + if gotNames != expNames { + return fmt.Errorf("after delete got %d measurements, but expected %d", gotNames, expNames) + } + return nil } @@ -1020,6 +1054,7 @@ func TestStore_Measurements_Auth(t *testing.T) { } }) } + } func TestStore_TagKeys_Auth(t *testing.T) { @@ -1072,6 +1107,41 @@ func TestStore_TagKeys_Auth(t *testing.T) { if gotKeys != expKeys { return fmt.Errorf("got %d keys, but expected %d", gotKeys, expKeys) } + + // Delete the series with region = west + cond, err := influxql.ParseExpr("region = 'west'") + if err != nil { + return err + } + if err := s.DeleteSeries("db0", nil, cond, true); err != nil { + return err + } + + if keys, err = s.TagKeys(authorizer, []uint64{0}, nil); err != nil { + return err + } + + // keys should not contain any tag keys associated with a series containing + // a secret tag or the deleted series + expKeys = 2 + gotKeys = 0 + for _, tk := range keys { + if got, exp := tk.Measurement, "cpu"; got != exp { + return fmt.Errorf("got measurement %q, expected %q", got, exp) + } + + for _, key := range tk.Keys { + if key == "secret" || key == "machine" || key == "region" { + return fmt.Errorf("got tag key %q but it should be filtered.", key) + } + gotKeys++ + } + } + + if gotKeys != expKeys { + return fmt.Errorf("got %d keys, but expected %d", gotKeys, expKeys) + } + return nil } @@ -1082,6 +1152,7 @@ func TestStore_TagKeys_Auth(t *testing.T) { } }) } + } func TestStore_TagValues_Auth(t *testing.T) { @@ -1136,6 +1207,48 @@ func TestStore_TagValues_Auth(t *testing.T) { } } + if gotValues != expValues { + return fmt.Errorf("got %d tags, but expected %d", gotValues, expValues) + } + + // Delete the series with values serverA + cond, err := influxql.ParseExpr("host = 'serverA'") + if err != nil { + return err + } + if err := s.DeleteSeries("db0", nil, cond, true); err != nil { + return err + } + + values, err = s.TagValues(authorizer, []uint64{0}, &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "_tagKey"}, + RHS: &influxql.StringLiteral{Val: "host"}, + }) + + if err != nil { + return err + } + + // values should not contain any tag values associated with a series containing + // a secret tag. + expValues = 1 + gotValues = 0 + for _, tv := range values { + if got, exp := tv.Measurement, "cpu"; got != exp { + return fmt.Errorf("got measurement %q, expected %q", got, exp) + } + + for _, v := range tv.Values { + if got, exp := v.Value, "serverD"; got == exp { + return fmt.Errorf("got tag value %q but it should be filtered.", got) + } else if got, exp := v.Value, "serverA"; got == exp { + return fmt.Errorf("got tag value %q but it should be filtered.", got) + } + gotValues++ + } + } + if gotValues != expValues { return fmt.Errorf("got %d tags, but expected %d", gotValues, expValues) } @@ -1400,6 +1513,12 @@ func NewStore() *Store { if testing.Verbose() { s.WithLogger(logger.New(os.Stdout)) } + + if runtime.GOARCH == "386" { + // Set the mmap size to something addressable in the process. + s.SeriesFileMaxSize = 1 << 27 // 128MB + } + return s } @@ -1408,6 +1527,7 @@ func NewStore() *Store { func MustOpenStore(index string) *Store { s := NewStore() s.EngineOptions.IndexVersion = index + if err := s.Open(); err != nil { panic(err) } @@ -1419,9 +1539,14 @@ func (s *Store) Reopen() error { if err := s.Store.Close(); err != nil { return err } + + // Keep old max series file size. + seriesMapSize := s.Store.SeriesFileMaxSize + s.Store = tsdb.NewStore(s.Path()) s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal") - return s.Open() + s.SeriesFileMaxSize = seriesMapSize + return s.Store.Open() } // Close closes the store and removes the underlying data.