new base class for batch jobs
parent
67d47ed280
commit
11d399468a
|
@ -0,0 +1 @@
|
|||
from .base import SeleneScript
|
|
@ -0,0 +1,101 @@
|
|||
from argparse import ArgumentParser
|
||||
from datetime import date, datetime
|
||||
from logging import getLogger
|
||||
from os import environ, path
|
||||
import sys
|
||||
|
||||
from selene.data.metrics import JobMetric, JobRepository
|
||||
from selene.util.db import DatabaseConnectionConfig, connect_to_db
|
||||
from selene.util.log import configure_logger
|
||||
|
||||
|
||||
class SeleneScript(object):
|
||||
_db = None
|
||||
_job_name = None
|
||||
|
||||
def __init__(self, job_file_path):
|
||||
self._job_file_path = job_file_path
|
||||
configure_logger(self.job_name)
|
||||
self.log = getLogger(self.job_name)
|
||||
self._arg_parser = ArgumentParser()
|
||||
self.args = None
|
||||
self.start_ts = datetime.now()
|
||||
self.end_ts = None
|
||||
self.success = False
|
||||
|
||||
@property
|
||||
def job_name(self):
|
||||
if self._job_name is None:
|
||||
job_file_name = path.basename(self._job_file_path)
|
||||
self._job_name = job_file_name[:-3]
|
||||
|
||||
return self._job_name
|
||||
|
||||
@property
|
||||
def db(self):
|
||||
if self._db is None:
|
||||
db_connection_config = DatabaseConnectionConfig(
|
||||
host=environ['DB_HOST'],
|
||||
db_name=environ['DB_NAME'],
|
||||
password=environ['DB_PASSWORD'],
|
||||
port=environ.get('DB_PORT', 5432),
|
||||
user=environ['DB_USER'],
|
||||
sslmode=environ.get('DB_SSLMODE'),
|
||||
use_namedtuple_cursor=True
|
||||
)
|
||||
self._db = connect_to_db(db_connection_config)
|
||||
|
||||
return self._db
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
self._start_job()
|
||||
self._run()
|
||||
self.success = True
|
||||
except:
|
||||
self.log.exception('An exception occurred - aborting script')
|
||||
raise
|
||||
finally:
|
||||
self._finish_job()
|
||||
|
||||
def _start_job(self):
|
||||
# Logger builds daily files, delineate start in case of multiple runs
|
||||
self.log.info('* * * * * START OF JOB * * * * *')
|
||||
self._define_args()
|
||||
self.args = self._arg_parser.parse_args()
|
||||
|
||||
def _define_args(self):
|
||||
self._arg_parser.add_argument(
|
||||
"--date",
|
||||
default=date.today(),
|
||||
help='Processing date in YYYY-MM-DD format',
|
||||
type=lambda dt: datetime.strptime(dt, '%Y-%m-%d').date()
|
||||
)
|
||||
|
||||
def _run(self):
|
||||
"""Subclass must override this to perform job-specific logic"""
|
||||
raise NotImplementedError
|
||||
|
||||
def _finish_job(self):
|
||||
self.end_ts = datetime.now()
|
||||
self._insert_metrics()
|
||||
self.db.close()
|
||||
self.log.info(
|
||||
'script run time: ' + str(self.end_ts - self.start_ts)
|
||||
)
|
||||
# Logger builds daily files, delineate end in case of multiple runs
|
||||
self.log.info('* * * * * END OF JOB * * * * *')
|
||||
|
||||
def _insert_metrics(self):
|
||||
if self.args is not None:
|
||||
job_repository = JobRepository(self.db)
|
||||
job_metric = JobMetric(
|
||||
job_name=self.job_name,
|
||||
batch_date=self.args.date,
|
||||
start_ts=self.start_ts,
|
||||
end_ts=self.end_ts,
|
||||
command=' '.join(sys.argv),
|
||||
success=self.success
|
||||
)
|
||||
job_id = job_repository.add(job_metric)
|
||||
self.log.info('Job ID: ' + job_id)
|
Loading…
Reference in New Issue