feat(influxd): Perform checks to determine if prior version exists

Closes #19408
pull/19446/head
Stuart Carnie 2020-08-24 15:40:25 -07:00
parent cfd089b77a
commit 7ec178f54c
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
1 changed files with 52 additions and 0 deletions

View File

@ -730,6 +730,11 @@ func (m *Launcher) run(ctx context.Context) (err error) {
return err
}
// check for 2.x data / state from a prior 2.x
if err := checkForPriorVersion(ctx, m.log, m.boltPath, m.enginePath, ts.BucketService, metaClient); err != nil {
os.Exit(1)
}
m.engine = storage.NewEngine(
m.enginePath,
m.StorageConfig,
@ -1272,6 +1277,53 @@ func (m *Launcher) run(ctx context.Context) (err error) {
return nil
}
func checkForPriorVersion(ctx context.Context, log *zap.Logger, boltPath string, enginePath string, bs platform.BucketService, metaClient *meta.Client) error {
buckets, _, err := bs.FindBuckets(ctx, platform.BucketFilter{})
if err != nil {
log.Error("Failed to retrieve buckets", zap.Error(err))
return err
}
hasErrors := false
// if there are no buckets, we will be fine
if len(buckets) > 0 {
log.Info("Checking InfluxDB metadata for prior version.", zap.String("bolt_path", boltPath))
for i := range buckets {
bucket := buckets[i]
if dbi := metaClient.Database(bucket.ID.String()); dbi == nil {
log.Error("Missing metadata for bucket.", zap.String("bucket", bucket.Name), zap.Stringer("bucket_id", bucket.ID))
hasErrors = true
}
}
if hasErrors {
log.Error("Incompatible InfluxDB 2.0 metadata found. File must be moved before influxd will start.", zap.String("path", boltPath))
}
}
// see if there are existing files which match the old directory structure
{
for _, name := range []string{"_series", "index"} {
dir := filepath.Join(enginePath, name)
if fi, err := os.Stat(dir); err == nil {
if fi.IsDir() {
log.Error("Found directory that is incompatible with this version of InfluxDB.", zap.String("path", dir))
hasErrors = true
}
}
}
}
if hasErrors {
log.Error("Incompatible InfluxDB 2.0 version found. Move all files outside of engine_path before influxd will start.", zap.String("engine_path", enginePath))
return errors.New("incompatible InfluxDB version")
}
return nil
}
// isAddressPortAvailable checks whether the address:port is available to listen,
// by using net.Listen to verify that the port opens successfully, then closes the listener.
func isAddressPortAvailable(address string, port int) (bool, error) {