arrakis.utils.pipeline

Pipeline and flow utility functions

Attributes

SUPPORTED_IMAGE_TYPES

TQDM_OUT

logo_str

Classes

TqdmProgressBar

Tqdm for Dask

performance_report_prefect

Gather performance report from prefect_dask

Functions

chunk_dask(→ list)

cpu_to_use(→ int)

Find number of cpus to use.

delayed_to_da(→ dask.array.Array)

Convert list of delayed arrays to a dask array

generic_parser(→ argparse.ArgumentParser)

inspect_client(→ Tuple[str, int, int, ...)

_summary_

port_forward(→ None)

Forward ports to local host

tqdm_dask(→ None)

Tqdm for Dask futures

upload_image_as_artifact_task(→ uuid.UUID)

Create and submit a markdown artifact tracked by prefect for an

workdir_arg_parser(→ argparse.ArgumentParser)

Module Contents

class arrakis.utils.pipeline.TqdmProgressBar(keys, scheduler=None, interval='100ms', loop=None, complete=True, start=True, **tqdm_kwargs)[source]

Bases: distributed.diagnostics.progressbar.ProgressBar

Tqdm for Dask

_draw_bar(remaining, all, **kwargs)[source]
_draw_stop(**kwargs)[source]
class arrakis.utils.pipeline.performance_report_prefect(filename='dask-report.html', stacklevel=1, mode=None, storage_options=None)[source]

Gather performance report from prefect_dask

Basically stolen from:

https://distributed.dask.org/en/latest/_modules/distributed/client.html#performance_report

This creates a static HTML file that includes many of the same plots of the dashboard for later viewing.

The resulting file uses JavaScript, and so must be viewed with a web browser. Locally we recommend using python -m http.server or hosting the file live online.

Parameters:
  • filename (str, optional) – The filename to save the performance report locally

  • stacklevel (int, optional) – The code execution frame utilized for populating the Calling Code section of the report. Defaults to 1 which is the frame calling performance_report_prefect

  • mode (str, optional) – Mode parameter to pass to bokeh.io.output.output_file(). Defaults to None.

  • storage_options (dict, optional) – Any additional arguments to fsspec.open() when writing to a URL.

Examples

>>> with performance_report_prefect(filename="myfile.html", stacklevel=1):
...     x.compute()
async __aenter__()[source]
async __aexit__(exc_type, exc_value, traceback, code=None)[source]
__enter__()[source]
__exit__(exc_type, exc_value, traceback)[source]
arrakis.utils.pipeline.chunk_dask(outputs: list, batch_size: int = 10000, task_name='', progress_text='', verbose=True) list[source]
arrakis.utils.pipeline.cpu_to_use(max_cpu: int, count: int) int[source]

Find number of cpus to use.

Find the right number of cpus to use when dividing up a task, such that there are no remainders.

Parameters:
  • max_cpu (int) – Maximum number of cores to use for a process.

  • count (int) – Number of tasks.

Returns:

Maximum number of cores to be used that divides into the number

arrakis.utils.pipeline.delayed_to_da(list_of_delayed: List[dask.delayed.Delayed], chunk: int | None = None) dask.array.Array[source]

Convert list of delayed arrays to a dask array

Parameters:
  • list_of_delayed (List[delayed]) – List of delayed objects

  • chunk (int, optional) – Chunksize to use. Defaults to None.

Returns:

Dask array

Return type:

da.Array

arrakis.utils.pipeline.generic_parser(parent_parser: bool = False) argparse.ArgumentParser[source]
arrakis.utils.pipeline.inspect_client(client: dask.distributed.Client | None = None) Tuple[str, int, int, astropy.units.Quantity, int, astropy.units.Quantity][source]

_summary_

Parameters:

client (Union[distributed.Client,None]) – Dask client to inspect. if None, will use the default client.

Returns:

addr, nworkers,

nthreads, memory, threads_per_worker, memory_per_worker

Return type:

Tuple[ str, int, int, u.Quantity, float, u.Quantity ]

arrakis.utils.pipeline.port_forward(port: int, target: str) None[source]

Forward ports to local host

Parameters:
  • port (int) – port to forward

  • target (str) – Target host

arrakis.utils.pipeline.tqdm_dask(futures_in: dask.distributed.Future, **kwargs) None[source]

Tqdm for Dask futures

arrakis.utils.pipeline.upload_image_as_artifact_task(image_path: pathlib.Path, description: str | None = None) uuid.UUID[source]

Create and submit a markdown artifact tracked by prefect for an input image. Currently supporting png formatted images.

The input image is converted to a base64 encoding, and embedded directly within the markdown string. Therefore, be mindful of the image size as this is tracked in the postgres database.

Parameters:
  • image_path (Path) – Path to the image to upload

  • description (Optional[str], optional) – A description passed to the markdown artifact. Defaults to None.

Returns:

Generated UUID of the registered artifact

Return type:

UUID

arrakis.utils.pipeline.workdir_arg_parser(parent_parser: bool = False) argparse.ArgumentParser[source]
arrakis.utils.pipeline.SUPPORTED_IMAGE_TYPES = ('png',)[source]
arrakis.utils.pipeline.TQDM_OUT[source]
arrakis.utils.pipeline.logo_str = Multiline-String[source]
Show Value
"""
    mmm   mmm   mmm   mmm   mmm
    )-(   )-(   )-(   )-(   )-(
   ( S ) ( P ) ( I ) ( C ) ( E )
   |   | |   | |   | |   | |   |
   |___| |___| |___| |___| |___|
    mmm     mmm     mmm     mmm
    )-(     )-(     )-(     )-(
   ( R )   ( A )   ( C )   ( S )
   |   |   |   |   |   |   |   |
   |___|   |___|   |___|   |___|

"""