Add optional pausing to data migration.
parent
d9f14cbb72
commit
439e4c9914
|
@ -1228,6 +1228,15 @@ func (self *HttpServer) configureDatabase(w libhttp.ResponseWriter, r *libhttp.R
|
|||
|
||||
func (self *HttpServer) migrateData(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) {
|
||||
pauseTime := r.URL.Query().Get("pause")
|
||||
pauseDuration := time.Millisecond * 100
|
||||
if pauseTime != "" {
|
||||
var err error
|
||||
pauseDuration, err = time.ParseDuration(pauseTime)
|
||||
if err != nil {
|
||||
return libhttp.StatusBadRequest, fmt.Errorf("Couldn't parse pause time: ", err)
|
||||
}
|
||||
}
|
||||
if !atomic.CompareAndSwapUint32(&self.migrationRunning, MIGRATION_NOT_RUNNING, MIGRATION_RUNNING) {
|
||||
return libhttp.StatusForbidden, fmt.Errorf("A migration is already running")
|
||||
}
|
||||
|
@ -1235,7 +1244,7 @@ func (self *HttpServer) migrateData(w libhttp.ResponseWriter, r *libhttp.Request
|
|||
log.Info("Starting Migration")
|
||||
defer atomic.CompareAndSwapUint32(&self.migrationRunning, MIGRATION_RUNNING, MIGRATION_NOT_RUNNING)
|
||||
dataMigrator := migration.NewDataMigrator(
|
||||
self.coordinator.(*coordinator.CoordinatorImpl), self.clusterConfig, self.config, self.config.DataDir, "shard_db", self.clusterConfig.MetaStore)
|
||||
self.coordinator.(*coordinator.CoordinatorImpl), self.clusterConfig, self.config, self.config.DataDir, "shard_db", self.clusterConfig.MetaStore, pauseDuration)
|
||||
dataMigrator.Migrate()
|
||||
}()
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/cluster"
|
||||
"github.com/influxdb/influxdb/configuration"
|
||||
|
@ -28,18 +29,20 @@ type DataMigrator struct {
|
|||
config *configuration.Configuration
|
||||
clusterConfig *cluster.ClusterConfiguration
|
||||
coord *coordinator.CoordinatorImpl
|
||||
pauseTime time.Duration
|
||||
}
|
||||
|
||||
const (
|
||||
MIGRATED_MARKER = "MIGRATED"
|
||||
OLD_SHARD_DIR = "shard_db"
|
||||
MIGRATED_MARKER = "MIGRATED"
|
||||
OLD_SHARD_DIR = "shard_db"
|
||||
POINT_COUNT_TO_PAUSE = 10000
|
||||
)
|
||||
|
||||
var (
|
||||
endStreamResponse = protocol.Response_END_STREAM
|
||||
)
|
||||
|
||||
func NewDataMigrator(coord *coordinator.CoordinatorImpl, clusterConfig *cluster.ClusterConfiguration, config *configuration.Configuration, baseDbDir, newSubDir string, metaStore *metastore.Store) *DataMigrator {
|
||||
func NewDataMigrator(coord *coordinator.CoordinatorImpl, clusterConfig *cluster.ClusterConfiguration, config *configuration.Configuration, baseDbDir, newSubDir string, metaStore *metastore.Store, pauseTime time.Duration) *DataMigrator {
|
||||
return &DataMigrator{
|
||||
baseDbDir: baseDbDir,
|
||||
dbDir: filepath.Join(baseDbDir, OLD_SHARD_DIR),
|
||||
|
@ -47,6 +50,7 @@ func NewDataMigrator(coord *coordinator.CoordinatorImpl, clusterConfig *cluster.
|
|||
config: config,
|
||||
clusterConfig: clusterConfig,
|
||||
coord: coord,
|
||||
pauseTime: pauseTime,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -103,6 +107,7 @@ func (dm *DataMigrator) migrateDatabaseInShard(database string, shard *LevelDbSh
|
|||
log.Info("Migrating %d series", len(seriesNames))
|
||||
|
||||
admin := dm.clusterConfig.GetClusterAdmin(dm.clusterConfig.GetClusterAdmins()[0])
|
||||
pointCount := 0
|
||||
for _, series := range seriesNames {
|
||||
q, err := parser.ParseQuery(fmt.Sprintf("select * from \"%s\"", series))
|
||||
if err != nil {
|
||||
|
@ -130,8 +135,14 @@ func (dm *DataMigrator) migrateDatabaseInShard(database string, shard *LevelDbSh
|
|||
if err != nil {
|
||||
log.Error("Writing Series data: %s", err.Error())
|
||||
}
|
||||
pointCount += len(response.Series.Points)
|
||||
if pointCount > POINT_COUNT_TO_PAUSE {
|
||||
pointCount = 0
|
||||
time.Sleep(dm.pauseTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Info("Done migrating %s for shard", database)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue