Add data migration 0.7 -> 0.8

Close #809. Fix #746
pull/809/merge
Paul Dix 2014-08-15 13:18:43 -04:00 committed by John Shahid
parent a9d73f5889
commit 5403baffd3
11 changed files with 1345 additions and 177 deletions

4
.gitignore vendored
View File

@ -29,7 +29,6 @@ protocol/protocol.pb.go
build/
# executables
/server
/influxdb
/benchmark-tool
/main
@ -58,3 +57,6 @@ config.toml
/admin
/admin/
/data/
# test data files
integration/migration_data/

View File

@ -14,48 +14,56 @@ import (
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"time"
log "code.google.com/p/log4go"
"github.com/bmizerany/pat"
"github.com/influxdb/influxdb/cluster"
. "github.com/influxdb/influxdb/common"
"github.com/influxdb/influxdb/configuration"
"github.com/influxdb/influxdb/coordinator"
"github.com/influxdb/influxdb/migration"
"github.com/influxdb/influxdb/parser"
"github.com/influxdb/influxdb/protocol"
)
type HttpServer struct {
conn net.Listener
sslConn net.Listener
httpPort string
httpSslPort string
httpSslCert string
adminAssetsDir string
coordinator coordinator.Coordinator
userManager UserManager
shutdown chan bool
clusterConfig *cluster.ClusterConfiguration
raftServer *coordinator.RaftServer
readTimeout time.Duration
conn net.Listener
sslConn net.Listener
httpPort string
httpSslPort string
httpSslCert string
adminAssetsDir string
coordinator coordinator.Coordinator
userManager UserManager
shutdown chan bool
clusterConfig *cluster.ClusterConfiguration
raftServer *coordinator.RaftServer
readTimeout time.Duration
migrationRunning uint32
config *configuration.Configuration
}
func NewHttpServer(httpPort string, readTimeout time.Duration, adminAssetsDir string, theCoordinator coordinator.Coordinator, userManager UserManager, clusterConfig *cluster.ClusterConfiguration, raftServer *coordinator.RaftServer) *HttpServer {
func NewHttpServer(config *configuration.Configuration, theCoordinator coordinator.Coordinator, userManager UserManager, clusterConfig *cluster.ClusterConfiguration, raftServer *coordinator.RaftServer) *HttpServer {
self := &HttpServer{}
self.httpPort = httpPort
self.adminAssetsDir = adminAssetsDir
self.httpPort = config.ApiHttpPortString()
self.adminAssetsDir = config.AdminAssetsDir
self.coordinator = theCoordinator
self.userManager = userManager
self.shutdown = make(chan bool, 2)
self.clusterConfig = clusterConfig
self.raftServer = raftServer
self.readTimeout = readTimeout
self.readTimeout = config.ApiReadTimeout
self.config = config
return self
}
const (
INVALID_CREDENTIALS_MSG = "Invalid database/username/password"
JSON_PRETTY_PRINT_INDENT = " "
MIGRATION_RUNNING = uint32(1)
MIGRATION_NOT_RUNNING = uint32(0)
)
func isPretty(r *libhttp.Request) bool {
@ -156,6 +164,9 @@ func (self *HttpServer) Serve(listener net.Listener) {
self.registerEndpoint(p, "del", "/cluster/shard_spaces/:db/:name", self.dropShardSpace)
self.registerEndpoint(p, "post", "/cluster/database_configs/:db", self.configureDatabase)
// migrates leveldb data from 0.7 to 0.8 format.
self.registerEndpoint(p, "post", "/cluster/migrate_data", self.migrateData)
// return whether the cluster is in sync or not
self.registerEndpoint(p, "get", "/sync", self.isInSync)
@ -1213,3 +1224,20 @@ func (self *HttpServer) configureDatabase(w libhttp.ResponseWriter, r *libhttp.R
return libhttp.StatusCreated, nil
})
}
func (self *HttpServer) migrateData(w libhttp.ResponseWriter, r *libhttp.Request) {
self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) {
if !atomic.CompareAndSwapUint32(&self.migrationRunning, MIGRATION_NOT_RUNNING, MIGRATION_RUNNING) {
return libhttp.StatusForbidden, fmt.Errorf("A migration is already running")
}
go func() {
log.Info("Starting Migration")
defer atomic.CompareAndSwapUint32(&self.migrationRunning, MIGRATION_RUNNING, MIGRATION_NOT_RUNNING)
dataMigrator := migration.NewDataMigrator(
self.coordinator.(*coordinator.CoordinatorImpl), self.clusterConfig, self.config, self.config.DataDir, "shard_db", self.clusterConfig.MetaStore)
dataMigrator.Migrate()
}()
return libhttp.StatusAccepted, nil
})
}

View File

@ -215,12 +215,10 @@ func (self *CoordinatorImpl) runDropSeriesQuery(querySpec *parser.QuerySpec, ser
return err
}
defer seriesWriter.Close()
fmt.Println("DROP series")
err := self.raftServer.DropSeries(db, series)
if err != nil {
return err
}
fmt.Println("DROP returning nil")
return nil
}

View File

