from __future__ import annotations
import os
import shlex
import subprocess as sb
from glob import glob
from pathlib import Path
import copy_data
import numpy as np
from arrakis.logger import logger, logging
from arrakis.utils.io import try_mkdir
from astropy.table import Table
logger.setLevel(logging.INFO)
[docs]
racs_area = os.path.abspath("/askapbuffer/payne/mcc381/RACS")
# spice_area = os.path.abspath('/group/askap/athomson/projects/arrakis/spica')
[docs]
spice_area = os.path.abspath("/scratch/ja3/athomson/spica")
[docs]
group_area = os.path.abspath("/group/ja3/athomson/spica/")
[docs]
SPICA = [
"1416+00A",
"1351+00A",
"1326+00A",
"1237+00A",
"1302+00A",
"1416-06A",
"1351-06A",
"1326-06A",
"1302-06A",
"1237-06A",
"1418-12A",
"1353-12A",
"1328-12A",
"1303-12A",
"1237-12A",
"1424-18A",
"1357-18A",
"1331-18A",
"1305-18A",
"1239-18A",
"1429-25A",
"1402-25A",
"1335-25A",
"1307-25A",
"1240-25A",
"1212+00A",
"1212-06A",
"1212-12A",
"1213-18A",
"1213-25A",
]
[docs]
def mslist(cal_sb, name):
try:
ms = glob(f"{racs_area}/{cal_sb}/RACS_test4_1.05_{name}/*beam00_*.ms")[0]
except Exception as e:
logger.error(e)
raise Exception(
f"Can't find '{racs_area}/{cal_sb}/RACS_test4_1.05_{name}/*beam00_*.ms'"
)
mslist_out = sb.run(
shlex.split(f"mslist --full {ms}"), capture_output=True, check=False
)
if mslist_out.returncode > 0:
logger.error(mslist_out.stderr.decode("utf-8"))
logger.error(mslist_out.stdout.decode("utf-8"))
mslist_out.check_returncode()
date_out = sb.run(
shlex.split("date +%Y-%m-%d-%H%M%S"), capture_output=True, check=True
)
out = mslist_out.stderr.decode() + f"METADATA_IS_GOOD {date_out.stdout.decode()}"
return out
[docs]
def main(
survey_dir: Path,
epoch: int = 0,
copy=False,
force=False,
cal=False,
mslist_dir=None,
cube_image=False,
):
field_path = survey_dir / "db" / f"epoch_{epoch}" / "field_data.csv"
tab = Table.read(field_path)
tab.add_index("FIELD_NAME")
cols = [
"Field name",
"CAL SBID",
"SBID",
"Leakage cal",
"Row index",
"Cube imaging",
]
spica_tab = Table(names=cols, dtype=[str, int, int, bool, int, bool])
for name in SPICA:
sub_tab = Table(tab.loc["FIELD_NAME", f"RACS_{name}"])
idx = np.argmax(sub_tab["CAL_SBID"])
index = sub_tab["INDEX"][idx]
cal_sbid = sub_tab["CAL_SBID"][idx]
sbid = sub_tab["SBID"][idx]
cal_files = glob(
f"{racs_area}/{cal_sbid}/RACS_test4_1.05_{name}/*averaged_cal.leakage.ms"
)
leak = not len(cal_files) == 0
cubes = []
for stoke in ["i", "q", "u"]:
cubes.extend(
glob(
f"{spice_area}/{cal_sbid}/RACS_test4_1.05_{name}/image.restored.{stoke}*.conv.fits"
)
)
cubes.extend(
glob(
f"{spice_area}/{cal_sbid}/RACS_test4_1.05_{name}/weights.{stoke}*.txt"
)
)
image = len(cubes) == 216
spica_tab.add_row([f"RACS_{name}", cal_sbid, sbid, leak, index, image])
spica_tab.sort("SBID")
spica_tab.pprint_all()
if cal:
logger.info("The following row indcies are ready to image:")
sub_tab = spica_tab[spica_tab["Leakage cal"]]
idx_list = []
for row in sub_tab:
idx_list.append(row["Row index"])
indxs = np.array(idx_list)
logger.info(" ".join(indxs.astype(str)))
if mslist_dir is not None:
mslist_dir = os.path.abspath(mslist_dir)
for row in spica_tab:
try:
out = mslist(
name=row["Field name"].replace("RACS_", ""), cal_sb=row["CAL SBID"]
)
sbdir = f"{mslist_dir}/{row['SBID']}"
try_mkdir(sbdir, verbose=False)
outdir = f"{sbdir}/metadata"
try_mkdir(outdir, verbose=False)
outfile = f"{outdir}/mslist-20_{row['Field name']}.txt"
with open(outfile, "w") as f:
f.write(out)
except Exception as e:
logger.error(e)
continue
if copy:
for row in spica_tab:
if row["Leakage cal"]:
if row["Cube imaging"]:
logger.info(
f"Cube imaging done for {row['Field name']}. Skipping..."
)
continue
else:
copy_data.main(
name=row["Field name"].replace("RACS_", ""),
sbid=row["CAL SBID"],
racs_area=racs_area,
spice_area=spice_area,
survey_dir=survey_dir,
epoch=epoch,
ncores=10,
clean=True,
force=force,
)
if cube_image:
cmds = [f"cd {spice_area}", "conda activate aces"]
for row in spica_tab:
cmd = f"start_pipeline.py -e 0 -p /group/askap/athomson/projects/arrakis/spica/racs_pipeline_cube.parset -o -m /group/askap/athomson/projects/arrakis/spica/modules.txt -t /group/askap/athomson/projects/arrakis/MSlists/{row['SBID']}/metadata/ -i {row['SBID']} -c {row['CAL SBID']} -f {row['Field name'].replace('RACS_', 'RACS_test4_1.05_')} -a ja3 -s"
cmds.append(cmd)
logger.info(f"Written imaging commands to '{cube_image}'")
with open(cube_image, "w") as f:
f.write("\n".join(cmds))
return spica_tab
[docs]
def cli():
import argparse
descStr = """
Helper for Spica pilot
"""
parser = argparse.ArgumentParser(
description=descStr, formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"survey",
type=str,
help="Survey directory",
)
parser.add_argument(
"--epoch",
type=int,
default=0,
help="Epoch to read field data from",
)
parser.add_argument(
"--copy",
action="store_true",
help="Copy calibrated data from racs's area [False].",
)
parser.add_argument(
"--copy_cutouts",
action="store_true",
help="Copy cutouts back to /group [False].",
)
parser.add_argument(
"--force", action="store_true", help="Force cleanup of Checkfiles."
)
parser.add_argument(
"--cal", action="store_true", help="Print calibrated field row indices."
)
parser.add_argument(
"--mslist_dir", type=str, default=None, help="Dir to store mslist files."
)
parser.add_argument(
"--cube_image", type=str, default=False, help="File to write image cmds to."
)
args = parser.parse_args()
_ = main(
survey_dir=Path(args.survey),
epoch=args.epoch,
copy=args.copy,
force=args.force,
cal=args.cal,
mslist_dir=args.mslist_dir,
cube_image=args.cube_image,
)
if __name__ == "__main__":
cli()