Add support for rocksdb

pull/641/head
John Shahid 2014-06-17 11:26:33 -04:00
parent 18b3b4f8a2
commit 7ce4b9477d
5 changed files with 241 additions and 5 deletions

View File

@ -76,6 +76,12 @@ leveldb_dir = /tmp/leveldb.influxdb.$(arch)
leveldb_file = leveldb-$(leveldb_version).tar.gz
leveldb_deps = $(leveldb_dir)/libleveldb.a
# rocksdb variables
rocksdb_version = 3.1
rocksdb_dir = /tmp/rocksdb.influxdb.$(arch)
rocksdb_file = rocksdb-$(rocksdb_version).tar.gz
rocksdb_deps = $(rocksdb_dir)/librocksdb.a
profile=off
ifneq ($(profile),off)
CGO_LDFLAGS += -ltcmalloc -lprofiler
@ -85,11 +91,13 @@ endif
# levigo flags
ifeq ($(uname_S),Linux)
CGO_CFLAGS += -I$(leveldb_dir)/include
CGO_LDFLAGS += $(leveldb_dir)/libleveldb.a $(snappy_dir)/.libs/libsnappy.a -lstdc++
CGO_LDFLAGS += $(leveldb_dir)/libleveldb.a $(snappy_dir)/.libs/libsnappy.a
CGO_CFLAGS += -I$(rocksdb_dir)/include
CGO_LDFLAGS += $(rocksdb_dir)/librocksdb.a -lstdc++ -lm -lz -lbz2
export CGO_CFLAGS
export CGO_LDFLAGS
else
CGO_LDFLAGS += -lleveldb -lsnappy -lstdc++
CGO_LDFLAGS += -lleveldb -lrocksdb -lsnappy -lstdc++
export CGO_LDFLAGS
endif
@ -116,10 +124,21 @@ ifeq ($(uname_S),Linux)
bash -c "cd $(leveldb_dir); \
wget https://leveldb.googlecode.com/files/$(leveldb_file); \
tar --strip-components=1 -xvzf $(leveldb_file); \
CXXFLAGS='-I$(snappy_dir) $(cflags)' LDFLAGS='-L$(snappy_dir)/.libs' make"
CXXFLAGS='-I$(snappy_dir) $(cflags)' LDFLAGS='-L$(snappy_dir)/.libs' $(MAKE)"
endif
$(rocksdb_deps):
ifeq ($(uname_S),Linux)
rm -rf $(rocksdb_dir)
mkdir -p $(rocksdb_dir)
bash -c "cd $(rocksdb_dir); \
wget https://github.com/facebook/rocksdb/archive/$(rocksdb_file); \
tar --strip-components=1 -xvzf $(rocksdb_file); \
CXXFLAGS='$(cflags)' $(MAKE) librocksdb.a"
endif
levigo_dependency = github.com/jmhodges/levigo
rocksdb_dependency = github.com/influxdb/rocksdb
proto_dependency = code.google.com/p/goprotobuf/protoc-gen-go
dependencies = code.google.com/p/go.crypto/bcrypt \
@ -143,11 +162,14 @@ src/$(levigo_dependency):
$(GO) get -d $(levigo_dependency)
bash -c "pushd $@; find . -name \*.go | xargs sed -i 's/\/\/ #cgo LDFLAGS: -lleveldb\|#cgo LDFLAGS: -lleveldb//g'; popd"
src/$(rocksdb_dependency):
$(GO) get -d $(rocksdb_dependency)
bash -c "pushd $@; find . -name \*.go | xargs sed -i 's/\/\/ #cgo LDFLAGS: -lrocksdb\|#cgo LDFLAGS: -lrocksdb//g'; popd"
$(dependencies_paths):
for i in $(dependencies); do $(GO) get -d $$i; done
dependencies: src/$(levigo_dependency) $(dependencies_paths) $(leveldb_deps) $(snappy_deps)
dependencies: src/$(levigo_dependency) src/$(rocksdb_dependency) $(dependencies_paths) $(leveldb_deps) $(rocksdb_deps) $(snappy_deps)
test_dependencies: dependencies
$(GO) get launchpad.net/gocheck

View File

@ -105,6 +105,17 @@ max-open-files = 40
# and gigabytes, respectively.
lru-cache-size = "200m"
[storage.engines.rocksdb]
# Maximum mmap open files, this will affect the virtual memory used by
# the process
max-open-files = 40
# LRU cache size, LRU is used by rocksdb to store contents of the
# uncompressed sstables. You can use `m` or `g` prefix for megabytes
# and gigabytes, respectively.
lru-cache-size = "200m"
[storage.engines.lmdb]
map-size = "100g"

View File