@ -7,7 +7,6 @@ import (
"fmt"
"math"
"regexp"
"strings"
"time"
"code.google.com/p/goprotobuf/proto"
@ -95,7 +94,7 @@ func (self *Shard) Write(database string, series []*protocol.Series) error {
func (self *Shard) Query(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
if querySpec.IsListSeriesQuery() {
return self.executeListSeriesQuery(querySpec, processor)
return fmt.Errorf("List series queries should never come to the shard")
} else if querySpec.IsDeleteFromSeriesQuery() {
return self.executeDeleteQuery(querySpec, processor)
}
@ -279,7 +278,7 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName
Points: seriesOutgoing.Points,
}
if !processor.YieldSeries(series) {
log.Info("Stopping processing")
log.Debug("Stopping processing")
shouldContinue = false
}
}
@ -304,47 +303,6 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName
return nil
}
func (self *Shard) yieldSeriesNamesForDb(db string, yield func(string) bool) error {
dbNameStart := len(DATABASE_SERIES_INDEX_PREFIX)
pred := func(key []byte) bool {
return len(key) >= dbNameStart && bytes.Equal(key[:dbNameStart], DATABASE_SERIES_INDEX_PREFIX)
}
firstKey := append(DATABASE_SERIES_INDEX_PREFIX, []byte(db+"~")...)
itr := self.db.Iterator()
defer itr.Close()
for itr.Seek(firstKey); itr.Valid(); itr.Next() {
key := itr.Key()
if !pred(key) {
break
}
dbSeries := string(key[dbNameStart:])
parts := strings.Split(dbSeries, "~")
if len(parts) > 1 {
if parts[0] != db {
break
}
name := parts[1]
shouldContinue := yield(name)
if !shouldContinue {
return nil
}
}
}
if err := itr.Error(); err != nil {
return err
}
return nil
}
func (self *Shard) executeListSeriesQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
return self.yieldSeriesNamesForDb(querySpec.Database(), func(_name string) bool {
name := _name
return processor.YieldPoint(&name, nil, nil)
})
}
func (self *Shard) executeDeleteQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
query := querySpec.DeleteQuery()
series := query.GetFromClause()
@ -571,114 +529,3 @@ func (self *Shard) getFieldsForSeries(db, series string, columns []string) ([]*m
}
return fields, nil
}
/* DEPRECATED methods do not use*/
// TODO: remove this on version 0.9 after people have had a chance to do migrations
func (self *Shard) getSeriesForDbAndRegexDEPRECATED(database string, regex *regexp.Regexp) []string {
names := []string{}
allSeries := self.metaStore.GetSeriesForDatabase(database)
for _, name := range allSeries {
if regex.MatchString(name) {
names = append(names, name)
}
}
return names
}
// TODO: remove this on version 0.9 after people have had a chance to do migrations
func (self *Shard) getSeriesForDatabaseDEPRECATED(database string) (series []string) {
err := self.yieldSeriesNamesForDb(database, func(name string) bool {
series = append(series, name)
return true
})
if err != nil {
log.Error("Cannot get series names for db %s: %s", database, err)
return nil
}
return series
}
// TODO: remove this function. I'm keeping it around for the moment since it'll probably have to be
// used in the DB upgrate/migration that moves metadata from the shard to Raft
func (self *Shard) getFieldsForSeriesDEPRECATED(db, series string, columns []string) ([]*metastore.Field, error) {
isCountQuery := false
if len(columns) > 0 && columns[0] == "*" {
columns = self.getColumnNamesForSeriesDEPRECATED(db, series)
} else if len(columns) == 0 {
isCountQuery = true
columns = self.getColumnNamesForSeriesDEPRECATED(db, series)
}
if len(columns) == 0 {
return nil, FieldLookupError{"Couldn't look up columns for series: " + series}
}
fields := make([]*metastore.Field, len(columns), len(columns))
for i, name := range columns {
id, errId := self.getIdForDbSeriesColumnDEPRECATED(&db, &series, &name)
if errId != nil {
return nil, errId
}
if id == nil {
return nil, FieldLookupError{"Field " + name + " doesn't exist in series " + series}
}
idInt, err := binary.ReadUvarint(bytes.NewBuffer(id))
if err != nil {
return nil, err
}
fields[i] = &metastore.Field{Name: name, Id: idInt}
}
// if it's a count query we just want the column that will be the most efficient to
// scan through. So find that and return it.
if isCountQuery {
bestField := fields[0]
return []*metastore.Field{bestField}, nil
}
return fields, nil
}
// TODO: remove this function. I'm keeping it around for the moment since it'll probably have to be
// used in the DB upgrate/migration that moves metadata from the shard to Raft
func (self *Shard) getColumnNamesForSeriesDEPRECATED(db, series string) []string {
dbNameStart := len(SERIES_COLUMN_INDEX_PREFIX)
seekKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(db+"~"+series+"~")...)
pred := func(key []byte) bool {
return len(key) >= dbNameStart && bytes.Equal(key[:dbNameStart], SERIES_COLUMN_INDEX_PREFIX)
}
it := self.db.Iterator()
defer it.Close()
names := make([]string, 0)
for it.Seek(seekKey); it.Valid(); it.Next() {
key := it.Key()
if !pred(key) {
break
}
dbSeriesColumn := string(key[dbNameStart:])
parts := strings.Split(dbSeriesColumn, "~")
if len(parts) > 2 {
if parts[0] != db || parts[1] != series {
break
}
names = append(names, parts[2])
}
}
if err := it.Error(); err != nil {
log.Error("Error while getting columns for series %s: %s", series, err)
return nil
}
return names
}
// TODO: remove this after a version that doesn't support migration from old non-raft metastore
func (self *Shard) getIdForDbSeriesColumnDEPRECATED(db, series, column *string) (ret []byte, err error) {
s := fmt.Sprintf("%s~%s~%s", *db, *series, *column)
b := []byte(s)
key := append(SERIES_COLUMN_INDEX_PREFIX, b...)
if ret, err = self.db.Get(key); err != nil {
return nil, err
}
return ret, nil
}

View File

@ -295,6 +295,10 @@ type FieldLookupError struct {
message string
}
func NewFieldLookupError(message string) *FieldLookupError {
return &FieldLookupError{message}
}
func (self FieldLookupError) Error() string {
return self.message
}

View File

@ -0,0 +1,86 @@
package integration
/*
This is commented out because we can't generate old data automatically. The files for this test
are up on S3 so that we can run it later. Just trust that I've run it (this is Paul)
*/
// import (
// "fmt"
// "io/ioutil"
// "net/http"
// "os"
// "path/filepath"
// "time"
// "github.com/influxdb/influxdb/datastore"
// "github.com/influxdb/influxdb/migration"
// . "github.com/influxdb/influxdb/integration/helpers"
// . "launchpad.net/gocheck"
// )
// type MigrationTestSuite struct {
// server *Server
// }
// var _ = Suite(&MigrationTestSuite{})
// func (self *MigrationTestSuite) SetUpSuite(c *C) {
// self.server = NewServer("integration/migration_test.toml", c)
// }
// func (self *MigrationTestSuite) TearDownSuite(c *C) {
// self.server.Stop()
// dataDir := "migration_data/data"
// shardDir := filepath.Join(dataDir, migration.OLD_SHARD_DIR)
// infos, err := ioutil.ReadDir(shardDir)
// if err != nil {
// fmt.Printf("Error Clearing Migration: ", err)
// return
// }
// for _, info := range infos {
// if info.IsDir() {
// os.Remove(filepath.Join(shardDir, info.Name(), migration.MIGRATED_MARKER))
// }
// }
// os.RemoveAll(filepath.Join(dataDir, datastore.SHARD_DATABASE_DIR))
// }
// func (self *MigrationTestSuite) TestMigrationOfPreviousDb(c *C) {
// _, err := http.Post("http://localhost:8086/cluster/migrate_data?u=root&p=root", "application/json", nil)
// c.Assert(err, IsNil)
// // make sure that it won't kick it off a second time while it's already running
// resp, _ := http.Post("http://localhost:8086/cluster/migrate_data?u=root&p=root", "application/json", nil)
// c.Assert(resp.StatusCode, Equals, http.StatusForbidden)
// time.Sleep(time.Second * 60)
// client := self.server.GetClient("test1", c)
// s, err := client.Query("select count(value) from cpu_idle")
// c.Assert(err, IsNil)
// c.Assert(s, HasLen, 1)
// c.Assert(s[0].Points, HasLen, 1)
// c.Assert(s[0].Points[0][1].(float64), Equals, float64(44434))
// s, err = client.Query("select count(type) from customer_events")
// c.Assert(err, IsNil)
// c.Assert(s, HasLen, 1)
// c.Assert(s[0].Points, HasLen, 1)
// c.Assert(s[0].Points[0][1].(float64), Equals, float64(162597))
// client = self.server.GetClient("test2", c)
// s, err = client.Query("list series")
// c.Assert(err, IsNil)
// c.Assert(s, HasLen, 1)
// c.Assert(s[0].Points, HasLen, 1000)
// s, err = client.Query("select count(value) from /.*/")
// c.Assert(s, HasLen, 1000)
// for _, series := range s {
// c.Assert(series.Points, HasLen, 1)
// c.Assert(series.Points[0][1].(float64), Equals, float64(1434))
// }
// _, err = http.Post("http://localhost:8086/cluster/migrate_data?u=root&p=root", "application/json", nil)
// c.Assert(err, IsNil)
// time.Sleep(time.Second * 5)
// }

