Implements common config migration functions

pull/3207/head
Kieran Prasch 2023-08-25 14:47:21 +02:00
parent acfc455a89
commit 2cc4b468f1
7 changed files with 81 additions and 200 deletions

View File

@ -2,17 +2,13 @@ from collections import OrderedDict
from .configuration_v1_to_v2 import configuration_v1_to_v2
from .configuration_v3_to_v4 import configuration_v3_to_v4
from .configuration_v4_to_v5 import configuration_v4_to_v5
from .configuration_v4_to_v6 import configuration_v4_to_v6
from .configuration_v5_to_v6 import configuration_v5_to_v6
MIGRATIONS = OrderedDict(
{
(1, 2): configuration_v1_to_v2,
(2, 3): None, # (no-op)
(3, 4): configuration_v3_to_v4,
(4, 5): configuration_v4_to_v5,
(4, 6): configuration_v4_to_v6,
(5, 6): configuration_v5_to_v6,
}
)

View File

@ -0,0 +1,16 @@
from typing import Dict
from nucypher.config.migrations.common import perform_migration
def __migration(config: Dict) -> None:
pass # apply migrations here
def configuration_v1_to_v2(filepath) -> None: # rename accordingly
perform_migration(
old_version=1, # migrating from version
new_version=2, # migrating to version
migration=__migration,
filepath=filepath,
)

View File

@ -1,44 +1,18 @@
import json
import os
from typing import Dict
BACKUP_SUFFIX = ".old"
OLD_VERSION = 1
NEW_VERSION = 2
from nucypher.config.migrations.common import perform_migration
def configuration_v1_to_v2(filepath: str):
# Read + deserialize
with open(filepath, "r") as file:
contents = file.read()
config = json.loads(contents)
def __migration(config: Dict) -> None:
domains = config["domains"]
domain = domains[0]
if len(domains) > 1:
print(f"Multiple domains configured, using the first one ({domain}).")
del config["domains"]
config["domain"] = domain
try:
# Check we have version 1 indeed
existing_version = config["version"]
if existing_version != OLD_VERSION:
raise RuntimeError(
f"Existing configuration is not version 1; Got version {existing_version}"
)
# Make a copy of the original file
backup_filepath = str(filepath) + BACKUP_SUFFIX
os.rename(filepath, backup_filepath)
print(f"Backed up existing configuration to {backup_filepath}")
# Get current domain value
domains = config["domains"]
domain = domains[0]
if len(domains) > 1:
print(f"Multiple domains configured, using the first one ({domain}).")
# Apply updates
del config["domains"] # deprecated
config["domain"] = domain
config["version"] = NEW_VERSION
except KeyError:
raise KeyError(f"Invalid {OLD_VERSION} configuration file.")
# Commit updates
with open(filepath, "w") as file:
file.write(json.dumps(config, indent=4))
print("OK! Migrated configuration file from v1 -> v2.")
def configuration_v1_to_v2(filepath) -> None:
perform_migration(
old_version=1, new_version=2, migration=__migration, filepath=filepath
)

View File

@ -1,40 +1,15 @@
import json
import os
from typing import Dict
BACKUP_SUFFIX = ".old"
OLD_VERSION = 3
NEW_VERSION = 4
from nucypher.config.migrations.common import perform_migration
def configuration_v3_to_v4(filepath: str):
# Read + deserialize
with open(filepath, "r") as file:
contents = file.read()
config = json.loads(contents)
def __migration(config: Dict) -> None:
worker_address = config["worker_address"]
del config["worker_address"] # deprecated
config["operator_address"] = worker_address
try:
# Check we have version 1 indeed
existing_version = config["version"]
if existing_version != OLD_VERSION:
raise RuntimeError(
f"Existing configuration is not version {OLD_VERSION}; Got version {existing_version}"
)
# Make a copy of the original file
backup_filepath = str(filepath) + BACKUP_SUFFIX
os.rename(filepath, backup_filepath)
print(f"Backed up existing configuration to {backup_filepath}")
# Apply updates
worker_address = config["worker_address"]
del config["worker_address"] # deprecated
config["operator_address"] = worker_address
config["version"] = NEW_VERSION
except KeyError:
raise RuntimeError(f"Invalid {OLD_VERSION} configuration file.")
# Commit updates
with open(filepath, "w") as file:
file.write(json.dumps(config, indent=4))
print(f"OK! Migrated configuration file from v{OLD_VERSION} -> v{NEW_VERSION}.")
def configuration_v3_to_v4(filepath) -> None:
perform_migration(
old_version=3, new_version=4, migration=__migration, filepath=filepath
)

View File