@ -0,0 +1,190 @@
package storage
import (
"bytes"
"fmt"
"runtime"
"sync"
"configuration"
rocksdb "github.com/influxdb/rocksdb"
)
const ROCKSDB_NAME = "rocksdb"
func init() {
registerEngine(ROCKSDB_NAME, Initializer{
NewRocksDBConfig,
NewRocksDB,
})
}
type RocksDBConfiguration struct {
MaxOpenFiles int `toml:"max-open-files"`
LruCacheSize configuration.Size `toml:"lru-cache-size"`
}
type RocksDB struct {
db *rocksdb.DB
opts *rocksdb.Options
wopts *rocksdb.WriteOptions
ropts *rocksdb.ReadOptions
cache *rocksdb.Cache
path string
}
func NewRocksDBConfig() interface{} {
return &RocksDBConfiguration{}
}
var rocksDBCache *rocksdb.Cache
var rocksDBCacheLock sync.Mutex
func NewRocksDB(path string, config interface{}) (Engine, error) {
c, ok := config.(*RocksDBConfiguration)
if !ok {
return nil, fmt.Errorf("Config is of type %T instead of %T", config, RocksDBConfiguration{})
}
// if it wasn't set, set it to 100
if c.MaxOpenFiles == 0 {
c.MaxOpenFiles = 100
}
// if it wasn't set, set it to 200 MB
if c.LruCacheSize == 0 {
c.LruCacheSize = 200 * 1024 * 1024
}
// initialize the global cache
if rocksDBCache == nil {
rocksDBCacheLock.Lock()
if rocksDBCache == nil {
rocksDBCache = rocksdb.NewLRUCache(int(c.LruCacheSize))
}
rocksDBCacheLock.Unlock()
}
opts := rocksdb.NewOptions()
env := rocksdb.NewDefaultEnv()
env.SetBackgroundThreads(runtime.NumCPU() * 2)
env.SetHighPriorityBackgroundThreads(1)
opts.SetMaxBackgroundCompactions(runtime.NumCPU()*2 - 1)
opts.SetMaxBackgroundFlushes(1)
opts.SetEnv(env)
opts.SetCache(rocksDBCache)
opts.SetCreateIfMissing(true)
opts.SetMaxOpenFiles(c.MaxOpenFiles)
db, err := rocksdb.Open(path, opts)
wopts := rocksdb.NewWriteOptions()
ropts := rocksdb.NewReadOptions()
return RocksDB{db, opts, wopts, ropts, rocksDBCache, path}, err
}
func (db RocksDB) Compact() {
db.db.CompactRange(rocksdb.Range{})
}
func (db RocksDB) Close() {
db.cache.Close()
db.ropts.Close()
db.wopts.Close()
db.opts.Close()
db.db.Close()
}
func (db RocksDB) Put(key, value []byte) error {
return db.BatchPut([]Write{Write{key, value}})
}
func (db RocksDB) Get(key []byte) ([]byte, error) {
return db.db.Get(db.ropts, key)
}
func (_ RocksDB) Name() string {
return ROCKSDB_NAME
}
func (db RocksDB) Path() string {
return db.path
}
func (db RocksDB) BatchPut(writes []Write) error {
wb := rocksdb.NewWriteBatch()
defer wb.Close()
for _, w := range writes {
if w.Value == nil {
wb.Delete(w.Key)
continue
}
wb.Put(w.Key, w.Value)
}
return db.db.Write(db.wopts, wb)
}
func (db RocksDB) Del(start, finish []byte) error {
wb := rocksdb.NewWriteBatch()
defer wb.Close()
itr := db.Iterator()
defer itr.Close()
for itr.Seek(start); itr.Valid(); itr.Next() {
k := itr.Key()
if bytes.Compare(k, finish) > 0 {
break
}
wb.Delete(k)
}
if err := itr.Error(); err != nil {
return err
}
return db.db.Write(db.wopts, wb)
}
type RocksDBIterator struct {
_itr *rocksdb.Iterator
err error
}
func (itr *RocksDBIterator) Seek(key []byte) {
itr._itr.Seek(key)
}
func (itr *RocksDBIterator) Next() {
itr._itr.Next()
}
func (itr *RocksDBIterator) Prev() {
itr._itr.Prev()
}
func (itr *RocksDBIterator) Valid() bool {
return itr._itr.Valid()
}
func (itr *RocksDBIterator) Error() error {
return itr.err
}
func (itr *RocksDBIterator) Key() []byte {
return itr._itr.Key()
}
func (itr *RocksDBIterator) Value() []byte {
return itr._itr.Value()
}
func (itr *RocksDBIterator) Close() error {
itr._itr.Close()
return nil
}
func (db RocksDB) Iterator() Iterator {
itr := db.db.NewIterator(db.ropts)
return &RocksDBIterator{itr, nil}
}

View File

@ -80,7 +80,7 @@ dir = "/tmp/influxdb/development/db"
write-buffer-size = 10000
# the engine to use for new shards, old shards will continue to use the same engine
default-engine = "lmdb"
default-engine = "rocksdb"
# The default setting on this is 0, which means unlimited. Set this to something if you want to
# limit the max number of open files. max-open-files is per shard so this * that will be max.
@ -105,6 +105,17 @@ max-open-files = 40
# and gigabytes, respectively.
lru-cache-size = "200m"
[storage.rocksdb]
# Maximum mmap open files, this will affect the virtual memory used by
# the process
max-open-files = 40
# LRU cache size, LRU is used by rocksdb to store contents of the
# uncompressed sstables. You can use `m` or `g` prefix for megabytes
# and gigabytes, respectively.
lru-cache-size = "200m"
[storage.lmdb]
map-size = "10g"

View File

@ -23,9 +23,11 @@ func main() {
os.RemoveAll("/tmp/test-ldb")
os.RemoveAll("/tmp/test-lmdb")
os.RemoveAll("/tmp/test-rocksdb")
benchmark("lmdb", Config{*points, *batchSize, *series, 0, 0, time.Now()})
benchmark("leveldb", Config{*points, *batchSize, *series, 0, 0, time.Now()})
benchmark("rocksdb", Config{*points, *batchSize, *series, 0, 0, time.Now()})
}
func benchmark(name string, c Config) {