View File

@ -0,0 +1,187 @@
# Welcome to the InfluxDB configuration file.
# If hostname (on the OS) doesn't return a name that can be resolved by the other
# systems in the cluster, you'll have to set the hostname to an IP or something
# that can be resolved here.
# hostname = ""
bind-address = "0.0.0.0"
# Once every 24 hours InfluxDB will report anonymous data to m.influxdb.com
# The data includes raft name (random 8 bytes), os, arch and version
# We don't track ip addresses of servers reporting. This is only used
# to track the number of instances running and the versions which
# is very helpful for us.
# Change this option to true to disable reporting.
reporting-disabled = true
[logging]
# logging level can be one of "debug", "info", "warn" or "error"
level = "info"
file = "stdout" # stdout to log to standard out
# Configure the admin server
[admin]
port = 8083 # binding is disabled if the port isn't set
assets = "./admin"
# Configure the http api
[api]
port = 8086 # binding is disabled if the port isn't set
# ssl-port = 8084 # Ssl support is enabled if you set a port and cert
# ssl-cert = /path/to/cert.pem
# connections will timeout after this amount of time. Ensures that clients that misbehave
# and keep alive connections they don't use won't end up connection a million times.
# However, if a request is taking longer than this to complete, could be a problem.
read-timeout = "5s"
[input_plugins]
# Configure the graphite api
[input_plugins.graphite]
enabled = false
# port = 2003
# database = "" # store graphite data in this database
# udp_enabled = true # enable udp interface on the same port as the tcp interface
# Configure the udp api
[input_plugins.udp]
enabled = false
# port = 4444
# database = ""
# Configure multiple udp apis each can write to separate db. Just
# repeat the following section to enable multiple udp apis on
# different ports.
[[input_plugins.udp_servers]] # array of tables
enabled = false
# port = 5551
# database = "db1"
# Raft configuration
[raft]
# The raft port should be open between all servers in a cluster.
# However, this port shouldn't be accessible from the internet.
port = 8090
# Where the raft logs are stored. The user running InfluxDB will need read/write access.
dir = "integration/migration_data/raft"
# election-timeout = "1s"
[storage]
dir = "integration/migration_data/data"
# How many requests to potentially buffer in memory. If the buffer gets filled then writes
# will still be logged and once the local storage has caught up (or compacted) the writes
# will be replayed from the WAL
write-buffer-size = 10000
# the engine to use for new shards, old shards will continue to use the same engine
default-engine = "leveldb"
# The default setting on this is 0, which means unlimited. Set this to something if you want to
# limit the max number of open files. max-open-files is per shard so this * that will be max.
max-open-shards = 0
# The default setting is 100. This option tells how many points will be fetched from LevelDb before
# they get flushed into backend.
point-batch-size = 100
# The number of points to batch in memory before writing them to leveldb. Lowering this number will
# reduce the memory usage, but will result in slower writes.
write-batch-size = 5000000
# The server will check this often for shards that have expired that should be cleared.
retention-sweep-period = "10m"
[storage.engines.leveldb]
# Maximum mmap open files, this will affect the virtual memory used by
# the process
max-open-files = 1000
# LRU cache size, LRU is used by leveldb to store contents of the
# uncompressed sstables. You can use `m` or `g` prefix for megabytes
# and gigabytes, respectively.
lru-cache-size = "200m"
[storage.engines.rocksdb]
# Maximum mmap open files, this will affect the virtual memory used by
# the process
max-open-files = 1000
# LRU cache size, LRU is used by rocksdb to store contents of the
# uncompressed sstables. You can use `m` or `g` prefix for megabytes
# and gigabytes, respectively.
lru-cache-size = "200m"
[storage.engines.hyperleveldb]
# Maximum mmap open files, this will affect the virtual memory used by
# the process
max-open-files = 1000
# LRU cache size, LRU is used by rocksdb to store contents of the
# uncompressed sstables. You can use `m` or `g` prefix for megabytes
# and gigabytes, respectively.
lru-cache-size = "200m"
[storage.engines.lmdb]
map-size = "100g"
[cluster]
# A comma separated list of servers to seed
# this server. this is only relevant when the
# server is joining a new cluster. Otherwise
# the server will use the list of known servers
# prior to shutting down. Any server can be pointed to
# as a seed. It will find the Raft leader automatically.
# Here's an example. Note that the port on the host is the same as the raft port.
# seed-servers = ["hosta:8090","hostb:8090"]
# Replication happens over a TCP connection with a Protobuf protocol.
# This port should be reachable between all servers in a cluster.
# However, this port shouldn't be accessible from the internet.
protobuf_port = 8099
protobuf_timeout = "2s" # the write timeout on the protobuf conn any duration parseable by time.ParseDuration
protobuf_heartbeat = "200ms" # the heartbeat interval between the servers. must be parseable by time.ParseDuration
protobuf_min_backoff = "1s" # the minimum backoff after a failed heartbeat attempt
protobuf_max_backoff = "10s" # the maxmimum backoff after a failed heartbeat attempt
# How many write requests to potentially buffer in memory per server. If the buffer gets filled then writes
# will still be logged and once the server has caught up (or come back online) the writes
# will be replayed from the WAL
write-buffer-size = 1000
# the maximum number of responses to buffer from remote nodes, if the
# expected number of responses exceed this number then querying will
# happen sequentially and the buffer size will be limited to this
# number
max-response-buffer-size = 100
# When queries get distributed out to shards, they go in parallel. This means that results can get buffered
# in memory since results will come in any order, but have to be processed in the correct time order.
# Setting this higher will give better performance, but you'll need more memory. Setting this to 1 will ensure
# that you don't need to buffer in memory, but you won't get the best performance.
concurrent-shard-query-limit = 10
[wal]
dir = "integration/migration_data/wal"
flush-after = 1000 # the number of writes after which wal will be flushed, 0 for flushing on every write
bookmark-after = 1000 # the number of writes after which a bookmark will be created
# the number of writes after which an index entry is created pointing
# to the offset of the first request, default to 1k
index-after = 1000
# the number of requests per one log file, if new requests came in a
# new log file will be created
requests-per-logfile = 10000

