From b7c5039416edb3aa927dda7c311fc3a72def490e Mon Sep 17 00:00:00 2001 From: Ashesh Vashi Date: Sat, 4 Feb 2017 15:26:57 +0100 Subject: [PATCH] Fix process execution. Fixes #1679. Fixes #2144. Re-engineer the background process executor, to avoid using sqlite as some builds of components it relies on do not support working in forked children. --- web/pgadmin/misc/bgprocess/__init__.py | 23 +- .../misc/bgprocess/process_executor.py | 559 +++++++++++------- web/pgadmin/misc/bgprocess/processes.py | 270 ++++++--- .../misc/bgprocess/static/css/bgprocess.css | 26 + .../misc/bgprocess/static/js/bgprocess.js | 124 ++-- 5 files changed, 625 insertions(+), 377 deletions(-) diff --git a/web/pgadmin/misc/bgprocess/__init__.py b/web/pgadmin/misc/bgprocess/__init__.py index 0587e13c7..80dbcfbed 100644 --- a/web/pgadmin/misc/bgprocess/__init__.py +++ b/web/pgadmin/misc/bgprocess/__init__.py @@ -15,7 +15,7 @@ from flask import url_for from flask_babel import gettext as _ from flask_security import login_required from pgadmin.utils import PgAdminModule -from pgadmin.utils.ajax import make_response, gone, bad_request, success_return +from pgadmin.utils.ajax import make_response, gone, success_return from .processes import BatchProcess @@ -47,7 +47,6 @@ class BGProcessModule(PgAdminModule): """ return { 'bgprocess.index': url_for("bgprocess.index"), - 'bgprocess.list': url_for("bgprocess.list"), 'seconds': _('seconds'), 'started': _('Started'), 'START_TIME': _('Start time'), @@ -55,7 +54,8 @@ class BGProcessModule(PgAdminModule): 'EXECUTION_TIME': _('Execution time'), 'running': _('Running...'), 'successfully_finished': _("Successfully completed."), - 'failed_with_exit_code': _("Failed (exit code: %s).") + 'failed_with_exit_code': _("Failed (exit code: %s)."), + 'BG_TOO_MANY_LOGS': _("Too many logs generated!") } @@ -65,14 +65,14 @@ blueprint = BGProcessModule( ) -@blueprint.route('/') +@blueprint.route('/', methods=['GET']) @login_required def index(): - return bad_request(errormsg=_('This URL can not be called directly.')) + return make_response(response=BatchProcess.list()) -@blueprint.route('/status//', methods=['GET']) -@blueprint.route('/status////', methods=['GET']) +@blueprint.route('/', methods=['GET']) +@blueprint.route('////', methods=['GET']) @login_required def status(pid, out=-1, err=-1): """ @@ -96,12 +96,7 @@ def status(pid, out=-1, err=-1): return gone(errormsg=str(lerr)) -@blueprint.route('/list/', methods=['GET']) -def list(): - return make_response(response=BatchProcess.list()) - - -@blueprint.route('/acknowledge//', methods=['PUT']) +@blueprint.route('/', methods=['PUT']) @login_required def acknowledge(pid): """ @@ -114,7 +109,7 @@ def acknowledge(pid): Positive status """ try: - BatchProcess.acknowledge(pid, True) + BatchProcess.acknowledge(pid) return success_return() except LookupError as lerr: diff --git a/web/pgadmin/misc/bgprocess/process_executor.py b/web/pgadmin/misc/bgprocess/process_executor.py index ef24822ed..3433f985b 100644 --- a/web/pgadmin/misc/bgprocess/process_executor.py +++ b/web/pgadmin/misc/bgprocess/process_executor.py @@ -23,66 +23,108 @@ This script will: database. Args: - process_id -- Process id - db_file -- Database file which holds list of processes to be executed - output_directory -- output directory + list of program and arguments passed to it. + +It also depends on the following environment variable for proper execution. +PROCID - Process-id +OUTDIR - Output directory """ from __future__ import print_function, unicode_literals # To make print function compatible with python2 & python3 import sys import os -import argparse -import sqlite3 -from datetime import datetime +from datetime import datetime, timedelta, tzinfo from subprocess import Popen, PIPE from threading import Thread -import csv -import pytz import codecs +import signal -# SQLite3 needs all string as UTF-8 -# We need to make string for Python2/3 compatible -if sys.version_info < (3,): - from cStringIO import StringIO +def log(msg): + if 'OUTDIR' not in os.environ: + return + + with open( + os.path.join(os.environ['OUTDIR'], ('log_%s' % os.getpid())), 'a' + ) as fp: + fp.write(('INFO:: %s\n' % str(msg))) - def u(x): - return x -else: - from io import StringIO +def log_exception(): + if 'OUTDIR' not in os.environ: + return + type_, value_, traceback_ = info=sys.exc_info() + + with open( + os.path.join(os.environ['OUTDIR'], ('log_%s' % os.getpid())), 'a' + ) as fp: + from traceback import format_exception + res = ''.join( + format_exception(type_, value_, traceback_) + ) + + fp.write('EXCEPTION::\n{0}'.format(res)) + return res - def u(x): - if hasattr(x, 'decode'): - return x.decode() - return x +IS_WIN = (os.name == 'nt') +ZERO = timedelta(0) +default_encoding = sys.getdefaultencoding() or "utf-8" -def usage(): +# Copied the 'UTC' class from the 'pytz' package to allow to run this script +# without any external dependent library, and can be used with any python +# version. +class UTC(tzinfo): + """UTC + + Optimized UTC implementation. It unpickles using the single module global + instance defined beneath this class declaration. """ - This function will display usage message. + zone = "UTC" - Args: - None + _utcoffset = ZERO + _dst = ZERO + _tzname = zone - Returns: - Displays help message - """ + def fromutc(self, dt): + if dt.tzinfo is None: + return self.localize(dt) + return super(UTC.__class__, self).fromutc(dt) - help_msg = """ -Usage: + def utcoffset(self, dt): + return ZERO -executer.py [-h|--help] - [-p|--process] Process ID - [-d|--db_file] SQLite3 database file path -""" - print(help_msg) + def tzname(self, dt): + return "UTC" + + def dst(self, dt): + return ZERO + + def localize(self, dt, is_dst=False): + '''Convert naive time to local time''' + if dt.tzinfo is not None: + raise ValueError('Not naive datetime (tzinfo is already set)') + return dt.replace(tzinfo=self) + + def normalize(self, dt, is_dst=False): + '''Correct the timezone information on the given datetime''' + if dt.tzinfo is self: + return dt + if dt.tzinfo is None: + raise ValueError('Naive time - no tzinfo set') + return dt.astimezone(self) + + def __repr__(self): + return "" + + def __str__(self): + return "UTC" def get_current_time(format='%Y-%m-%d %H:%M:%S.%f %z'): return datetime.utcnow().replace( - tzinfo=pytz.utc + tzinfo=UTC() ).strftime(format) @@ -93,35 +135,34 @@ class ProcessLogger(Thread): Methods: -------- - * __init__(stream_type, configs) + * __init__(stream_type) - This method is use to initlize the ProcessLogger class object - * logging(msg) - - This method is use to log messages in sqlite3 database + * log(msg) + - Log message in the orderly manner. * run() - Reads the stdout/stderr for messages and sent them to logger """ - def __init__(self, stream_type, configs): + def __init__(self, stream_type): """ This method is use to initialize the ProcessLogger class object Args: stream_type: Type of STD (std) - configs: Process details dict Returns: None """ Thread.__init__(self) - self.configs = configs self.process = None self.stream = None + self.encoding = default_encoding self.logger = codecs.open( os.path.join( - configs['output_directory'], stream_type - ), 'w', "utf-8" + os.environ['OUTDIR'], stream_type + ), 'w', self.encoding, "ignore" ) def attach_process_stream(self, process, stream): @@ -153,7 +194,8 @@ class ProcessLogger(Thread): if msg: self.logger.write( str('{0},{1}').format( - get_current_time(format='%Y%m%d%H%M%S%f'), u(msg) + get_current_time(format='%y%m%d%H%M%S%f'), + msg.decode(self.encoding, 'replace') ) ) return True @@ -176,44 +218,7 @@ class ProcessLogger(Thread): self.logger = None -def read_configs(data): - """ - This reads SQLite3 database and fetches process details - - Args: - data - configuration details - - Returns: - Process details fetched from database as a dict - """ - if data.db_file is not None and data.process_id is not None: - conn = sqlite3.connect(data.db_file) - c = conn.cursor() - t = (data.process_id,) - - c.execute('SELECT command, arguments FROM process WHERE \ - exit_code is NULL \ - AND pid=?', t) - - row = c.fetchone() - conn.close() - - if row and len(row) > 1: - configs = { - 'pid': data.process_id, - 'cmd': row[0], - 'args': row[1], - 'output_directory': data.output_directory, - 'db_file': data.db_file - } - return configs - else: - return None - else: - raise ValueError("Please verify pid and db_file arguments.") - - -def update_configs(kwargs): +def update_status(**kw): """ This function will updates process stats @@ -223,166 +228,268 @@ def update_configs(kwargs): Returns: None """ - if 'db_file' in kwargs and 'pid' in kwargs: - conn = sqlite3.connect(kwargs['db_file']) - sql = 'UPDATE process SET ' - params = list() + import json - for param in ['start_time', 'end_time', 'exit_code']: - if param in kwargs: - sql += (',' if len(params) else '') + param + '=? ' - params.append(kwargs[param]) - - if len(params) == 0: - return - - sql += 'WHERE pid=?' - params.append(kwargs['pid']) - - with conn: - c = conn.cursor() - c.execute(sql, params) - conn.commit() - - # Commit & close cursor - conn.close() + if os.environ['OUTDIR']: + status = { + k: v for k, v in kw.items() if k in [ + 'start_time', 'end_time', 'exit_code', 'pid' + ] + } + log('Updating the status:\n{0}'.format(json.dumps(status))) + with open(os.path.join(os.environ['OUTDIR'], 'status'), 'w') as fp: + json.dump(status, fp) else: raise ValueError("Please verify pid and db_file arguments.") -def execute(configs): +def execute(): """ This function will execute the background process - Args: - configs: Process configuration details - Returns: None """ - if configs is not None: - command = [configs['cmd']] - args_csv = StringIO(configs['args']) - args_reader = csv.reader(args_csv, delimiter=str(',')) - for args in args_reader: - command = command + args - args = { - 'pid': configs['pid'], - 'db_file': configs['db_file'] - } + command = sys.argv[1:] + args = dict() + log('Initialize the process execution: {0}'.format(command)) + # Create seprate thread for stdout and stderr + process_stdout = ProcessLogger('out') + process_stderr = ProcessLogger('err') + process = None + + try: + # update start_time + args.update({ + 'start_time': get_current_time(), + 'stdout': process_stdout.log, + 'stderr': process_stderr.log, + 'pid': os.getpid() + }) + + # Update start time + update_status(**args) + log('Status updated...') + + if 'PROCID' in os.environ and os.environ['PROCID'] in os.environ: + os.environ['PGPASSWORD'] = os.environ[os.environ['PROCID']] + + kwargs = dict() + kwargs['close_fds'] = False + kwargs['shell'] = True if IS_WIN else False + + # We need environment variables & values in string + log('Converting the environment variable in the bytes format...') + kwargs['env'] = convert_environment_variables(os.environ.copy()) + + log('Starting the command execution...') + process = Popen( + command, stdout=PIPE, stderr=PIPE, stdin=None, **kwargs + ) + + log('Attaching the loggers to stdout, and stderr...') + # Attach the stream to the process logger, and start logging. + process_stdout.attach_process_stream(process, process.stdout) + process_stdout.start() + process_stderr.attach_process_stream(process, process.stderr) + process_stderr.start() + + # Join both threads together + process_stdout.join() + process_stderr.join() + + log('Waiting for the process to finish...') + # Child process return code + exitCode = process.wait() + + if exitCode is None: + exitCode = process.poll() + + log('Process exited with code: {0}'.format(exitCode)) + args.update({'exit_code': exitCode}) + + # Add end_time + args.update({'end_time': get_current_time()}) + + # Fetch last output, and error from process if it has missed. + data = process.communicate() + if data: + if data[0]: + process_stdout.log(data[0]) + if data[1]: + process_stderr.log(data[1]) + + # If executable not found or invalid arguments passed + except OSError: + info = log_exception() + args.update({'exit_code': 500}) + if process_stderr: + process_stderr.log(info) + else: + print("WARNING: ", e.strerror, file=sys.stderr) + args.update({'end_time': get_current_time()}) + args.update({'exit_code': e.errno}) + + # Unknown errors + except Exception: + info = log_exception() + args.update({'exit_code': 501}) + if process_stderr: + process_stderr.log(info) + else: + print("WARNING: ", str(e), file=sys.stderr) + args.update({'end_time': get_current_time()}) + args.update({'exit_code': -1}) + finally: + # Update the execution end_time, and exit-code. + update_status(**args) + log('Exiting the process executor...') + if process_stderr: + process_stderr.release() + if process_stdout: + process_stdout.release() + log('Bye!') + + +# Let's ignore all the signal comming to us. +def signal_handler(signal, msg): + pass + + +def convert_environment_variables(env): + """ + This function is use to convert environment variable to string + because environment variable must be string in popen + :param env: Dict of environment variable + :return: Encoded environment variable as string + """ + temp_env = dict() + for key, value in env.items(): try: - reload(sys) - sys.setdefaultencoding('utf8') - except: - pass - - # Create seprate thread for stdout and stderr - process_stdout = ProcessLogger('out', configs) - process_stderr = ProcessLogger('err', configs) - - try: - # update start_time - args.update({ - 'start_time': get_current_time(), - 'stdout': process_stdout.log, - 'stderr': process_stderr.log - }) - - # Update start time - update_configs(args) - - if args['pid'] in os.environ: - os.environ['PGPASSWORD'] = os.environ[args['pid']] - - process = Popen( - command, stdout=PIPE, stderr=PIPE, stdin=PIPE, - shell=(os.name == 'nt'), close_fds=(os.name != 'nt') - ) - try: - del (os.environ['PGPASSWORD']) - except: - pass - - # Attach the stream to the process logger, and start logging. - process_stdout.attach_process_stream(process, process.stdout) - process_stdout.start() - process_stderr.attach_process_stream(process, process.stderr) - process_stderr.start() - - # Join both threads together - process_stdout.join() - process_stderr.join() - - # Child process return code - exitCode = process.wait() - - if exitCode is None: - exitCode = process.poll() - - args.update({'exit_code': exitCode}) - - # Add end_time - args.update({'end_time': get_current_time()}) - - # Fetch last output, and error from process if it has missed. - data = process.communicate() - if data: - if data[0]: - process_stdout.log(data[0]) - if data[1]: - process_stderr.log(data[1]) - - # If executable not found or invalid arguments passed - except OSError as e: - if process_stderr: - process_stderr.log(e.strerror) - else: - print("WARNING: ", e.strerror, file=sys.stderr) - args.update({'end_time': get_current_time()}) - args.update({'exit_code': e.errno}) - - # Unknown errors + if not isinstance(key, str): + key = key.encode(default_encoding) + if not isinstance(value, str): + value = value.encode(default_encoding) + temp_env[key] = value except Exception as e: - if process_stderr: - process_stderr.log(str(e)) - else: - print("WARNING: ", str(e), file=sys.stderr) - args.update({'end_time': get_current_time()}) - args.update({'exit_code': -1}) - finally: - # Update the execution end_time, and exit-code. - update_configs(args) - if process_stderr: - process_stderr.release() - process_stderr = None - if process_stdout: - process_stdout.release() - process_stdout = None - - else: - raise ValueError("Please verify process configs.") + log_exception() + return temp_env if __name__ == '__main__': - # Read command line arguments - parser = argparse.ArgumentParser( - description='Process executor for pgAdmin 4' - ) - parser.add_argument( - '-p', '--process_id', help='Process ID', required=True - ) - parser.add_argument( - '-d', '--db_file', help='Configuration Database', required=True - ) - parser.add_argument( - '-o', '--output_directory', - help='Location where the logs will be created', required=True - ) - args = parser.parse_args() + log('Starting the process executor...') - # Fetch bakcground process details from SQLite3 database file - configs = read_configs(args) + # Ignore any signals + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + log('Disabled the SIGINT, SIGTERM signals...') - # Execute the background process - execute(configs) + if IS_WIN: + log('Disable the SIGBREAKM signal (windows)...') + signal.signal(signal.SIGBREAK, signal_handler) + log('Disabled the SIGBREAKM signal (windows)...') + + # For windows: + # We would run the process_executor in the detached mode again to make + # the child process to run as a daemon. And, it would run without + # depending on the status of the web-server. + if 'PGA_BGP_FOREGROUND' in os.environ and \ + os.environ['PGA_BGP_FOREGROUND'] == "1": + log('[CHILD] Start process execution...') + log('Executing the command now from the detached child...') + # This is a child process running as the daemon process. + # Let's do the job assing to it. + execute() + else: + from subprocess import CREATE_NEW_PROCESS_GROUP + DETACHED_PROCESS = 0x00000008 + + # Forward the standard input, output, and error stream to the + # 'devnull'. + stdin = open(os.devnull, "r") + stdout = open(os.devnull, "a") + stderr = open(os.devnull, "a") + env = os.environ.copy() + env['PGA_BGP_FOREGROUND'] = "1" + + # We need environment variables & values in string + log('[PARENT] Converting the environment variable in the bytes format...') + try: + env = convert_environment_variables(env) + except Exception as e: + log_exception() + + kwargs = { + 'stdin': stdin.fileno(), + 'stdout': stdout.fileno(), + 'stderr': stderr.fileno(), + 'creationflags': CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS, + 'close_fds': False, + 'cwd': os.environ['OUTDIR'], + 'env': env + } + + cmd = [sys.executable] + cmd.extend(sys.argv) + + log('[PARENT] Command executings: {0}'.format(cmd)) + + p = Popen(cmd, **kwargs) + + exitCode = p.poll() + + if exitCode is not None: + log( + '[PARENT] Child exited with exit-code#{0}...'.format( + exitCode + ) + ) + else: + log('[PARENT] Started the child with PID#{0}'.format(p.pid)) + + # Question: Should we wait for sometime? + # Answer: Looks the case... + from time import sleep + sleep(2) + log('[PARENT] Exiting...') + sys.exit(0) + else: + r, w = os.pipe() + + # For POSIX: + # We will fork the process, and run the child process as daemon, and + # let it do the job. + if os.fork() == 0: + log('[CHILD] Forked the child process...') + # Hmm... So - I need to do the job now... + try: + os.close(r) + + log('[CHILD] Make the child process leader...') + # Let me be the process leader first. + os.setsid() + os.umask(0) + + log('[CHILD] Make the child process leader...') + w = os.fdopen(w, 'w') + # Let me inform my parent - I will do the job, do not worry + # now, and die peacefully. + log('[CHILD] Inform parent about successful child forking...') + w.write('1') + w.close() + + log('[CHILD] Start executing the background process...') + execute() + except Exception: + sys.exit(1) + else: + os.close(w) + r = os.fdopen(r) + # I do not care, what the child send. + r.read() + log('[PARENT] Got message from the child...') + r.close() + + log('[PARENT] Exiting...') + sys.exit(0) diff --git a/web/pgadmin/misc/bgprocess/processes.py b/web/pgadmin/misc/bgprocess/processes.py index 8470842ba..fd7021def 100644 --- a/web/pgadmin/misc/bgprocess/processes.py +++ b/web/pgadmin/misc/bgprocess/processes.py @@ -19,11 +19,11 @@ import sys from abc import ABCMeta, abstractproperty, abstractmethod from datetime import datetime from pickle import dumps, loads -from subprocess import Popen, PIPE +from subprocess import Popen import pytz from dateutil import parser -from flask import current_app as app +from flask import current_app from flask_babel import gettext as _ from flask_security import current_user @@ -154,21 +154,52 @@ class BatchProcess(object): self.ecode = None # Arguments + self.args = _args args_csv_io = StringIO() csv_writer = csv.writer( args_csv_io, delimiter=str(','), quoting=csv.QUOTE_MINIMAL ) csv_writer.writerow(_args) - self.args = args_csv_io.getvalue().strip(str('\r\n')) j = Process( - pid=int(id), command=_cmd, arguments=self.args, logdir=log_dir, - desc=dumps(self.desc), user_id=current_user.id + pid=int(id), command=_cmd, + arguments=args_csv_io.getvalue().strip(str('\r\n')), + logdir=log_dir, desc=dumps(self.desc), user_id=current_user.id ) db.session.add(j) db.session.commit() def start(self): + + def which(program, paths): + def is_exe(fpath): + return os.path.exists(fpath) and os.access(fpath, os.X_OK) + + for path in paths: + if not os.path.isdir(path): + continue + exe_file = os.path.join(path, program) + if is_exe(exe_file): + return exe_file + return None + + def convert_environment_variables(env): + """ + This function is use to convert environment variable to string + because environment variable must be string in popen + :param env: Dict of environment variable + :return: Encoded environment variable as string + """ + encoding = sys.getdefaultencoding() + temp_env = dict() + for key, value in env.items(): + if not isinstance(key, str): + key = key.encode(encoding) + if not isinstance(value, str): + value = value.encode(encoding) + temp_env[key] = value + return temp_env + if self.stime is not None: if self.etime is None: raise Exception(_('The process has already been started.')) @@ -179,21 +210,63 @@ class BatchProcess(object): executor = os.path.join( os.path.dirname(__file__), 'process_executor.py' ) + paths = sys.path[:] + interpreter = None + + if os.name == 'nt': + paths.insert(0, os.path.join(sys.prefix, 'Scripts')) + paths.insert(0, os.path.join(sys.prefix)) + + interpreter = which('pythonw.exe', paths) + if interpreter is None: + interpreter = which('python.exe', paths) + else: + paths.insert(0, os.path.join(sys.prefix, 'bin')) + interpreter = which('python', paths) p = None cmd = [ - (sys.executable if not app.PGADMIN_RUNTIME else - 'pythonw.exe' if os.name == 'nt' else 'python'), - executor, - '-p', self.id, - '-o', self.log_dir, - '-d', config.SQLITE_PATH + interpreter if interpreter is not None else 'python', + executor, self.cmd ] + cmd.extend(self.args) + + command = [] + for c in cmd: + command.append(str(c)) + + current_app.logger.info( + "Executing the process executor with the arguments: %s", + ' '.join(command) + ) + cmd = command + + # Make a copy of environment, and add new variables to support + env = os.environ.copy() + env['PROCID'] = self.id + env['OUTDIR'] = self.log_dir + env['PGA_BGP_FOREGROUND'] = "1" + + # We need environment variables & values in string + env = convert_environment_variables(env) if os.name == 'nt': + DETACHED_PROCESS = 0x00000008 + from subprocess import CREATE_NEW_PROCESS_GROUP + + # We need to redirect the standard input, standard output, and + # standard error to devnull in order to allow it start in detached + # mode on + stdout = os.devnull + stderr = stdout + stdin = open(os.devnull, "r") + stdout = open(stdout, "a") + stderr = open(stderr, "a") + p = Popen( - cmd, stdout=None, stderr=None, stdin=None, close_fds=True, - shell=False, creationflags=0x00000008 + cmd, close_fds=False, env=env, stdout=stdout.fileno(), + stderr=stderr.fileno(), stdin=stdin.fileno(), + creationflags=(CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS) ) else: def preexec_function(): @@ -204,15 +277,19 @@ class BatchProcess(object): signal.signal(signal.SIGINT, signal.SIG_IGN) p = Popen( - cmd, stdout=PIPE, stderr=None, stdin=None, close_fds=True, - shell=False, preexec_fn=preexec_function + cmd, close_fds=True, stdout=None, stderr=None, stdin=None, + preexec_fn=preexec_function, env=env ) self.ecode = p.poll() - if self.ecode is not None and self.ecode != 0: - # TODO:// Find a way to read error from detached failed process - # Couldn't start execution + # Execution completed immediately. + # Process executor can not update the status, if it was not able to + # start properly. + if self.ecode is not None and self.ecode != 0: + # There is no way to find out the error message from this process + # as standard output, and standard error were redirected to + # devnull. p = Process.query.filter_by( pid=self.id, user_id=current_user.id ).first() @@ -222,54 +299,49 @@ class BatchProcess(object): db.session.commit() def status(self, out=0, err=0): - import codecs + import re + ctime = get_current_time(format='%Y%m%d%H%M%S%f') stdout = [] stderr = [] out_completed = err_completed = False process_output = (out != -1 and err != -1) + enc = sys.getdefaultencoding() - def read_log(logfile, log, pos, ctime, check=True): + def read_log(logfile, log, pos, ctime): completed = True - lines = 0 + idx = 0 + c = re.compile(r"(\d+),(.*$)") if not os.path.isfile(logfile): return 0, False - with codecs.open(logfile, 'r', 'utf-8') as stream: - stream.seek(pos) - for line in stream: - logtime = StringIO() - idx = 0 - for c in line: - idx += 1 - if c == ',': - break - logtime.write(c) - logtime = logtime.getvalue() - - if check and logtime > ctime: + with open(logfile, 'rb') as f: + eofs = os.fstat(f.fileno()).st_size + f.seek(pos, 0) + while pos < eofs: + idx += 1 + line = f.readline() + line = line.decode(enc, 'replace') + r = c.split(line) + if r[1] > ctime: completed = False break - if lines == 5120: - ctime = logtime + log.append([r[1], r[2]]) + pos = f.tell() + if idx == 1024: completed = False break - - lines += 1 - log.append([logtime, line[idx:]]) - pos = stream.tell() + if pos == eofs: + completed = True + break return pos, completed if process_output: - out, out_completed = read_log( - self.stdout, stdout, out, ctime, True - ) - err, err_completed = read_log( - self.stderr, stderr, err, ctime, True - ) + out, out_completed = read_log(self.stdout, stdout, out, ctime) + err, err_completed = read_log(self.stderr, stderr, err, ctime) j = Process.query.filter_by( pid=self.id, user_id=current_user.id @@ -278,6 +350,9 @@ class BatchProcess(object): execution_time = None if j is not None: + status, updated = BatchProcess.update_process_info(j) + if updated: + db.session.commit() self.stime = j.start_time self.etime = j.end_time self.ecode = j.exit_code @@ -289,19 +364,16 @@ class BatchProcess(object): execution_time = (etime - stime).total_seconds() if process_output and self.ecode is not None and ( - len(stdout) + len(stderr) < 3073 + len(stdout) + len(stderr) < 1024 ): - out, out_completed = read_log( - self.stdout, stdout, out, ctime, False - ) - err, err_completed = read_log( - self.stderr, stderr, err, ctime, False - ) + out, out_completed = read_log(self.stdout, stdout, out, ctime) + err, err_completed = read_log(self.stderr, stderr, err, ctime) else: out_completed = err_completed = False if out == -1 or err == -1: return { + 'start_time': self.stime, 'exit_code': self.ecode, 'execution_time': execution_time } @@ -309,18 +381,67 @@ class BatchProcess(object): return { 'out': {'pos': out, 'lines': stdout, 'done': out_completed}, 'err': {'pos': err, 'lines': stderr, 'done': err_completed}, + 'start_time': self.stime, 'exit_code': self.ecode, 'execution_time': execution_time } + @staticmethod + def update_process_info(p): + if p.start_time is None or p.end_time is None: + status = os.path.join(p.logdir, 'status') + if not os.path.isfile(status): + return False, False + + with open(status, 'r') as fp: + import json + try: + data = json.load(fp) + + # First - check for the existance of 'start_time'. + if 'start_time' in data and data['start_time']: + p.start_time = data['start_time'] + + # We can't have 'exit_code' without the 'start_time' + if 'exit_code' in data and \ + data['exit_code'] is not None: + p.exit_code = data['exit_code'] + + # We can't have 'end_time' without the 'exit_code'. + if 'end_time' in data and data['end_time']: + p.end_time = data['end_time'] + + return True, True + + except ValueError as e: + current_app.logger.warning( + _("Status for the background process '{0}' couldn't be loaded!").format( + p.pid + ) + ) + current_app.logger.exception(e) + return False, False + return True, False + @staticmethod def list(): processes = Process.query.filter_by(user_id=current_user.id) + changed = False res = [] for p in processes: - if p.start_time is None or p.acknowledge is not None: + status, updated = BatchProcess.update_process_info(p) + if not status: continue + + if not changed: + changed = updated + + if p.start_time is None or ( + p.acknowledge is not None and p.end_time is None + ): + continue + execution_time = None stime = parser.parse(p.start_time) @@ -350,10 +471,20 @@ class BatchProcess(object): 'execution_time': execution_time }) + if changed: + db.session.commit() + return res @staticmethod - def acknowledge(_pid, _release): + def acknowledge(_pid): + """ + Acknowledge from the user, he/she has alredy watched the status. + + Update the acknowledgement status, if the process is still running. + And, delete the process information from the configuration, and the log + files related to the process, if it has already been completed. + """ p = Process.query.filter_by( user_id=current_user.id, pid=_pid ).first() @@ -363,33 +494,12 @@ class BatchProcess(object): _("Could not find a process with the specified ID.") ) - if _release: - import shutil - shutil.rmtree(p.logdir, True) + if p.end_time is not None: + logdir = p.logdir db.session.delete(p) + import shutil + shutil.rmtree(logdir, True) else: p.acknowledge = get_current_time() db.session.commit() - - @staticmethod - def release(pid=None): - import shutil - processes = None - - if pid is not None: - processes = Process.query.filter_by( - user_id=current_user.id, pid=pid - ) - else: - processes = Process.query.filter_by( - user_id=current_user.id, - acknowledge=None - ) - - if processes: - for p in processes: - shutil.rmtree(p.logdir, True) - - db.session.delete(p) - db.session.commit() diff --git a/web/pgadmin/misc/bgprocess/static/css/bgprocess.css b/web/pgadmin/misc/bgprocess/static/css/bgprocess.css index 2767772e6..0838f0a7e 100644 --- a/web/pgadmin/misc/bgprocess/static/css/bgprocess.css +++ b/web/pgadmin/misc/bgprocess/static/css/bgprocess.css @@ -20,6 +20,7 @@ margin-top: 0px; margin-bottom: 5px; padding: 5px; + padding-right: 20px; white-space: pre-wrap; text-align: center; border-top-left-radius: 5px; @@ -165,3 +166,28 @@ ol.pg-bg-process-logs { .bg-process-footer .bg-process-exec-time { padding-right: 0; } + +.pg-bg-bgprocess .ajs-commands { + right: -13px; + top: 2px; + opacity: 0.5; +} + +.pg-bg-bgprocess .bg-close { + display: inline-block; + position: absolute; + height: 25px; + width: 25px; + right: -12px; + top: 3px; + padding: 2px; + border: 2px solid #1f5fa6; + border-radius: 4px; + opacity: 0.5; + background-color: white; + color: red; +} + +.pg-bg-bgprocess:hover .bg-close { + opacity: 0.95; +} diff --git a/web/pgadmin/misc/bgprocess/static/js/bgprocess.js b/web/pgadmin/misc/bgprocess/static/js/bgprocess.js index 122a30652..cccd6a2f0 100644 --- a/web/pgadmin/misc/bgprocess/static/js/bgprocess.js +++ b/web/pgadmin/misc/bgprocess/static/js/bgprocess.js @@ -1,6 +1,7 @@ -define( - ['underscore', 'underscore.string', 'jquery', 'pgadmin.browser', 'alertify', 'pgadmin.browser.messages'], -function(_, S, $, pgBrowser, alertify, pgMessages) { +define([ + 'pgadmin', 'underscore', 'underscore.string', 'jquery', 'pgadmin.browser', + 'alertify', 'pgadmin.browser.messages' +], function(pgAdmin, _, S, $, pgBrowser, alertify, pgMessages) { pgBrowser.BackgroundProcessObsorver = pgBrowser.BackgroundProcessObsorver || {}; @@ -34,8 +35,9 @@ function(_, S, $, pgBrowser, alertify, pgMessages) { exit_code: null, acknowledge: info['acknowledge'], execution_time: null, - out: null, - err: null, + out: -1, + err: -1, + lot_more: false, notifier: null, container: null, @@ -69,23 +71,16 @@ function(_, S, $, pgBrowser, alertify, pgMessages) { }, url: function(type) { - var base_url = pgMessages['bgprocess.index'], - url = base_url; + var url = S('%s%s').sprintf(pgMessages['bgprocess.index'], this.id).value(); switch (type) { case 'status': - url = S('%sstatus/%s/').sprintf(base_url, this.id).value(); - if (this.details) { - url = S('%s%s/%s/').sprintf( - url, (this.out && this.out.pos) || 0, - (this.err && this.err.pos) || 0 + if (this.details && this.out != -1 && this.err != -1) { + url = S('%s/%s/%s/').sprintf( + url, this.out, this.err ).value(); } break; - case 'info': - case 'acknowledge': - url = S('%s%s/%s/').sprintf(base_url, type, this.id).value(); - break; } return url; @@ -114,53 +109,49 @@ function(_, S, $, pgBrowser, alertify, pgMessages) { if ('out' in data) { self.out = data.out && data.out.pos; - self.completed = data.out.done; if (data.out && data.out.lines) { - data.out.lines.sort(function(a, b) { return a[0] < b[0]; }); out = data.out.lines; } } if ('err' in data) { self.err = data.err && data.err.pos; - self.completed = (self.completed && data.err.done); if (data.err && data.err.lines) { - data.err.lines.sort(function(a, b) { return a[0] < b[0]; }); err = data.err.lines; } } + self.completed = self.completed || ( + 'err' in data && 'out' in data && data.err.done && data.out.done + ) || ( + !self.details && !_.isNull(self.exit_code) + ); - var io = ie = 0; + var io = ie = 0, res = [], + escapeEl = document.createElement('textarea'), + escapeHTML = function(html) { + escapeEl.textContent = html; + return escapeEl.innerHTML; + }; - while (io < out.length && ie < err.length && - self.logs[0].children.length < 5120) { - if (out[io][0] < err[ie][0]){ - self.logs.append( - $('
  • ', {class: 'pg-bg-res-out'}).text(out[io++][1]) - ); + while (io < out.length && ie < err.length) { + if (pgAdmin.natural_sort(out[io][0], err[ie][0]) <= 0){ + res.push('
  • ' + escapeHTML(out[io++][1]) + '
  • '); } else { - self.logs.append( - $('
  • ', {class: 'pg-bg-res-err'}).text(err[ie++][1]) - ); + res.push('
  • ' + escapeHTML(err[ie++][1]) + '
  • '); } } - while (io < out.length && self.logs[0].children.length < 5120) { - self.logs.append( - $('
  • ', {class: 'pg-bg-res-out'}).text(out[io++][1]) - ); + while (io < out.length) { + res.push('
  • ' + escapeHTML(out[io++][1]) + '
  • '); } - while (ie < err.length && self.logs[0].children.length < 5120) { - self.logs.append( - $('
  • ', {class: 'pg-bg-res-err'}).text(err[ie++][1]) - ); + while (ie < err.length) { + res.push('
  • ' + escapeHTML(err[ie++][1]) + '
  • '); } - - if (self.logs[0].children.length >= 5120) { - self.completed = true; + if (res.length) { + self.logs.append(res.join('')); } if (self.stime) { @@ -197,7 +188,7 @@ function(_, S, $, pgBrowser, alertify, pgMessages) { setTimeout(function() {self.show.apply(self)}, 10); } - if (self.state != 2 || (self.details && !self.completed)) { + if (!self.completed) { setTimeout( function() { self.status.apply(self); @@ -232,12 +223,11 @@ function(_, S, $, pgBrowser, alertify, pgMessages) { if (self.notify && !self.details) { if (!self.notifier) { - var content = $('
    ').append( - $('
    ', { - class: "h5 pg-bg-notify-header" - }).text( - self.desc - ) + var header = $('
    ', { + class: "h5 pg-bg-notify-header" + }).append($('').text(self.desc)), + content = $('
    ').append( + header ).append( $('
    ', {class: 'pg-bg-notify-body h6' }).append( $('
    ', {class: 'pg-bg-start col-xs-12' }).append( @@ -249,12 +239,17 @@ function(_, S, $, pgBrowser, alertify, pgMessages) { ), for_details = $('
    ', { class: "col-xs-12 text-center pg-bg-click h6" - }).text(pgMessages.CLICK_FOR_DETAILED_MSG).appendTo(content), + }).append( + $('').text(pgMessages.CLICK_FOR_DETAILED_MSG) + ).appendTo(content), status = $('
    ', { class: "pg-bg-status col-xs-12 h5 " + ((self.exit_code === 0) ? 'bg-success': (self.exit_code == 1) ? 'bg-failed' : '') - }).appendTo(content); + }).appendTo(content), + close_me = $( + '
    ' + ).appendTo(header); self.container = content; self.notifier = alertify.notify( @@ -268,10 +263,17 @@ function(_, S, $, pgBrowser, alertify, pgMessages) { this.notifier.dismiss(); this.notifier = null; + this.completed = false; this.show_detailed_view.apply(this); }.bind(self)); + close_me.on('click', function(ev) { + this.notifier.dismiss(); + this.notifier = null; + this.acknowledge_server.apply(this); + }.bind(this)); + // Do not close the notifier, when clicked on the container, which // is a default behaviour. content.on('click', function(ev) { @@ -351,6 +353,8 @@ function(_, S, $, pgBrowser, alertify, pgMessages) { if (is_new) { self.details = true; + self.err = 0; + self.out = 0; setTimeout( function() { self.status.apply(self); @@ -419,28 +423,26 @@ function(_, S, $, pgBrowser, alertify, pgMessages) { function() { setTimeout( function() { - pgBrowser.BackgroundProcessObsorver.update_process_list(); + pgBrowser.BackgroundProcessObsorver.update_process_list(true); }, 1000 ); } ) }, - update_process_list: function() { + update_process_list: function(recheck) { var observer = this; $.ajax({ typs: 'GET', timeout: 30000, - url: pgMessages['bgprocess.list'], + url: pgMessages['bgprocess.index'], cache: false, async: true, contentType: "application/json", success: function(res) { - if (!res) { - // FIXME:: - // Do you think - we should call the list agains after some - // interval? + var cnt = 0; + if (!res || !_.isArray(res)) { return; } for (idx in res) { @@ -451,6 +453,14 @@ function(_, S, $, pgBrowser, alertify, pgMessages) { } } } + if (recheck && res.length == 0) { + // Recheck after some more time + setTimeout( + function() { + observer.update_process_list(false); + }, 3000 + ); + } }, error: function(res) { // FIXME:: What to do now?