#!/usr/bin/env python3
"""Arrakis multi-field pipeline"""
from __future__ import annotations
import argparse
import logging
import os
from pathlib import Path
import configargparse
import pkg_resources
import yaml
from astropy.time import Time
from prefect import flow
from arrakis import (
cleanup,
linmos,
makecat,
merge_fields,
process_spice,
rmclean_oncuts,
rmsynth_oncuts,
validate,
)
from arrakis.logger import UltimateHelpFormatter, logger
from arrakis.utils.database import test_db
from arrakis.utils.pipeline import logo_str
from arrakis.validate import validation_parser
@flow
[docs]
def process_merge(args, host: str, inter_dir: str, task_runner) -> None:
"""Workflow to merge spectra from overlapping fields together
Args:
args (Namespace): Parameters to use for this process
host (str): Address of the mongoDB servicing the processing
inter_dir (str): Location to store data from merged fields
"""
previous_future = None
previous_future = (
merge_fields.with_options(task_runner=task_runner)(
fields=args.fields,
field_dirs=args.datadirs,
merge_name=args.merge_name,
output_dir=args.output_dir,
host=host,
epoch=args.epoch,
username=args.username,
password=args.password,
yanda=args.yanda,
)
if not args.skip_merge
else previous_future
)
previous_future = (
rmsynth_oncuts.main.with_options(task_runner=task_runner)(
field=args.field,
outdir=Path(args.datadir),
host=args.host,
epoch=args.epoch,
sbid=args.sbid,
username=args.username,
password=args.password,
dimension=args.dimension,
database=args.database,
limit=args.limit,
savePlots=args.save_plots,
weightType=args.weight_type,
fitRMSF=args.fit_rmsf,
phiMax_radm2=args.phi_max,
dPhi_radm2=args.dphi,
nSamples=args.n_samples,
polyOrd=args.poly_ord,
noStokesI=args.no_stokes_i,
showPlots=args.show_plots,
not_RMSF=args.not_rmsf,
rm_verbose=args.rm_verbose,
debug=args.debug,
fit_function=args.fit_function,
tt0=args.tt0,
tt1=args.tt1,
ion=False, # Always False as we don't do Frion twice
do_own_fit=args.do_own_fit,
)
if not args.skip_rmsynth
else previous_future
)
previous_future = (
rmclean_oncuts.main.with_options(task_runner=task_runner)(
field=args.merge_name,
outdir=inter_dir,
host=host,
epoch=args.epoch,
username=args.username,
password=args.password,
dimension=args.dimension,
database=args.database,
limit=args.limit,
cutoff=args.cutoff,
maxIter=args.maxIter,
gain=args.gain,
window=args.window,
showPlots=args.show_plots,
rm_verbose=args.rm_verbose,
)
if not args.skip_rmclean
else previous_future
)
previous_future = (
makecat.main.with_options(task_runner=task_runner)(
field=args.merge_name,
host=host,
epoch=args.epoch,
username=args.username,
password=args.password,
outfile=args.outfile,
)
if not args.skip_cat
else previous_future
)
previous_future = (
validate.main.with_options(task_runner=task_runner)(
catalogue_path=Path(args.outfile),
npix=args.npix,
map_size=args.map_size,
snr_cut=args.leakage_snr,
bins=args.leakage_bins * 2,
)
if not args.skip_validate
else previous_future
)
[docs]
def main(args: configargparse.Namespace) -> None:
"""Main script
Args:
args (configargparse.Namespace): Command line arguments.
"""
if args.dask_config is None:
config_dir = pkg_resources.resource_filename("arrakis", "configs")
args.dask_config = os.path.join(config_dir, "default.yaml")
if args.outfile is None:
args.outfile = f"{args.merge_name}.pipe.test.fits"
test_db(
host=args.host,
username=args.username,
password=args.password,
)
args_yaml = yaml.dump(vars(args))
args_yaml_f = os.path.abspath(f"{args.merge_name}-config-{Time.now().fits}.yaml")
logger.info(f"Saving config to '{args_yaml_f}'")
with open(args_yaml_f, "w") as f:
f.write(args_yaml)
dask_runner = process_spice.create_dask_runner(
dask_config=args.dask_config,
)
inter_dir = os.path.join(os.path.abspath(args.output_dir), args.merge_name)
process_merge.with_options(
name=f"Arrakis Merge: {args.merge_name}", task_runner=dask_runner
)(args, args.host, inter_dir, dask_runner)
[docs]
def pipeline_parser(parent_parser: bool = False) -> argparse.ArgumentParser:
descStr = f"""
{logo_str}
Arrakis regional pipeline.
Before running make sure to start a session of mongodb e.g.
$ mongod --dbpath=/path/to/database --bind_ip $(hostname -i)
"""
# Parse the command line options
pipeline_parser = argparse.ArgumentParser(
add_help=not parent_parser,
description=descStr,
formatter_class=UltimateHelpFormatter,
)
parser = pipeline_parser.add_argument_group("pipeline arguments")
parser.add_argument(
"--dask_config",
type=str,
default=None,
help="Config file for Dask SlurmCLUSTER.",
)
parser.add_argument(
"--skip_frion", action="store_true", help="Skip cleanup stage [False]."
)
parser.add_argument(
"--skip_rmsynth", action="store_true", help="Skip RM Synthesis stage [False]."
)
parser.add_argument(
"--skip_rmclean", action="store_true", help="Skip RM-CLEAN stage [False]."
)
parser.add_argument(
"--skip_cat", action="store_true", help="Skip catalogue stage [False]."
)
parser.add_argument(
"--skip_validate", action="store_true", help="Skip validation stage."
)
parser.add_argument(
"--skip_cleanup", action="store_true", help="Skip cleanup stage [False]."
)
return pipeline_parser
[docs]
def cli() -> None:
"""Command-line interface"""
# Help string to be shown using the -h option
# Parse the command line options
pipe_parser = pipeline_parser(parent_parser=True)
merge_parser = merge_fields.merge_parser(parent_parser=True)
linmos_parser = linmos.linmos_parser(parent_parser=True)
common_parser = rmsynth_oncuts.rm_common_parser(parent_parser=True)
synth_parser = rmsynth_oncuts.rmsynth_parser(parent_parser=True)
rmclean_parser = rmclean_oncuts.clean_parser(parent_parser=True)
catalogue_parser = makecat.cat_parser(parent_parser=True)
val_parser = validation_parser(parent_parser=True)
clean_parser = cleanup.cleanup_parser(parent_parser=True)
# Parse the command line options
parser = configargparse.ArgParser(
description=pipe_parser.description,
formatter_class=UltimateHelpFormatter,
parents=[
pipe_parser,
merge_parser,
linmos_parser,
common_parser,
synth_parser,
rmclean_parser,
catalogue_parser,
val_parser,
clean_parser,
],
)
parser.add("--config", required=False, is_config_file=True, help="Config file path")
args = parser.parse_args()
verbose = args.verbose
if verbose:
logger.setLevel(logging.INFO)
if args.debugger:
logger.setLevel(logging.DEBUG)
main(args)
if __name__ == "__main__":
cli()