155
migration/data_migrator.go Normal file
View File

@ -0,0 +1,155 @@
package migration
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/configuration"
"github.com/influxdb/influxdb/coordinator"
"github.com/influxdb/influxdb/engine"
"github.com/influxdb/influxdb/metastore"
"github.com/influxdb/influxdb/parser"
"github.com/influxdb/influxdb/protocol"
log "code.google.com/p/log4go"
// "github.com/BurntSushi/toml"
"github.com/jmhodges/levigo"
)
// Used for migrating data from old versions of influx.
type DataMigrator struct {
baseDbDir string
dbDir string
metaStore *metastore.Store
config *configuration.Configuration
clusterConfig *cluster.ClusterConfiguration
coord *coordinator.CoordinatorImpl
}
const (
MIGRATED_MARKER = "MIGRATED"
OLD_SHARD_DIR = "shard_db"
)
var (
endStreamResponse = protocol.Response_END_STREAM
)
func NewDataMigrator(coord *coordinator.CoordinatorImpl, clusterConfig *cluster.ClusterConfiguration, config *configuration.Configuration, baseDbDir, newSubDir string, metaStore *metastore.Store) *DataMigrator {
return &DataMigrator{
baseDbDir: baseDbDir,
dbDir: filepath.Join(baseDbDir, OLD_SHARD_DIR),
metaStore: metaStore,
config: config,
clusterConfig: clusterConfig,
coord: coord,
}
}
func (dm *DataMigrator) Migrate() {
log.Info("Migrating from dir %s", dm.dbDir)
infos, err := ioutil.ReadDir(dm.dbDir)
if err != nil {
log.Error("Error Migrating: ", err)
return
}
names := make([]string, 0)
for _, info := range infos {
if info.IsDir() {
names = append(names, info.Name())
}
}
sort.Strings(names)
// go through in reverse order so most recently created shards will be migrated first
for i := len(names) - 1; i >= 0; i-- {
dm.migrateDir(names[i])
}
}
func (dm *DataMigrator) migrateDir(name string) {
migrateMarkerFile := filepath.Join(dm.shardDir(name), MIGRATED_MARKER)
if _, err := os.Stat(migrateMarkerFile); err == nil {
log.Info("Already migrated %s. Skipping", name)
return
}
log.Info("Migrating %s", name)
shard, err := dm.getShard(name)
if err != nil {
log.Error("Migration error getting shard: %s", err.Error())
return
}
defer shard.Close()
databases := dm.clusterConfig.GetDatabases()
for _, database := range databases {
err := dm.migrateDatabaseInShard(database.Name, shard)
if err != nil {
log.Error("Error migrating database %s: %s", database.Name, err.Error())
return
}
}
err = ioutil.WriteFile(migrateMarkerFile, []byte("done.\n"), 0644)
if err != nil {
log.Error("Problem writing migration marker for shard %s: %s", name, err.Error())
}
}
func (dm *DataMigrator) migrateDatabaseInShard(database string, shard *LevelDbShard) error {
log.Info("Migrating database %s for shard", database)
seriesNames := shard.GetSeriesForDatabase(database)
log.Info("Migrating %d series", len(seriesNames))
admin := dm.clusterConfig.GetClusterAdmin(dm.clusterConfig.GetClusterAdmins()[0])
for _, series := range seriesNames {
q, err := parser.ParseQuery(fmt.Sprintf("select * from \"%s\"", series))
if err != nil {
log.Error("Problem migrating series %s", series)
continue
}
query := q[0]
seriesChan := make(chan *protocol.Response)
queryEngine := engine.NewPassthroughEngine(seriesChan, 2000)
querySpec := parser.NewQuerySpec(admin, database, query)
go func() {
err := shard.Query(querySpec, queryEngine)
if err != nil {
log.Error("Error migrating %s", err.Error())
}
queryEngine.Close()
seriesChan <- &protocol.Response{Type: &endStreamResponse}
}()
for {
response := <-seriesChan
if *response.Type == endStreamResponse {
break
}
err := dm.coord.WriteSeriesData(admin, database, []*protocol.Series{response.Series})
if err != nil {
log.Error("Writing Series data: %s", err.Error())
}
}
}
return nil
}
func (dm *DataMigrator) shardDir(name string) string {
return filepath.Join(dm.baseDbDir, OLD_SHARD_DIR, name)
}
func (dm *DataMigrator) getShard(name string) (*LevelDbShard, error) {
dbDir := dm.shardDir(name)
cache := levigo.NewLRUCache(int(2000))
opts := levigo.NewOptions()
opts.SetCache(cache)
opts.SetCreateIfMissing(true)
opts.SetMaxOpenFiles(1000)
ldb, err := levigo.Open(dbDir, opts)
if err != nil {
return nil, err
}
return NewLevelDbShard(ldb, dm.config.StoragePointBatchSize, dm.config.StorageWriteBatchSize)
}

829
migration/level_db_shard.go Normal file
View File

