From 11d399468ae1c8a4fc68473b51a053b7ce6cf974 Mon Sep 17 00:00:00 2001 From: Chris Veilleux Date: Mon, 3 Jun 2019 23:28:52 -0500 Subject: [PATCH] new base class for batch jobs --- batch/base/__init__.py | 1 + batch/base/base.py | 101 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 batch/base/__init__.py create mode 100644 batch/base/base.py diff --git a/batch/base/__init__.py b/batch/base/__init__.py new file mode 100644 index 00000000..ad72b3b1 --- /dev/null +++ b/batch/base/__init__.py @@ -0,0 +1 @@ +from .base import SeleneScript diff --git a/batch/base/base.py b/batch/base/base.py new file mode 100644 index 00000000..ba39cb36 --- /dev/null +++ b/batch/base/base.py @@ -0,0 +1,101 @@ +from argparse import ArgumentParser +from datetime import date, datetime +from logging import getLogger +from os import environ, path +import sys + +from selene.data.metrics import JobMetric, JobRepository +from selene.util.db import DatabaseConnectionConfig, connect_to_db +from selene.util.log import configure_logger + + +class SeleneScript(object): + _db = None + _job_name = None + + def __init__(self, job_file_path): + self._job_file_path = job_file_path + configure_logger(self.job_name) + self.log = getLogger(self.job_name) + self._arg_parser = ArgumentParser() + self.args = None + self.start_ts = datetime.now() + self.end_ts = None + self.success = False + + @property + def job_name(self): + if self._job_name is None: + job_file_name = path.basename(self._job_file_path) + self._job_name = job_file_name[:-3] + + return self._job_name + + @property + def db(self): + if self._db is None: + db_connection_config = DatabaseConnectionConfig( + host=environ['DB_HOST'], + db_name=environ['DB_NAME'], + password=environ['DB_PASSWORD'], + port=environ.get('DB_PORT', 5432), + user=environ['DB_USER'], + sslmode=environ.get('DB_SSLMODE'), + use_namedtuple_cursor=True + ) + self._db = connect_to_db(db_connection_config) + + return self._db + + def run(self): + try: + self._start_job() + self._run() + self.success = True + except: + self.log.exception('An exception occurred - aborting script') + raise + finally: + self._finish_job() + + def _start_job(self): + # Logger builds daily files, delineate start in case of multiple runs + self.log.info('* * * * * START OF JOB * * * * *') + self._define_args() + self.args = self._arg_parser.parse_args() + + def _define_args(self): + self._arg_parser.add_argument( + "--date", + default=date.today(), + help='Processing date in YYYY-MM-DD format', + type=lambda dt: datetime.strptime(dt, '%Y-%m-%d').date() + ) + + def _run(self): + """Subclass must override this to perform job-specific logic""" + raise NotImplementedError + + def _finish_job(self): + self.end_ts = datetime.now() + self._insert_metrics() + self.db.close() + self.log.info( + 'script run time: ' + str(self.end_ts - self.start_ts) + ) + # Logger builds daily files, delineate end in case of multiple runs + self.log.info('* * * * * END OF JOB * * * * *') + + def _insert_metrics(self): + if self.args is not None: + job_repository = JobRepository(self.db) + job_metric = JobMetric( + job_name=self.job_name, + batch_date=self.args.date, + start_ts=self.start_ts, + end_ts=self.end_ts, + command=' '.join(sys.argv), + success=self.success + ) + job_id = job_repository.add(job_metric) + self.log.info('Job ID: ' + job_id)