Improve bulk series file writes.
parent
e88477097f
commit
307bb6af9c
|
@ -55,6 +55,8 @@ func (m *HashMap) Reset() {
|
|||
m.tracker.SetSize(0)
|
||||
}
|
||||
|
||||
func (m *HashMap) LoadFactor() int { return m.loadFactor }
|
||||
|
||||
// Get returns the value for a key from the Hashmap, or nil if no key exists.
|
||||
func (m *HashMap) Get(key []byte) interface{} {
|
||||
var now time.Time
|
||||
|
@ -90,7 +92,7 @@ func (m *HashMap) put(key []byte, val interface{}, instrument bool) {
|
|||
// Grow the map if we've run out of slots.
|
||||
m.n++
|
||||
if m.n > m.threshold {
|
||||
m.grow()
|
||||
m.Grow(m.capacity * 2)
|
||||
}
|
||||
|
||||
// If the key was overwritten then decrement the size.
|
||||
|
@ -181,14 +183,20 @@ func (m *HashMap) alloc() {
|
|||
m.mask = int64(m.capacity - 1)
|
||||
}
|
||||
|
||||
// grow doubles the capacity and reinserts all existing hashes & elements.
|
||||
func (m *HashMap) grow() {
|
||||
// Grow increases the capacity and reinserts all existing hashes & elements.
|
||||
func (m *HashMap) Grow(sz int64) {
|
||||
// Ensure new capacity is a power of two and greater than current capacity.
|
||||
sz = pow2(sz)
|
||||
if sz <= m.capacity {
|
||||
return
|
||||
}
|
||||
|
||||
// Copy old elements and hashes.
|
||||
elems, hashes := m.elems, m.hashes
|
||||
capacity := m.capacity
|
||||
|
||||
// Double capacity & reallocate.
|
||||
m.capacity *= 2
|
||||
// Increase capacity & reallocate.
|
||||
m.capacity = sz
|
||||
m.alloc()
|
||||
|
||||
// Copy old elements to new hash/elem list.
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/toml"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"github.com/influxdata/influxdb/tsdb/tsi1"
|
||||
"github.com/influxdata/influxdb/tsdb/tsm1"
|
||||
)
|
||||
|
@ -26,6 +27,9 @@ type Config struct {
|
|||
// Series file config.
|
||||
SeriesFilePath string `toml:"series-file-path"` // Overrides the default path.
|
||||
|
||||
// TSDB config.
|
||||
TSDB tsdb.Config `toml:"tsdb"`
|
||||
|
||||
// WAL config.
|
||||
WAL tsm1.WALConfig `toml:"wal"`
|
||||
WALPath string `toml:"wal-path"` // Overrides the default path.
|
||||
|
@ -43,6 +47,7 @@ type Config struct {
|
|||
func NewConfig() Config {
|
||||
return Config{
|
||||
RetentionInterval: toml.Duration(DefaultRetentionInterval),
|
||||
TSDB: tsdb.NewConfig(),
|
||||
WAL: tsm1.NewWALConfig(),
|
||||
Engine: tsm1.NewConfig(),
|
||||
Index: tsi1.NewConfig(),
|
||||
|
|
|
@ -116,6 +116,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
|
|||
|
||||
// Initialize series file.
|
||||
e.sfile = tsdb.NewSeriesFile(c.GetSeriesFilePath(path))
|
||||
e.sfile.LargeWriteThreshold = c.TSDB.LargeSeriesWriteThreshold
|
||||
|
||||
// Initialise index.
|
||||
e.index = tsi1.NewIndex(e.sfile, c.Index,
|
||||
|
|
|
@ -6,3 +6,23 @@ import (
|
|||
|
||||
// EOF represents a "not found" key returned by a Cursor.
|
||||
const EOF = query.ZeroTime
|
||||
|
||||
const (
|
||||
// DefaultLargeSeriesWriteThreshold is the number of series per write
|
||||
// that requires the series index be pregrown before insert.
|
||||
DefaultLargeSeriesWriteThreshold = 10000
|
||||
)
|
||||
|
||||
// Config contains all of the configuration related to tsdb.
|
||||
type Config struct {
|
||||
// LargeSeriesWriteThreshold is the threshold before a write requires
|
||||
// preallocation to improve throughput. Currently used in the series file.
|
||||
LargeSeriesWriteThreshold int `toml:"large-series-write-threshold"`
|
||||
}
|
||||
|
||||
// NewConfig return a new instance of config with default settings.
|
||||
func NewConfig() Config {
|
||||
return Config{
|
||||
LargeSeriesWriteThreshold: DefaultLargeSeriesWriteThreshold,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,6 +48,8 @@ type SeriesFile struct {
|
|||
defaultMetricLabels prometheus.Labels
|
||||
metricsEnabled bool
|
||||
|
||||
LargeWriteThreshold int
|
||||
|
||||
Logger *zap.Logger
|
||||
}
|
||||
|
||||
|
@ -57,6 +59,8 @@ func NewSeriesFile(path string) *SeriesFile {
|
|||
path: path,
|
||||
metricsEnabled: true,
|
||||
Logger: zap.NewNop(),
|
||||
|
||||
LargeWriteThreshold: DefaultLargeSeriesWriteThreshold,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -121,6 +125,7 @@ func (f *SeriesFile) Open(ctx context.Context) error {
|
|||
for i := 0; i < SeriesFilePartitionN; i++ {
|
||||
// TODO(edd): These partition initialisation should be moved up to NewSeriesFile.
|
||||
p := NewSeriesPartition(i, f.SeriesPartitionPath(i))
|
||||
p.LargeWriteThreshold = f.LargeWriteThreshold
|
||||
p.Logger = f.Logger.With(zap.Int("partition", p.ID()))
|
||||
|
||||
// For each series file index, rhh trackers are used to track the RHH Hashmap.
|
||||
|
|
|
@ -155,6 +155,14 @@ func (idx *SeriesIndex) Recover(segments []*SeriesSegment) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// GrowBy preallocates the in-memory hashmap to a larger size.
|
||||
func (idx *SeriesIndex) GrowBy(delta int) {
|
||||
if delta < 0 {
|
||||
return
|
||||
}
|
||||
idx.keyIDMap.Grow(((idx.keyIDMap.Len() + int64(delta)) * 100) / int64(idx.keyIDMap.LoadFactor()))
|
||||
}
|
||||
|
||||
// Count returns the number of series in the index.
|
||||
func (idx *SeriesIndex) Count() uint64 {
|
||||
return idx.OnDiskCount() + idx.InMemCount()
|
||||
|
|
|
@ -44,7 +44,8 @@ type SeriesPartition struct {
|
|||
compacting bool
|
||||
compactionsDisabled int
|
||||
|
||||
CompactThreshold int
|
||||
CompactThreshold int
|
||||
LargeWriteThreshold int
|
||||
|
||||
tracker *seriesPartitionTracker
|
||||
Logger *zap.Logger
|
||||
|
@ -53,13 +54,14 @@ type SeriesPartition struct {
|
|||
// NewSeriesPartition returns a new instance of SeriesPartition.
|
||||
func NewSeriesPartition(id int, path string) *SeriesPartition {
|
||||
p := &SeriesPartition{
|
||||
id: id,
|
||||
path: path,
|
||||
closing: make(chan struct{}),
|
||||
CompactThreshold: DefaultSeriesPartitionCompactThreshold,
|
||||
tracker: newSeriesPartitionTracker(newSeriesFileMetrics(nil), nil),
|
||||
Logger: zap.NewNop(),
|
||||
seq: uint64(id) + 1,
|
||||
id: id,
|
||||
path: path,
|
||||
closing: make(chan struct{}),
|
||||
CompactThreshold: DefaultSeriesPartitionCompactThreshold,
|
||||
LargeWriteThreshold: DefaultLargeSeriesWriteThreshold,
|
||||
tracker: newSeriesPartitionTracker(newSeriesFileMetrics(nil), nil),
|
||||
Logger: zap.NewNop(),
|
||||
seq: uint64(id) + 1,
|
||||
}
|
||||
p.index = NewSeriesIndex(p.IndexPath())
|
||||
return p
|
||||
|
@ -182,9 +184,7 @@ func (p *SeriesPartition) IndexPath() string { return filepath.Join(p.path, "ind
|
|||
// CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist.
|
||||
// The ids parameter is modified to contain series IDs for all keys belonging to this partition.
|
||||
// If the type does not match the existing type for the key, a zero id is stored.
|
||||
func (p *SeriesPartition) CreateSeriesListIfNotExists(collection *SeriesCollection,
|
||||
keyPartitionIDs []int) error {
|
||||
|
||||
func (p *SeriesPartition) CreateSeriesListIfNotExists(collection *SeriesCollection, keyPartitionIDs []int) error {
|
||||
p.mu.RLock()
|
||||
if p.closed {
|
||||
p.mu.RUnlock()
|
||||
|
@ -226,6 +226,13 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(collection *SeriesCollecti
|
|||
newKeyRanges := make([]keyRange, 0, writeRequired)
|
||||
newIDs := make(map[string]SeriesIDTyped, writeRequired)
|
||||
|
||||
// Pre-grow index for large writes.
|
||||
if writeRequired >= p.LargeWriteThreshold {
|
||||
p.mu.Lock()
|
||||
p.index.GrowBy(writeRequired)
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
// Obtain write lock to create new series.
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
package tsdb_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/logger"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
func BenchmarkSeriesPartition_CreateSeriesListIfNotExists(b *testing.B) {
|
||||
for _, n := range []int{1000, 10000, 100000, 1000000} {
|
||||
b.Run(strconv.Itoa(n), func(b *testing.B) {
|
||||
var collection tsdb.SeriesCollection
|
||||
for i := 0; i < n; i++ {
|
||||
collection.Names = append(collection.Names, []byte("cpu"))
|
||||
collection.Tags = append(collection.Tags, models.Tags{
|
||||
{Key: []byte("tag0"), Value: []byte("value0")},
|
||||
{Key: []byte("tag1"), Value: []byte("value1")},
|
||||
{Key: []byte("tag2"), Value: []byte("value2")},
|
||||
{Key: []byte("tag3"), Value: []byte("value3")},
|
||||
{Key: []byte("tag4"), Value: []byte(fmt.Sprintf("value%d", i))},
|
||||
})
|
||||
collection.Types = append(collection.Types, models.Integer)
|
||||
}
|
||||
collection.SeriesKeys = tsdb.GenerateSeriesKeys(collection.Names, collection.Tags)
|
||||
collection.SeriesIDs = make([]tsdb.SeriesID, len(collection.SeriesKeys))
|
||||
keyPartitionIDs := make([]int, n)
|
||||
|
||||
b.ResetTimer()
|
||||
for j := 0; j < b.N; j++ {
|
||||
p := MustOpenSeriesPartition()
|
||||
if err := p.CreateSeriesListIfNotExists(&collection, keyPartitionIDs); err != nil {
|
||||
b.Fatal(err)
|
||||
} else if err := p.Close(); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// SeriesPartition is a test wrapper for tsdb.SeriesPartition.
|
||||
type SeriesPartition struct {
|
||||
*tsdb.SeriesPartition
|
||||
}
|
||||
|
||||
// NewSeriesPartition returns a new instance of SeriesPartition with a temporary file path.
|
||||
func NewSeriesPartition() *SeriesPartition {
|
||||
dir, err := ioutil.TempDir("", "tsdb-series-partition-")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return &SeriesPartition{SeriesPartition: tsdb.NewSeriesPartition(0, dir)}
|
||||
}
|
||||
|
||||
// MustOpenSeriesPartition returns a new, open instance of SeriesPartition. Panic on error.
|
||||
func MustOpenSeriesPartition() *SeriesPartition {
|
||||
f := NewSeriesPartition()
|
||||
f.Logger = logger.New(os.Stdout)
|
||||
if err := f.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return f
|
||||
}
|
||||
|
||||
// Close closes the partition and removes it from disk.
|
||||
func (f *SeriesPartition) Close() error {
|
||||
defer os.RemoveAll(f.Path())
|
||||
return f.SeriesPartition.Close()
|
||||
}
|
|
@ -11,6 +11,10 @@ var DefaultMaxConcurrentOpens = runtime.GOMAXPROCS(0)
|
|||
|
||||
const (
|
||||
DefaultMADVWillNeed = false
|
||||
|
||||
// DefaultLargeSeriesWriteThreshold is the number of series per write
|
||||
// that requires the series index be pregrown before insert.
|
||||
DefaultLargeSeriesWriteThreshold = 10000
|
||||
)
|
||||
|
||||
// Config contains all of the configuration necessary to run a tsm1 engine.
|
||||
|
@ -25,6 +29,10 @@ type Config struct {
|
|||
// slow disks.
|
||||
MADVWillNeed bool `toml:"use-madv-willneed"`
|
||||
|
||||
// LargeSeriesWriteThreshold is the threshold before a write requires
|
||||
// preallocation to improve throughput. Currently used in the series file.
|
||||
LargeSeriesWriteThreshold int `toml:"large-series-write-threshold"`
|
||||
|
||||
Compaction CompactionConfig `toml:"compaction"`
|
||||
Cache CacheConfig `toml:"cache"`
|
||||
}
|
||||
|
@ -32,8 +40,9 @@ type Config struct {
|
|||
// NewConfig constructs a Config with the default values.
|
||||
func NewConfig() Config {
|
||||
return Config{
|
||||
MaxConcurrentOpens: DefaultMaxConcurrentOpens,
|
||||
MADVWillNeed: DefaultMADVWillNeed,
|
||||
MaxConcurrentOpens: DefaultMaxConcurrentOpens,
|
||||
MADVWillNeed: DefaultMADVWillNeed,
|
||||
LargeSeriesWriteThreshold: DefaultLargeSeriesWriteThreshold,
|
||||
|
||||
Cache: NewCacheConfig(),
|
||||
Compaction: CompactionConfig{
|
||||
|
|
Loading…
Reference in New Issue