@ -0,0 +1,829 @@
package migration
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math"
"regexp"
"strings"
"sync"
"time"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/common"
. "github.com/influxdb/influxdb/datastore"
"github.com/influxdb/influxdb/parser"
"github.com/influxdb/influxdb/protocol"
"code.google.com/p/goprotobuf/proto"
log "code.google.com/p/log4go"
"github.com/jmhodges/levigo"
)
/*
This is deprecated. It's only sitting around because people will need it when
migrating their data from 0.7 to 0.8. We'll remove it later.
*/
type LevelDbShard struct {
db *levigo.DB
readOptions *levigo.ReadOptions
writeOptions *levigo.WriteOptions
lastIdUsed uint64
columnIdMutex sync.Mutex
closed bool
pointBatchSize int
writeBatchSize int
}
type Field struct {
Id []byte
Name string
}
type rawColumnValue struct {
time []byte
sequence []byte
value []byte
}
// // returns true if the point has the correct field id and is
// // in the given time range
func isPointInRange(fieldId, startTime, endTime, point []byte) bool {
id := point[:8]
time := point[8:16]
return bytes.Equal(id, fieldId) && bytes.Compare(time, startTime) > -1 && bytes.Compare(time, endTime) < 1
}
// depending on the query order (whether it's ascending or not) returns
// the min (or max in case of descending query) of the current
// [timestamp,sequence] and the self's [timestamp,sequence]
//
// This is used to determine what the next point's timestamp
// and sequence number should be.
func (self *rawColumnValue) updatePointTimeAndSequence(currentTimeRaw, currentSequenceRaw []byte, isAscendingQuery bool) ([]byte, []byte) {
if currentTimeRaw == nil {
return self.time, self.sequence
}
compareValue := 1
if isAscendingQuery {
compareValue = -1
}
timeCompare := bytes.Compare(self.time, currentTimeRaw)
if timeCompare == compareValue {
return self.time, self.sequence
}
if timeCompare != 0 {
return currentTimeRaw, currentSequenceRaw
}
if bytes.Compare(self.sequence, currentSequenceRaw) == compareValue {
return currentTimeRaw, self.sequence
}
return currentTimeRaw, currentSequenceRaw
}
func NewLevelDbShard(db *levigo.DB, pointBatchSize, writeBatchSize int) (*LevelDbShard, error) {
ro := levigo.NewReadOptions()
lastIdBytes, err2 := db.Get(ro, NEXT_ID_KEY)
if err2 != nil {
return nil, err2
}
lastId := uint64(0)
if lastIdBytes != nil {
lastId, err2 = binary.ReadUvarint(bytes.NewBuffer(lastIdBytes))
if err2 != nil {
return nil, err2
}
}
return &LevelDbShard{
db: db,
writeOptions: levigo.NewWriteOptions(),
readOptions: ro,
lastIdUsed: lastId,
pointBatchSize: pointBatchSize,
writeBatchSize: writeBatchSize,
}, nil
}
func (self *LevelDbShard) Write(database string, series []*protocol.Series) error {
wb := levigo.NewWriteBatch()
defer wb.Close()
for _, s := range series {
if len(s.Points) == 0 {
return errors.New("Unable to write no data. Series was nil or had no points.")
}
count := 0
for fieldIndex, field := range s.Fields {
temp := field
id, err := self.createIdForDbSeriesColumn(&database, s.Name, &temp)
if err != nil {
return err
}
keyBuffer := bytes.NewBuffer(make([]byte, 0, 24))
dataBuffer := proto.NewBuffer(nil)
for _, point := range s.Points {
keyBuffer.Reset()
dataBuffer.Reset()
keyBuffer.Write(id)
timestamp := self.convertTimestampToUint(point.GetTimestampInMicroseconds())
// pass the uint64 by reference so binary.Write() doesn't create a new buffer
// see the source code for intDataSize() in binary.go
binary.Write(keyBuffer, binary.BigEndian, &timestamp)
binary.Write(keyBuffer, binary.BigEndian, point.SequenceNumber)
pointKey := keyBuffer.Bytes()
if point.Values[fieldIndex].GetIsNull() {
wb.Delete(pointKey)
goto check
}
err = dataBuffer.Marshal(point.Values[fieldIndex])
if err != nil {
return err
}
wb.Put(pointKey, dataBuffer.Bytes())
check:
count++
if count >= self.writeBatchSize {
err = self.db.Write(self.writeOptions, wb)
if err != nil {
return err
}
count = 0
wb.Clear()
}
}
}
}
return self.db.Write(self.writeOptions, wb)
}
func (self *LevelDbShard) Query(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
if querySpec.IsListSeriesQuery() {
return self.executeListSeriesQuery(querySpec, processor)
} else if querySpec.IsDeleteFromSeriesQuery() {
return self.executeDeleteQuery(querySpec, processor)
} else if querySpec.IsDropSeriesQuery() {
return self.executeDropSeriesQuery(querySpec, processor)
}
seriesAndColumns := querySpec.SelectQuery().GetReferencedColumns()
if !self.hasReadAccess(querySpec) {
return errors.New("User does not have access to one or more of the series requested.")
}
for series, columns := range seriesAndColumns {
if regex, ok := series.GetCompiledRegex(); ok {
seriesNames := self.getSeriesForDbAndRegex(querySpec.Database(), regex)
for _, name := range seriesNames {
if !querySpec.HasReadAccess(name) {
continue
}
err := self.executeQueryForSeries(querySpec, name, columns, processor)
if err != nil {
return err
}
}
} else {
err := self.executeQueryForSeries(querySpec, series.Name, columns, processor)
if err != nil {
return err
}
}
}
return nil
}
func (self *LevelDbShard) DropDatabase(database string) error {
seriesNames := self.GetSeriesForDatabase(database)
for _, name := range seriesNames {
if err := self.dropSeries(database, name); err != nil {
log.Error("DropDatabase: ", err)
return err
}
}
self.compact()
return nil
}
func (self *LevelDbShard) IsClosed() bool {
return self.closed
}
func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName string, columns []string, processor cluster.QueryProcessor) error {
startTimeBytes := self.byteArrayForTime(querySpec.GetStartTime())
endTimeBytes := self.byteArrayForTime(querySpec.GetEndTime())
fields, err := self.getFieldsForSeries(querySpec.Database(), seriesName, columns)
if err != nil {
// because a db is distributed across the cluster, it's possible we don't have the series indexed here. ignore
switch err := err.(type) {
case FieldLookupError:
log.Debug("Cannot find fields %v", columns)
return nil
default:
log.Error("Error looking up fields for %s: %s", seriesName, err)
return fmt.Errorf("Error looking up fields for %s: %s", seriesName, err)
}
}
fieldCount := len(fields)
rawColumnValues := make([]rawColumnValue, fieldCount, fieldCount)
query := querySpec.SelectQuery()
aliases := query.GetTableAliases(seriesName)
if querySpec.IsSinglePointQuery() {
series, err := self.fetchSinglePoint(querySpec, seriesName, fields)
if err != nil {
log.Error("Error reading a single point: %s", err)
return err
}
if len(series.Points) > 0 {
processor.YieldPoint(series.Name, series.Fields, series.Points[0])
}
return nil
}
fieldNames, iterators := self.getIterators(fields, startTimeBytes, endTimeBytes, query.Ascending)
defer func() {
for _, it := range iterators {
it.Close()
}
}()
seriesOutgoing := &protocol.Series{Name: protocol.String(seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, self.pointBatchSize)}
// TODO: clean up, this is super gnarly
// optimize for the case where we're pulling back only a single column or aggregate
buffer := bytes.NewBuffer(nil)
valueBuffer := proto.NewBuffer(nil)
for {
isValid := false
point := &protocol.Point{Values: make([]*protocol.FieldValue, fieldCount, fieldCount)}
for i, it := range iterators {
if rawColumnValues[i].value != nil || !it.Valid() {
continue
}
key := it.Key()
if len(key) < 16 {
continue
}
if !isPointInRange(fields[i].Id, startTimeBytes, endTimeBytes, key) {
continue
}
value := it.Value()
sequenceNumber := key[16:]
rawTime := key[8:16]
rawColumnValues[i] = rawColumnValue{time: rawTime, sequence: sequenceNumber, value: value}
}
var pointTimeRaw []byte
var pointSequenceRaw []byte
// choose the highest (or lowest in case of ascending queries) timestamp
// and sequence number. that will become the timestamp and sequence of
// the next point.
for _, value := range rawColumnValues {
if value.value == nil {
continue
}
pointTimeRaw, pointSequenceRaw = value.updatePointTimeAndSequence(pointTimeRaw,
pointSequenceRaw, query.Ascending)
}
for i, iterator := range iterators {
// if the value is nil or doesn't match the point's timestamp and sequence number
// then skip it
if rawColumnValues[i].value == nil ||
!bytes.Equal(rawColumnValues[i].time, pointTimeRaw) ||
!bytes.Equal(rawColumnValues[i].sequence, pointSequenceRaw) {
point.Values[i] = &protocol.FieldValue{IsNull: &TRUE}
continue
}
// if we emitted at lease one column, then we should keep
// trying to get more points
isValid = true
// advance the iterator to read a new value in the next iteration
if query.Ascending {
iterator.Next()
} else {
iterator.Prev()
}
fv := &protocol.FieldValue{}
valueBuffer.SetBuf(rawColumnValues[i].value)
err := valueBuffer.Unmarshal(fv)
if err != nil {
log.Error("Error while running query: %s", err)
return err
}
point.Values[i] = fv
rawColumnValues[i].value = nil
}
var sequence uint64
var t uint64
// set the point sequence number and timestamp
buffer.Reset()
buffer.Write(pointSequenceRaw)
binary.Read(buffer, binary.BigEndian, &sequence)
buffer.Reset()
buffer.Write(pointTimeRaw)
binary.Read(buffer, binary.BigEndian, &t)
time := self.convertUintTimestampToInt64(&t)
point.SetTimestampInMicroseconds(time)
point.SequenceNumber = &sequence
// stop the loop if we ran out of points
if !isValid {
break
}
shouldContinue := true
seriesOutgoing.Points = append(seriesOutgoing.Points, point)
if len(seriesOutgoing.Points) >= self.pointBatchSize {
for _, alias := range aliases {
series := &protocol.Series{
Name: proto.String(alias),
Fields: fieldNames,
Points: seriesOutgoing.Points,
}
if !processor.YieldSeries(series) {
log.Info("Stopping processing")
shouldContinue = false
}
}
seriesOutgoing = &protocol.Series{Name: protocol.String(seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, self.pointBatchSize)}
}
if !shouldContinue {
break
}
}
//Yield remaining data
for _, alias := range aliases {
log.Debug("Final Flush %s", alias)
series := &protocol.Series{Name: protocol.String(alias), Fields: seriesOutgoing.Fields, Points: seriesOutgoing.Points}
if !processor.YieldSeries(series) {
log.Debug("Cancelled...")
}
}
log.Debug("Finished running query %s", query.GetQueryString())
return nil
}
func (self *LevelDbShard) executeListSeriesQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
it := self.db.NewIterator(self.readOptions)
defer it.Close()
database := querySpec.Database()
seekKey := append(DATABASE_SERIES_INDEX_PREFIX, []byte(querySpec.Database()+"~")...)
it.Seek(seekKey)
dbNameStart := len(DATABASE_SERIES_INDEX_PREFIX)
for it = it; it.Valid(); it.Next() {
key := it.Key()
if len(key) < dbNameStart || !bytes.Equal(key[:dbNameStart], DATABASE_SERIES_INDEX_PREFIX) {
break
}
dbSeries := string(key[dbNameStart:])
parts := strings.Split(dbSeries, "~")
if len(parts) > 1 {
if parts[0] != database {
break
}
name := parts[1]
shouldContinue := processor.YieldPoint(&name, nil, nil)
if !shouldContinue {
return nil
}
}
}
return nil
}
func (self *LevelDbShard) executeDeleteQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
query := querySpec.DeleteQuery()
series := query.GetFromClause()
database := querySpec.Database()
if series.Type != parser.FromClauseArray {
return fmt.Errorf("Merge and Inner joins can't be used with a delete query", series.Type)
}
for _, name := range series.Names {
var err error
if regex, ok := name.Name.GetCompiledRegex(); ok {
err = self.deleteRangeOfRegex(database, regex, query.GetStartTime(), query.GetEndTime())
} else {
err = self.deleteRangeOfSeries(database, name.Name.Name, query.GetStartTime(), query.GetEndTime())
}
if err != nil {
return err
}
}
self.compact()
return nil
}
func (self *LevelDbShard) executeDropSeriesQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
database := querySpec.Database()
series := querySpec.Query().DropSeriesQuery.GetTableName()
err := self.dropSeries(database, series)
if err != nil {
return err
}
self.compact()
return nil
}
func (self *LevelDbShard) dropSeries(database, series string) error {
startTimeBytes := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
endTimeBytes := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
wb := levigo.NewWriteBatch()
defer wb.Close()
if err := self.deleteRangeOfSeriesCommon(database, series, startTimeBytes, endTimeBytes); err != nil {
return err
}
for _, name := range self.getColumnNamesForSeries(database, series) {
indexKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(database+"~"+series+"~"+name)...)
wb.Delete(indexKey)
}
wb.Delete(append(DATABASE_SERIES_INDEX_PREFIX, []byte(database+"~"+series)...))
// remove the column indeces for this time series
return self.db.Write(self.writeOptions, wb)
}
func (self *LevelDbShard) byteArrayForTimeInt(time int64) []byte {
timeBuffer := bytes.NewBuffer(make([]byte, 0, 8))
binary.Write(timeBuffer, binary.BigEndian, self.convertTimestampToUint(&time))
bytes := timeBuffer.Bytes()
return bytes
}
func (self *LevelDbShard) byteArraysForStartAndEndTimes(startTime, endTime int64) ([]byte, []byte) {
return self.byteArrayForTimeInt(startTime), self.byteArrayForTimeInt(endTime)
}
func (self *LevelDbShard) deleteRangeOfSeriesCommon(database, series string, startTimeBytes, endTimeBytes []byte) error {
columns := self.getColumnNamesForSeries(database, series)
fields, err := self.getFieldsForSeries(database, series, columns)
if err != nil {
// because a db is distributed across the cluster, it's possible we don't have the series indexed here. ignore
switch err := err.(type) {
case FieldLookupError:
return nil
default:
return err
}
}
ro := levigo.NewReadOptions()
defer ro.Close()
ro.SetFillCache(false)
wb := levigo.NewWriteBatch()
count := 0
defer wb.Close()
for _, field := range fields {
it := self.db.NewIterator(ro)
defer it.Close()
startKey := append(field.Id, startTimeBytes...)
it.Seek(startKey)
if it.Valid() {
if !bytes.Equal(it.Key()[:8], field.Id) {
it.Next()
if it.Valid() {
startKey = it.Key()
}
}
}
for it = it; it.Valid(); it.Next() {
k := it.Key()
if len(k) < 16 || !bytes.Equal(k[:8], field.Id) || bytes.Compare(k[8:16], endTimeBytes) == 1 {
break
}
wb.Delete(k)
count++
if count >= self.writeBatchSize {
err = self.db.Write(self.writeOptions, wb)
if err != nil {
return err
}
count = 0
wb.Clear()
}
}
}
return self.db.Write(self.writeOptions, wb)
}
func (self *LevelDbShard) compact() {
log.Info("Compacting shard")
self.db.CompactRange(levigo.Range{})
log.Info("Shard compaction is done")
}
func (self *LevelDbShard) deleteRangeOfSeries(database, series string, startTime, endTime time.Time) error {
startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(common.TimeToMicroseconds(startTime), common.TimeToMicroseconds(endTime))
return self.deleteRangeOfSeriesCommon(database, series, startTimeBytes, endTimeBytes)
}
func (self *LevelDbShard) deleteRangeOfRegex(database string, regex *regexp.Regexp, startTime, endTime time.Time) error {
series := self.getSeriesForDbAndRegex(database, regex)
for _, name := range series {
err := self.deleteRangeOfSeries(database, name, startTime, endTime)
if err != nil {
return err
}
}
return nil
}
func (self *LevelDbShard) getFieldsForSeries(db, series string, columns []string) ([]*Field, error) {
isCountQuery := false
if len(columns) > 0 && columns[0] == "*" {
columns = self.getColumnNamesForSeries(db, series)
} else if len(columns) == 0 {
isCountQuery = true
columns = self.getColumnNamesForSeries(db, series)
}
if len(columns) == 0 {
return nil, NewFieldLookupError("Coulnd't look up columns for series: " + series)
}
fields := make([]*Field, len(columns), len(columns))
for i, name := range columns {
id, errId := self.getIdForDbSeriesColumn(&db, &series, &name)
if errId != nil {
return nil, errId
}
if id == nil {
return nil, NewFieldLookupError("Field " + name + " doesn't exist in series " + series)
}
fields[i] = &Field{Name: name, Id: id}
}
// if it's a count query we just want the column that will be the most efficient to
// scan through. So find that and return it.
if isCountQuery {
bestField := fields[0]
return []*Field{bestField}, nil
}
return fields, nil
}
func (self *LevelDbShard) getColumnNamesForSeries(db, series string) []string {
it := self.db.NewIterator(self.readOptions)
defer it.Close()
seekKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(db+"~"+series+"~")...)
it.Seek(seekKey)
names := make([]string, 0)
dbNameStart := len(SERIES_COLUMN_INDEX_PREFIX)
for it = it; it.Valid(); it.Next() {
key := it.Key()
if len(key) < dbNameStart || !bytes.Equal(key[:dbNameStart], SERIES_COLUMN_INDEX_PREFIX) {
break
}
dbSeriesColumn := string(key[dbNameStart:])
parts := strings.Split(dbSeriesColumn, "~")
if len(parts) > 2 {
if parts[0] != db || parts[1] != series {
break
}
names = append(names, parts[2])
}
}
return names
}
func (self *LevelDbShard) hasReadAccess(querySpec *parser.QuerySpec) bool {
for series, _ := range querySpec.SeriesValuesAndColumns() {
if _, isRegex := series.GetCompiledRegex(); !isRegex {
if !querySpec.HasReadAccess(series.Name) {
return false
}
}
}
return true
}
func (self *LevelDbShard) byteArrayForTime(t time.Time) []byte {
timeBuffer := bytes.NewBuffer(make([]byte, 0, 8))
timeMicro := common.TimeToMicroseconds(t)
binary.Write(timeBuffer, binary.BigEndian, self.convertTimestampToUint(&timeMicro))
return timeBuffer.Bytes()
}
func (self *LevelDbShard) getSeriesForDbAndRegex(database string, regex *regexp.Regexp) []string {
names := []string{}
allSeries := self.GetSeriesForDatabase(database)
for _, name := range allSeries {
if regex.MatchString(name) {
names = append(names, name)
}
}
return names
}
func (self *LevelDbShard) GetSeriesForDatabase(database string) []string {
it := self.db.NewIterator(self.readOptions)
defer it.Close()
seekKey := append(DATABASE_SERIES_INDEX_PREFIX, []byte(database+"~")...)
it.Seek(seekKey)
dbNameStart := len(DATABASE_SERIES_INDEX_PREFIX)
names := make([]string, 0)
for it = it; it.Valid(); it.Next() {
key := it.Key()
if len(key) < dbNameStart || !bytes.Equal(key[:dbNameStart], DATABASE_SERIES_INDEX_PREFIX) {
break
}
dbSeries := string(key[dbNameStart:])
parts := strings.Split(dbSeries, "~")
if len(parts) > 1 {
if parts[0] != database {
break
}
name := parts[1]
names = append(names, name)
}
}
return names
}
func (self *LevelDbShard) createIdForDbSeriesColumn(db, series, column *string) (ret []byte, err error) {
ret, err = self.getIdForDbSeriesColumn(db, series, column)
if err != nil {
return
}
if ret != nil {
return
}
self.columnIdMutex.Lock()
defer self.columnIdMutex.Unlock()
ret, err = self.getIdForDbSeriesColumn(db, series, column)
if err != nil {
return
}
if ret != nil {
return
}
ret, err = self.getNextIdForColumn(db, series, column)
if err != nil {
return
}
s := fmt.Sprintf("%s~%s~%s", *db, *series, *column)
b := []byte(s)
key := append(SERIES_COLUMN_INDEX_PREFIX, b...)
err = self.db.Put(self.writeOptions, key, ret)
return
}
func (self *LevelDbShard) getIdForDbSeriesColumn(db, series, column *string) (ret []byte, err error) {
s := fmt.Sprintf("%s~%s~%s", *db, *series, *column)
b := []byte(s)
key := append(SERIES_COLUMN_INDEX_PREFIX, b...)
if ret, err = self.db.Get(self.readOptions, key); err != nil {
return nil, err
}
return ret, nil
}
func (self *LevelDbShard) getNextIdForColumn(db, series, column *string) (ret []byte, err error) {
id := self.lastIdUsed + 1
self.lastIdUsed += 1
idBytes := make([]byte, 8, 8)
binary.PutUvarint(idBytes, id)
wb := levigo.NewWriteBatch()
defer wb.Close()
wb.Put(NEXT_ID_KEY, idBytes)
databaseSeriesIndexKey := append(DATABASE_SERIES_INDEX_PREFIX, []byte(*db+"~"+*series)...)
wb.Put(databaseSeriesIndexKey, []byte{})
seriesColumnIndexKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(*db+"~"+*series+"~"+*column)...)
wb.Put(seriesColumnIndexKey, idBytes)
if err = self.db.Write(self.writeOptions, wb); err != nil {
return nil, err
}
return idBytes, nil
}
func (self *LevelDbShard) Close() {
self.closed = true
self.readOptions.Close()
self.writeOptions.Close()
self.db.Close()
}
func (self *LevelDbShard) convertTimestampToUint(t *int64) uint64 {
if *t < 0 {
return uint64(math.MaxInt64 + *t + 1)
}
return uint64(*t) + uint64(math.MaxInt64) + uint64(1)
}
func (self *LevelDbShard) fetchSinglePoint(querySpec *parser.QuerySpec, series string, fields []*Field) (*protocol.Series, error) {
query := querySpec.SelectQuery()
fieldCount := len(fields)
fieldNames := make([]string, 0, fieldCount)
point := &protocol.Point{Values: make([]*protocol.FieldValue, 0, fieldCount)}
timestamp := common.TimeToMicroseconds(query.GetStartTime())
sequenceNumber, err := query.GetSinglePointQuerySequenceNumber()
if err != nil {
return nil, err
}
timeAndSequenceBuffer := bytes.NewBuffer(make([]byte, 0, 16))
binary.Write(timeAndSequenceBuffer, binary.BigEndian, self.convertTimestampToUint(&timestamp))
binary.Write(timeAndSequenceBuffer, binary.BigEndian, sequenceNumber)
sequenceNumber_uint64 := uint64(sequenceNumber)
point.SequenceNumber = &sequenceNumber_uint64
point.SetTimestampInMicroseconds(timestamp)
timeAndSequenceBytes := timeAndSequenceBuffer.Bytes()
for _, field := range fields {
pointKey := append(field.Id, timeAndSequenceBytes...)
if data, err := self.db.Get(self.readOptions, pointKey); err != nil {
return nil, err
} else {
fieldValue := &protocol.FieldValue{}
err := proto.Unmarshal(data, fieldValue)
if err != nil {
return nil, err
}
if data != nil {
fieldNames = append(fieldNames, field.Name)
point.Values = append(point.Values, fieldValue)
}
}
}
result := &protocol.Series{Name: &series, Fields: fieldNames, Points: []*protocol.Point{point}}
return result, nil
}
func (self *LevelDbShard) getIterators(fields []*Field, start, end []byte, isAscendingQuery bool) (fieldNames []string, iterators []*levigo.Iterator) {
iterators = make([]*levigo.Iterator, len(fields))
fieldNames = make([]string, len(fields))
// start the iterators to go through the series data
for i, field := range fields {
fieldNames[i] = field.Name
iterators[i] = self.db.NewIterator(self.readOptions)
if isAscendingQuery {
iterators[i].Seek(append(field.Id, start...))
} else {
iterators[i].Seek(append(append(field.Id, end...), MAX_SEQUENCE...))
if iterators[i].Valid() {
iterators[i].Prev()
}
}
}
return
}
func (self *LevelDbShard) convertUintTimestampToInt64(t *uint64) int64 {
if *t > uint64(math.MaxInt64) {
return int64(*t-math.MaxInt64) - int64(1)
}
return int64(*t) - math.MaxInt64 - int64(1)
}

