{ "cells": [ { "cell_type": "markdown", "id": "efe7f3fe", "metadata": {}, "source": [ "# Building a custom task\n", "This notebook provides a behind the scenes look at the Volara framework, how it works, and how you\n", "can build upon it to process your own data in whatever way you would like. This is also a good\n", "place to start if you are trying to debug a job going wrong." ] }, { "cell_type": "markdown", "id": "60edb39f", "metadata": {}, "source": [ "## Daisy\n", "`volara` is essentially a convenience wrapper around `daisy`, a python library for blockwise\n", "processing. `daisy` is a powerful library that provides the basics of blockwise processing.\n", "[See here](https://funkelab.github.io/daisy/) for details about `daisy`. An understanding of\n", "`daisy` is not required to use `volara`, but will make unstanding and developing using `volara`\n", "easier.\n", "\n", "`volara` provides:\n", "- A simple interface for defining blockwise operations, that then generates the necessary daisy code\n", "- A set of common operations that are often used in volumetric data processing\n", "- Free nice to have features for any task using the `volara` framework:\n", " - Completed block tracking that is both fast, and easily visualized\n", " - Support for running workers on remote machines, making it easy to utilize\n", " slurm, lsf, or any other cluster type. As long as you can create a tcp connection\n", " to the machine you want to run the worker on, it should be possible to support any `volara` task.\n", "\n", "Other tutorials go into more detail about how to use the existing operations, but this tutorial\n", "will focus on how to use `volara` for your own custom operations." ] }, { "cell_type": "markdown", "id": "71d16e63", "metadata": {}, "source": [ "## BlockwiseTask\n", "The `BlockwiseTask` class is the main entry point for defining a blockwise operation.\n", "It is a pydantic class and an ABC that defines the minimum requirements for a blockwise task.\n", "\n", "See [the documentation](https://e11bio.github.io/volara/api.html#volara.blockwise.BlockwiseTask) for details\n", "about the BlockwiseTask class.\n", "\n", "If you subclass `BlockwiseTask`, you must provide at minimum the following fields, abstract methods and properties:\n", "\n", "### fields:\n", "- task_type: A string that identifies the type of task. This is used to deserialize a yaml file\n", " into a task. This is only necessary to override if you want to run a worker on a separate process\n", " or machine. If you are just running locally, you can ignore this.\n", "- fit: See [daisy docs](https://funkelab.github.io/daisy/api.html#daisy.Task) for info on the \"fit\"\n", " field.\n", "- read_write_conflict: See [daisy docs](https://funkelab.github.io/daisy/api.html#daisy.Task) for info\n", " on the \"read_write_conflict\" field.\n", "\n", "### properties:\n", "- task_name: A string that uniquely identifies specific instances of your task. This is used\n", " in the file path to write logs, keep track of progress, and for communication between the\n", " client/server model in `daisy`.\n", "- write_roi: The total output ROI (Region Of Interest) of the task. This is often just ROI of your\n", " input array, but can be different for some tasks. Note that this is expected in *World Units*.\n", "- write_size: The write size of each block processed as part of a task. Note that this is expected\n", " in *World Units*.\n", "- context_size: The amount of context needed to process each block for a task. It can be provided\n", " as a single tuple of context that is added above and below every block, or as a pair of lower and\n", " upper context.\n", "\n", "### methods:\n", "- drop_artifacts: A helper function to reset anything produced by a task to a clean state equivalent\n", " to not having run the task at all\n", "- process_block_func: A constructor for a function that will take a single block as input and process it.\n", " Note that this constructor should be implemented as a context manager, that yields a `process_block` function:\n", " ```python\n", " @contextmanager\n", " def process_block_func(self, block: Block, ...) -> Callable[[Block], None]:\n", " # do any setup that is needed for the worker\n", " def process_block(block: Block, ...) -> None:\n", " # do something with the block\n", " yield process_block\n", " # do any cleanup that is needed for the worker\n", " ```" ] }, { "cell_type": "code", "execution_count": null, "id": "c047c929", "metadata": {}, "outputs": [], "source": [ "import multiprocessing as mp\n", "\n", "mp.set_start_method(\"fork\", force=True)" ] }, { "cell_type": "markdown", "id": "09bb3f4e", "metadata": {}, "source": [ "## Example: Argmax\n", "Lets build the simplest possible argmax task using volara." ] }, { "cell_type": "code", "execution_count": null, "id": "44577a63", "metadata": {}, "outputs": [], "source": [ "# `BlockwiseTask` base class is necessary to use the `volara` framework\n", "import logging\n", "\n", "# `shutil` is used to remove the artifacts of a task\n", "import shutil\n", "\n", "# contextmanager decorator for the process_block_func method\n", "from contextlib import contextmanager\n", "\n", "# `numpy` for generating data\n", "import numpy as np\n", "\n", "# `daisy` splits the task into blocks for processing and passes the blocks to the workers\n", "# Note that blocks only contain read_roi, write_roi, and a unique identifier\n", "from daisy import Block\n", "\n", "# `Coordinate` and `Roi` are used to define points and regions in 3D space\n", "from funlib.geometry import Coordinate, Roi\n", "\n", "# `prepare_ds` and `open_ds` are helper methods to interface with zarr arrays\n", "# with offsets, voxel_sizes, units, and axis types such as \"channel\", \"time\", and \"space\"\n", "from funlib.persistence import open_ds, prepare_ds\n", "\n", "from volara.blockwise import BlockwiseTask\n", "\n", "logging.basicConfig(level=logging.INFO)\n", "\n", "\n", "class Argmax(BlockwiseTask):\n", " \"\"\"\n", " A super simple argmax task\n", " \"\"\"\n", "\n", " # task_type is used to identify the task type. This is only needed if you are\n", " # running the task on a remote machine.\n", " task_type: str = \"argmax\"\n", "\n", " # simple task settings\n", " fit: str = \"shrink\"\n", " read_write_conflict: bool = False\n", "\n", " # There are no inputs, so this is just a constant string\n", " @property\n", " def task_name(self) -> str:\n", " return \"simple-argmax\"\n", "\n", " # We will make a 10x10x10 array with 3 channels. The channels are not included\n", " # Roi since they are not spatially relevant\n", " @property\n", " def write_roi(self) -> Roi:\n", " return Roi((0, 0, 0), (10, 10, 10))\n", "\n", " # We will write chunks of size 5x5x5 at a time. So we will have 8 blocks\n", " @property\n", " def write_size(self) -> Coordinate:\n", " return Coordinate((5, 5, 5))\n", "\n", " # No context is needed for argmax\n", " @property\n", " def context_size(self) -> Coordinate:\n", " return Coordinate((0, 0, 0))\n", "\n", " # We will initialize some input data, and create the output array.\n", " # Most tasks will need an init to define the output of a task. The inputs\n", " # will usually be passed in as a parameter to the task.\n", " def init(self):\n", " in_array = prepare_ds(\n", " f\"{self.task_name}/data.zarr/in_array\",\n", " shape=(3, 10, 10, 10),\n", " chunk_shape=(3, 5, 5, 5),\n", " offset=(0, 0, 0),\n", " )\n", " np.random.seed(0)\n", " in_array[:] = np.random.randint(0, 10, size=in_array.shape)\n", "\n", " prepare_ds(\n", " f\"{self.task_name}/data.zarr/out_array\",\n", " shape=(10, 10, 10),\n", " chunk_shape=(5, 5, 5),\n", " offset=(0, 0, 0),\n", " )\n", "\n", " # make sure that both the input and output arrays are removed if this task\n", " # is dropped.\n", " def drop_artifacts(self):\n", " shutil.rmtree(f\"{self.task_name}/data.zarr/in_array\")\n", " shutil.rmtree(f\"{self.task_name}/data.zarr/out_array\")\n", "\n", " # Input and output arrays are opened in the context manager, the process_block\n", " # function only needs to read and write to those arrays.\n", " @contextmanager\n", " def process_block_func(self):\n", " in_array = open_ds(\n", " f\"{self.task_name}/data.zarr/in_array\",\n", " mode=\"r+\",\n", " )\n", " out_array = open_ds(\n", " f\"{self.task_name}/data.zarr/out_array\",\n", " mode=\"r+\",\n", " )\n", "\n", " def process_block(block: Block) -> None:\n", " in_data = in_array[block.read_roi]\n", " out_data = in_data.argmax(axis=0)\n", " out_array[block.write_roi] = out_data\n", "\n", " yield process_block" ] }, { "cell_type": "markdown", "id": "d94d3b2e", "metadata": {}, "source": [ "### Running the task" ] }, { "cell_type": "code", "execution_count": null, "id": "e6046619", "metadata": {}, "outputs": [], "source": [ "argmax_task = Argmax()\n", "argmax_task.run_blockwise(multiprocessing=False)" ] }, { "cell_type": "markdown", "id": "d723d790", "metadata": { "lines_to_next_cell": 0 }, "source": [ "### Inspecting the results" ] }, { "cell_type": "code", "execution_count": null, "id": "7473cb34", "metadata": {}, "outputs": [], "source": [ "import zarr\n", "\n", "print(zarr.open(f\"{argmax_task.task_name}/data.zarr/in_array\")[:, :, 0, 0])\n", "print(zarr.open(f\"{argmax_task.task_name}/data.zarr/out_array\")[:, 0, 0])" ] }, { "cell_type": "markdown", "id": "1edcc552", "metadata": {}, "source": [ "### What do we get for free?" ] }, { "cell_type": "markdown", "id": "3ad877db", "metadata": { "lines_to_next_cell": 0 }, "source": [ "#### Block done tracking" ] }, { "cell_type": "code", "execution_count": null, "id": "56a2fc15", "metadata": {}, "outputs": [], "source": [ "# Rerunning the same task. All blocks get skipped\n", "argmax_task.run_blockwise(multiprocessing=False)" ] }, { "cell_type": "markdown", "id": "11495f8c", "metadata": {}, "source": [ "#### drop task" ] }, { "cell_type": "code", "execution_count": null, "id": "c681ae8c", "metadata": {}, "outputs": [], "source": [ "# Call `argmax_task.drop()` to reset the task. This calls `drop_artifacts`\n", "# but also removes any logs and block completion tracking.\n", "argmax_task.drop()" ] }, { "cell_type": "markdown", "id": "23215f7e", "metadata": {}, "source": [ "#### Multiprocessing" ] }, { "cell_type": "code", "execution_count": null, "id": "77ff0ae6", "metadata": {}, "outputs": [], "source": [ "# We can run the same job with mulitple workers.\n", "argmax_task = Argmax(num_workers=2)\n", "argmax_task.run_blockwise(multiprocessing=True)" ] }, { "cell_type": "markdown", "id": "306bb679", "metadata": {}, "source": [ "#### Running on a remote machine\n", "\n", "This task is not quite ready to be run on a remote machine, but it is very close.\n", "To run on a remote machine, you need to register the task with `volara` so that we can\n", "deserialize the config files that are passed to the worker, and execute the correct code.\n", "This has to be done automatically based on the environment, so you need put your task in a\n", "pip installable python package. The basic structure of the package is:\n", "```\n", "package-root/\n", "├── volara-argmax-plugin/\n", "│ ├── __init__.py\n", "│ └── argmax.py\n", "└── pyproject.toml\n", "```\n", "The `pyproject.toml` must include the following lines:\n", "```toml\n", "[project.entry-points.\"volara.blockwise_tasks\"]\n", "argmax = \"volara_argmax_plugin.argmax:Argmax\"\n", "```\n", "This will register the task with `volara` so that it can be deserialized and run on a remote machine." ] } ], "metadata": { "jupytext": { "cell_metadata_filter": "-all", "main_language": "python", "notebook_metadata_filter": "-all" } }, "nbformat": 4, "nbformat_minor": 5 }