arrakis.utils.pipeline
Pipeline and flow utility functions
Attributes
Classes
Tqdm for Dask |
|
Gather performance report from prefect_dask |
Functions
|
|
|
Find number of cpus to use. |
|
Convert list of delayed arrays to a dask array |
|
|
|
_summary_ |
|
Forward ports to local host |
|
Tqdm for Dask futures |
|
Create and submit a markdown artifact tracked by prefect for an |
|
Module Contents
- class arrakis.utils.pipeline.TqdmProgressBar(keys, scheduler=None, interval='100ms', loop=None, complete=True, start=True, **tqdm_kwargs)[source]
Bases:
distributed.diagnostics.progressbar.ProgressBar
Tqdm for Dask
- class arrakis.utils.pipeline.performance_report_prefect(filename='dask-report.html', stacklevel=1, mode=None, storage_options=None)[source]
Gather performance report from prefect_dask
- Basically stolen from:
https://distributed.dask.org/en/latest/_modules/distributed/client.html#performance_report
This creates a static HTML file that includes many of the same plots of the dashboard for later viewing.
The resulting file uses JavaScript, and so must be viewed with a web browser. Locally we recommend using
python -m http.server
or hosting the file live online.- Parameters:
filename (str, optional) – The filename to save the performance report locally
stacklevel (int, optional) – The code execution frame utilized for populating the Calling Code section of the report. Defaults to 1 which is the frame calling
performance_report_prefect
mode (str, optional) – Mode parameter to pass to
bokeh.io.output.output_file()
. Defaults toNone
.storage_options (dict, optional) – Any additional arguments to
fsspec.open()
when writing to a URL.
Examples
>>> with performance_report_prefect(filename="myfile.html", stacklevel=1): ... x.compute()
- arrakis.utils.pipeline.chunk_dask(outputs: list, batch_size: int = 10000, task_name='', progress_text='', verbose=True) list [source]
- arrakis.utils.pipeline.cpu_to_use(max_cpu: int, count: int) int [source]
Find number of cpus to use.
Find the right number of cpus to use when dividing up a task, such that there are no remainders.
- Parameters:
max_cpu (int) – Maximum number of cores to use for a process.
count (int) – Number of tasks.
- Returns:
Maximum number of cores to be used that divides into the number
- arrakis.utils.pipeline.delayed_to_da(list_of_delayed: List[dask.delayed.Delayed], chunk: int | None = None) dask.array.Array [source]
Convert list of delayed arrays to a dask array
- Parameters:
list_of_delayed (List[delayed]) – List of delayed objects
chunk (int, optional) – Chunksize to use. Defaults to None.
- Returns:
Dask array
- Return type:
da.Array
- arrakis.utils.pipeline.generic_parser(parent_parser: bool = False) argparse.ArgumentParser [source]
- arrakis.utils.pipeline.inspect_client(client: dask.distributed.Client | None = None) Tuple[str, int, int, astropy.units.Quantity, int, astropy.units.Quantity] [source]
_summary_
- Parameters:
client (Union[distributed.Client,None]) – Dask client to inspect. if None, will use the default client.
- Returns:
- addr, nworkers,
nthreads, memory, threads_per_worker, memory_per_worker
- Return type:
Tuple[ str, int, int, u.Quantity, float, u.Quantity ]
- arrakis.utils.pipeline.port_forward(port: int, target: str) None [source]
Forward ports to local host
- Parameters:
port (int) – port to forward
target (str) – Target host
- arrakis.utils.pipeline.tqdm_dask(futures_in: dask.distributed.Future, **kwargs) None [source]
Tqdm for Dask futures
- arrakis.utils.pipeline.upload_image_as_artifact_task(image_path: pathlib.Path, description: str | None = None) uuid.UUID [source]
Create and submit a markdown artifact tracked by prefect for an input image. Currently supporting png formatted images.
The input image is converted to a base64 encoding, and embedded directly within the markdown string. Therefore, be mindful of the image size as this is tracked in the postgres database.
- Parameters:
image_path (Path) – Path to the image to upload
description (Optional[str], optional) – A description passed to the markdown artifact. Defaults to None.
- Returns:
Generated UUID of the registered artifact
- Return type:
UUID