Abstract storage engines

This commit create a storage engine interface and modify shards to use
interface instead. It also adds a B-Tree implementation and benchmark
tool to compare the performance of the different storage engines.
pull/641/head
John Shahid 2014-06-04 14:31:39 -04:00
parent c89f03d751
commit 319e6a4735
11 changed files with 821 additions and 126 deletions

1
.gitignore vendored
View File

@ -29,6 +29,7 @@ build/
/daemon
/benchmark
/main
/benchmark-storage
godef
gosym
gocode

View File

@ -134,6 +134,7 @@ github.com/influxdb/go-cache \
github.com/BurntSushi/toml \
github.com/influxdb/influxdb-go \
code.google.com/p/gogoprotobuf/proto \
github.com/szferi/gomdb \
$(proto_dependency)
dependencies_paths := $(addprefix src/,$(dependencies))
@ -160,6 +161,7 @@ build: | dependencies protobuf parser build_version_string
# TODO: build all packages, otherwise we won't know
# if there's an error
$(GO) build $(GO_BUILD_OPTIONS) daemon
$(GO) build $(GO_BUILD_OPTIONS) tools/benchmark-storage
$(GO) build benchmark
clean:

View File

@ -4,6 +4,7 @@ import (
"bytes"
"cluster"
"common"
"datastore/storage"
"encoding/binary"
"errors"
"fmt"
@ -17,13 +18,10 @@ import (
"code.google.com/p/goprotobuf/proto"
log "code.google.com/p/log4go"
"github.com/jmhodges/levigo"
)
type LevelDbShard struct {
db *levigo.DB
readOptions *levigo.ReadOptions
writeOptions *levigo.WriteOptions
db storage.Engine
lastIdUsed uint64
columnIdMutex sync.Mutex
closed bool
@ -31,9 +29,8 @@ type LevelDbShard struct {
writeBatchSize int
}
func NewLevelDbShard(db *levigo.DB, pointBatchSize, writeBatchSize int) (*LevelDbShard, error) {
ro := levigo.NewReadOptions()
lastIdBytes, err2 := db.Get(ro, NEXT_ID_KEY)
func NewLevelDbShard(db storage.Engine, pointBatchSize, writeBatchSize int) (*LevelDbShard, error) {
lastIdBytes, err2 := db.Get(NEXT_ID_KEY)
if err2 != nil {
return nil, err2
}
@ -48,8 +45,6 @@ func NewLevelDbShard(db *levigo.DB, pointBatchSize, writeBatchSize int) (*LevelD
return &LevelDbShard{
db: db,
writeOptions: levigo.NewWriteOptions(),
readOptions: ro,
lastIdUsed: lastId,
pointBatchSize: pointBatchSize,
writeBatchSize: writeBatchSize,
@ -57,8 +52,7 @@ func NewLevelDbShard(db *levigo.DB, pointBatchSize, writeBatchSize int) (*LevelD
}
func (self *LevelDbShard) Write(database string, series []*protocol.Series) error {
wb := levigo.NewWriteBatch()
defer wb.Close()
wb := make([]storage.Write, 0, self.writeBatchSize)
for _, s := range series {
if len(s.Points) == 0 {
@ -72,9 +66,9 @@ func (self *LevelDbShard) Write(database string, series []*protocol.Series) erro
if err != nil {
return err
}
keyBuffer := bytes.NewBuffer(make([]byte, 0, 24))
dataBuffer := proto.NewBuffer(nil)
for _, point := range s.Points {
keyBuffer := bytes.NewBuffer(make([]byte, 0, 24))
dataBuffer := proto.NewBuffer(nil)
keyBuffer.Reset()
dataBuffer.Reset()
@ -87,7 +81,7 @@ func (self *LevelDbShard) Write(database string, series []*protocol.Series) erro
pointKey := keyBuffer.Bytes()
if point.Values[fieldIndex].GetIsNull() {
wb.Delete(pointKey)
wb = append(wb, storage.Write{pointKey, nil})
goto check
}
@ -95,22 +89,22 @@ func (self *LevelDbShard) Write(database string, series []*protocol.Series) erro
if err != nil {
return err
}
wb.Put(pointKey, dataBuffer.Bytes())
wb = append(wb, storage.Write{pointKey, dataBuffer.Bytes()})
check:
count++
if count >= self.writeBatchSize {
err = self.db.Write(self.writeOptions, wb)
err = self.db.BatchPut(wb)
if err != nil {
return err
}
count = 0
wb.Clear()
wb = make([]storage.Write, 0, self.writeBatchSize)
}
}
}
}
return self.db.Write(self.writeOptions, wb)
return self.db.BatchPut(wb)
}
func (self *LevelDbShard) Query(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
@ -158,7 +152,7 @@ func (self *LevelDbShard) DropDatabase(database string) error {
return err
}
}
self.compact()
self.db.Compact()
return nil
}
@ -219,6 +213,9 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser
for i, it := range iterators {
if rawColumnValues[i].value != nil || !it.Valid() {
if err := it.Error(); err != nil {
return err
}
continue
}
@ -342,35 +339,47 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser
return nil
}
func (self *LevelDbShard) executeListSeriesQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
it := self.db.NewIterator(self.readOptions)
defer it.Close()
database := querySpec.Database()
seekKey := append(DATABASE_SERIES_INDEX_PREFIX, []byte(querySpec.Database()+"~")...)
it.Seek(seekKey)
func (self *LevelDbShard) yieldSeriesNamesForDb(db string, yield func(string) bool) error {
dbNameStart := len(DATABASE_SERIES_INDEX_PREFIX)
for it = it; it.Valid(); it.Next() {
key := it.Key()
if len(key) < dbNameStart || !bytes.Equal(key[:dbNameStart], DATABASE_SERIES_INDEX_PREFIX) {
pred := func(key []byte) bool {
return len(key) >= dbNameStart && bytes.Equal(key[:dbNameStart], DATABASE_SERIES_INDEX_PREFIX)
}
firstKey := append(DATABASE_SERIES_INDEX_PREFIX, []byte(db+"~")...)
itr := self.db.Iterator()
defer itr.Close()
for itr.Seek(firstKey); itr.Valid(); itr.Next() {
key := itr.Key()
if !pred(key) {
break
}
dbSeries := string(key[dbNameStart:])
parts := strings.Split(dbSeries, "~")
if len(parts) > 1 {
if parts[0] != database {
if parts[0] != db {
break
}
name := parts[1]
shouldContinue := processor.YieldPoint(&name, nil, nil)
shouldContinue := yield(name)
if !shouldContinue {
return nil
}
}
}
if err := itr.Error(); err != nil {
return err
}
return nil
}
func (self *LevelDbShard) executeListSeriesQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
return self.yieldSeriesNamesForDb(querySpec.Database(), func(_name string) bool {
name := _name
return processor.YieldPoint(&name, nil, nil)
})
}
func (self *LevelDbShard) executeDeleteQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
query := querySpec.DeleteQuery()
series := query.GetFromClause()
@ -391,7 +400,7 @@ func (self *LevelDbShard) executeDeleteQuery(querySpec *parser.QuerySpec, proces
return err
}
}
self.compact()
self.db.Compact()
return nil
}
@ -402,7 +411,7 @@ func (self *LevelDbShard) executeDropSeriesQuery(querySpec *parser.QuerySpec, pr
if err != nil {
return err
}
self.compact()
self.db.Compact()
return nil
}
@ -410,22 +419,22 @@ func (self *LevelDbShard) dropSeries(database, series string) error {
startTimeBytes := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
endTimeBytes := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
wb := levigo.NewWriteBatch()
defer wb.Close()
if err := self.deleteRangeOfSeriesCommon(database, series, startTimeBytes, endTimeBytes); err != nil {
return err
}
wb := []storage.Write{}
for _, name := range self.getColumnNamesForSeries(database, series) {
indexKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(database+"~"+series+"~"+name)...)
wb.Delete(indexKey)
wb = append(wb, storage.Write{indexKey, nil})
}
wb.Delete(append(DATABASE_SERIES_INDEX_PREFIX, []byte(database+"~"+series)...))
key := append(DATABASE_SERIES_INDEX_PREFIX, []byte(database+"~"+series)...)
wb = append(wb, storage.Write{key, nil})
// remove the column indeces for this time series
return self.db.Write(self.writeOptions, wb)
return self.db.BatchPut(wb)
}
func (self *LevelDbShard) byteArrayForTimeInt(time int64) []byte {
@ -451,51 +460,31 @@ func (self *LevelDbShard) deleteRangeOfSeriesCommon(database, series string, sta
return err
}
}
ro := levigo.NewReadOptions()
defer ro.Close()
ro.SetFillCache(false)
wb := levigo.NewWriteBatch()
count := 0
defer wb.Close()
startKey := bytes.NewBuffer(nil)
endKey := bytes.NewBuffer(nil)
for _, field := range fields {
it := self.db.NewIterator(ro)
defer it.Close()
startKey.Reset()
startKey.Write(field.Id)
startKey.Write(startTimeBytes)
startKey.Write([]byte{0, 0, 0, 0, 0, 0, 0, 0})
endKey.Reset()
endKey.Write(field.Id)
endKey.Write(endTimeBytes)
endKey.Write([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff})
startKey := append(field.Id, startTimeBytes...)
it.Seek(startKey)
if it.Valid() {
if !bytes.Equal(it.Key()[:8], field.Id) {
it.Next()
if it.Valid() {
startKey = it.Key()
}
}
}
for it = it; it.Valid(); it.Next() {
k := it.Key()
if len(k) < 16 || !bytes.Equal(k[:8], field.Id) || bytes.Compare(k[8:16], endTimeBytes) == 1 {
break
}
wb.Delete(k)
count++
if count >= self.writeBatchSize {
err = self.db.Write(self.writeOptions, wb)
if err != nil {
return err
}
count = 0
wb.Clear()
}
err := self.db.Del(startKey.Bytes(), endKey.Bytes())
if err != nil {
return err
}
}
return self.db.Write(self.writeOptions, wb)
return nil
}
func (self *LevelDbShard) compact() {
log.Info("Compacting shard")
self.db.CompactRange(levigo.Range{})
log.Info("Shard compaction is done")
}
// func (self *LevelDbShard) compact() {
// log.Info("Compacting shard")
// self.db.CompactRange(levigo.Range{})
// log.Info("Shard compaction is done")
// }
func (self *LevelDbShard) deleteRangeOfSeries(database, series string, startTime, endTime time.Time) error {
startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(common.TimeToMicroseconds(startTime), common.TimeToMicroseconds(endTime))
@ -547,17 +536,20 @@ func (self *LevelDbShard) getFieldsForSeries(db, series string, columns []string
return fields, nil
}
// TODO: WHY NO RETURN AN ERROR
func (self *LevelDbShard) getColumnNamesForSeries(db, series string) []string {
it := self.db.NewIterator(self.readOptions)
dbNameStart := len(SERIES_COLUMN_INDEX_PREFIX)
seekKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(db+"~"+series+"~")...)
pred := func(key []byte) bool {
return len(key) >= dbNameStart && bytes.Equal(key[:dbNameStart], SERIES_COLUMN_INDEX_PREFIX)
}
it := self.db.Iterator()
defer it.Close()
seekKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(db+"~"+series+"~")...)
it.Seek(seekKey)
names := make([]string, 0)
dbNameStart := len(SERIES_COLUMN_INDEX_PREFIX)
for it = it; it.Valid(); it.Next() {
for it.Seek(seekKey); it.Valid(); it.Next() {
key := it.Key()
if len(key) < dbNameStart || !bytes.Equal(key[:dbNameStart], SERIES_COLUMN_INDEX_PREFIX) {
if !pred(key) {
break
}
dbSeriesColumn := string(key[dbNameStart:])
@ -569,6 +561,10 @@ func (self *LevelDbShard) getColumnNamesForSeries(db, series string) []string {
names = append(names, parts[2])
}
}
if err := it.Error(); err != nil {
log.Error("Error while getting columns for series %s: %s", series, err)
return nil
}
return names
}
@ -601,30 +597,16 @@ func (self *LevelDbShard) getSeriesForDbAndRegex(database string, regex *regexp.
return names
}
func (self *LevelDbShard) getSeriesForDatabase(database string) []string {
it := self.db.NewIterator(self.readOptions)
defer it.Close()
seekKey := append(DATABASE_SERIES_INDEX_PREFIX, []byte(database+"~")...)
it.Seek(seekKey)
dbNameStart := len(DATABASE_SERIES_INDEX_PREFIX)
names := make([]string, 0)
for it = it; it.Valid(); it.Next() {
key := it.Key()
if len(key) < dbNameStart || !bytes.Equal(key[:dbNameStart], DATABASE_SERIES_INDEX_PREFIX) {
break
}
dbSeries := string(key[dbNameStart:])
parts := strings.Split(dbSeries, "~")
if len(parts) > 1 {
if parts[0] != database {
break
}
name := parts[1]
names = append(names, name)
}
func (self *LevelDbShard) getSeriesForDatabase(database string) (series []string) {
err := self.yieldSeriesNamesForDb(database, func(name string) bool {
series = append(series, name)
return true
})
if err != nil {
log.Error("Cannot get series names for db %s: %s", database, err)
return nil
}
return names
return series
}
func (self *LevelDbShard) createIdForDbSeriesColumn(db, series, column *string) (ret []byte, err error) {
@ -655,7 +637,7 @@ func (self *LevelDbShard) createIdForDbSeriesColumn(db, series, column *string)
s := fmt.Sprintf("%s~%s~%s", *db, *series, *column)
b := []byte(s)
key := append(SERIES_COLUMN_INDEX_PREFIX, b...)
err = self.db.Put(self.writeOptions, key, ret)
err = self.db.Put(key, ret)
return
}
@ -663,7 +645,7 @@ func (self *LevelDbShard) getIdForDbSeriesColumn(db, series, column *string) (re
s := fmt.Sprintf("%s~%s~%s", *db, *series, *column)
b := []byte(s)
key := append(SERIES_COLUMN_INDEX_PREFIX, b...)
if ret, err = self.db.Get(self.readOptions, key); err != nil {
if ret, err = self.db.Get(key); err != nil {
return nil, err
}
return ret, nil
@ -674,14 +656,13 @@ func (self *LevelDbShard) getNextIdForColumn(db, series, column *string) (ret []
self.lastIdUsed += 1
idBytes := make([]byte, 8, 8)
binary.PutUvarint(idBytes, id)
wb := levigo.NewWriteBatch()
defer wb.Close()
wb.Put(NEXT_ID_KEY, idBytes)
wb := make([]storage.Write, 0, 3)
wb = append(wb, storage.Write{NEXT_ID_KEY, idBytes})
databaseSeriesIndexKey := append(DATABASE_SERIES_INDEX_PREFIX, []byte(*db+"~"+*series)...)
wb.Put(databaseSeriesIndexKey, []byte{})
wb = append(wb, storage.Write{databaseSeriesIndexKey, []byte{}})
seriesColumnIndexKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(*db+"~"+*series+"~"+*column)...)
wb.Put(seriesColumnIndexKey, idBytes)
if err = self.db.Write(self.writeOptions, wb); err != nil {
wb = append(wb, storage.Write{seriesColumnIndexKey, idBytes})
if err = self.db.BatchPut(wb); err != nil {
return nil, err
}
return idBytes, nil
@ -689,9 +670,8 @@ func (self *LevelDbShard) getNextIdForColumn(db, series, column *string) (ret []
func (self *LevelDbShard) close() {
self.closed = true
self.readOptions.Close()
self.writeOptions.Close()
self.db.Close()
self.db = nil
}
func (self *LevelDbShard) convertTimestampToUint(t *int64) uint64 {
@ -723,7 +703,7 @@ func (self *LevelDbShard) fetchSinglePoint(querySpec *parser.QuerySpec, series s
for _, field := range fields {
pointKey := append(field.Id, timeAndSequenceBytes...)
if data, err := self.db.Get(self.readOptions, pointKey); err != nil {
if data, err := self.db.Get(pointKey); err != nil {
return nil, err
} else {
fieldValue := &protocol.FieldValue{}
@ -743,22 +723,30 @@ func (self *LevelDbShard) fetchSinglePoint(querySpec *parser.QuerySpec, series s
return result, nil
}
func (self *LevelDbShard) getIterators(fields []*Field, start, end []byte, isAscendingQuery bool) (fieldNames []string, iterators []*levigo.Iterator) {
iterators = make([]*levigo.Iterator, len(fields))
func (self *LevelDbShard) getIterators(fields []*Field, start, end []byte, isAscendingQuery bool) (fieldNames []string, iterators []storage.Iterator) {
iterators = make([]storage.Iterator, len(fields))
fieldNames = make([]string, len(fields))
// start the iterators to go through the series data
for i, field := range fields {
fieldNames[i] = field.Name
iterators[i] = self.db.NewIterator(self.readOptions)
iterators[i] = self.db.Iterator()
if isAscendingQuery {
iterators[i].Seek(append(field.Id, start...))
firstKey := append(field.Id, start...)
iterators[i].Seek(firstKey)
} else {
iterators[i].Seek(append(append(field.Id, end...), MAX_SEQUENCE...))
firstKey := append(append(field.Id, end...), MAX_SEQUENCE...)
iterators[i].Seek(firstKey)
if iterators[i].Valid() {
iterators[i].Prev()
}
}
if err := iterators[i].Error(); err != nil {
log.Error("Error while getting iterators: %s", err)
return nil, nil
}
}
return
}

View File

@ -12,6 +12,8 @@ import (
"sync"
"time"
"datastore/storage"
log "code.google.com/p/log4go"
"github.com/jmhodges/levigo"
)
@ -117,17 +119,20 @@ func (self *LevelDbShardDatastore) GetOrCreateShard(id uint32) (cluster.LocalSha
dbDir := self.shardDir(id)
var se storage.Engine
var err error
log.Info("DATASTORE: opening or creating shard %s", dbDir)
ldb, err := levigo.Open(dbDir, self.levelDbOptions)
// TODO: pass options to the leveldb constructor
se, err = storage.GetEngine("leveldb", dbDir)
if err != nil {
log.Error("Error opening shard: ", err)
return nil, err
}
db, err = NewLevelDbShard(ldb, self.pointBatchSize, self.writeBatchSize)
db, err = NewLevelDbShard(se, self.pointBatchSize, self.writeBatchSize)
if err != nil {
log.Error("Error creating shard: ", err)
ldb.Close()
se.Close()
return nil, err
}
self.shards[id] = db

View File

@ -0,0 +1,50 @@
package storage
// A write/delete used by BatchPut. To delete data, set the Value to nil
type Write struct {
Key []byte
Value []byte
}
// Iterator is used to scan a range of keys
type Iterator interface {
// Seek to the given key, or a larger key if this one doesn't exist
Seek(key []byte)
// Get the key at the current position
Key() []byte
// Get the value at the current position
Value() []byte
// Move the next element
Next()
// Move the previous element
Prev()
// True if no error occured during scanning and if the predicate
// used when calling GetRangePredicate is true . If GetRange is
// used, returns GetKey() <= last where <= is the lexicographic
// order.
Valid() bool
// Returns any errors occured during scannig
Error() error
// Close the iterator and free the resources used
Close() error
}
// Interface to all storage engine backends
type Engine interface {
Name() string
Path() string
// Add the given key/value pair, duplicates aren't allowed
Put(key, value []byte) error
// Get the value associated with the given key
Get(key []byte) ([]byte, error)
// More efficient put
BatchPut(writes []Write) error
// Delete the given range of keys [first, last]
Del(first, last []byte) error
// Get an iterator for the db
Iterator() Iterator
// Compact the db and reclaim unused space
Compact()
// Close the database
Close()
}

View File

@ -0,0 +1,141 @@
package storage
import (
"bytes"
levigo "github.com/jmhodges/levigo"
)
const LEVELDB_NAME = "leveldb"
func init() {
registerEngine(LEVELDB_NAME, NewLevelDb)
}
type LevelDB struct {
db *levigo.DB
opts *levigo.Options
wopts *levigo.WriteOptions
ropts *levigo.ReadOptions
cache *levigo.Cache
path string
}
func NewLevelDb(path string) (Engine, error) {
opts := levigo.NewOptions()
cache := levigo.NewLRUCache(100 * 1024 * 1024)
opts.SetCompression(levigo.NoCompression)
opts.SetCache(cache)
opts.SetCreateIfMissing(true)
db, err := levigo.Open(path, opts)
wopts := levigo.NewWriteOptions()
ropts := levigo.NewReadOptions()
return LevelDB{db, opts, wopts, ropts, cache, path}, err
}
func (db LevelDB) Compact() {
db.db.CompactRange(levigo.Range{})
}
func (db LevelDB) Close() {
db.cache.Close()
db.ropts.Close()
db.wopts.Close()
db.opts.Close()
db.db.Close()
}
func (db LevelDB) Put(key, value []byte) error {
return db.BatchPut([]Write{Write{key, value}})
}
func (db LevelDB) Get(key []byte) ([]byte, error) {
return db.db.Get(db.ropts, key)
}
func (_ LevelDB) Name() string {
return LEVELDB_NAME
}
func (db LevelDB) Path() string {
return db.path
}
func (db LevelDB) BatchPut(writes []Write) error {
wb := levigo.NewWriteBatch()
defer wb.Close()
for _, w := range writes {
if w.Value == nil {
wb.Delete(w.Key)
continue
}
wb.Put(w.Key, w.Value)
}
return db.db.Write(db.wopts, wb)
}
func (db LevelDB) Del(start, finish []byte) error {
wb := levigo.NewWriteBatch()
defer wb.Close()
itr := db.Iterator()
defer itr.Close()
for itr.Seek(start); itr.Valid(); itr.Next() {
k := itr.Key()
if bytes.Compare(k, finish) > 0 {
break
}
wb.Delete(k)
}
if err := itr.Error(); err != nil {
return err
}
return db.db.Write(db.wopts, wb)
}
type LevelDbIterator struct {
_itr *levigo.Iterator
err error
}
func (itr *LevelDbIterator) Seek(key []byte) {
itr._itr.Seek(key)
}
func (itr *LevelDbIterator) Next() {
itr._itr.Next()
}
func (itr *LevelDbIterator) Prev() {
itr._itr.Prev()
}
func (itr *LevelDbIterator) Valid() bool {
return itr._itr.Valid()
}
func (itr *LevelDbIterator) Error() error {
return itr.err
}
func (itr *LevelDbIterator) Key() []byte {
return itr._itr.Key()
}
func (itr *LevelDbIterator) Value() []byte {
return itr._itr.Value()
}
func (itr *LevelDbIterator) Close() error {
itr._itr.Close()
return nil
}
func (db LevelDB) Iterator() Iterator {
itr := db.db.NewIterator(db.ropts)
return &LevelDbIterator{itr, nil}
}

View File

@ -0,0 +1,240 @@
package storage
import (
"bytes"
"os"
mdb "github.com/szferi/gomdb"
)
const MDB_NAME = "lmdb"
func init() {
registerEngine(MDB_NAME, NewMDB)
}
type Mdb struct {
env *mdb.Env
db mdb.DBI
path string
}
func NewMDB(path string) (Engine, error) {
env, err := mdb.NewEnv()
if err != nil {
return Mdb{}, err
}
// TODO: max dbs should be configurable
if err := env.SetMaxDBs(10); err != nil {
return Mdb{}, err
}
if err := env.SetMapSize(10 << 30); err != nil {
return Mdb{}, err
}
if _, err := os.Stat(path); err != nil {
err = os.MkdirAll(path, 0755)
if err != nil {
return Mdb{}, err
}
}
err = env.Open(path, mdb.WRITEMAP|mdb.MAPASYNC|mdb.CREATE, 0755)
if err != nil {
return Mdb{}, err
}
tx, err := env.BeginTxn(nil, 0)
if err != nil {
return Mdb{}, err
}
dbi, err := tx.DBIOpen(nil, mdb.CREATE)
if err != nil {
return Mdb{}, err
}
if err := tx.Commit(); err != nil {
return Mdb{}, err
}
db := Mdb{
env: env,
db: dbi,
path: path,
}
return db, nil
}
func (db Mdb) Put(key, value []byte) error {
return db.BatchPut([]Write{Write{key, value}})
}
func (db Mdb) BatchPut(writes []Write) error {
tx, err := db.env.BeginTxn(nil, 0)
if err != nil {
return err
}
for _, w := range writes {
var err error
if w.Value == nil {
err = tx.Del(db.db, w.Key, nil)
} else {
err = tx.Put(db.db, w.Key, w.Value, 0)
}
if err != nil && err != mdb.NotFound {
tx.Abort()
return err
}
}
return tx.Commit()
}
func (db Mdb) Get(key []byte) ([]byte, error) {
tx, err := db.env.BeginTxn(nil, mdb.RDONLY)
if err != nil {
return nil, err
}
defer tx.Commit()
v, err := tx.Get(db.db, key)
if err == mdb.NotFound {
return nil, nil
}
return v, err
}
func (db Mdb) Del(start, finish []byte) error {
tx, err := db.env.BeginTxn(nil, 0)
if err != nil {
return err
}
defer tx.Commit()
itr := db.iterator(true)
defer itr.Close()
count := 0
for itr.Seek(start); itr.Valid(); itr.Next() {
key := itr.Key()
if bytes.Compare(key, finish) > 0 {
break
}
// TODO: We should be using one cursor instead of two
// transactions, but deleting using a cursor, crashes
err = tx.Del(db.db, itr.key, nil)
if err != nil {
return err
}
count++
}
return nil
}
type MdbIterator struct {
key []byte
value []byte
c *mdb.Cursor
tx *mdb.Txn
valid bool
err error
}
func (itr *MdbIterator) Key() []byte {
return itr.key
}
func (itr *MdbIterator) Value() []byte {
return itr.value
}
func (itr *MdbIterator) Valid() bool {
return itr.valid
}
func (itr *MdbIterator) Error() error {
return itr.err
}
func (itr *MdbIterator) getCurrent() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.GET_CURRENT)
itr.setState()
}
func (itr *MdbIterator) Seek(key []byte) {
itr.key, itr.value, itr.err = itr.c.Get(key, mdb.SET_RANGE)
itr.setState()
}
func (itr *MdbIterator) Next() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.NEXT)
itr.setState()
}
func (itr *MdbIterator) Prev() {
itr.key, itr.value, itr.err = itr.c.Get(nil, mdb.PREV)
itr.setState()
}
func (itr *MdbIterator) setState() {
if itr.err != nil {
if itr.err == mdb.NotFound {
itr.err = nil
}
itr.valid = false
}
}
func (itr *MdbIterator) Close() error {
if err := itr.c.Close(); err != nil {
itr.tx.Commit()
return err
}
return itr.tx.Commit()
}
func (_ Mdb) Name() string {
return MDB_NAME
}
func (db Mdb) Path() string {
return db.path
}
func (db Mdb) Iterator() Iterator {
return db.iterator(true)
}
func (db Mdb) Compact() {
}
func (db Mdb) iterator(rdonly bool) *MdbIterator {
flags := uint(0)
if rdonly {
flags = mdb.RDONLY
}
tx, err := db.env.BeginTxn(nil, flags)
if err != nil {
return &MdbIterator{nil, nil, nil, nil, false, err}
}
c, err := tx.CursorOpen(db.db)
if err != nil {
tx.Abort()
return &MdbIterator{nil, nil, nil, nil, false, err}
}
return &MdbIterator{nil, nil, c, tx, true, nil}
}
func (db Mdb) Close() {
db.env.DBIClose(db.db)
if err := db.env.Close(); err != nil {
panic(err)
}
}

View File

@ -0,0 +1,9 @@
package storage
import "bytes"
func DefaultPredicate(last []byte) func(key []byte) bool {
return func(key []byte) bool {
return bytes.Compare(key, last) < 1
}
}

View File

@ -0,0 +1,23 @@
package storage
import "fmt"
type Init func(path string) (Engine, error)
var engineRegistry = make(map[string]Init)
func registerEngine(name string, initializer Init) {
if _, ok := engineRegistry[name]; ok {
panic(fmt.Errorf("Engine '%s' already exists", name))
}
engineRegistry[name] = initializer
}
func GetEngine(name, path string) (Engine, error) {
initializer := engineRegistry[name]
if initializer == nil {
return nil, fmt.Errorf("Engine '%s' not found", name)
}
return initializer(path)
}

View File

@ -0,0 +1,53 @@
package main
import (
"bytes"
"datastore/storage"
"encoding/binary"
"math/rand"
"protocol"
"time"
"code.google.com/p/goprotobuf/proto"
)
type Config struct {
points int
batch int
series int
nextSeriesId int
nextSequence int64
now time.Time
}
func (c *Config) MakeBatch() []storage.Write {
ws := make([]storage.Write, 0, c.batch)
for b := c.batch; b > 0; b-- {
key := bytes.NewBuffer(nil)
binary.Write(key, binary.BigEndian, int64(c.nextSeriesId))
binary.Write(key, binary.BigEndian, c.now.UnixNano()/1000)
binary.Write(key, binary.BigEndian, c.nextSequence)
v := rand.Int63()
fv := &protocol.FieldValue{
Int64Value: &v,
}
b, err := proto.Marshal(fv)
if err != nil {
panic(err)
}
ws = append(ws, storage.Write{
Key: key.Bytes(),
Value: b,
})
c.nextSeriesId++
if c.nextSeriesId >= c.series {
c.nextSeriesId = 0
}
c.nextSequence++
c.now = c.now.Add(time.Microsecond)
}
return ws
}

View File

@ -0,0 +1,183 @@
package main
import (
"bytes"
"datastore/storage"
"encoding/binary"
"flag"
"fmt"
"math/rand"
"os"
"os/exec"
"strings"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
points := flag.Int("points", 200000000, "Number of points")
batchSize := flag.Int("batch", 1000, "Batch size")
series := flag.Int("series", 1, "Number of series")
flag.Parse()
os.RemoveAll("/tmp/test-ldb")
os.RemoveAll("/tmp/test-lmdb")
benchmarkLevelDb(Config{*points, *batchSize, *series, 0, 0, time.Now()})
benchmarkMdb(Config{*points, *batchSize, *series, 0, 0, time.Now()})
}
func benchmarkMdb(c Config) {
db, err := storage.GetEngine("lmdb", "/tmp/test-lmdb")
if err != nil {
panic(err)
}
defer db.Close()
benchmarkDbCommon(db, c)
}
func benchmarkLevelDb(c Config) {
db, err := storage.GetEngine("leveldb", "/tmp/test-mdb")
if err != nil {
panic(err)
}
defer db.Close()
benchmarkDbCommon(db, c)
}
func getSize(path string) string {
cmd := exec.Command("du", "-sh", path)
out, err := cmd.CombinedOutput()
if err != nil {
panic(err)
}
return strings.Fields(string(out))[0]
}
func benchmarkDbCommon(db storage.Engine, c Config) {
fmt.Printf("################ Benchmarking: %s\n", db.Name())
start := time.Now()
count := 0
for p := c.points; p > 0; {
c := writeBatch(db, &c)
count += c
p -= c
}
d := time.Now().Sub(start)
fmt.Printf("Writing %d points in batches of %d points took %s (%f microsecond per point)\n",
count,
c.batch,
d,
float64(d.Nanoseconds())/1000.0/float64(c.points),
)
timeQuerying(db, c.series)
fmt.Printf("Size: %s\n", getSize(db.Path()))
queryAndDelete(db, c.points, c.series)
timeQuerying(db, c.series)
fmt.Printf("Size: %s\n", getSize(db.Path()))
count = 0
for p := c.points / 2; p > 0; {
c := writeBatch(db, &c)
count += c
p -= c
}
fmt.Printf("Writing %d points in batches of %d points took %s (%f microsecond per point)\n",
count,
c.batch,
d,
float64(d.Nanoseconds())/1000.0/float64(c.points),
)
fmt.Printf("Size: %s\n", getSize(db.Path()))
}
func timeQuerying(db storage.Engine, series int) {
s := time.Now()
count := 0
for series -= 1; series >= 0; series-- {
query(db, int64(series), func(itr storage.Iterator) {
count++
})
}
d := time.Now().Sub(s)
fmt.Printf("Querying %d points took %s (%f microseconds per point)\n",
count, d, float64(d.Nanoseconds())/1000.0/float64(count))
}
func queryAndDelete(db storage.Engine, points, series int) {
// query the database
startCount := points / series / 4
endCount := points * 3 / series / 4
total := 0
var d time.Duration
for series -= 1; series >= 0; series-- {
count := 0
var delStart []byte
var delEnd []byte
query(db, int64(series), func(itr storage.Iterator) {
count++
if count == startCount {
delStart = itr.Key()
}
if count == endCount-1 {
delEnd = itr.Key()
total += endCount - startCount
}
})
start := time.Now()
err := db.Del(delStart, delEnd)
if err != nil {
panic(err)
}
d += time.Now().Sub(start)
}
fmt.Printf("Took %s to delete %d points\n", d, total)
start := time.Now()
db.Compact()
fmt.Printf("Took %s to compact\n", time.Now().Sub(start))
}
func query(db storage.Engine, s int64, yield func(storage.Iterator)) {
sb := bytes.NewBuffer(nil)
binary.Write(sb, binary.BigEndian, s)
binary.Write(sb, binary.BigEndian, int64(0))
binary.Write(sb, binary.BigEndian, int64(0))
eb := bytes.NewBuffer(nil)
binary.Write(eb, binary.BigEndian, s)
binary.Write(eb, binary.BigEndian, int64(-1))
binary.Write(eb, binary.BigEndian, int64(-1))
itr := db.Iterator()
defer itr.Close()
count := 0
for itr.Seek(sb.Bytes()); itr.Valid(); itr.Next() {
key := itr.Key()
if bytes.Compare(key, eb.Bytes()) > 0 {
break
}
count++
yield(itr)
}
}
func writeBatch(db storage.Engine, c *Config) int {
ws := c.MakeBatch()
if err := db.BatchPut(ws); err != nil {
panic(err)
}
return len(ws)
}