Pipeline and flow utility functions

Module Contents



Tqdm for Dask


Gather performance report from prefect_dask


chunk_dask(→ list)

cpu_to_use(→ int)

Find number of cpus to use.

delayed_to_da(→ dask.array.Array)

Convert list of delayed arrays to a dask array

inspect_client(→ Tuple[str, int, int, ...)


port_forward(→ None)

Forward ports to local host

tqdm_dask(→ None)

Tqdm for Dask futures




class arrakis.utils.pipeline.TqdmProgressBar(keys, scheduler=None, interval='100ms', loop=None, complete=True, start=True, **tqdm_kwargs)[source]

Bases: distributed.diagnostics.progressbar.ProgressBar

Tqdm for Dask

_draw_bar(remaining, all, **kwargs)[source]
class arrakis.utils.pipeline.performance_report_prefect(filename='dask-report.html', stacklevel=1, mode=None, storage_options=None)[source]

Gather performance report from prefect_dask

Basically stolen from:

This creates a static HTML file that includes many of the same plots of the dashboard for later viewing.

The resulting file uses JavaScript, and so must be viewed with a web browser. Locally we recommend using python -m http.server or hosting the file live online.

  • filename (str, optional) – The filename to save the performance report locally

  • stacklevel (int, optional) – The code execution frame utilized for populating the Calling Code section of the report. Defaults to 1 which is the frame calling performance_report_prefect

  • mode (str, optional) – Mode parameter to pass to Defaults to None.

  • storage_options (dict, optional) – Any additional arguments to when writing to a URL.


>>> with performance_report_prefect(filename="myfile.html", stacklevel=1):
...     x.compute()
async __aenter__()[source]
async __aexit__(exc_type, exc_value, traceback, code=None)[source]
__exit__(exc_type, exc_value, traceback)[source]
arrakis.utils.pipeline.chunk_dask(outputs: list, batch_size: int = 10000, task_name='', progress_text='', verbose=True) list[source]
arrakis.utils.pipeline.cpu_to_use(max_cpu: int, count: int) int[source]

Find number of cpus to use.

Find the right number of cpus to use when dividing up a task, such that there are no remainders.

  • max_cpu (int) – Maximum number of cores to use for a process.

  • count (int) – Number of tasks.


Maximum number of cores to be used that divides into the number

arrakis.utils.pipeline.delayed_to_da(list_of_delayed: List[dask.delayed.Delayed], chunk: int | None = None) dask.array.Array[source]

Convert list of delayed arrays to a dask array

  • list_of_delayed (List[delayed]) – List of delayed objects

  • chunk (int, optional) – Chunksize to use. Defaults to None.


Dask array

Return type:


arrakis.utils.pipeline.inspect_client(client: dask.distributed.Client | None = None) Tuple[str, int, int, astropy.units.Quantity, int, astropy.units.Quantity][source]



client (Union[distributed.Client,None]) – Dask client to inspect. if None, will use the default client.


addr, nworkers,

nthreads, memory, threads_per_worker, memory_per_worker

Return type:

Tuple[ str, int, int, u.Quantity, float, u.Quantity ]

arrakis.utils.pipeline.port_forward(port: int, target: str) None[source]

Forward ports to local host

  • port (int) – port to forward

  • target (str) – Target host

arrakis.utils.pipeline.tqdm_dask(futures_in: dask.distributed.Future, **kwargs) None[source]

Tqdm for Dask futures

arrakis.utils.pipeline.logo_str = Multiline-String[source]
Show Value
    mmm   mmm   mmm   mmm   mmm
    )-(   )-(   )-(   )-(   )-(
   ( S ) ( P ) ( I ) ( C ) ( E )
   |   | |   | |   | |   | |   |
   |___| |___| |___| |___| |___|
    mmm     mmm     mmm     mmm
    )-(     )-(     )-(     )-(
   ( R )   ( A )   ( C )   ( S )
   |   |   |   |   |   |   |   |
   |___|   |___|   |___|   |___|