diff --git a/Makefile.in b/Makefile.in index 4f4347827e..263bb35495 100644 --- a/Makefile.in +++ b/Makefile.in @@ -76,6 +76,12 @@ leveldb_dir = /tmp/leveldb.influxdb.$(arch) leveldb_file = leveldb-$(leveldb_version).tar.gz leveldb_deps = $(leveldb_dir)/libleveldb.a +# rocksdb variables +rocksdb_version = 3.1 +rocksdb_dir = /tmp/rocksdb.influxdb.$(arch) +rocksdb_file = rocksdb-$(rocksdb_version).tar.gz +rocksdb_deps = $(rocksdb_dir)/librocksdb.a + profile=off ifneq ($(profile),off) CGO_LDFLAGS += -ltcmalloc -lprofiler @@ -85,11 +91,13 @@ endif # levigo flags ifeq ($(uname_S),Linux) CGO_CFLAGS += -I$(leveldb_dir)/include -CGO_LDFLAGS += $(leveldb_dir)/libleveldb.a $(snappy_dir)/.libs/libsnappy.a -lstdc++ +CGO_LDFLAGS += $(leveldb_dir)/libleveldb.a $(snappy_dir)/.libs/libsnappy.a +CGO_CFLAGS += -I$(rocksdb_dir)/include +CGO_LDFLAGS += $(rocksdb_dir)/librocksdb.a -lstdc++ -lm -lz -lbz2 export CGO_CFLAGS export CGO_LDFLAGS else -CGO_LDFLAGS += -lleveldb -lsnappy -lstdc++ +CGO_LDFLAGS += -lleveldb -lrocksdb -lsnappy -lstdc++ export CGO_LDFLAGS endif @@ -116,10 +124,21 @@ ifeq ($(uname_S),Linux) bash -c "cd $(leveldb_dir); \ wget https://leveldb.googlecode.com/files/$(leveldb_file); \ tar --strip-components=1 -xvzf $(leveldb_file); \ - CXXFLAGS='-I$(snappy_dir) $(cflags)' LDFLAGS='-L$(snappy_dir)/.libs' make" + CXXFLAGS='-I$(snappy_dir) $(cflags)' LDFLAGS='-L$(snappy_dir)/.libs' $(MAKE)" +endif + +$(rocksdb_deps): +ifeq ($(uname_S),Linux) + rm -rf $(rocksdb_dir) + mkdir -p $(rocksdb_dir) + bash -c "cd $(rocksdb_dir); \ + wget https://github.com/facebook/rocksdb/archive/$(rocksdb_file); \ + tar --strip-components=1 -xvzf $(rocksdb_file); \ + CXXFLAGS='$(cflags)' $(MAKE) librocksdb.a" endif levigo_dependency = github.com/jmhodges/levigo +rocksdb_dependency = github.com/influxdb/rocksdb proto_dependency = code.google.com/p/goprotobuf/protoc-gen-go dependencies = code.google.com/p/go.crypto/bcrypt \ @@ -143,11 +162,14 @@ src/$(levigo_dependency): $(GO) get -d $(levigo_dependency) bash -c "pushd $@; find . -name \*.go | xargs sed -i 's/\/\/ #cgo LDFLAGS: -lleveldb\|#cgo LDFLAGS: -lleveldb//g'; popd" +src/$(rocksdb_dependency): + $(GO) get -d $(rocksdb_dependency) + bash -c "pushd $@; find . -name \*.go | xargs sed -i 's/\/\/ #cgo LDFLAGS: -lrocksdb\|#cgo LDFLAGS: -lrocksdb//g'; popd" $(dependencies_paths): for i in $(dependencies); do $(GO) get -d $$i; done -dependencies: src/$(levigo_dependency) $(dependencies_paths) $(leveldb_deps) $(snappy_deps) +dependencies: src/$(levigo_dependency) src/$(rocksdb_dependency) $(dependencies_paths) $(leveldb_deps) $(rocksdb_deps) $(snappy_deps) test_dependencies: dependencies $(GO) get launchpad.net/gocheck diff --git a/config.sample.toml b/config.sample.toml index f4eb7cdca9..4d333e0dc3 100644 --- a/config.sample.toml +++ b/config.sample.toml @@ -105,6 +105,17 @@ max-open-files = 40 # and gigabytes, respectively. lru-cache-size = "200m" +[storage.engines.rocksdb] + +# Maximum mmap open files, this will affect the virtual memory used by +# the process +max-open-files = 40 + +# LRU cache size, LRU is used by rocksdb to store contents of the +# uncompressed sstables. You can use `m` or `g` prefix for megabytes +# and gigabytes, respectively. +lru-cache-size = "200m" + [storage.engines.lmdb] map-size = "100g" diff --git a/src/datastore/storage/rocksdb.go b/src/datastore/storage/rocksdb.go new file mode 100644 index 0000000000..75a7367af7 --- /dev/null +++ b/src/datastore/storage/rocksdb.go @@ -0,0 +1,190 @@ +package storage + +import ( + "bytes" + "fmt" + "runtime" + "sync" + + "configuration" + + rocksdb "github.com/influxdb/rocksdb" +) + +const ROCKSDB_NAME = "rocksdb" + +func init() { + registerEngine(ROCKSDB_NAME, Initializer{ + NewRocksDBConfig, + NewRocksDB, + }) +} + +type RocksDBConfiguration struct { + MaxOpenFiles int `toml:"max-open-files"` + LruCacheSize configuration.Size `toml:"lru-cache-size"` +} + +type RocksDB struct { + db *rocksdb.DB + opts *rocksdb.Options + wopts *rocksdb.WriteOptions + ropts *rocksdb.ReadOptions + cache *rocksdb.Cache + path string +} + +func NewRocksDBConfig() interface{} { + return &RocksDBConfiguration{} +} + +var rocksDBCache *rocksdb.Cache +var rocksDBCacheLock sync.Mutex + +func NewRocksDB(path string, config interface{}) (Engine, error) { + c, ok := config.(*RocksDBConfiguration) + if !ok { + return nil, fmt.Errorf("Config is of type %T instead of %T", config, RocksDBConfiguration{}) + } + + // if it wasn't set, set it to 100 + if c.MaxOpenFiles == 0 { + c.MaxOpenFiles = 100 + } + + // if it wasn't set, set it to 200 MB + if c.LruCacheSize == 0 { + c.LruCacheSize = 200 * 1024 * 1024 + } + + // initialize the global cache + if rocksDBCache == nil { + rocksDBCacheLock.Lock() + if rocksDBCache == nil { + rocksDBCache = rocksdb.NewLRUCache(int(c.LruCacheSize)) + } + rocksDBCacheLock.Unlock() + } + + opts := rocksdb.NewOptions() + env := rocksdb.NewDefaultEnv() + env.SetBackgroundThreads(runtime.NumCPU() * 2) + env.SetHighPriorityBackgroundThreads(1) + opts.SetMaxBackgroundCompactions(runtime.NumCPU()*2 - 1) + opts.SetMaxBackgroundFlushes(1) + opts.SetEnv(env) + opts.SetCache(rocksDBCache) + opts.SetCreateIfMissing(true) + opts.SetMaxOpenFiles(c.MaxOpenFiles) + db, err := rocksdb.Open(path, opts) + wopts := rocksdb.NewWriteOptions() + ropts := rocksdb.NewReadOptions() + return RocksDB{db, opts, wopts, ropts, rocksDBCache, path}, err +} + +func (db RocksDB) Compact() { + db.db.CompactRange(rocksdb.Range{}) +} + +func (db RocksDB) Close() { + db.cache.Close() + db.ropts.Close() + db.wopts.Close() + db.opts.Close() + db.db.Close() +} + +func (db RocksDB) Put(key, value []byte) error { + return db.BatchPut([]Write{Write{key, value}}) +} + +func (db RocksDB) Get(key []byte) ([]byte, error) { + return db.db.Get(db.ropts, key) +} + +func (_ RocksDB) Name() string { + return ROCKSDB_NAME +} + +func (db RocksDB) Path() string { + return db.path +} + +func (db RocksDB) BatchPut(writes []Write) error { + wb := rocksdb.NewWriteBatch() + defer wb.Close() + for _, w := range writes { + if w.Value == nil { + wb.Delete(w.Key) + continue + } + wb.Put(w.Key, w.Value) + } + return db.db.Write(db.wopts, wb) +} + +func (db RocksDB) Del(start, finish []byte) error { + wb := rocksdb.NewWriteBatch() + defer wb.Close() + + itr := db.Iterator() + defer itr.Close() + + for itr.Seek(start); itr.Valid(); itr.Next() { + k := itr.Key() + if bytes.Compare(k, finish) > 0 { + break + } + wb.Delete(k) + } + + if err := itr.Error(); err != nil { + return err + } + + return db.db.Write(db.wopts, wb) +} + +type RocksDBIterator struct { + _itr *rocksdb.Iterator + err error +} + +func (itr *RocksDBIterator) Seek(key []byte) { + itr._itr.Seek(key) +} + +func (itr *RocksDBIterator) Next() { + itr._itr.Next() +} + +func (itr *RocksDBIterator) Prev() { + itr._itr.Prev() +} + +func (itr *RocksDBIterator) Valid() bool { + return itr._itr.Valid() +} + +func (itr *RocksDBIterator) Error() error { + return itr.err +} + +func (itr *RocksDBIterator) Key() []byte { + return itr._itr.Key() +} + +func (itr *RocksDBIterator) Value() []byte { + return itr._itr.Value() +} + +func (itr *RocksDBIterator) Close() error { + itr._itr.Close() + return nil +} + +func (db RocksDB) Iterator() Iterator { + itr := db.db.NewIterator(db.ropts) + + return &RocksDBIterator{itr, nil} +} diff --git a/src/integration/test_config_single.toml b/src/integration/test_config_single.toml index 91eb4d84f2..b3c0fb4663 100644 --- a/src/integration/test_config_single.toml +++ b/src/integration/test_config_single.toml @@ -80,7 +80,7 @@ dir = "/tmp/influxdb/development/db" write-buffer-size = 10000 # the engine to use for new shards, old shards will continue to use the same engine -default-engine = "lmdb" +default-engine = "rocksdb" # The default setting on this is 0, which means unlimited. Set this to something if you want to # limit the max number of open files. max-open-files is per shard so this * that will be max. @@ -105,6 +105,17 @@ max-open-files = 40 # and gigabytes, respectively. lru-cache-size = "200m" +[storage.rocksdb] + +# Maximum mmap open files, this will affect the virtual memory used by +# the process +max-open-files = 40 + +# LRU cache size, LRU is used by rocksdb to store contents of the +# uncompressed sstables. You can use `m` or `g` prefix for megabytes +# and gigabytes, respectively. +lru-cache-size = "200m" + [storage.lmdb] map-size = "10g" diff --git a/src/tools/benchmark-storage/main.go b/src/tools/benchmark-storage/main.go index 9c1ff13d71..96ded5bcc5 100644 --- a/src/tools/benchmark-storage/main.go +++ b/src/tools/benchmark-storage/main.go @@ -23,9 +23,11 @@ func main() { os.RemoveAll("/tmp/test-ldb") os.RemoveAll("/tmp/test-lmdb") + os.RemoveAll("/tmp/test-rocksdb") benchmark("lmdb", Config{*points, *batchSize, *series, 0, 0, time.Now()}) benchmark("leveldb", Config{*points, *batchSize, *series, 0, 0, time.Now()}) + benchmark("rocksdb", Config{*points, *batchSize, *series, 0, 0, time.Now()}) } func benchmark(name string, c Config) {