Next iteration through the job scheduler:
-- added docstrings -- converted run_job method into a JobRunner class -- added a test job to run in non-production environments -- added ability to override the default run datepull/191/head
parent
abf3556b36
commit
e6d14d792d
|
@ -1,6 +1,13 @@
|
|||
"""Define the commands to run Selene batch jobs and the execution schedule.
|
||||
|
||||
This module is run as a daemon on the Selene batch host. It defines the
|
||||
commands needed to run each job using the subprocess module. The jobs are
|
||||
scheduled using the "schedule" library.
|
||||
"""
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
from datetime import date, timedelta
|
||||
|
||||
import schedule
|
||||
|
||||
|
@ -9,36 +16,100 @@ from selene.util.log import configure_logger
|
|||
_log = configure_logger('selene_job_scheduler')
|
||||
|
||||
|
||||
def run_job(script_name: str, script_args: str = None):
|
||||
command = ['pipenv', 'run', 'python']
|
||||
script_path = os.path.join(
|
||||
os.environ['SELENE_SCRIPT_DIR'],
|
||||
script_name
|
||||
)
|
||||
command.append(script_path)
|
||||
if script_args is not None:
|
||||
command.extend(script_args.split())
|
||||
_log.info(command)
|
||||
result = subprocess.run(command, capture_output=True)
|
||||
if result.returncode:
|
||||
_log.error(
|
||||
'Job {job_name} failed\n'
|
||||
'\tSTDOUT - {stdout}'
|
||||
'\tSTDERR - {stderr}'.format(
|
||||
job_name=script_name[:-3],
|
||||
stdout=result.stdout.decode(),
|
||||
stderr=result.stderr.decode()
|
||||
)
|
||||
class JobRunner(object):
|
||||
"""Build the command to run a batch job and run it via subprocess."""
|
||||
def __init__(self, script_name: str):
|
||||
self.script_name = script_name
|
||||
self.job_args: str = None
|
||||
self.job_date: date = None
|
||||
|
||||
def run_job(self):
|
||||
if self.job_date is not None:
|
||||
self._add_date_to_args()
|
||||
command = self._build_command()
|
||||
self._execute_command(command)
|
||||
|
||||
def _add_date_to_args(self):
|
||||
"""Adds a date argument to the argument string.
|
||||
|
||||
The SeleneScript base class defaults the run date to current date so the
|
||||
date argument only needs to be specified when it is not current date.
|
||||
"""
|
||||
if self.job_args is None:
|
||||
self.job_args = ''
|
||||
date_arg = ' --date ' + str(self.job_date)
|
||||
self.job_args += date_arg
|
||||
|
||||
def _build_command(self):
|
||||
"""Build the command to run the script."""
|
||||
command = ['pipenv', 'run', 'python']
|
||||
script_path = os.path.join(
|
||||
os.environ['SELENE_SCRIPT_DIR'],
|
||||
self.script_name
|
||||
)
|
||||
else:
|
||||
log_msg = 'Job {job_name} completed successfully'
|
||||
_log.info(log_msg.format(job_name=script_name[:-3]))
|
||||
command.append(script_path)
|
||||
if self.job_args is not None:
|
||||
command.extend(self.job_args.split())
|
||||
_log.info(command)
|
||||
|
||||
return command
|
||||
|
||||
def _execute_command(self, command):
|
||||
"""Run the script using the subprocess module."""
|
||||
result = subprocess.run(command, capture_output=True)
|
||||
if result.returncode:
|
||||
_log.error(
|
||||
'Job {job_name} failed\n'
|
||||
'\tSTDOUT - {stdout}'
|
||||
'\tSTDERR - {stderr}'.format(
|
||||
job_name=self.script_name[:-3],
|
||||
stdout=result.stdout.decode(),
|
||||
stderr=result.stderr.decode()
|
||||
)
|
||||
)
|
||||
else:
|
||||
log_msg = 'Job {job_name} completed successfully'
|
||||
_log.info(log_msg.format(job_name=self.script_name[:-3]))
|
||||
|
||||
|
||||
def test_scheduler():
|
||||
"""Run in non-production environments to test scheduler functionality."""
|
||||
job_runner = JobRunner('test_scheduler.py')
|
||||
job_runner.job_date = date.today() - timedelta(days=1)
|
||||
job_runner.job_args = '--arg-with-value test --arg-no-value'
|
||||
job_runner.run_job()
|
||||
|
||||
|
||||
def partition_api_metrics():
|
||||
"""Copy rows from metric.api table to partitioned metric.api_history table
|
||||
|
||||
Build a partition on the metric.api_history table for yesterday's date.
|
||||
Copy yesterday's metric.api table rows to the partition.
|
||||
"""
|
||||
job_runner = JobRunner('partition_api_metrics.py')
|
||||
job_runner.job_date = date.today() - timedelta(days=1)
|
||||
job_runner.run_job()
|
||||
|
||||
|
||||
def update_device_last_contact():
|
||||
"""Update the last time a device was seen.
|
||||
|
||||
Each time a device calls the public API, the Redis database is updated with
|
||||
to associate the time of the call with the device. Dump the contents of
|
||||
the Redis data to the device.device table on the Postgres database.
|
||||
"""
|
||||
job_runner = JobRunner('update_device_last_contact.py')
|
||||
job_runner.run_job()
|
||||
|
||||
|
||||
# Define the schedule
|
||||
schedule.every().day.at('00:00').do(run_job, 'partition_api_metrics.py')
|
||||
schedule.every().day.at('00:05').do(run_job, 'update_device_last_contact.py')
|
||||
if os.environ['SELENE_ENVIRONMENT'] != 'prod':
|
||||
schedule.every(5).minutes.do(test_scheduler)
|
||||
|
||||
schedule.every().day.at('00:00').do(partition_api_metrics)
|
||||
schedule.every().day.at('00:05').do(update_device_last_contact)
|
||||
|
||||
# Run the schedule
|
||||
while True:
|
||||
schedule.run_pending()
|
||||
time.sleep(1)
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
"""Job to test the scheduler functionality.
|
||||
|
||||
This job is run through the batch job scheduler. It contains assertion
|
||||
statements that test the functionality of the code in the
|
||||
job_scheduler.job module.
|
||||
"""
|
||||
from datetime import date, timedelta
|
||||
|
||||
from selene.batch.base import SeleneScript
|
||||
|
||||
|
||||
class TestScheduler(SeleneScript):
|
||||
def __init__(self):
|
||||
super(TestScheduler, self).__init__(__file__)
|
||||
|
||||
def _define_args(self):
|
||||
"""Pass an arg with value and arg without value to the script
|
||||
|
||||
The scheduler needs to be able to handle arguments that take a value
|
||||
and those that do not. Define one of each and specify them in the
|
||||
scheduler.
|
||||
"""
|
||||
super(TestScheduler, self)._define_args()
|
||||
self._arg_parser.add_argument(
|
||||
"--arg-with-value",
|
||||
help='Argument to test passing a value with an argument',
|
||||
required=True,
|
||||
type=str
|
||||
)
|
||||
self._arg_parser.add_argument(
|
||||
"--arg-no-value",
|
||||
help='Argument to test passing a value with an argument',
|
||||
action="store_true"
|
||||
)
|
||||
|
||||
def _run(self):
|
||||
self.log.info('Running the scheduler test job')
|
||||
assert self.args.arg_no_value
|
||||
assert self.args.arg_with_value == 'test'
|
||||
|
||||
# Tests the logic that overrides the default date in the scheduler.
|
||||
assert self.args.date == date.today() - timedelta(days=1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
TestScheduler().run()
|
Loading…
Reference in New Issue