@ -1,38 +1,14 @@
import json
import os
from typing import Dict
BACKUP_SUFFIX = ".old"
OLD_VERSION = 4
NEW_VERSION = 5
from nucypher.config.migrations.common import perform_migration
def configuration_v4_to_v5(filepath: str):
# Read + deserialize
with open(filepath, "r") as file:
contents = file.read()
config = json.loads(contents)
def __migration(config: Dict) -> None:
del config["federated_only"] # deprecated
del config["checksum_address"]
try:
existing_version = config["version"]
if existing_version != OLD_VERSION:
raise RuntimeError(
f"Existing configuration is not version {OLD_VERSION}; Got version {existing_version}"
)
# Make a copy of the original file
backup_filepath = str(filepath) + BACKUP_SUFFIX
os.rename(filepath, backup_filepath)
print(f"Backed up existing configuration to {backup_filepath}")
# Apply updates
del config["federated_only"] # deprecated
del config["checksum_address"]
config["version"] = NEW_VERSION
except KeyError:
raise RuntimeError(f"Invalid {OLD_VERSION} configuration file.")
# Commit updates
with open(filepath, "w") as file:
file.write(json.dumps(config, indent=4))
print(f"OK! Migrated configuration file from v{OLD_VERSION} -> v{NEW_VERSION}.")
def configuration_v4_to_v5(filepath) -> None:
perform_migration(
old_version=4, new_version=5, migration=__migration, filepath=filepath
)

View File

@ -1,52 +1,25 @@
import json
import os
from typing import Dict
from nucypher.blockchain.eth.networks import NetworksInventory
BACKUP_SUFFIX = ".old"
OLD_VERSION = 4
NEW_VERSION = 6
from nucypher.config.migrations.common import perform_migration
def configuration_v4_to_v6(filepath: str):
# Read + deserialize
with open(filepath, "r") as file:
contents = file.read()
config = json.loads(contents)
def __migration(config: Dict) -> None:
del config["federated_only"] # deprecated
del config["checksum_address"]
try:
existing_version = config["version"]
if existing_version != OLD_VERSION:
raise RuntimeError(
f"Existing configuration is not version {OLD_VERSION}; Got version {existing_version}"
)
# Multichain support
eth_provider = config["eth_provider_uri"]
eth_chain_id = NetworksInventory.get_ethereum_chain_id(config["domain"])
polygon_provider = config["payment_provider"]
polygon_chain_id = NetworksInventory.get_polygon_chain_id(config["payment_network"])
config["condition_providers"] = {
eth_chain_id: [eth_provider],
polygon_chain_id: [polygon_provider],
}
# Make a copy of the original file
backup_filepath = str(filepath) + BACKUP_SUFFIX
os.rename(filepath, backup_filepath)
print(f"Backed up existing configuration to {backup_filepath}")
# Apply updates
del config["federated_only"] # deprecated
del config["checksum_address"]
config["version"] = NEW_VERSION
# Multichain support
eth_provider = config["eth_provider_uri"]
eth_chain_id = NetworksInventory.get_ethereum_chain_id(config["domain"])
polygon_provider = config["payment_provider"]
polygon_chain_id = NetworksInventory.get_polygon_chain_id(
config["payment_network"]
)
config["condition_providers"] = {
eth_chain_id: [eth_provider],
polygon_chain_id: [polygon_provider],
}
except KeyError:
raise RuntimeError(f"Invalid {OLD_VERSION} configuration file.")
# Commit updates
with open(filepath, "w") as file:
file.write(json.dumps(config, indent=4))
print(f"OK! Migrated configuration file from v{OLD_VERSION} -> v{NEW_VERSION}.")
def configuration_v4_to_v6(filepath) -> None:
perform_migration(
old_version=4, new_version=6, migration=__migration, filepath=filepath
)

View File

@ -1,50 +1,21 @@
import json
import os
from typing import Dict
from nucypher.blockchain.eth.networks import NetworksInventory
BACKUP_SUFFIX = ".old"
OLD_VERSION = 5
NEW_VERSION = 6
from nucypher.config.migrations.common import perform_migration
def configuration_v5_to_v6(filepath: str):
# Read + deserialize
with open(filepath, "r") as file:
contents = file.read()
config = json.loads(contents)
def __migration(config: Dict) -> None:
eth_provider = config["eth_provider_uri"]
eth_chain_id = NetworksInventory.get_ethereum_chain_id(config["domain"])
polygon_provider = config["payment_provider"]
polygon_chain_id = NetworksInventory.get_polygon_chain_id(config["payment_network"])
config["condition_providers"] = {
eth_chain_id: [eth_provider],
polygon_chain_id: [polygon_provider],
}
try:
existing_version = config["version"]
if existing_version != OLD_VERSION:
raise RuntimeError(
f"Existing configuration is not version {OLD_VERSION}; Got version {existing_version}"
)
# Make a copy of the original file
backup_filepath = str(filepath) + BACKUP_SUFFIX
os.rename(filepath, backup_filepath)
print(f"Backed up existing configuration to {backup_filepath}")
# Apply updates
config["version"] = NEW_VERSION
# Multichain support
eth_provider = config["eth_provider_uri"]
eth_chain_id = NetworksInventory.get_ethereum_chain_id(config["domain"])
polygon_provider = config["payment_provider"]
polygon_chain_id = NetworksInventory.get_polygon_chain_id(
config["payment_network"]
)
config["condition_providers"] = {
eth_chain_id: [eth_provider],
polygon_chain_id: [polygon_provider],
}
except KeyError:
raise KeyError(f"Invalid {OLD_VERSION} configuration file.")
# Commit updates
with open(filepath, "w") as file:
file.write(json.dumps(config, indent=4))
print(f"OK! Migrated configuration file from v{OLD_VERSION} -> v{NEW_VERSION}.")
def configuration_v5_to_v6(filepath) -> None:
perform_migration(
old_version=5, new_version=6, migration=__migration, filepath=filepath
)