#!/usr/bin/env python3
"""Arrakis single-field pipeline"""
import argparse
import logging
import os
from importlib import resources
from pathlib import Path
import configargparse
import yaml
from astropy.time import Time
from prefect import flow
from prefect.task_runners import BaseTaskRunner
from prefect_dask import DaskTaskRunner
from arrakis import (
cleanup,
cutout,
frion,
imager,
linmos,
makecat,
rmclean_oncuts,
rmsynth_oncuts,
validate,
)
from arrakis.logger import UltimateHelpFormatter, logger
from arrakis.utils.database import test_db
from arrakis.utils.pipeline import generic_parser, logo_str, workdir_arg_parser
from arrakis.validate import validation_parser
@flow(name="Combining+Synthesis on Arrakis")
[docs]
def process_spice(args, host: str, task_runner: BaseTaskRunner) -> None:
"""Workflow to process the SPICE-RACS data
Args:
args (configargparse.Namespace): Configuration parameters for this run
host (str): Host address of the mongoDB.
"""
outfile = f"{args.field}.pipe.test.fits" if args.outfile is None else args.outfile
previous_future = None
previous_future = (
cutout.cutout_islands.with_options(
task_runner=task_runner,
)(
field=args.field,
directory=args.datadir,
host=host,
epoch=args.epoch,
sbid=args.sbid,
username=args.username,
password=args.password,
pad=args.pad,
stokeslist=["I", "Q", "U"],
dryrun=args.dryrun,
limit=args.limit,
)
if not args.skip_cutout
else previous_future
)
previous_future = (
linmos.main.with_options(
task_runner=task_runner,
)(
field=args.field,
datadir=Path(args.datadir),
host=host,
epoch=args.epoch,
sbid=args.sbid,
holofile=Path(args.holofile),
username=args.username,
password=args.password,
yanda=args.yanda,
yanda_img=args.yanda_image,
stokeslist=["I", "Q", "U"],
limit=args.limit,
)
if not args.skip_linmos
else previous_future
)
previous_future = (
frion.main.with_options(task_runner=task_runner)(
field=args.field,
outdir=args.datadir,
host=host,
epoch=args.epoch,
sbid=args.sbid,
username=args.username,
password=args.password,
database=args.database,
ionex_server=args.ionex_server,
ionex_prefix=args.ionex_prefix,
ionex_proxy_server=args.ionex_proxy_server,
ionex_formatter=args.ionex_formatter,
ionex_predownload=args.ionex_predownload,
limit=args.limit,
)
if not args.skip_frion
else previous_future
)
previous_future = (
rmsynth_oncuts.main.with_options(task_runner=task_runner)(
field=args.field,
outdir=Path(args.datadir),
host=args.host,
epoch=args.epoch,
sbid=args.sbid,
username=args.username,
password=args.password,
dimension=args.dimension,
verbose=args.verbose,
database=args.database,
limit=args.limit,
savePlots=args.save_plots,
weightType=args.weight_type,
fitRMSF=args.fit_rmsf,
phiMax_radm2=args.phi_max,
dPhi_radm2=args.dphi,
nSamples=args.n_samples,
polyOrd=args.poly_ord,
noStokesI=args.no_stokes_i,
showPlots=args.show_plots,
not_RMSF=args.not_rmsf,
rm_verbose=args.rm_verbose,
debug=args.debug,
fit_function=args.fit_function,
tt0=args.tt0,
tt1=args.tt1,
ion=True,
do_own_fit=args.do_own_fit,
)
if not args.skip_rmsynth
else previous_future
)
previous_future = (
rmclean_oncuts.main.with_options(task_runner=task_runner)(
field=args.field,
outdir=args.datadir,
host=host,
epoch=args.epoch,
sbid=args.sbid,
username=args.username,
password=args.password,
dimension=args.dimension,
database=args.database,
limit=args.limit,
cutoff=args.cutoff,
maxIter=args.max_iter,
gain=args.gain,
window=args.window,
rm_verbose=args.rm_verbose,
)
if not args.skip_rmclean
else previous_future
)
previous_future = (
makecat.main.with_options(task_runner=task_runner)(
field=args.field,
host=host,
epoch=args.epoch,
sbid=args.sbid,
username=args.username,
password=args.password,
verbose=args.verbose,
outfile=outfile,
)
if not args.skip_cat
else previous_future
)
previous_future = (
validate.main.with_options(task_runner=task_runner)(
catalogue_path=Path(args.outfile),
npix=args.npix,
map_size=args.map_size,
snr_cut=args.leakage_snr,
bins=args.leakage_bins * 2,
)
if not args.skip_validate
else previous_future
)
previous_future = (
cleanup.main.with_options(task_runner=task_runner)(
datadir=args.datadir,
)
if not args.skip_cleanup
else previous_future
)
[docs]
def save_args(args: configargparse.Namespace) -> Path:
"""Helper function to create a record of the input configuration arguments that
govern the pipeline instance
Args:
args (configargparse.Namespace): Supplied arguments for the Arrakis pipeline instance
Returns:
Path: Output path of the saved file
"""
args_yaml = yaml.dump(vars(args))
args_yaml_f = os.path.abspath(f"{args.field}-config-{Time.now().fits}.yaml")
logger.info(f"Saving config to '{args_yaml_f}'")
with open(args_yaml_f, "w") as f:
f.write(args_yaml)
return Path(args_yaml_f)
[docs]
def create_dask_runner(
dask_config: str,
overload: bool = False,
) -> DaskTaskRunner:
"""Create a DaskTaskRunner
Args:
dask_config (str): Configuraiton file for the DaskTaskRunner
overload (bool, optional): Overload the options for threadded work. Defaults to False.
Returns:
DaskTaskRunner: The prefect DaskTaskRunner instance
"""
logger.setLevel(logging.INFO)
logger.info("Creating a Dask Task Runner.")
if dask_config is None:
config_dir = resources.files("arrakis.configs")
dask_config = config_dir / "default.yaml"
with open(dask_config) as f:
logger.info(f"Loading {dask_config}")
yaml_config: dict = yaml.safe_load(f)
cluster_class_str = yaml_config.get("cluster_class", "distributed.LocalCluster")
cluster_kwargs = yaml_config.get("cluster_kwargs", {})
adapt_kwargs = yaml_config.get("adapt_kwargs", {})
if overload:
logger.info("Overwriting config attributes.")
cluster_kwargs["job_cpu"] = cluster_kwargs["cores"]
cluster_kwargs["cores"] = 1
cluster_kwargs["processes"] = 1
config = {
"cluster_class": cluster_class_str,
"cluster_kwargs": cluster_kwargs,
"adapt_kwargs": adapt_kwargs,
}
return DaskTaskRunner(**config)
[docs]
def main(args: configargparse.Namespace) -> None:
"""Main script
Args:
args (configargparse.Namespace): Command line arguments.
"""
host = args.host
# Lets save the args as a record for the ages
output_args_path = save_args(args)
logger.info(f"Saved arguments to {output_args_path}.")
if not args.imager_only:
# Test the mongoDB
test_db(
host=host,
username=args.username,
password=args.password,
)
if not args.skip_imager:
# This is the client for the imager component of the arrakis
# pipeline.
dask_runner = create_dask_runner(
dask_config=args.imager_dask_config,
overload=True,
)
logger.info("Obtained DaskTaskRunner, executing the imager workflow. ")
imager.main.with_options(
name=f"Arrakis Imaging -- {args.sbid}:{args.field}", task_runner=dask_runner
)(
msdir=args.msdir,
out_dir=args.datadir,
num_beams=args.num_beams,
temp_dir_wsclean=args.temp_dir_wsclean,
temp_dir_images=args.temp_dir_images,
cutoff=args.psf_cutoff,
robust=args.robust,
pols=args.pols,
nchan=args.nchan,
local_rms=args.local_rms,
local_rms_window=args.local_rms_window,
size=args.size,
scale=args.scale,
mgain=args.mgain,
niter=args.niter,
nmiter=args.nmiter,
auto_mask=args.auto_mask,
force_mask_rounds=args.force_mask_rounds,
auto_threshold=args.auto_threshold,
minuv=args.minuv,
purge=args.purge,
taper=args.taper,
parallel_deconvolution=args.parallel,
gridder=args.gridder,
wsclean_path=(
Path(args.local_wsclean) if args.local_wsclean else args.hosted_wsclean
),
multiscale=args.multiscale,
multiscale_scale_bias=args.multiscale_scale_bias,
absmem=args.absmem,
ms_glob_pattern=args.ms_glob_pattern,
data_column=args.data_column,
skip_fix_ms=args.skip_fix_ms,
no_mf_weighting=args.no_mf_weighting,
disable_pol_local_rms=args.disable_pol_local_rms,
disable_pol_force_mask_rounds=args.disable_pol_force_mask_rounds,
)
client = dask_runner._client
if client is not None:
client.close()
del dask_runner
else:
logger.warn("Skipping the image creation step. ")
if args.imager_only:
logger.info("Not running any stages after the imager. ")
return
# This is the client and pipeline for the RM extraction
dask_runner_2 = create_dask_runner(
dask_config=args.dask_config,
)
# Define flow
process_spice.with_options(
name=f"Arrakis Synthesis -- {args.sbid}:{args.field}",
)(args, host, dask_runner_2)
[docs]
def pipeline_parser(parent_parser: bool = False) -> argparse.ArgumentParser:
descStr = f"""
{logo_str}
Arrakis pipeline.
Before running make sure to start a session of mongodb e.g.
$ mongod --dbpath=/path/to/database --bind_ip $(hostname -i)
"""
# Parse the command line options
pipeline_parser = argparse.ArgumentParser(
add_help=not parent_parser,
description=descStr,
formatter_class=UltimateHelpFormatter,
)
parser = pipeline_parser.add_argument_group("pipeline arguments")
parser.add_argument(
"--dask_config",
type=str,
default=None,
help="Config file for Dask SlurmCLUSTER.",
)
parser.add_argument(
"--imager_dask_config",
type=str,
default=None,
help="Config file for Dask SlurmCLUSTER.",
)
parser.add_argument(
"--imager_only",
action="store_true",
help="Only run the imager component of the pipeline. ",
)
parser.add_argument(
"--skip_imager", action="store_true", help="Skip imaging stage."
)
parser.add_argument("--skip_cutout", action="store_true", help="Skip cutout stage.")
parser.add_argument("--skip_linmos", action="store_true", help="Skip LINMOS stage.")
parser.add_argument("--skip_frion", action="store_true", help="Skip cleanup stage.")
parser.add_argument(
"--skip_rmsynth", action="store_true", help="Skip RM Synthesis stage."
)
parser.add_argument(
"--skip_rmclean", action="store_true", help="Skip RM-CLEAN stage."
)
parser.add_argument("--skip_cat", action="store_true", help="Skip catalogue stage.")
parser.add_argument(
"--skip_validate", action="store_true", help="Skip validation stage."
)
parser.add_argument(
"--skip_cleanup", action="store_true", help="Skip cleanup stage."
)
return pipeline_parser
[docs]
def cli():
"""Command-line interface"""
# Help string to be shown using the -h option
pipe_parser = pipeline_parser(parent_parser=True)
work_parser = workdir_arg_parser(parent_parser=True)
gen_parser = generic_parser(parent_parser=True)
imager_parser = imager.imager_parser(parent_parser=True)
cutout_parser = cutout.cutout_parser(parent_parser=True)
linmos_parser = linmos.linmos_parser(parent_parser=True)
fr_parser = frion.frion_parser(parent_parser=True)
common_parser = rmsynth_oncuts.rm_common_parser(parent_parser=True)
synth_parser = rmsynth_oncuts.rmsynth_parser(parent_parser=True)
rmclean_parser = rmclean_oncuts.clean_parser(parent_parser=True)
catalogue_parser = makecat.cat_parser(parent_parser=True)
val_parser = validation_parser(parent_parser=True)
clean_parser = cleanup.cleanup_parser(parent_parser=True)
# Parse the command line options
parser = configargparse.ArgParser(
description=pipe_parser.description,
formatter_class=UltimateHelpFormatter,
parents=[
pipe_parser,
work_parser,
gen_parser,
imager_parser,
cutout_parser,
linmos_parser,
fr_parser,
common_parser,
synth_parser,
rmclean_parser,
catalogue_parser,
val_parser,
clean_parser,
],
)
parser.add("--config", required=False, is_config_file=True, help="Config file path")
args = parser.parse_args()
parser.print_values()
verbose = args.verbose
if verbose:
logger.setLevel(logging.INFO)
logger.info(logo_str)
logger.info("\n\nArguments: ")
logger.info(args)
main(args)
if __name__ == "__main__":
cli()