#!/usr/bin/env python3
"""Arrakis single-field pipeline"""
from __future__ import annotations
import argparse
import logging
import os
from importlib import resources
from pathlib import Path
import configargparse
import yaml
from astropy.time import Time
from prefect import flow
from prefect.task_runners import BaseTaskRunner
from prefect_dask import DaskTaskRunner
from arrakis import (
cleanup,
cutout,
frion,
imager,
linmos,
makecat,
rmclean_oncuts,
rmsynth_oncuts,
validate,
)
from arrakis.logger import UltimateHelpFormatter, logger
from arrakis.utils.database import test_db
from arrakis.utils.pipeline import generic_parser, logo_str, workdir_arg_parser
from arrakis.validate import validation_parser
@flow(name="Combining+Synthesis on Arrakis", retries=3, retry_delay_seconds=600)
[docs]
def process_spice(args, host: str, task_runner: BaseTaskRunner) -> None:
"""Workflow to process the SPICE-RACS data
Args:
args (configargparse.Namespace): Configuration parameters for this run
host (str): Host address of the mongoDB.
"""
outfile = f"{args.field}.pipe.test.fits" if args.outfile is None else args.outfile
previous_future = None
previous_future = (
cutout.cutout_islands.with_options(
task_runner=task_runner,
)(
field=args.field,
directory=args.datadir,
host=host,
epoch=args.epoch,
sbid=args.sbid,
username=args.username,
password=args.password,
pad=args.pad,
stokeslist=["I", "Q", "U"],
dryrun=args.dryrun,
limit=args.limit,
)
if not args.skip_cutout
else previous_future
)
previous_future = (
linmos.main.with_options(
task_runner=task_runner,
)(
field=args.field,
datadir=Path(args.datadir),
host=host,
epoch=args.epoch,
sbid=args.sbid,
holofile=Path(args.holofile),
username=args.username,
password=args.password,
yanda=args.yanda,
yanda_img=args.yanda_image,
stokeslist=["I", "Q", "U"],
limit=args.limit,
)
if not args.skip_linmos
else previous_future
)
previous_future = (
frion.main.with_options(task_runner=task_runner)(
field=args.field,
outdir=args.datadir,
host=host,
epoch=args.epoch,
sbid=args.sbid,
username=args.username,
password=args.password,
database=args.database,
ionex_server=args.ionex_server,
ionex_prefix=args.ionex_prefix,
ionex_proxy_server=args.ionex_proxy_server,
ionex_formatter=args.ionex_formatter,
ionex_predownload=args.ionex_predownload,
limit=args.limit,
)
if not args.skip_frion
else previous_future
)
previous_future = (
rmsynth_oncuts.main.with_options(task_runner=task_runner)(
field=args.field,
outdir=Path(args.datadir),
host=args.host,
epoch=args.epoch,
sbid=args.sbid,
username=args.username,
password=args.password,
dimension=args.dimension,
database=args.database,
limit=args.limit,
savePlots=args.save_plots,
weightType=args.weight_type,
fitRMSF=args.fit_rmsf,
phiMax_radm2=args.phi_max,
dPhi_radm2=args.dphi,
nSamples=args.n_samples,
polyOrd=args.poly_ord,
noStokesI=args.no_stokes_i,
showPlots=args.show_plots,
not_RMSF=args.not_rmsf,
rm_verbose=args.rm_verbose,
debug=args.debug,
fit_function=args.fit_function,
tt0=args.tt0,
tt1=args.tt1,
ion=True if not args.skip_frion else False,
do_own_fit=args.do_own_fit,
)
if not args.skip_rmsynth
else previous_future
)
previous_future = (
rmclean_oncuts.main.with_options(task_runner=task_runner)(
field=args.field,
outdir=args.datadir,
host=host,
epoch=args.epoch,
sbid=args.sbid,
username=args.username,
password=args.password,
dimension=args.dimension,
database=args.database,
limit=args.limit,
cutoff=args.cutoff,
maxIter=args.max_iter,
gain=args.gain,
window=args.window,
rm_verbose=args.rm_verbose,
)
if not args.skip_rmclean
else previous_future
)
previous_future = (
makecat.main.with_options(task_runner=task_runner)(
field=args.field,
host=host,
epoch=args.epoch,
sbid=args.sbid,
username=args.username,
password=args.password,
outfile=outfile,
)
if not args.skip_cat
else previous_future
)
previous_future = (
validate.main.with_options(task_runner=task_runner)(
catalogue_path=Path(args.outfile),
npix=args.npix,
map_size=args.map_size,
snr_cut=args.leakage_snr,
bins=args.leakage_bins * 2,
)
if not args.skip_validate
else previous_future
)
previous_future = (
cleanup.main.with_options(task_runner=task_runner)(
datadir=args.datadir,
)
if not args.skip_cleanup
else previous_future
)
[docs]
def save_args(args: configargparse.Namespace) -> Path:
"""Helper function to create a record of the input configuration arguments that
govern the pipeline instance
Args:
args (configargparse.Namespace): Supplied arguments for the Arrakis pipeline instance
Returns:
Path: Output path of the saved file
"""
args_yaml = yaml.dump(vars(args))
args_yaml_f = os.path.abspath(f"{args.field}-config-{Time.now().fits}.yaml")
logger.info(f"Saving config to '{args_yaml_f}'")
with open(args_yaml_f, "w") as f:
f.write(args_yaml)
return Path(args_yaml_f)
[docs]
def create_dask_runner(
dask_config: str,
overload: bool = False,
) -> DaskTaskRunner:
"""Create a DaskTaskRunner
Args:
dask_config (str): Configuration file for the DaskTaskRunner
overload (bool, optional): Overload the options for threadded work. Defaults to False.
Returns:
DaskTaskRunner: The prefect DaskTaskRunner instance
"""
logger.setLevel(logging.INFO)
logger.info("Creating a Dask Task Runner.")
if dask_config is None:
config_dir = resources.files("arrakis.configs")
dask_config = config_dir / "default.yaml"
with open(dask_config) as f:
logger.info(f"Loading {dask_config}")
yaml_config: dict = yaml.safe_load(f)
cluster_class_str = yaml_config.get("cluster_class", "distributed.LocalCluster")
cluster_kwargs = yaml_config.get("cluster_kwargs", {})
adapt_kwargs = yaml_config.get("adapt_kwargs", {})
if overload:
logger.info("Overwriting config attributes.")
cluster_kwargs["job_cpu"] = cluster_kwargs["cores"]
cluster_kwargs["cores"] = 1
cluster_kwargs["processes"] = 1
config = {
"cluster_class": cluster_class_str,
"cluster_kwargs": cluster_kwargs,
"adapt_kwargs": adapt_kwargs,
}
return DaskTaskRunner(**config)
[docs]
def main(args: configargparse.Namespace) -> None:
"""Main script
Args:
args (configargparse.Namespace): Command line arguments.
"""
host = args.host
# Lets save the args as a record for the ages
output_args_path = save_args(args)
logger.info(f"Saved arguments to {output_args_path}.")
if not args.imager_only:
# Test the mongoDB
test_db(
host=host,
username=args.username,
password=args.password,
)
if not args.skip_imager:
# This is the client for the imager component of the arrakis
# pipeline.
dask_runner = create_dask_runner(
dask_config=args.imager_dask_config,
overload=True,
)
logger.info("Obtained DaskTaskRunner, executing the imager workflow. ")
imager.main.with_options(
name=f"Arrakis Imaging -- {args.sbid}:{args.field}", task_runner=dask_runner
)(
msdir=args.msdir,
out_dir=args.datadir,
num_beams=args.num_beams,
temp_dir_wsclean=args.temp_dir_wsclean,
temp_dir_images=args.temp_dir_images,
cutoff=args.psf_cutoff,
robust=args.robust,
pols=args.pols,
nchan=args.nchan,
local_rms=args.local_rms,
local_rms_window=args.local_rms_window,
size=args.size,
scale=args.scale,
mgain=args.mgain,
niter=args.niter,
nmiter=args.nmiter,
auto_mask=args.auto_mask,
force_mask_rounds=args.force_mask_rounds,
auto_threshold=args.auto_threshold,
minuv=args.minuv,
purge=args.purge,
taper=args.taper,
parallel_deconvolution=args.parallel,
gridder=args.gridder,
wsclean_path=(
Path(args.local_wsclean) if args.local_wsclean else args.hosted_wsclean
),
multiscale=args.multiscale,
multiscale_scale_bias=args.multiscale_scale_bias,
absmem=args.absmem,
ms_glob_pattern=args.ms_glob_pattern,
data_column=args.data_column,
skip_fix_ms=args.skip_fix_ms,
no_mf_weighting=args.no_mf_weighting,
disable_pol_local_rms=args.disable_pol_local_rms,
disable_pol_force_mask_rounds=args.disable_pol_force_mask_rounds,
)
client = dask_runner._client
if client is not None:
client.close()
del dask_runner
else:
logger.warn("Skipping the image creation step. ")
if args.imager_only:
logger.info("Not running any stages after the imager. ")
return
# This is the client and pipeline for the RM extraction
dask_runner_2 = create_dask_runner(
dask_config=args.dask_config,
)
# Define flow
process_spice.with_options(
name=f"Arrakis Synthesis -- {args.sbid}:{args.field}",
)(args, host, dask_runner_2)
[docs]
def pipeline_parser(parent_parser: bool = False) -> argparse.ArgumentParser:
descStr = f"""
{logo_str}
Arrakis pipeline.
Before running make sure to start a session of mongodb e.g.
$ mongod --dbpath=/path/to/database --bind_ip $(hostname -i)
"""
# Parse the command line options
pipeline_parser = argparse.ArgumentParser(
add_help=not parent_parser,
description=descStr,
formatter_class=UltimateHelpFormatter,
)
parser = pipeline_parser.add_argument_group("pipeline arguments")
parser.add_argument(
"--dask_config",
type=str,
default=None,
help="Config file for Dask SlurmCLUSTER.",
)
parser.add_argument(
"--imager_dask_config",
type=str,
default=None,
help="Config file for Dask SlurmCLUSTER.",
)
parser.add_argument(
"--imager_only",
action="store_true",
help="Only run the imager component of the pipeline. ",
)
parser.add_argument(
"--skip_imager", action="store_true", help="Skip imaging stage."
)
parser.add_argument("--skip_cutout", action="store_true", help="Skip cutout stage.")
parser.add_argument("--skip_linmos", action="store_true", help="Skip LINMOS stage.")
parser.add_argument("--skip_frion", action="store_true", help="Skip cleanup stage.")
parser.add_argument(
"--skip_rmsynth", action="store_true", help="Skip RM Synthesis stage."
)
parser.add_argument(
"--skip_rmclean", action="store_true", help="Skip RM-CLEAN stage."
)
parser.add_argument("--skip_cat", action="store_true", help="Skip catalogue stage.")
parser.add_argument(
"--skip_validate", action="store_true", help="Skip validation stage."
)
parser.add_argument(
"--skip_cleanup", action="store_true", help="Skip cleanup stage."
)
return pipeline_parser
[docs]
def cli() -> None:
"""Command-line interface"""
# Help string to be shown using the -h option
pipe_parser = pipeline_parser(parent_parser=True)
work_parser = workdir_arg_parser(parent_parser=True)
gen_parser = generic_parser(parent_parser=True)
imager_parser = imager.imager_parser(parent_parser=True)
cutout_parser = cutout.cutout_parser(parent_parser=True)
linmos_parser = linmos.linmos_parser(parent_parser=True)
fr_parser = frion.frion_parser(parent_parser=True)
common_parser = rmsynth_oncuts.rm_common_parser(parent_parser=True)
synth_parser = rmsynth_oncuts.rmsynth_parser(parent_parser=True)
rmclean_parser = rmclean_oncuts.clean_parser(parent_parser=True)
catalogue_parser = makecat.cat_parser(parent_parser=True)
val_parser = validation_parser(parent_parser=True)
clean_parser = cleanup.cleanup_parser(parent_parser=True)
# Parse the command line options
parser = configargparse.ArgParser(
description=pipe_parser.description,
formatter_class=UltimateHelpFormatter,
parents=[
pipe_parser,
work_parser,
gen_parser,
imager_parser,
cutout_parser,
linmos_parser,
fr_parser,
common_parser,
synth_parser,
rmclean_parser,
catalogue_parser,
val_parser,
clean_parser,
],
)
parser.add("--config", required=False, is_config_file=True, help="Config file path")
args = parser.parse_args()
parser.print_values()
verbose = args.verbose
if verbose:
logger.setLevel(logging.INFO)
logger.info(logo_str)
logger.info("\n\nArguments: ")
logger.info(args)
main(args)
if __name__ == "__main__":
cli()