Commit 9045cc60 authored by m.bottaccio@bopen.eu's avatar m.bottaccio@bopen.eu
Browse files

Merge branch '970-fix-kill' into 'development'

Ensure dask scheduler url is always extracted from config and logger handlers...

See merge request data-tailor/data-tailor!607
parents 97f684a0 569b9a7f
......@@ -593,9 +593,9 @@ def manage_process(
# using Dask, the process is no longer the atomic task. Dask futures are instead.
# in fact, single pid can be associate to multiple customisations and therefore it can be
# omitted process_uuid is the atomic id for dask futures instead
dask_scheduler = config["epcs"].get("dask_scheduler_url")
if future is not None:
dask_helpers.cancel_future(logfile_path, future, **kwargs)
dask_scheduler = kwargs.get("config", {}).get("epcs", {}).get("dask_scheduler_url")
dask_helpers.cancel_future(logfile_path, future, dask_scheduler)
if dask_scheduler is None and pid is not None:
process = psutil.Process(pid)
for child in process.children(recursive=True):
......
......@@ -73,7 +73,7 @@ def ensure_dask_clean_client(dask_scheduler=None, logger=None, **kwargs):
return client
def cancel_future(logfile_path, future, **kwargs):
def cancel_future(logfile_path, future, dask_scheduler):
"""
:param logfile_path:
......@@ -82,7 +82,6 @@ def cancel_future(logfile_path, future, **kwargs):
:return:
"""
logger = log_reader.create_logger(f"PROCESSING-{future}")
dask_scheduler = kwargs.get("config", {}).get("epcs", {}).get("dask_scheduler_url")
client = ensure_dask_client(dask_scheduler, logger)
distributed.Future(future, client).cancel()
cancel_keys = distributed.Variable("cancel_keys").get()
......
......@@ -344,7 +344,7 @@ def read_pid_from_log(logfile_path):
with open(logfile_path, "r") as log_file:
stream = log_file.read().splitlines()
string_to_find = "INFO - PID: "
string_for_future = "INFO - FUTURE: "
string_for_future = "- FUTURE: "
for line in stream:
if string_to_find in line:
pid = int(line.split(string_to_find)[1])
......@@ -443,3 +443,7 @@ def _add_sent_signal_to_logfile(logfile_path, signal, timeout=0, **kwargs):
if timeout > 0:
logger.info(f"Max execution time exceeded: {timeout} s. Cancelling customisation.")
logger.info(f"Sent signal: {signal.upper()}")
for h in logger.handlers:
h.flush()
h.close()
logger = None
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment