#!/usr/bin/env python3
"""DANGER ZONE: Purge directories of un-needed FITS files."""
from __future__ import annotations
import argparse
import logging
import shutil
import tarfile
from pathlib import Path
import astropy.units as u
import numpy as np
from prefect import flow, get_run_logger, task
from tqdm.auto import tqdm
from arrakis.logger import TqdmToLogger, UltimateHelpFormatter, logger
from arrakis.utils.io import verify_tarball
from arrakis.utils.pipeline import generic_parser, logo_str
logger.setLevel(logging.INFO)
[docs]
TQDM_OUT = TqdmToLogger(logger, level=logging.INFO)
# @task(name="Purge cublets")
[docs]
def purge_cubelet_beams(filepath: Path) -> Path:
"""Clean up beam images
Args:
workdir (str): Directory containing images
stoke (str): Stokes parameter
"""
# Clean up beam images
logger.critical(f"Removing {filepath}")
filepath.unlink(missing_ok=True)
logger.info(f"Beam image purged from {filepath}")
return filepath
@task(name="Make cutout tarball")
[docs]
def make_cutout_tarball(cutdir: Path, overwrite: bool = False) -> Path:
"""Make a tarball of the cutouts directory
Args:
cutdir (Path): Directory containing cutouts
Returns:
Path: Path to the tarball
"""
logger = get_run_logger()
logger.info(f"Making tarball of {cutdir}")
tarball = cutdir.with_suffix(".tar")
if tarball.exists() and not overwrite:
logger.warning(f"Tarball {tarball} exists. Skipping.")
return tarball
all_things = list(cutdir.glob("*"))
with tarfile.open(tarball, "w") as tar:
for cutout in tqdm(all_things, file=TQDM_OUT, desc="Tarballing cutouts"):
tar.add(cutout, arcname=cutout.name)
logger.info(f"Tarball created: {tarball}")
verification = verify_tarball(tarball)
if not verification:
raise RuntimeError(f"Verification of {tarball} failed!")
logger.critical(f"Removing {cutdir}")
shutil.rmtree(cutdir)
return tarball
@flow(name="Cleanup")
[docs]
def main(
datadir: Path,
overwrite: bool = False,
) -> None:
"""Clean up beam images flow
Args:
datadir (Path): Directory with sub dir 'cutouts'
overwrite (bool): Overwrite existing tarball
"""
cutdir = datadir / "cutouts"
# First, make a tarball of the cutouts
tarball = make_cutout_tarball.submit(cutdir=cutdir, overwrite=overwrite)
logger.info(f"Tarball created: {tarball.result()}")
# Purge the big beams
to_purge_bigbeams = list(datadir.glob("*.beam[00-36]*.fits"))
logger.warning(f"Will purge {len(to_purge_bigbeams)} big beams")
to_purge_pickles = list(datadir.glob("*.pkl"))
logger.warning(f"Will purge {len(to_purge_pickles)} beam pickles")
to_purge_weights = list(datadir.glob("weights*.txt"))
logger.warning(f"Will purge {len(to_purge_weights)} weight files")
# Purge the cublet beams
to_purge_cublets = list(cutdir.glob("*/*.beam[00-36]*.fits"))
logger.warning(f"Will purge {len(to_purge_cublets)} cublet beams")
to_purge_all = (
to_purge_bigbeams + to_purge_cublets + to_purge_pickles + to_purge_weights
)
total_file_size = np.sum([p.stat().st_size for p in to_purge_all]) * u.byte
logger.warning(f"Purging {len(to_purge_all)} files from {datadir}")
logger.warning(f"Will free {total_file_size.to(u.GB)}")
purged: list[Path] = []
for to_purge in tqdm(to_purge_all, file=TQDM_OUT, desc="Purging big beams"):
purged.append(purge_cubelet_beams(to_purge))
logger.info(f"Files purged: {len(purged)}")
logger.info("Cleanup done!")
[docs]
def cleanup_parser(parent_parser: bool = False) -> argparse.ArgumentParser:
# Help string to be shown using the -h option
descStr = f"""
{logo_str}
Arrakis Stage:
Clean up after LINMOS
"""
# Parse the command line options
cleanup_parser = argparse.ArgumentParser(
add_help=not parent_parser,
description=descStr,
formatter_class=UltimateHelpFormatter,
)
parser = cleanup_parser.add_argument_group("cleanup arguments")
parser.add_argument(
"--overwrite",
dest="overwrite",
action="store_true",
help="Overwrite existing tarball",
)
return cleanup_parser
[docs]
def cli():
"""Command-line interface"""
gen_parser = generic_parser(parent_parser=True)
clean_parser = cleanup_parser(parent_parser=True)
parser = argparse.ArgumentParser(
parents=[gen_parser, clean_parser],
formatter_class=UltimateHelpFormatter,
description=clean_parser.description,
)
args = parser.parse_args()
verbose = args.verbose
if verbose:
logger.setLevel(logging.DEBUG)
main(datadir=Path(args.datadir), overwrite=args.overwrite)
if __name__ == "__main__":
cli()