View File

@ -64,7 +64,7 @@ func NewServer(config *configuration.Configuration) (*Server, error) {
protobufServer := coordinator.NewProtobufServer(config.ProtobufListenString(), requestHandler)
raftServer.AssignCoordinator(coord)
httpApi := http.NewHttpServer(config.ApiHttpPortString(), config.ApiReadTimeout, config.AdminAssetsDir, coord, coord, clusterConfig, raftServer)
httpApi := http.NewHttpServer(config, coord, coord, clusterConfig, raftServer)
httpApi.EnableSsl(config.ApiHttpSslPortString(), config.ApiHttpCertPath)
graphiteApi := graphite.NewServer(config, coord, clusterConfig)
adminServer := admin.NewHttpServer(config.AdminAssetsDir, config.AdminHttpPortString())

View File

@ -30,6 +30,14 @@ type GlobalState struct {
path string
}
// from version 0.7 to 0.8 the Suffix variables changed from
// ints to uint32s. We need this struct to convert them.
type oldGlobalState struct {
GlobalState
CurrentFileSuffix int
FirstSuffix int
}
func newGlobalState(path string) (*GlobalState, error) {
f, err := os.Open(path)
if err != nil {
@ -84,7 +92,7 @@ func (self *GlobalState) write(w io.Writer) error {
return gob.NewEncoder(w).Encode(self)
}
func (self *GlobalState) read(r io.Reader) error {
func (self *GlobalState) read(r *os.File) error {
// skip the version
reader := bufio.NewReader(r)
// read the version line
@ -92,7 +100,31 @@ func (self *GlobalState) read(r io.Reader) error {
if err != nil {
return err
}
return gob.NewDecoder(reader).Decode(self)
err = gob.NewDecoder(reader).Decode(self)
// from version 0.7 to 0.8 the type of the Suffix variables
// changed to uint32. Catch this and convert to a new GlobalState object.
if err != nil {
old := &oldGlobalState{}
r.Seek(int64(0), 0)
reader := bufio.NewReader(r)
// read the version line
_, err := reader.ReadString('\n')
if err != nil {
return err
}
err = gob.NewDecoder(reader).Decode(old)
if err != nil {
return err
}
self.CurrentFileOffset = old.CurrentFileOffset
self.CurrentFileSuffix = uint32(old.CurrentFileSuffix)
self.LargestRequestNumber = old.LargestRequestNumber
self.FirstSuffix = uint32(old.FirstSuffix)
self.ShardLastSequenceNumber = old.ShardLastSequenceNumber
self.ServerLastRequestNumber = old.ServerLastRequestNumber
}
return nil
}
func (self *GlobalState) recover(shardId uint32, sequenceNumber uint64) {