diff --git a/.gitignore b/.gitignore index 0d6d88e3cc..816ed0be76 100644 --- a/.gitignore +++ b/.gitignore @@ -29,7 +29,6 @@ protocol/protocol.pb.go build/ # executables -/server /influxdb /benchmark-tool /main @@ -58,3 +57,6 @@ config.toml /admin /admin/ /data/ + +# test data files +integration/migration_data/ diff --git a/api/http/api.go b/api/http/api.go index 1bbafef03d..6b3a90d5a8 100644 --- a/api/http/api.go +++ b/api/http/api.go @@ -14,48 +14,56 @@ import ( "path/filepath" "strconv" "strings" + "sync/atomic" "time" log "code.google.com/p/log4go" "github.com/bmizerany/pat" "github.com/influxdb/influxdb/cluster" . "github.com/influxdb/influxdb/common" + "github.com/influxdb/influxdb/configuration" "github.com/influxdb/influxdb/coordinator" + "github.com/influxdb/influxdb/migration" "github.com/influxdb/influxdb/parser" "github.com/influxdb/influxdb/protocol" ) type HttpServer struct { - conn net.Listener - sslConn net.Listener - httpPort string - httpSslPort string - httpSslCert string - adminAssetsDir string - coordinator coordinator.Coordinator - userManager UserManager - shutdown chan bool - clusterConfig *cluster.ClusterConfiguration - raftServer *coordinator.RaftServer - readTimeout time.Duration + conn net.Listener + sslConn net.Listener + httpPort string + httpSslPort string + httpSslCert string + adminAssetsDir string + coordinator coordinator.Coordinator + userManager UserManager + shutdown chan bool + clusterConfig *cluster.ClusterConfiguration + raftServer *coordinator.RaftServer + readTimeout time.Duration + migrationRunning uint32 + config *configuration.Configuration } -func NewHttpServer(httpPort string, readTimeout time.Duration, adminAssetsDir string, theCoordinator coordinator.Coordinator, userManager UserManager, clusterConfig *cluster.ClusterConfiguration, raftServer *coordinator.RaftServer) *HttpServer { +func NewHttpServer(config *configuration.Configuration, theCoordinator coordinator.Coordinator, userManager UserManager, clusterConfig *cluster.ClusterConfiguration, raftServer *coordinator.RaftServer) *HttpServer { self := &HttpServer{} - self.httpPort = httpPort - self.adminAssetsDir = adminAssetsDir + self.httpPort = config.ApiHttpPortString() + self.adminAssetsDir = config.AdminAssetsDir self.coordinator = theCoordinator self.userManager = userManager self.shutdown = make(chan bool, 2) self.clusterConfig = clusterConfig self.raftServer = raftServer - self.readTimeout = readTimeout + self.readTimeout = config.ApiReadTimeout + self.config = config return self } const ( INVALID_CREDENTIALS_MSG = "Invalid database/username/password" JSON_PRETTY_PRINT_INDENT = " " + MIGRATION_RUNNING = uint32(1) + MIGRATION_NOT_RUNNING = uint32(0) ) func isPretty(r *libhttp.Request) bool { @@ -156,6 +164,9 @@ func (self *HttpServer) Serve(listener net.Listener) { self.registerEndpoint(p, "del", "/cluster/shard_spaces/:db/:name", self.dropShardSpace) self.registerEndpoint(p, "post", "/cluster/database_configs/:db", self.configureDatabase) + // migrates leveldb data from 0.7 to 0.8 format. + self.registerEndpoint(p, "post", "/cluster/migrate_data", self.migrateData) + // return whether the cluster is in sync or not self.registerEndpoint(p, "get", "/sync", self.isInSync) @@ -1213,3 +1224,20 @@ func (self *HttpServer) configureDatabase(w libhttp.ResponseWriter, r *libhttp.R return libhttp.StatusCreated, nil }) } + +func (self *HttpServer) migrateData(w libhttp.ResponseWriter, r *libhttp.Request) { + self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) { + if !atomic.CompareAndSwapUint32(&self.migrationRunning, MIGRATION_NOT_RUNNING, MIGRATION_RUNNING) { + return libhttp.StatusForbidden, fmt.Errorf("A migration is already running") + } + go func() { + log.Info("Starting Migration") + defer atomic.CompareAndSwapUint32(&self.migrationRunning, MIGRATION_RUNNING, MIGRATION_NOT_RUNNING) + dataMigrator := migration.NewDataMigrator( + self.coordinator.(*coordinator.CoordinatorImpl), self.clusterConfig, self.config, self.config.DataDir, "shard_db", self.clusterConfig.MetaStore) + dataMigrator.Migrate() + }() + + return libhttp.StatusAccepted, nil + }) +} diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index c936693c41..8e9bf1febb 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -215,12 +215,10 @@ func (self *CoordinatorImpl) runDropSeriesQuery(querySpec *parser.QuerySpec, ser return err } defer seriesWriter.Close() - fmt.Println("DROP series") err := self.raftServer.DropSeries(db, series) if err != nil { return err } - fmt.Println("DROP returning nil") return nil } diff --git a/datastore/shard.go b/datastore/shard.go index aeb2e62da1..61ed101593 100644 --- a/datastore/shard.go +++ b/datastore/shard.go @@ -7,7 +7,6 @@ import ( "fmt" "math" "regexp" - "strings" "time" "code.google.com/p/goprotobuf/proto" @@ -95,7 +94,7 @@ func (self *Shard) Write(database string, series []*protocol.Series) error { func (self *Shard) Query(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error { if querySpec.IsListSeriesQuery() { - return self.executeListSeriesQuery(querySpec, processor) + return fmt.Errorf("List series queries should never come to the shard") } else if querySpec.IsDeleteFromSeriesQuery() { return self.executeDeleteQuery(querySpec, processor) } @@ -279,7 +278,7 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName Points: seriesOutgoing.Points, } if !processor.YieldSeries(series) { - log.Info("Stopping processing") + log.Debug("Stopping processing") shouldContinue = false } } @@ -304,47 +303,6 @@ func (self *Shard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName return nil } -func (self *Shard) yieldSeriesNamesForDb(db string, yield func(string) bool) error { - dbNameStart := len(DATABASE_SERIES_INDEX_PREFIX) - pred := func(key []byte) bool { - return len(key) >= dbNameStart && bytes.Equal(key[:dbNameStart], DATABASE_SERIES_INDEX_PREFIX) - } - - firstKey := append(DATABASE_SERIES_INDEX_PREFIX, []byte(db+"~")...) - itr := self.db.Iterator() - defer itr.Close() - - for itr.Seek(firstKey); itr.Valid(); itr.Next() { - key := itr.Key() - if !pred(key) { - break - } - dbSeries := string(key[dbNameStart:]) - parts := strings.Split(dbSeries, "~") - if len(parts) > 1 { - if parts[0] != db { - break - } - name := parts[1] - shouldContinue := yield(name) - if !shouldContinue { - return nil - } - } - } - if err := itr.Error(); err != nil { - return err - } - return nil -} - -func (self *Shard) executeListSeriesQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error { - return self.yieldSeriesNamesForDb(querySpec.Database(), func(_name string) bool { - name := _name - return processor.YieldPoint(&name, nil, nil) - }) -} - func (self *Shard) executeDeleteQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error { query := querySpec.DeleteQuery() series := query.GetFromClause() @@ -571,114 +529,3 @@ func (self *Shard) getFieldsForSeries(db, series string, columns []string) ([]*m } return fields, nil } - -/* DEPRECATED methods do not use*/ - -// TODO: remove this on version 0.9 after people have had a chance to do migrations -func (self *Shard) getSeriesForDbAndRegexDEPRECATED(database string, regex *regexp.Regexp) []string { - names := []string{} - allSeries := self.metaStore.GetSeriesForDatabase(database) - for _, name := range allSeries { - if regex.MatchString(name) { - names = append(names, name) - } - } - return names -} - -// TODO: remove this on version 0.9 after people have had a chance to do migrations -func (self *Shard) getSeriesForDatabaseDEPRECATED(database string) (series []string) { - err := self.yieldSeriesNamesForDb(database, func(name string) bool { - series = append(series, name) - return true - }) - if err != nil { - log.Error("Cannot get series names for db %s: %s", database, err) - return nil - } - return series -} - -// TODO: remove this function. I'm keeping it around for the moment since it'll probably have to be -// used in the DB upgrate/migration that moves metadata from the shard to Raft -func (self *Shard) getFieldsForSeriesDEPRECATED(db, series string, columns []string) ([]*metastore.Field, error) { - isCountQuery := false - if len(columns) > 0 && columns[0] == "*" { - columns = self.getColumnNamesForSeriesDEPRECATED(db, series) - } else if len(columns) == 0 { - isCountQuery = true - columns = self.getColumnNamesForSeriesDEPRECATED(db, series) - } - if len(columns) == 0 { - return nil, FieldLookupError{"Couldn't look up columns for series: " + series} - } - - fields := make([]*metastore.Field, len(columns), len(columns)) - - for i, name := range columns { - id, errId := self.getIdForDbSeriesColumnDEPRECATED(&db, &series, &name) - if errId != nil { - return nil, errId - } - if id == nil { - return nil, FieldLookupError{"Field " + name + " doesn't exist in series " + series} - } - idInt, err := binary.ReadUvarint(bytes.NewBuffer(id)) - if err != nil { - return nil, err - } - fields[i] = &metastore.Field{Name: name, Id: idInt} - } - - // if it's a count query we just want the column that will be the most efficient to - // scan through. So find that and return it. - if isCountQuery { - bestField := fields[0] - return []*metastore.Field{bestField}, nil - } - return fields, nil -} - -// TODO: remove this function. I'm keeping it around for the moment since it'll probably have to be -// used in the DB upgrate/migration that moves metadata from the shard to Raft -func (self *Shard) getColumnNamesForSeriesDEPRECATED(db, series string) []string { - dbNameStart := len(SERIES_COLUMN_INDEX_PREFIX) - seekKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(db+"~"+series+"~")...) - pred := func(key []byte) bool { - return len(key) >= dbNameStart && bytes.Equal(key[:dbNameStart], SERIES_COLUMN_INDEX_PREFIX) - } - it := self.db.Iterator() - defer it.Close() - - names := make([]string, 0) - for it.Seek(seekKey); it.Valid(); it.Next() { - key := it.Key() - if !pred(key) { - break - } - dbSeriesColumn := string(key[dbNameStart:]) - parts := strings.Split(dbSeriesColumn, "~") - if len(parts) > 2 { - if parts[0] != db || parts[1] != series { - break - } - names = append(names, parts[2]) - } - } - if err := it.Error(); err != nil { - log.Error("Error while getting columns for series %s: %s", series, err) - return nil - } - return names -} - -// TODO: remove this after a version that doesn't support migration from old non-raft metastore -func (self *Shard) getIdForDbSeriesColumnDEPRECATED(db, series, column *string) (ret []byte, err error) { - s := fmt.Sprintf("%s~%s~%s", *db, *series, *column) - b := []byte(s) - key := append(SERIES_COLUMN_INDEX_PREFIX, b...) - if ret, err = self.db.Get(key); err != nil { - return nil, err - } - return ret, nil -} diff --git a/datastore/shard_datastore.go b/datastore/shard_datastore.go index cc00c56d7f..1b14b73d02 100644 --- a/datastore/shard_datastore.go +++ b/datastore/shard_datastore.go @@ -295,6 +295,10 @@ type FieldLookupError struct { message string } +func NewFieldLookupError(message string) *FieldLookupError { + return &FieldLookupError{message} +} + func (self FieldLookupError) Error() string { return self.message } diff --git a/integration/migration_test.go b/integration/migration_test.go new file mode 100644 index 0000000000..1dcae33a26 --- /dev/null +++ b/integration/migration_test.go @@ -0,0 +1,86 @@ +package integration + +/* +This is commented out because we can't generate old data automatically. The files for this test +are up on S3 so that we can run it later. Just trust that I've run it (this is Paul) +*/ + +// import ( +// "fmt" +// "io/ioutil" +// "net/http" +// "os" +// "path/filepath" +// "time" + +// "github.com/influxdb/influxdb/datastore" +// "github.com/influxdb/influxdb/migration" + +// . "github.com/influxdb/influxdb/integration/helpers" +// . "launchpad.net/gocheck" +// ) + +// type MigrationTestSuite struct { +// server *Server +// } + +// var _ = Suite(&MigrationTestSuite{}) + +// func (self *MigrationTestSuite) SetUpSuite(c *C) { +// self.server = NewServer("integration/migration_test.toml", c) +// } + +// func (self *MigrationTestSuite) TearDownSuite(c *C) { +// self.server.Stop() +// dataDir := "migration_data/data" +// shardDir := filepath.Join(dataDir, migration.OLD_SHARD_DIR) +// infos, err := ioutil.ReadDir(shardDir) +// if err != nil { +// fmt.Printf("Error Clearing Migration: ", err) +// return +// } +// for _, info := range infos { +// if info.IsDir() { +// os.Remove(filepath.Join(shardDir, info.Name(), migration.MIGRATED_MARKER)) +// } +// } +// os.RemoveAll(filepath.Join(dataDir, datastore.SHARD_DATABASE_DIR)) +// } + +// func (self *MigrationTestSuite) TestMigrationOfPreviousDb(c *C) { +// _, err := http.Post("http://localhost:8086/cluster/migrate_data?u=root&p=root", "application/json", nil) +// c.Assert(err, IsNil) +// // make sure that it won't kick it off a second time while it's already running +// resp, _ := http.Post("http://localhost:8086/cluster/migrate_data?u=root&p=root", "application/json", nil) +// c.Assert(resp.StatusCode, Equals, http.StatusForbidden) + +// time.Sleep(time.Second * 60) +// client := self.server.GetClient("test1", c) +// s, err := client.Query("select count(value) from cpu_idle") +// c.Assert(err, IsNil) +// c.Assert(s, HasLen, 1) +// c.Assert(s[0].Points, HasLen, 1) +// c.Assert(s[0].Points[0][1].(float64), Equals, float64(44434)) + +// s, err = client.Query("select count(type) from customer_events") +// c.Assert(err, IsNil) +// c.Assert(s, HasLen, 1) +// c.Assert(s[0].Points, HasLen, 1) +// c.Assert(s[0].Points[0][1].(float64), Equals, float64(162597)) + +// client = self.server.GetClient("test2", c) +// s, err = client.Query("list series") +// c.Assert(err, IsNil) +// c.Assert(s, HasLen, 1) +// c.Assert(s[0].Points, HasLen, 1000) + +// s, err = client.Query("select count(value) from /.*/") +// c.Assert(s, HasLen, 1000) +// for _, series := range s { +// c.Assert(series.Points, HasLen, 1) +// c.Assert(series.Points[0][1].(float64), Equals, float64(1434)) +// } +// _, err = http.Post("http://localhost:8086/cluster/migrate_data?u=root&p=root", "application/json", nil) +// c.Assert(err, IsNil) +// time.Sleep(time.Second * 5) +// } diff --git a/integration/migration_test.toml b/integration/migration_test.toml new file mode 100644 index 0000000000..b710d1ae4e --- /dev/null +++ b/integration/migration_test.toml @@ -0,0 +1,187 @@ +# Welcome to the InfluxDB configuration file. + +# If hostname (on the OS) doesn't return a name that can be resolved by the other +# systems in the cluster, you'll have to set the hostname to an IP or something +# that can be resolved here. +# hostname = "" + +bind-address = "0.0.0.0" + +# Once every 24 hours InfluxDB will report anonymous data to m.influxdb.com +# The data includes raft name (random 8 bytes), os, arch and version +# We don't track ip addresses of servers reporting. This is only used +# to track the number of instances running and the versions which +# is very helpful for us. +# Change this option to true to disable reporting. +reporting-disabled = true + +[logging] +# logging level can be one of "debug", "info", "warn" or "error" +level = "info" +file = "stdout" # stdout to log to standard out + +# Configure the admin server +[admin] +port = 8083 # binding is disabled if the port isn't set +assets = "./admin" + +# Configure the http api +[api] +port = 8086 # binding is disabled if the port isn't set +# ssl-port = 8084 # Ssl support is enabled if you set a port and cert +# ssl-cert = /path/to/cert.pem + +# connections will timeout after this amount of time. Ensures that clients that misbehave +# and keep alive connections they don't use won't end up connection a million times. +# However, if a request is taking longer than this to complete, could be a problem. +read-timeout = "5s" + +[input_plugins] + + # Configure the graphite api + [input_plugins.graphite] + enabled = false + # port = 2003 + # database = "" # store graphite data in this database + # udp_enabled = true # enable udp interface on the same port as the tcp interface + + # Configure the udp api + [input_plugins.udp] + enabled = false + # port = 4444 + # database = "" + + # Configure multiple udp apis each can write to separate db. Just + # repeat the following section to enable multiple udp apis on + # different ports. + [[input_plugins.udp_servers]] # array of tables + enabled = false + # port = 5551 + # database = "db1" + +# Raft configuration +[raft] +# The raft port should be open between all servers in a cluster. +# However, this port shouldn't be accessible from the internet. + +port = 8090 + +# Where the raft logs are stored. The user running InfluxDB will need read/write access. +dir = "integration/migration_data/raft" + +# election-timeout = "1s" + +[storage] + +dir = "integration/migration_data/data" +# How many requests to potentially buffer in memory. If the buffer gets filled then writes +# will still be logged and once the local storage has caught up (or compacted) the writes +# will be replayed from the WAL +write-buffer-size = 10000 + +# the engine to use for new shards, old shards will continue to use the same engine +default-engine = "leveldb" + +# The default setting on this is 0, which means unlimited. Set this to something if you want to +# limit the max number of open files. max-open-files is per shard so this * that will be max. +max-open-shards = 0 + +# The default setting is 100. This option tells how many points will be fetched from LevelDb before +# they get flushed into backend. +point-batch-size = 100 + +# The number of points to batch in memory before writing them to leveldb. Lowering this number will +# reduce the memory usage, but will result in slower writes. +write-batch-size = 5000000 + +# The server will check this often for shards that have expired that should be cleared. +retention-sweep-period = "10m" + +[storage.engines.leveldb] + +# Maximum mmap open files, this will affect the virtual memory used by +# the process +max-open-files = 1000 + +# LRU cache size, LRU is used by leveldb to store contents of the +# uncompressed sstables. You can use `m` or `g` prefix for megabytes +# and gigabytes, respectively. +lru-cache-size = "200m" + +[storage.engines.rocksdb] + +# Maximum mmap open files, this will affect the virtual memory used by +# the process +max-open-files = 1000 + +# LRU cache size, LRU is used by rocksdb to store contents of the +# uncompressed sstables. You can use `m` or `g` prefix for megabytes +# and gigabytes, respectively. +lru-cache-size = "200m" + +[storage.engines.hyperleveldb] + +# Maximum mmap open files, this will affect the virtual memory used by +# the process +max-open-files = 1000 + +# LRU cache size, LRU is used by rocksdb to store contents of the +# uncompressed sstables. You can use `m` or `g` prefix for megabytes +# and gigabytes, respectively. +lru-cache-size = "200m" + +[storage.engines.lmdb] + +map-size = "100g" + +[cluster] +# A comma separated list of servers to seed +# this server. this is only relevant when the +# server is joining a new cluster. Otherwise +# the server will use the list of known servers +# prior to shutting down. Any server can be pointed to +# as a seed. It will find the Raft leader automatically. + +# Here's an example. Note that the port on the host is the same as the raft port. +# seed-servers = ["hosta:8090","hostb:8090"] + +# Replication happens over a TCP connection with a Protobuf protocol. +# This port should be reachable between all servers in a cluster. +# However, this port shouldn't be accessible from the internet. + +protobuf_port = 8099 +protobuf_timeout = "2s" # the write timeout on the protobuf conn any duration parseable by time.ParseDuration +protobuf_heartbeat = "200ms" # the heartbeat interval between the servers. must be parseable by time.ParseDuration +protobuf_min_backoff = "1s" # the minimum backoff after a failed heartbeat attempt +protobuf_max_backoff = "10s" # the maxmimum backoff after a failed heartbeat attempt + +# How many write requests to potentially buffer in memory per server. If the buffer gets filled then writes +# will still be logged and once the server has caught up (or come back online) the writes +# will be replayed from the WAL +write-buffer-size = 1000 + +# the maximum number of responses to buffer from remote nodes, if the +# expected number of responses exceed this number then querying will +# happen sequentially and the buffer size will be limited to this +# number +max-response-buffer-size = 100 + +# When queries get distributed out to shards, they go in parallel. This means that results can get buffered +# in memory since results will come in any order, but have to be processed in the correct time order. +# Setting this higher will give better performance, but you'll need more memory. Setting this to 1 will ensure +# that you don't need to buffer in memory, but you won't get the best performance. +concurrent-shard-query-limit = 10 + +[wal] + +dir = "integration/migration_data/wal" +flush-after = 1000 # the number of writes after which wal will be flushed, 0 for flushing on every write +bookmark-after = 1000 # the number of writes after which a bookmark will be created + +# the number of writes after which an index entry is created pointing +# to the offset of the first request, default to 1k +index-after = 1000 + +# the number of requests per one log file, if new requests came in a +# new log file will be created +requests-per-logfile = 10000 diff --git a/migration/data_migrator.go b/migration/data_migrator.go new file mode 100644 index 0000000000..e96de25eae --- /dev/null +++ b/migration/data_migrator.go @@ -0,0 +1,155 @@ +package migration + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + + "github.com/influxdb/influxdb/cluster" + "github.com/influxdb/influxdb/configuration" + "github.com/influxdb/influxdb/coordinator" + "github.com/influxdb/influxdb/engine" + "github.com/influxdb/influxdb/metastore" + "github.com/influxdb/influxdb/parser" + "github.com/influxdb/influxdb/protocol" + + log "code.google.com/p/log4go" + // "github.com/BurntSushi/toml" + "github.com/jmhodges/levigo" +) + +// Used for migrating data from old versions of influx. +type DataMigrator struct { + baseDbDir string + dbDir string + metaStore *metastore.Store + config *configuration.Configuration + clusterConfig *cluster.ClusterConfiguration + coord *coordinator.CoordinatorImpl +} + +const ( + MIGRATED_MARKER = "MIGRATED" + OLD_SHARD_DIR = "shard_db" +) + +var ( + endStreamResponse = protocol.Response_END_STREAM +) + +func NewDataMigrator(coord *coordinator.CoordinatorImpl, clusterConfig *cluster.ClusterConfiguration, config *configuration.Configuration, baseDbDir, newSubDir string, metaStore *metastore.Store) *DataMigrator { + return &DataMigrator{ + baseDbDir: baseDbDir, + dbDir: filepath.Join(baseDbDir, OLD_SHARD_DIR), + metaStore: metaStore, + config: config, + clusterConfig: clusterConfig, + coord: coord, + } +} + +func (dm *DataMigrator) Migrate() { + log.Info("Migrating from dir %s", dm.dbDir) + infos, err := ioutil.ReadDir(dm.dbDir) + if err != nil { + log.Error("Error Migrating: ", err) + return + } + names := make([]string, 0) + for _, info := range infos { + if info.IsDir() { + names = append(names, info.Name()) + } + } + sort.Strings(names) + // go through in reverse order so most recently created shards will be migrated first + for i := len(names) - 1; i >= 0; i-- { + dm.migrateDir(names[i]) + } +} + +func (dm *DataMigrator) migrateDir(name string) { + migrateMarkerFile := filepath.Join(dm.shardDir(name), MIGRATED_MARKER) + if _, err := os.Stat(migrateMarkerFile); err == nil { + log.Info("Already migrated %s. Skipping", name) + return + } + log.Info("Migrating %s", name) + shard, err := dm.getShard(name) + if err != nil { + log.Error("Migration error getting shard: %s", err.Error()) + return + } + defer shard.Close() + databases := dm.clusterConfig.GetDatabases() + for _, database := range databases { + err := dm.migrateDatabaseInShard(database.Name, shard) + if err != nil { + log.Error("Error migrating database %s: %s", database.Name, err.Error()) + return + } + } + err = ioutil.WriteFile(migrateMarkerFile, []byte("done.\n"), 0644) + if err != nil { + log.Error("Problem writing migration marker for shard %s: %s", name, err.Error()) + } +} + +func (dm *DataMigrator) migrateDatabaseInShard(database string, shard *LevelDbShard) error { + log.Info("Migrating database %s for shard", database) + seriesNames := shard.GetSeriesForDatabase(database) + log.Info("Migrating %d series", len(seriesNames)) + + admin := dm.clusterConfig.GetClusterAdmin(dm.clusterConfig.GetClusterAdmins()[0]) + for _, series := range seriesNames { + q, err := parser.ParseQuery(fmt.Sprintf("select * from \"%s\"", series)) + if err != nil { + log.Error("Problem migrating series %s", series) + continue + } + query := q[0] + seriesChan := make(chan *protocol.Response) + queryEngine := engine.NewPassthroughEngine(seriesChan, 2000) + querySpec := parser.NewQuerySpec(admin, database, query) + go func() { + err := shard.Query(querySpec, queryEngine) + if err != nil { + log.Error("Error migrating %s", err.Error()) + } + queryEngine.Close() + seriesChan <- &protocol.Response{Type: &endStreamResponse} + }() + for { + response := <-seriesChan + if *response.Type == endStreamResponse { + break + } + err := dm.coord.WriteSeriesData(admin, database, []*protocol.Series{response.Series}) + if err != nil { + log.Error("Writing Series data: %s", err.Error()) + } + } + } + return nil +} + +func (dm *DataMigrator) shardDir(name string) string { + return filepath.Join(dm.baseDbDir, OLD_SHARD_DIR, name) +} + +func (dm *DataMigrator) getShard(name string) (*LevelDbShard, error) { + dbDir := dm.shardDir(name) + cache := levigo.NewLRUCache(int(2000)) + opts := levigo.NewOptions() + opts.SetCache(cache) + opts.SetCreateIfMissing(true) + opts.SetMaxOpenFiles(1000) + ldb, err := levigo.Open(dbDir, opts) + if err != nil { + return nil, err + } + + return NewLevelDbShard(ldb, dm.config.StoragePointBatchSize, dm.config.StorageWriteBatchSize) +} diff --git a/migration/level_db_shard.go b/migration/level_db_shard.go new file mode 100644 index 0000000000..5c6b9a3a93 --- /dev/null +++ b/migration/level_db_shard.go @@ -0,0 +1,829 @@ +package migration + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "math" + "regexp" + "strings" + "sync" + "time" + + "github.com/influxdb/influxdb/cluster" + "github.com/influxdb/influxdb/common" + . "github.com/influxdb/influxdb/datastore" + "github.com/influxdb/influxdb/parser" + "github.com/influxdb/influxdb/protocol" + + "code.google.com/p/goprotobuf/proto" + log "code.google.com/p/log4go" + "github.com/jmhodges/levigo" +) + +/* + This is deprecated. It's only sitting around because people will need it when + migrating their data from 0.7 to 0.8. We'll remove it later. +*/ + +type LevelDbShard struct { + db *levigo.DB + readOptions *levigo.ReadOptions + writeOptions *levigo.WriteOptions + lastIdUsed uint64 + columnIdMutex sync.Mutex + closed bool + pointBatchSize int + writeBatchSize int +} + +type Field struct { + Id []byte + Name string +} + +type rawColumnValue struct { + time []byte + sequence []byte + value []byte +} + +// // returns true if the point has the correct field id and is +// // in the given time range +func isPointInRange(fieldId, startTime, endTime, point []byte) bool { + id := point[:8] + time := point[8:16] + return bytes.Equal(id, fieldId) && bytes.Compare(time, startTime) > -1 && bytes.Compare(time, endTime) < 1 +} + +// depending on the query order (whether it's ascending or not) returns +// the min (or max in case of descending query) of the current +// [timestamp,sequence] and the self's [timestamp,sequence] +// +// This is used to determine what the next point's timestamp +// and sequence number should be. +func (self *rawColumnValue) updatePointTimeAndSequence(currentTimeRaw, currentSequenceRaw []byte, isAscendingQuery bool) ([]byte, []byte) { + if currentTimeRaw == nil { + return self.time, self.sequence + } + + compareValue := 1 + if isAscendingQuery { + compareValue = -1 + } + + timeCompare := bytes.Compare(self.time, currentTimeRaw) + if timeCompare == compareValue { + return self.time, self.sequence + } + + if timeCompare != 0 { + return currentTimeRaw, currentSequenceRaw + } + + if bytes.Compare(self.sequence, currentSequenceRaw) == compareValue { + return currentTimeRaw, self.sequence + } + + return currentTimeRaw, currentSequenceRaw +} + +func NewLevelDbShard(db *levigo.DB, pointBatchSize, writeBatchSize int) (*LevelDbShard, error) { + ro := levigo.NewReadOptions() + lastIdBytes, err2 := db.Get(ro, NEXT_ID_KEY) + if err2 != nil { + return nil, err2 + } + + lastId := uint64(0) + if lastIdBytes != nil { + lastId, err2 = binary.ReadUvarint(bytes.NewBuffer(lastIdBytes)) + if err2 != nil { + return nil, err2 + } + } + + return &LevelDbShard{ + db: db, + writeOptions: levigo.NewWriteOptions(), + readOptions: ro, + lastIdUsed: lastId, + pointBatchSize: pointBatchSize, + writeBatchSize: writeBatchSize, + }, nil +} + +func (self *LevelDbShard) Write(database string, series []*protocol.Series) error { + wb := levigo.NewWriteBatch() + defer wb.Close() + + for _, s := range series { + if len(s.Points) == 0 { + return errors.New("Unable to write no data. Series was nil or had no points.") + } + + count := 0 + for fieldIndex, field := range s.Fields { + temp := field + id, err := self.createIdForDbSeriesColumn(&database, s.Name, &temp) + if err != nil { + return err + } + keyBuffer := bytes.NewBuffer(make([]byte, 0, 24)) + dataBuffer := proto.NewBuffer(nil) + for _, point := range s.Points { + keyBuffer.Reset() + dataBuffer.Reset() + + keyBuffer.Write(id) + timestamp := self.convertTimestampToUint(point.GetTimestampInMicroseconds()) + // pass the uint64 by reference so binary.Write() doesn't create a new buffer + // see the source code for intDataSize() in binary.go + binary.Write(keyBuffer, binary.BigEndian, ×tamp) + binary.Write(keyBuffer, binary.BigEndian, point.SequenceNumber) + pointKey := keyBuffer.Bytes() + + if point.Values[fieldIndex].GetIsNull() { + wb.Delete(pointKey) + goto check + } + + err = dataBuffer.Marshal(point.Values[fieldIndex]) + if err != nil { + return err + } + wb.Put(pointKey, dataBuffer.Bytes()) + check: + count++ + if count >= self.writeBatchSize { + err = self.db.Write(self.writeOptions, wb) + if err != nil { + return err + } + count = 0 + wb.Clear() + } + } + } + } + + return self.db.Write(self.writeOptions, wb) +} + +func (self *LevelDbShard) Query(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error { + if querySpec.IsListSeriesQuery() { + return self.executeListSeriesQuery(querySpec, processor) + } else if querySpec.IsDeleteFromSeriesQuery() { + return self.executeDeleteQuery(querySpec, processor) + } else if querySpec.IsDropSeriesQuery() { + return self.executeDropSeriesQuery(querySpec, processor) + } + + seriesAndColumns := querySpec.SelectQuery().GetReferencedColumns() + + if !self.hasReadAccess(querySpec) { + return errors.New("User does not have access to one or more of the series requested.") + } + + for series, columns := range seriesAndColumns { + if regex, ok := series.GetCompiledRegex(); ok { + seriesNames := self.getSeriesForDbAndRegex(querySpec.Database(), regex) + for _, name := range seriesNames { + if !querySpec.HasReadAccess(name) { + continue + } + err := self.executeQueryForSeries(querySpec, name, columns, processor) + if err != nil { + return err + } + } + } else { + err := self.executeQueryForSeries(querySpec, series.Name, columns, processor) + if err != nil { + return err + } + } + } + return nil +} + +func (self *LevelDbShard) DropDatabase(database string) error { + seriesNames := self.GetSeriesForDatabase(database) + for _, name := range seriesNames { + if err := self.dropSeries(database, name); err != nil { + log.Error("DropDatabase: ", err) + return err + } + } + self.compact() + return nil +} + +func (self *LevelDbShard) IsClosed() bool { + return self.closed +} + +func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, seriesName string, columns []string, processor cluster.QueryProcessor) error { + startTimeBytes := self.byteArrayForTime(querySpec.GetStartTime()) + endTimeBytes := self.byteArrayForTime(querySpec.GetEndTime()) + + fields, err := self.getFieldsForSeries(querySpec.Database(), seriesName, columns) + if err != nil { + // because a db is distributed across the cluster, it's possible we don't have the series indexed here. ignore + switch err := err.(type) { + case FieldLookupError: + log.Debug("Cannot find fields %v", columns) + return nil + default: + log.Error("Error looking up fields for %s: %s", seriesName, err) + return fmt.Errorf("Error looking up fields for %s: %s", seriesName, err) + } + } + + fieldCount := len(fields) + rawColumnValues := make([]rawColumnValue, fieldCount, fieldCount) + query := querySpec.SelectQuery() + + aliases := query.GetTableAliases(seriesName) + if querySpec.IsSinglePointQuery() { + series, err := self.fetchSinglePoint(querySpec, seriesName, fields) + if err != nil { + log.Error("Error reading a single point: %s", err) + return err + } + if len(series.Points) > 0 { + processor.YieldPoint(series.Name, series.Fields, series.Points[0]) + } + return nil + } + + fieldNames, iterators := self.getIterators(fields, startTimeBytes, endTimeBytes, query.Ascending) + defer func() { + for _, it := range iterators { + it.Close() + } + }() + + seriesOutgoing := &protocol.Series{Name: protocol.String(seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, self.pointBatchSize)} + + // TODO: clean up, this is super gnarly + // optimize for the case where we're pulling back only a single column or aggregate + buffer := bytes.NewBuffer(nil) + valueBuffer := proto.NewBuffer(nil) + for { + isValid := false + point := &protocol.Point{Values: make([]*protocol.FieldValue, fieldCount, fieldCount)} + + for i, it := range iterators { + if rawColumnValues[i].value != nil || !it.Valid() { + continue + } + + key := it.Key() + if len(key) < 16 { + continue + } + + if !isPointInRange(fields[i].Id, startTimeBytes, endTimeBytes, key) { + continue + } + + value := it.Value() + sequenceNumber := key[16:] + + rawTime := key[8:16] + rawColumnValues[i] = rawColumnValue{time: rawTime, sequence: sequenceNumber, value: value} + } + + var pointTimeRaw []byte + var pointSequenceRaw []byte + // choose the highest (or lowest in case of ascending queries) timestamp + // and sequence number. that will become the timestamp and sequence of + // the next point. + for _, value := range rawColumnValues { + if value.value == nil { + continue + } + + pointTimeRaw, pointSequenceRaw = value.updatePointTimeAndSequence(pointTimeRaw, + pointSequenceRaw, query.Ascending) + } + + for i, iterator := range iterators { + // if the value is nil or doesn't match the point's timestamp and sequence number + // then skip it + if rawColumnValues[i].value == nil || + !bytes.Equal(rawColumnValues[i].time, pointTimeRaw) || + !bytes.Equal(rawColumnValues[i].sequence, pointSequenceRaw) { + + point.Values[i] = &protocol.FieldValue{IsNull: &TRUE} + continue + } + + // if we emitted at lease one column, then we should keep + // trying to get more points + isValid = true + + // advance the iterator to read a new value in the next iteration + if query.Ascending { + iterator.Next() + } else { + iterator.Prev() + } + + fv := &protocol.FieldValue{} + valueBuffer.SetBuf(rawColumnValues[i].value) + err := valueBuffer.Unmarshal(fv) + if err != nil { + log.Error("Error while running query: %s", err) + return err + } + point.Values[i] = fv + rawColumnValues[i].value = nil + } + + var sequence uint64 + var t uint64 + + // set the point sequence number and timestamp + buffer.Reset() + buffer.Write(pointSequenceRaw) + binary.Read(buffer, binary.BigEndian, &sequence) + buffer.Reset() + buffer.Write(pointTimeRaw) + binary.Read(buffer, binary.BigEndian, &t) + + time := self.convertUintTimestampToInt64(&t) + point.SetTimestampInMicroseconds(time) + point.SequenceNumber = &sequence + + // stop the loop if we ran out of points + if !isValid { + break + } + + shouldContinue := true + + seriesOutgoing.Points = append(seriesOutgoing.Points, point) + + if len(seriesOutgoing.Points) >= self.pointBatchSize { + for _, alias := range aliases { + series := &protocol.Series{ + Name: proto.String(alias), + Fields: fieldNames, + Points: seriesOutgoing.Points, + } + if !processor.YieldSeries(series) { + log.Info("Stopping processing") + shouldContinue = false + } + } + seriesOutgoing = &protocol.Series{Name: protocol.String(seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, self.pointBatchSize)} + } + + if !shouldContinue { + break + } + } + + //Yield remaining data + for _, alias := range aliases { + log.Debug("Final Flush %s", alias) + series := &protocol.Series{Name: protocol.String(alias), Fields: seriesOutgoing.Fields, Points: seriesOutgoing.Points} + if !processor.YieldSeries(series) { + log.Debug("Cancelled...") + } + } + + log.Debug("Finished running query %s", query.GetQueryString()) + return nil +} + +func (self *LevelDbShard) executeListSeriesQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error { + it := self.db.NewIterator(self.readOptions) + defer it.Close() + + database := querySpec.Database() + seekKey := append(DATABASE_SERIES_INDEX_PREFIX, []byte(querySpec.Database()+"~")...) + it.Seek(seekKey) + dbNameStart := len(DATABASE_SERIES_INDEX_PREFIX) + for it = it; it.Valid(); it.Next() { + key := it.Key() + if len(key) < dbNameStart || !bytes.Equal(key[:dbNameStart], DATABASE_SERIES_INDEX_PREFIX) { + break + } + dbSeries := string(key[dbNameStart:]) + parts := strings.Split(dbSeries, "~") + if len(parts) > 1 { + if parts[0] != database { + break + } + name := parts[1] + shouldContinue := processor.YieldPoint(&name, nil, nil) + if !shouldContinue { + return nil + } + } + } + return nil +} + +func (self *LevelDbShard) executeDeleteQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error { + query := querySpec.DeleteQuery() + series := query.GetFromClause() + database := querySpec.Database() + if series.Type != parser.FromClauseArray { + return fmt.Errorf("Merge and Inner joins can't be used with a delete query", series.Type) + } + + for _, name := range series.Names { + var err error + if regex, ok := name.Name.GetCompiledRegex(); ok { + err = self.deleteRangeOfRegex(database, regex, query.GetStartTime(), query.GetEndTime()) + } else { + err = self.deleteRangeOfSeries(database, name.Name.Name, query.GetStartTime(), query.GetEndTime()) + } + + if err != nil { + return err + } + } + self.compact() + return nil +} + +func (self *LevelDbShard) executeDropSeriesQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error { + database := querySpec.Database() + series := querySpec.Query().DropSeriesQuery.GetTableName() + err := self.dropSeries(database, series) + if err != nil { + return err + } + self.compact() + return nil +} + +func (self *LevelDbShard) dropSeries(database, series string) error { + startTimeBytes := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + endTimeBytes := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF} + + wb := levigo.NewWriteBatch() + defer wb.Close() + + if err := self.deleteRangeOfSeriesCommon(database, series, startTimeBytes, endTimeBytes); err != nil { + return err + } + + for _, name := range self.getColumnNamesForSeries(database, series) { + indexKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(database+"~"+series+"~"+name)...) + wb.Delete(indexKey) + } + + wb.Delete(append(DATABASE_SERIES_INDEX_PREFIX, []byte(database+"~"+series)...)) + + // remove the column indeces for this time series + return self.db.Write(self.writeOptions, wb) +} + +func (self *LevelDbShard) byteArrayForTimeInt(time int64) []byte { + timeBuffer := bytes.NewBuffer(make([]byte, 0, 8)) + binary.Write(timeBuffer, binary.BigEndian, self.convertTimestampToUint(&time)) + bytes := timeBuffer.Bytes() + return bytes +} + +func (self *LevelDbShard) byteArraysForStartAndEndTimes(startTime, endTime int64) ([]byte, []byte) { + return self.byteArrayForTimeInt(startTime), self.byteArrayForTimeInt(endTime) +} + +func (self *LevelDbShard) deleteRangeOfSeriesCommon(database, series string, startTimeBytes, endTimeBytes []byte) error { + columns := self.getColumnNamesForSeries(database, series) + fields, err := self.getFieldsForSeries(database, series, columns) + if err != nil { + // because a db is distributed across the cluster, it's possible we don't have the series indexed here. ignore + switch err := err.(type) { + case FieldLookupError: + return nil + default: + return err + } + } + ro := levigo.NewReadOptions() + defer ro.Close() + ro.SetFillCache(false) + wb := levigo.NewWriteBatch() + count := 0 + defer wb.Close() + for _, field := range fields { + it := self.db.NewIterator(ro) + defer it.Close() + + startKey := append(field.Id, startTimeBytes...) + it.Seek(startKey) + if it.Valid() { + if !bytes.Equal(it.Key()[:8], field.Id) { + it.Next() + if it.Valid() { + startKey = it.Key() + } + } + } + for it = it; it.Valid(); it.Next() { + k := it.Key() + if len(k) < 16 || !bytes.Equal(k[:8], field.Id) || bytes.Compare(k[8:16], endTimeBytes) == 1 { + break + } + wb.Delete(k) + count++ + if count >= self.writeBatchSize { + err = self.db.Write(self.writeOptions, wb) + if err != nil { + return err + } + count = 0 + wb.Clear() + } + } + } + return self.db.Write(self.writeOptions, wb) +} + +func (self *LevelDbShard) compact() { + log.Info("Compacting shard") + self.db.CompactRange(levigo.Range{}) + log.Info("Shard compaction is done") +} + +func (self *LevelDbShard) deleteRangeOfSeries(database, series string, startTime, endTime time.Time) error { + startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(common.TimeToMicroseconds(startTime), common.TimeToMicroseconds(endTime)) + return self.deleteRangeOfSeriesCommon(database, series, startTimeBytes, endTimeBytes) +} + +func (self *LevelDbShard) deleteRangeOfRegex(database string, regex *regexp.Regexp, startTime, endTime time.Time) error { + series := self.getSeriesForDbAndRegex(database, regex) + for _, name := range series { + err := self.deleteRangeOfSeries(database, name, startTime, endTime) + if err != nil { + return err + } + } + return nil +} + +func (self *LevelDbShard) getFieldsForSeries(db, series string, columns []string) ([]*Field, error) { + isCountQuery := false + if len(columns) > 0 && columns[0] == "*" { + columns = self.getColumnNamesForSeries(db, series) + } else if len(columns) == 0 { + isCountQuery = true + columns = self.getColumnNamesForSeries(db, series) + } + if len(columns) == 0 { + return nil, NewFieldLookupError("Coulnd't look up columns for series: " + series) + } + + fields := make([]*Field, len(columns), len(columns)) + + for i, name := range columns { + id, errId := self.getIdForDbSeriesColumn(&db, &series, &name) + if errId != nil { + return nil, errId + } + if id == nil { + return nil, NewFieldLookupError("Field " + name + " doesn't exist in series " + series) + } + fields[i] = &Field{Name: name, Id: id} + } + + // if it's a count query we just want the column that will be the most efficient to + // scan through. So find that and return it. + if isCountQuery { + bestField := fields[0] + return []*Field{bestField}, nil + } + return fields, nil +} + +func (self *LevelDbShard) getColumnNamesForSeries(db, series string) []string { + it := self.db.NewIterator(self.readOptions) + defer it.Close() + + seekKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(db+"~"+series+"~")...) + it.Seek(seekKey) + names := make([]string, 0) + dbNameStart := len(SERIES_COLUMN_INDEX_PREFIX) + for it = it; it.Valid(); it.Next() { + key := it.Key() + if len(key) < dbNameStart || !bytes.Equal(key[:dbNameStart], SERIES_COLUMN_INDEX_PREFIX) { + break + } + dbSeriesColumn := string(key[dbNameStart:]) + parts := strings.Split(dbSeriesColumn, "~") + if len(parts) > 2 { + if parts[0] != db || parts[1] != series { + break + } + names = append(names, parts[2]) + } + } + return names +} + +func (self *LevelDbShard) hasReadAccess(querySpec *parser.QuerySpec) bool { + for series, _ := range querySpec.SeriesValuesAndColumns() { + if _, isRegex := series.GetCompiledRegex(); !isRegex { + if !querySpec.HasReadAccess(series.Name) { + return false + } + } + } + return true +} + +func (self *LevelDbShard) byteArrayForTime(t time.Time) []byte { + timeBuffer := bytes.NewBuffer(make([]byte, 0, 8)) + timeMicro := common.TimeToMicroseconds(t) + binary.Write(timeBuffer, binary.BigEndian, self.convertTimestampToUint(&timeMicro)) + return timeBuffer.Bytes() +} + +func (self *LevelDbShard) getSeriesForDbAndRegex(database string, regex *regexp.Regexp) []string { + names := []string{} + allSeries := self.GetSeriesForDatabase(database) + for _, name := range allSeries { + if regex.MatchString(name) { + names = append(names, name) + } + } + return names +} + +func (self *LevelDbShard) GetSeriesForDatabase(database string) []string { + it := self.db.NewIterator(self.readOptions) + defer it.Close() + + seekKey := append(DATABASE_SERIES_INDEX_PREFIX, []byte(database+"~")...) + it.Seek(seekKey) + dbNameStart := len(DATABASE_SERIES_INDEX_PREFIX) + names := make([]string, 0) + for it = it; it.Valid(); it.Next() { + key := it.Key() + if len(key) < dbNameStart || !bytes.Equal(key[:dbNameStart], DATABASE_SERIES_INDEX_PREFIX) { + break + } + dbSeries := string(key[dbNameStart:]) + parts := strings.Split(dbSeries, "~") + if len(parts) > 1 { + if parts[0] != database { + break + } + name := parts[1] + names = append(names, name) + } + } + return names +} + +func (self *LevelDbShard) createIdForDbSeriesColumn(db, series, column *string) (ret []byte, err error) { + ret, err = self.getIdForDbSeriesColumn(db, series, column) + if err != nil { + return + } + + if ret != nil { + return + } + + self.columnIdMutex.Lock() + defer self.columnIdMutex.Unlock() + ret, err = self.getIdForDbSeriesColumn(db, series, column) + if err != nil { + return + } + + if ret != nil { + return + } + + ret, err = self.getNextIdForColumn(db, series, column) + if err != nil { + return + } + s := fmt.Sprintf("%s~%s~%s", *db, *series, *column) + b := []byte(s) + key := append(SERIES_COLUMN_INDEX_PREFIX, b...) + err = self.db.Put(self.writeOptions, key, ret) + return +} + +func (self *LevelDbShard) getIdForDbSeriesColumn(db, series, column *string) (ret []byte, err error) { + s := fmt.Sprintf("%s~%s~%s", *db, *series, *column) + b := []byte(s) + key := append(SERIES_COLUMN_INDEX_PREFIX, b...) + if ret, err = self.db.Get(self.readOptions, key); err != nil { + return nil, err + } + return ret, nil +} + +func (self *LevelDbShard) getNextIdForColumn(db, series, column *string) (ret []byte, err error) { + id := self.lastIdUsed + 1 + self.lastIdUsed += 1 + idBytes := make([]byte, 8, 8) + binary.PutUvarint(idBytes, id) + wb := levigo.NewWriteBatch() + defer wb.Close() + wb.Put(NEXT_ID_KEY, idBytes) + databaseSeriesIndexKey := append(DATABASE_SERIES_INDEX_PREFIX, []byte(*db+"~"+*series)...) + wb.Put(databaseSeriesIndexKey, []byte{}) + seriesColumnIndexKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(*db+"~"+*series+"~"+*column)...) + wb.Put(seriesColumnIndexKey, idBytes) + if err = self.db.Write(self.writeOptions, wb); err != nil { + return nil, err + } + return idBytes, nil +} + +func (self *LevelDbShard) Close() { + self.closed = true + self.readOptions.Close() + self.writeOptions.Close() + self.db.Close() +} + +func (self *LevelDbShard) convertTimestampToUint(t *int64) uint64 { + if *t < 0 { + return uint64(math.MaxInt64 + *t + 1) + } + return uint64(*t) + uint64(math.MaxInt64) + uint64(1) +} + +func (self *LevelDbShard) fetchSinglePoint(querySpec *parser.QuerySpec, series string, fields []*Field) (*protocol.Series, error) { + query := querySpec.SelectQuery() + fieldCount := len(fields) + fieldNames := make([]string, 0, fieldCount) + point := &protocol.Point{Values: make([]*protocol.FieldValue, 0, fieldCount)} + timestamp := common.TimeToMicroseconds(query.GetStartTime()) + sequenceNumber, err := query.GetSinglePointQuerySequenceNumber() + if err != nil { + return nil, err + } + + timeAndSequenceBuffer := bytes.NewBuffer(make([]byte, 0, 16)) + binary.Write(timeAndSequenceBuffer, binary.BigEndian, self.convertTimestampToUint(×tamp)) + binary.Write(timeAndSequenceBuffer, binary.BigEndian, sequenceNumber) + sequenceNumber_uint64 := uint64(sequenceNumber) + point.SequenceNumber = &sequenceNumber_uint64 + point.SetTimestampInMicroseconds(timestamp) + + timeAndSequenceBytes := timeAndSequenceBuffer.Bytes() + for _, field := range fields { + pointKey := append(field.Id, timeAndSequenceBytes...) + + if data, err := self.db.Get(self.readOptions, pointKey); err != nil { + return nil, err + } else { + fieldValue := &protocol.FieldValue{} + err := proto.Unmarshal(data, fieldValue) + if err != nil { + return nil, err + } + if data != nil { + fieldNames = append(fieldNames, field.Name) + point.Values = append(point.Values, fieldValue) + } + } + } + + result := &protocol.Series{Name: &series, Fields: fieldNames, Points: []*protocol.Point{point}} + + return result, nil +} + +func (self *LevelDbShard) getIterators(fields []*Field, start, end []byte, isAscendingQuery bool) (fieldNames []string, iterators []*levigo.Iterator) { + iterators = make([]*levigo.Iterator, len(fields)) + fieldNames = make([]string, len(fields)) + + // start the iterators to go through the series data + for i, field := range fields { + fieldNames[i] = field.Name + iterators[i] = self.db.NewIterator(self.readOptions) + if isAscendingQuery { + iterators[i].Seek(append(field.Id, start...)) + } else { + iterators[i].Seek(append(append(field.Id, end...), MAX_SEQUENCE...)) + if iterators[i].Valid() { + iterators[i].Prev() + } + } + } + return +} + +func (self *LevelDbShard) convertUintTimestampToInt64(t *uint64) int64 { + if *t > uint64(math.MaxInt64) { + return int64(*t-math.MaxInt64) - int64(1) + } + return int64(*t) - math.MaxInt64 - int64(1) +} diff --git a/server/server.go b/server/server.go index b9a5b963eb..eed92f6ac8 100644 --- a/server/server.go +++ b/server/server.go @@ -64,7 +64,7 @@ func NewServer(config *configuration.Configuration) (*Server, error) { protobufServer := coordinator.NewProtobufServer(config.ProtobufListenString(), requestHandler) raftServer.AssignCoordinator(coord) - httpApi := http.NewHttpServer(config.ApiHttpPortString(), config.ApiReadTimeout, config.AdminAssetsDir, coord, coord, clusterConfig, raftServer) + httpApi := http.NewHttpServer(config, coord, coord, clusterConfig, raftServer) httpApi.EnableSsl(config.ApiHttpSslPortString(), config.ApiHttpCertPath) graphiteApi := graphite.NewServer(config, coord, clusterConfig) adminServer := admin.NewHttpServer(config.AdminAssetsDir, config.AdminHttpPortString()) diff --git a/wal/global_state.go b/wal/global_state.go index 873617a466..3eea577bf6 100644 --- a/wal/global_state.go +++ b/wal/global_state.go @@ -30,6 +30,14 @@ type GlobalState struct { path string } +// from version 0.7 to 0.8 the Suffix variables changed from +// ints to uint32s. We need this struct to convert them. +type oldGlobalState struct { + GlobalState + CurrentFileSuffix int + FirstSuffix int +} + func newGlobalState(path string) (*GlobalState, error) { f, err := os.Open(path) if err != nil { @@ -84,7 +92,7 @@ func (self *GlobalState) write(w io.Writer) error { return gob.NewEncoder(w).Encode(self) } -func (self *GlobalState) read(r io.Reader) error { +func (self *GlobalState) read(r *os.File) error { // skip the version reader := bufio.NewReader(r) // read the version line @@ -92,7 +100,31 @@ func (self *GlobalState) read(r io.Reader) error { if err != nil { return err } - return gob.NewDecoder(reader).Decode(self) + err = gob.NewDecoder(reader).Decode(self) + + // from version 0.7 to 0.8 the type of the Suffix variables + // changed to uint32. Catch this and convert to a new GlobalState object. + if err != nil { + old := &oldGlobalState{} + r.Seek(int64(0), 0) + reader := bufio.NewReader(r) + // read the version line + _, err := reader.ReadString('\n') + if err != nil { + return err + } + err = gob.NewDecoder(reader).Decode(old) + if err != nil { + return err + } + self.CurrentFileOffset = old.CurrentFileOffset + self.CurrentFileSuffix = uint32(old.CurrentFileSuffix) + self.LargestRequestNumber = old.LargestRequestNumber + self.FirstSuffix = uint32(old.FirstSuffix) + self.ShardLastSequenceNumber = old.ShardLastSequenceNumber + self.ServerLastRequestNumber = old.ServerLastRequestNumber + } + return nil } func (self *GlobalState) recover(shardId uint32, sequenceNumber uint64) {