pandas = "^2.2"
diff --git a/search/search_index.json b/search/search_index.json
index 489945a..461bdf4 100644
--- a/search/search_index.json
+++ b/search/search_index.json
@@ -1 +1 @@
-{"config":{"lang":["en"],"separator":"[\\s\\-]+","pipeline":["stopWordFilter"]},"docs":[{"location":"","title":"Overview","text":"ipybox
is a lightweight, stateful and secure Python code execution sandbox built with IPython and Docker. Designed for AI agents that interact with their environment through code execution, it is also well-suited for general-purpose code execution. ipybox
is fully open-source and free to use, distributed under the Apache 2.0 license.
"},{"location":"#features","title":"Features","text":" - Secure Execution: Executes code in isolated Docker containers, preventing unauthorized access to the host system
- Stateful Execution: Maintains variable and session state across commands using IPython kernels
- Real-Time Output Streaming: Provides immediate feedback through direct output streaming
- Enhanced Plotting Support: Enables downloading of plots created with Matplotlib and other visualization libraries
- Flexible Dependency Management: Supports package installation and updates during runtime or at build time
- Resource Management: Controls container lifecycle with built-in timeout and resource management features
- Reproducible Environments: Ensures consistent execution environments across different systems
"},{"location":"#status","title":"Status","text":"ipybox
is in active early development, with ongoing refinements and enhancements to its core features. Community feedback and contributions are welcome as we continue to evolve the project.
"},{"location":"installation/","title":"Installation","text":"pip install ipybox\n
"},{"location":"installation/#docker-image","title":"Docker image","text":"Before using ipybox
, you need to build a Docker image. This image contains all required dependencies for executing Python code in stateful and isolated sessions.
Note
Building an ipybox
Docker image requires Docker to be installed on your system. Containers created from this image will run with the same user and group IDs as the user who built the image, ensuring proper file permissions on mounted host directories.
"},{"location":"installation/#default-build","title":"Default build","text":"To build an ipybox
Docker image with default settings:
python -m ipybox build\n
This creates a Docker image tagged as gradion-ai/ipybox
containing the base Python dependencies required for the code execution environment.
"},{"location":"installation/#custom-build","title":"Custom build","text":"To create a custom ipybox
Docker image with additional dependencies, create a dependencies file (e.g., dependencies.txt
). For example:
dependencies.txtpandas = \"^2.2\"\nscikit-learn = \"^1.5\"\nmatplotlib = \"^3.9\"\n
Then build the image with a custom tag and dependencies:
python -m ipybox build -t my-box:v1 -d path/to/dependencies.txt\n
The dependencies file should use the Poetry dependency specification format. These packages will be installed alongside the base dependencies required for the execution environment. You can also install additional dependencies at runtime.
"},{"location":"usage/","title":"Usage","text":"The two main classes of the ipybox
package are ExecutionContainer
and ExecutionClient
.
Note
Runnable scripts of the source code on this page are available in the examples directory.
"},{"location":"usage/#basic-usage","title":"Basic usage","text":"For executing code in ipybox
you first need to create a Docker container from an ipybox
Docker image and then an IPython kernel running in that container. This is done with the ExecutionContainer
and the ExecutionClient
context managers.
from ipybox import ExecutionClient, ExecutionContainer\n\n\nasync with ExecutionContainer(tag=\"gradion-ai/ipybox\") as container: # (1)!\n async with ExecutionClient(port=container.port) as client: # (2)!\n result = await client.execute(\"print('Hello, world!')\") # (3)!\n print(f\"Output: {result.text}\") # (4)!\n
- Create and start a container for code execution
- Create and connect to an IPython kernel
- Execute Python code and await the result
- Output:
Hello, world!
The default image used by ExecutionContainer
is gradion-ai/ipybox
. You can specify a custom image with the tag
argument like in ExecutionContainer(tag=\"my-box:v1\")
, for example.
Note
Instead of letting the ExecutionContainer
context manager handle the lifecycle of the container, you can also manually run and kill the container.
"},{"location":"usage/#state-management","title":"State management","text":"Code execution within the same client
context is stateful i.e. you can reference variables from previous executions. Code executions in different client contexts are isolated from each other:
async with ExecutionContainer() as container:\n async with ExecutionClient(port=container.port) as client_1: # (1)!\n result = await client_1.execute(\"x = 1\") # (2)!\n assert result.text is None\n result = await client_1.execute(\"print(x)\") # (3)!\n assert result.text == \"1\"\n\n async with ExecutionClient(port=container.port) as client_2: # (4)!\n try:\n await client_2.execute(\"print(x)\") # (5)!\n except ExecutionError as e:\n assert e.args[0] == \"NameError: name 'x' is not defined\"\n
- First client context
- Execute code that defines variable x
- Reference variable x defined in previous execution
- Second client context
- Variable x is not defined in
client_2
context
"},{"location":"usage/#output-streaming","title":"Output streaming","text":"The ExecutionClient
supports streaming output as it's generated during code execution:
async with ExecutionContainer() as container:\n async with ExecutionClient(port=container.port) as client:\n code = \"\"\"\n import time\n for i in range(5):\n print(f\"Processing step {i}\")\n time.sleep(1)\n \"\"\" # (1)!\n\n execution = await client.submit(code) # (2)!\n print(\"Streaming output:\")\n async for chunk in execution.stream(): # (3)!\n print(f\"Received output: {chunk.strip()}\") # (4)!\n\n result = await execution.result() # (5)!\n print(\"\\nAggregated output:\")\n print(result.text) # (6)!\n
- Code that produces gradual output
- Submit the code for execution
- Stream the output
- Prints one line per second:
Received output: Processing step 0\nReceived output: Processing step 1\nReceived output: Processing step 2\nReceived output: Processing step 3\nReceived output: Processing step 4\n
- Get the aggregated output as a single result
- Prints the aggregated output:
Aggregated output:\nProcessing step 0\nProcessing step 1\nProcessing step 2\nProcessing step 3\nProcessing step 4\n
The stream()
method accepts an optional timeout
argument (defaults to 120
seconds). In case of timeout, the execution is automatically terminated by interrupting the kernel.
"},{"location":"usage/#installing-dependencies-at-runtime","title":"Installing dependencies at runtime","text":"async with ExecutionContainer() as container:\n async with ExecutionClient(port=container.port) as client:\n execution = await client.submit(\"!pip install einops\") # (1)!\n async for chunk in execution.stream(): # (2)!\n print(chunk, end=\"\", flush=True)\n\n result = await client.execute(\"\"\"\n import einops\n print(einops.__version__)\n \"\"\") # (3)!\n print(f\"Output: {result.text}\") # (4)!\n
- Install the
einops
package using pip - Stream the installation progress. Something like
Collecting einops\nDownloading einops-0.8.0-py3-none-any.whl (10.0 kB)\nInstalling collected packages: einops\nSuccessfully installed einops-0.8.0\n
- Import and use the installed package
- Prints
Output: 0.8.0
You can also install and use a package within a single execution. There's no need to have two separate executions as done in the example above.
"},{"location":"usage/#creating-and-returning-plots","title":"Creating and returning plots","text":"Plots created with matplotlib
or other libraries are returned as PIL images. Images are not part of the output stream, but are available as images
list in the result
object.
async with ExecutionContainer() as container:\n async with ExecutionClient(port=container.port) as client:\n execution = await client.submit(\"\"\"\n !pip install matplotlib\n\n import matplotlib.pyplot as plt\n import numpy as np\n\n x = np.linspace(0, 10, 100)\n plt.figure(figsize=(8, 6))\n plt.plot(x, np.sin(x))\n plt.title('Sine Wave')\n plt.show()\n\n print(\"Plot generation complete!\")\n \"\"\") # (1)!\n\n async for chunk in execution.stream(): # (2)!\n print(chunk, end=\"\", flush=True)\n\n result = await execution.result()\n result.images[0].save(\"sine.png\") # (3)!\n
- Install
matplotlib
and generate a plot - Stream output text (installation progress and
print
statement) - Get attached image from execution result and save it as sine.png
"},{"location":"usage/#bind-mounts","title":"Bind mounts","text":"Bind mounts allow executed code to read and write files on the host machine.
await aiofiles.os.makedirs(\"data\", exist_ok=True)\nawait aiofiles.os.makedirs(\"output\", exist_ok=True)\n\nbinds = { # (1)!\n \"./data\": \"data\", # (2)!\n \"./output\": \"output\", # (3)!\n}\n\nasync with aiofiles.open(\"data/input.txt\", \"w\") as f:\n await f.write(\"hello world\")\n\nasync with ExecutionContainer(binds=binds) as container:\n async with ExecutionClient(port=container.port) as client:\n await client.execute(\"\"\"\n with open('data/input.txt') as f:\n data = f.read()\n\n processed = data.upper()\n\n with open('output/result.txt', 'w') as f:\n f.write(processed)\n \"\"\") # (4)!\n\nasync with aiofiles.open(\"output/result.txt\", \"r\") as f: # (5)!\n result = await f.read()\n assert result == \"HELLO WORLD\"\n
- Map host paths to container paths.
- For reading files from host.
- For writing files to host.
- Read from mounted
data
directory, convert to uppercase and write to mounted output
directory - Verify the results on host
"},{"location":"usage/#environment-variables","title":"Environment variables","text":"Environment variables can be set on the container for passing secrets or configuration data, for example.
# Define environment variables for the container\nenv = {\"API_KEY\": \"secret-key-123\", \"DEBUG\": \"1\"} # (1)!\n\nasync with ExecutionContainer(env=env) as container:\n async with ExecutionClient(port=container.port) as client:\n result = await client.execute(\"\"\"\n import os\n\n api_key = os.environ['API_KEY']\n print(f\"Using API key: {api_key}\")\n\n debug = bool(int(os.environ.get('DEBUG', '0')))\n if debug:\n print(\"Debug mode enabled\")\n \"\"\") # (2)!\n print(result.text) # (3)!\n
- Define environment variables for the container
- Access environment variables in executed code
- Prints
Using API key: secret-key-123\nDebug mode enabled\n
"},{"location":"usage/#manual-container-lifecycle-management","title":"Manual container lifecycle management","text":"Instead of using ExecutionContainer
as a context manager, you can also manually run()
and kill()
the container. This is useful for running the container on a separate host listening to a user-defined host port (e.g. 7777
in the example below).
container = ExecutionContainer(port=7777) # (1)!\nawait container.run() # (2)!\nassert container.port == 7777\n\n# do some work ...\n\nawait container.kill() # (3)!\n
- Create an
ExecutionContainer
instance using a fixed port. - Run the container (detached).
- Cleanup.
"},{"location":"api/execution_client/","title":"ExecutionClient","text":""},{"location":"api/execution_client/#ipybox.executor.ExecutionClient","title":"ExecutionClient","text":"ExecutionClient(port: int, host: str = 'localhost', heartbeat_interval: float = 10)\n
A context manager for executing code in an IPython kernel.
Parameters:
Name Type Description Default host
str
Hostname where the code execution container is running
'localhost'
port
int
Host port of the code execution container
required heartbeat_interval
float
Interval in seconds between heartbeat pings. Defaults to 10.
10
Example from ipybox import ExecutionClient, ExecutionContainer\n\nbinds = {\"/host/path\": \"example/path\"}\nenv = {\"API_KEY\": \"secret\"}\n\nasync with ExecutionContainer(binds=binds, env=env) as container:\n async with ExecutionClient(host=\"localhost\", port=container.port) as client:\n result = await client.execute(\"print('Hello, world!')\")\n print(result.text)\n
Hello, world!
Source code in ipybox/executor.py
def __init__(self, port: int, host: str = \"localhost\", heartbeat_interval: float = 10):\n self.port = port\n self.host = host\n\n self._heartbeat_interval = heartbeat_interval\n self._heartbeat_callback = None\n\n self._kernel_id = None\n self._ws: WebSocketClientConnection\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.kernel_id","title":"kernel_id property
","text":"kernel_id\n
The ID of the running IPython kernel.
Raises:
Type Description ValueError
If not connected to a kernel
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.connect","title":"connect async
","text":"connect(retries: int = 10, retry_interval: float = 1.0)\n
Creates and connects to an IPython kernel.
Parameters:
Name Type Description Default retries
int
Number of connection attempts. Defaults to 10.
10
retry_interval
float
Delay between retries in seconds. Defaults to 1.0.
1.0
Raises:
Type Description ConnectionError
If connection cannot be established after all retries
Source code in ipybox/executor.py
async def connect(self, retries: int = 10, retry_interval: float = 1.0):\n \"\"\"Creates and connects to an IPython kernel.\n\n Args:\n retries: Number of connection attempts. Defaults to 10.\n retry_interval: Delay between retries in seconds. Defaults to 1.0.\n\n Raises:\n ConnectionError: If connection cannot be established after all retries\n \"\"\"\n for _ in range(retries):\n try:\n self._kernel_id = await self._create_kernel()\n break\n except Exception:\n await asyncio.sleep(retry_interval)\n else:\n raise ConnectionError(\"Failed to create kernel\")\n\n self._ws = await websocket_connect(HTTPRequest(url=self.kernel_ws_url))\n logger.info(\"Connected to kernel\")\n\n self.heartbeat_callback = PeriodicCallback(self._ping_kernel, self._heartbeat_interval * 1000)\n self.heartbeat_callback.start()\n logger.info(f\"Started heartbeat (interval = {self._heartbeat_interval}s)\")\n\n await self._init_kernel()\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.disconnect","title":"disconnect async
","text":"disconnect()\n
Closes the connection to the kernel and cleans up resources.
Source code in ipybox/executor.py
async def disconnect(self):\n \"\"\"Closes the connection to the kernel and cleans up resources.\"\"\"\n self.heartbeat_callback.stop()\n self._ws.close()\n async with aiohttp.ClientSession() as session:\n async with session.delete(self.kernel_http_url):\n pass\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.execute","title":"execute async
","text":"execute(code: str, timeout: float = 120) -> ExecutionResult\n
Executes code and returns the result.
Parameters:
Name Type Description Default code
str
Code to execute
required timeout
float
Maximum execution time in seconds. Defaults to 120.
120
Returns:
Type Description ExecutionResult
ExecutionResult object
Raises:
Type Description ExecutionError
If code execution raised an error
TimeoutError
If execution exceeds timeout duration
Source code in ipybox/executor.py
async def execute(self, code: str, timeout: float = 120) -> ExecutionResult:\n \"\"\"Executes code and returns the result.\n\n Args:\n code: Code to execute\n timeout: Maximum execution time in seconds. Defaults to 120.\n\n Returns:\n ExecutionResult object\n\n Raises:\n ExecutionError: If code execution raised an error\n asyncio.TimeoutError: If execution exceeds timeout duration\n \"\"\"\n execution = await self.submit(code)\n return await execution.result(timeout=timeout)\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.submit","title":"submit async
","text":"submit(code: str) -> Execution\n
Submits code for execution and returns an Execution object to track it.
Parameters:
Name Type Description Default code
str
Python code to execute
required Returns:
Type Description Execution
An Execution object to track the submitted code execution
Source code in ipybox/executor.py
async def submit(self, code: str) -> Execution:\n \"\"\"Submits code for execution and returns an Execution object to track it.\n\n Args:\n code: Python code to execute\n\n Returns:\n An Execution object to track the submitted code execution\n \"\"\"\n req_id = uuid4().hex\n req = {\n \"header\": {\n \"username\": \"\",\n \"version\": \"5.0\",\n \"session\": \"\",\n \"msg_id\": req_id,\n \"msg_type\": \"execute_request\",\n },\n \"parent_header\": {},\n \"channel\": \"shell\",\n \"content\": {\n \"code\": code,\n \"silent\": False,\n \"store_history\": False,\n \"user_expressions\": {},\n \"allow_stdin\": False,\n },\n \"metadata\": {},\n \"buffers\": {},\n }\n\n await self._send_request(req)\n return Execution(client=self, req_id=req_id)\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionResult","title":"ExecutionResult dataclass
","text":"ExecutionResult(text: str | None, images: list[Image])\n
The result of a code execution.
Parameters:
Name Type Description Default text
str | None
Output text generated during execution
required images
list[Image]
List of images generated during execution
required"},{"location":"api/execution_client/#ipybox.executor.Execution","title":"Execution","text":"Execution(client: ExecutionClient, req_id: str)\n
Represents a code execution in an IPython kernel.
Parameters:
Name Type Description Default client
ExecutionClient
The client instance that created this execution
required req_id
str
Unique identifier for the execution request
required Source code in ipybox/executor.py
def __init__(self, client: \"ExecutionClient\", req_id: str):\n self.client = client\n self.req_id = req_id\n\n self._chunks: list[str] = []\n self._images: list[Image.Image] = []\n\n self._stream_consumed: bool = False\n
"},{"location":"api/execution_client/#ipybox.executor.Execution.result","title":"result async
","text":"result(timeout: float = 120) -> ExecutionResult\n
Waits for execution to complete and returns the final result.
If a timeout is reached, the kernel is interrupted.
Parameters:
Name Type Description Default timeout
float
Maximum time to wait in seconds. Defaults to 120.
120
Returns:
Type Description ExecutionResult
ExecutionResult object
Raises:
Type Description TimeoutError
If execution exceeds timeout duration
Source code in ipybox/executor.py
async def result(self, timeout: float = 120) -> ExecutionResult:\n \"\"\"Waits for execution to complete and returns the final result.\n\n If a timeout is reached, the kernel is interrupted.\n\n Args:\n timeout: Maximum time to wait in seconds. Defaults to 120.\n\n Returns:\n ExecutionResult object\n\n Raises:\n asyncio.TimeoutError: If execution exceeds timeout duration\n \"\"\"\n if not self._stream_consumed:\n async for _ in self.stream(timeout=timeout):\n pass\n\n return ExecutionResult(\n text=\"\".join(self._chunks).strip() if self._chunks else None,\n images=self._images,\n )\n
"},{"location":"api/execution_client/#ipybox.executor.Execution.stream","title":"stream async
","text":"stream(timeout: float = 120) -> AsyncIterator[str]\n
Streams the execution output text as it becomes available.
Parameters:
Name Type Description Default timeout
float
Maximum time to wait in seconds. Defaults to 120.
120
Yields:
Type Description AsyncIterator[str]
Output text chunks as they arrive
Raises:
Type Description TimeoutError
If execution exceeds timeout duration
Source code in ipybox/executor.py
async def stream(self, timeout: float = 120) -> AsyncIterator[str]:\n \"\"\"Streams the execution output text as it becomes available.\n\n Args:\n timeout: Maximum time to wait in seconds. Defaults to 120.\n\n Yields:\n Output text chunks as they arrive\n\n Raises:\n asyncio.TimeoutError: If execution exceeds timeout duration\n \"\"\"\n try:\n async with asyncio.timeout(timeout):\n async for elem in self._stream():\n match elem:\n case str():\n self._chunks.append(elem)\n yield elem\n case Image.Image():\n self._images.append(elem)\n except asyncio.TimeoutError:\n await self.client._interrupt_kernel()\n await asyncio.sleep(0.2) # TODO: make configurable\n raise\n finally:\n self._stream_consumed = True\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionError","title":"ExecutionError","text":"ExecutionError(message: str, trace: str | None = None)\n
Bases: Exception
Exception raised when code execution in the IPython kernel fails.
Parameters:
Name Type Description Default message
str
Error message
required trace
str | None
Stack trace string representation
None
Source code in ipybox/executor.py
def __init__(self, message: str, trace: str | None = None):\n super().__init__(message)\n self.trace = trace\n
"},{"location":"api/execution_client/#ipybox.executor.ConnectionError","title":"ConnectionError","text":" Bases: Exception
Exception raised when connection to an IPython kernel fails.
"},{"location":"api/execution_container/","title":"ExecutionContainer","text":"A context manager for managing the lifecycle of a Docker container used for code execution.
It handles the creation, port mapping, volume binding, and cleanup of the container.
Parameters:
Name Type Description Default tag
str
Tag of the Docker image to use (defaults to gradion-ai/ipybox)
DEFAULT_TAG
binds
dict[str, str] | None
Mapping of host paths to container paths for volume mounting. Host paths may be relative or absolute. Container paths must be relative and are created as subdirectories of /home/appuser
in the container.
None
env
dict[str, str] | None
Environment variables to set in the container
None
port
int | None
Host port to map to the container's executor port. If not provided, a random port will be allocated.
None
Attributes:
Name Type Description port
int
Host port mapped to the container's executor port. This port is dynamically allocated when the container is started.
Example from ipybox import ExecutionClient, ExecutionContainer\n\nbinds = {\"/host/path\": \"example/path\"}\nenv = {\"API_KEY\": \"secret\"}\n\nasync with ExecutionContainer(binds=binds, env=env) as container:\n async with ExecutionClient(host=\"localhost\", port=container.port) as client:\n result = await client.execute(\"print('Hello, world!')\")\n print(result.text)\n
Hello, world!
Source code in ipybox/container.py
def __init__(\n self,\n tag: str = DEFAULT_TAG,\n binds: dict[str, str] | None = None,\n env: dict[str, str] | None = None,\n port: int | None = None,\n):\n self.tag = tag\n self.binds = binds or {}\n self.env = env or {}\n\n self._docker = None\n self._container = None\n self._port = port\n
"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.port","title":"port property
","text":"port: int\n
The host port mapped to the container's executor port.
This port is dynamically allocated when the container is started unless explicitly provided.
Raises:
Type Description RuntimeError
If the container is not running
"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.kill","title":"kill async
","text":"kill()\n
Kill and remove the Docker container.
Source code in ipybox/container.py
async def kill(self):\n \"\"\"\n Kill and remove the Docker container.\n \"\"\"\n if self._container:\n await self._container.kill()\n\n if self._docker:\n await self._docker.close()\n
"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.run","title":"run async
","text":"run()\n
Create and start the Docker container.
Source code in ipybox/container.py
async def run(self):\n \"\"\"\n Create and start the Docker container.\n \"\"\"\n self._docker = Docker()\n self._container = await self._run()\n
"}]}
\ No newline at end of file
+{"config":{"lang":["en"],"separator":"[\\s\\-]+","pipeline":["stopWordFilter"]},"docs":[{"location":"","title":"Overview","text":"ipybox
is a lightweight, stateful and secure Python code execution sandbox built with IPython and Docker. Designed for AI agents that interact with their environment through code execution, it is also well-suited for general-purpose code execution. ipybox
is fully open-source and free to use, distributed under the Apache 2.0 license.
"},{"location":"#features","title":"Features","text":" - Secure Execution: Executes code in isolated Docker containers, preventing unauthorized access to the host system
- Stateful Execution: Maintains variable and session state across commands using IPython kernels
- Real-Time Output Streaming: Provides immediate feedback through direct output streaming
- Enhanced Plotting Support: Enables downloading of plots created with Matplotlib and other visualization libraries
- Flexible Dependency Management: Supports package installation and updates during runtime or at build time
- Resource Management: Controls container lifecycle with built-in timeout and resource management features
- Reproducible Environments: Ensures consistent execution environments across different systems
"},{"location":"installation/","title":"Installation","text":"pip install ipybox\n
"},{"location":"installation/#docker-image","title":"Docker image","text":"Before using ipybox
, you need to build a Docker image. This image contains all required dependencies for executing Python code in stateful and isolated sessions.
"},{"location":"installation/#default-build","title":"Default build","text":"To build an ipybox
Docker image with default settings:
python -m ipybox build\n
This creates a Docker image tagged as gradion-ai/ipybox
containing the base Python dependencies required for the code execution environment.
Note
By default, containers created from this image will run with the same user and group IDs as the user who built the image, ensuring proper file permissions on mounted host directories. If you use the -r
or --root
option when building the image, the container will run as root.
"},{"location":"installation/#custom-build","title":"Custom build","text":"To create a custom ipybox
Docker image with additional dependencies, create a dependencies file (e.g., dependencies.txt
). For example:
dependencies.txtpandas = \"^2.2\"\nscikit-learn = \"^1.5\"\nmatplotlib = \"^3.9\"\n
Then build the image with a custom tag and dependencies:
python -m ipybox build -t my-box:v1 -d path/to/dependencies.txt\n
The dependencies file should use the Poetry dependency specification format. These packages will be installed alongside the base dependencies required for the execution environment. You can also install additional dependencies at runtime.
"},{"location":"usage/","title":"Usage","text":"The two main classes of the ipybox
package are ExecutionContainer
and ExecutionClient
.
Note
Runnable scripts of the source code on this page are available in the examples directory.
"},{"location":"usage/#basic-usage","title":"Basic usage","text":"For executing code in ipybox
you first need to create a Docker container from an ipybox
Docker image and then an IPython kernel running in that container. This is done with the ExecutionContainer
and the ExecutionClient
context managers.
from ipybox import ExecutionClient, ExecutionContainer\n\n\nasync with ExecutionContainer(tag=\"gradion-ai/ipybox\") as container: # (1)!\n async with ExecutionClient(port=container.port) as client: # (2)!\n result = await client.execute(\"print('Hello, world!')\") # (3)!\n print(f\"Output: {result.text}\") # (4)!\n
- Create and start a container for code execution
- Create and connect to an IPython kernel
- Execute Python code and await the result
- Output:
Hello, world!
The default image used by ExecutionContainer
is gradion-ai/ipybox
. You can specify a custom image with the tag
argument like in ExecutionContainer(tag=\"my-box:v1\")
, for example.
Note
Instead of letting the ExecutionContainer
context manager handle the lifecycle of the container, you can also manually manage the container lifecycle.
"},{"location":"usage/#state-management","title":"State management","text":"Code execution within the same client
context is stateful i.e. you can reference variables from previous executions. Code executions in different client contexts are isolated from each other:
async with ExecutionContainer() as container:\n async with ExecutionClient(port=container.port) as client_1: # (1)!\n result = await client_1.execute(\"x = 1\") # (2)!\n assert result.text is None\n result = await client_1.execute(\"print(x)\") # (3)!\n assert result.text == \"1\"\n\n async with ExecutionClient(port=container.port) as client_2: # (4)!\n try:\n await client_2.execute(\"print(x)\") # (5)!\n except ExecutionError as e:\n assert e.args[0] == \"NameError: name 'x' is not defined\"\n
- First client context
- Execute code that defines variable x
- Reference variable x defined in previous execution
- Second client context
- Variable x is not defined in
client_2
context
"},{"location":"usage/#output-streaming","title":"Output streaming","text":"The ExecutionClient
supports streaming output as it's generated during code execution:
async with ExecutionContainer() as container:\n async with ExecutionClient(port=container.port) as client:\n code = \"\"\"\n import time\n for i in range(5):\n print(f\"Processing step {i}\")\n time.sleep(1)\n \"\"\" # (1)!\n\n execution = await client.submit(code) # (2)!\n print(\"Streaming output:\")\n async for chunk in execution.stream(): # (3)!\n print(f\"Received output: {chunk.strip()}\") # (4)!\n\n result = await execution.result() # (5)!\n print(\"\\nAggregated output:\")\n print(result.text) # (6)!\n
- Code that produces gradual output
- Submit the code for execution
- Stream the output
- Prints one line per second:
Received output: Processing step 0\nReceived output: Processing step 1\nReceived output: Processing step 2\nReceived output: Processing step 3\nReceived output: Processing step 4\n
- Get the aggregated output as a single result
- Prints the aggregated output:
Aggregated output:\nProcessing step 0\nProcessing step 1\nProcessing step 2\nProcessing step 3\nProcessing step 4\n
The stream()
method accepts an optional timeout
argument (defaults to 120
seconds). In case of timeout, the execution is automatically terminated by interrupting the kernel.
"},{"location":"usage/#installing-dependencies-at-runtime","title":"Installing dependencies at runtime","text":"async with ExecutionContainer() as container:\n async with ExecutionClient(port=container.port) as client:\n execution = await client.submit(\"!pip install einops\") # (1)!\n async for chunk in execution.stream(): # (2)!\n print(chunk, end=\"\", flush=True)\n\n result = await client.execute(\"\"\"\n import einops\n print(einops.__version__)\n \"\"\") # (3)!\n print(f\"Output: {result.text}\") # (4)!\n
- Install the
einops
package using pip - Stream the installation progress. Something like
Collecting einops\nDownloading einops-0.8.0-py3-none-any.whl (10.0 kB)\nInstalling collected packages: einops\nSuccessfully installed einops-0.8.0\n
- Import and use the installed package
- Prints
Output: 0.8.0
You can also install and use a package within a single execution. There's no need to have two separate executions as done in the example above.
"},{"location":"usage/#creating-and-returning-plots","title":"Creating and returning plots","text":"Plots created with matplotlib
or other libraries are returned as PIL images. Images are not part of the output stream, but are available as images
list in the result
object.
async with ExecutionContainer() as container:\n async with ExecutionClient(port=container.port) as client:\n execution = await client.submit(\"\"\"\n !pip install matplotlib\n\n import matplotlib.pyplot as plt\n import numpy as np\n\n x = np.linspace(0, 10, 100)\n plt.figure(figsize=(8, 6))\n plt.plot(x, np.sin(x))\n plt.title('Sine Wave')\n plt.show()\n\n print(\"Plot generation complete!\")\n \"\"\") # (1)!\n\n async for chunk in execution.stream(): # (2)!\n print(chunk, end=\"\", flush=True)\n\n result = await execution.result()\n result.images[0].save(\"sine.png\") # (3)!\n
- Install
matplotlib
and generate a plot - Stream output text (installation progress and
print
statement) - Get attached image from execution result and save it as sine.png
"},{"location":"usage/#bind-mounts","title":"Bind mounts","text":"Bind mounts allow executed code to read and write files on the host machine.
await aiofiles.os.makedirs(\"data\", exist_ok=True)\nawait aiofiles.os.makedirs(\"output\", exist_ok=True)\n\nbinds = { # (1)!\n \"./data\": \"data\", # (2)!\n \"./output\": \"output\", # (3)!\n}\n\nasync with aiofiles.open(\"data/input.txt\", \"w\") as f:\n await f.write(\"hello world\")\n\nasync with ExecutionContainer(binds=binds) as container:\n async with ExecutionClient(port=container.port) as client:\n await client.execute(\"\"\"\n with open('data/input.txt') as f:\n data = f.read()\n\n processed = data.upper()\n\n with open('output/result.txt', 'w') as f:\n f.write(processed)\n \"\"\") # (4)!\n\nasync with aiofiles.open(\"output/result.txt\", \"r\") as f: # (5)!\n result = await f.read()\n assert result == \"HELLO WORLD\"\n
- Map host paths to container paths.
- For reading files from host.
- For writing files to host.
- Read from mounted
data
directory, convert to uppercase and write to mounted output
directory - Verify the results on host
"},{"location":"usage/#environment-variables","title":"Environment variables","text":"Environment variables can be set on the container for passing secrets or configuration data, for example.
# Define environment variables for the container\nenv = {\"API_KEY\": \"secret-key-123\", \"DEBUG\": \"1\"} # (1)!\n\nasync with ExecutionContainer(env=env) as container:\n async with ExecutionClient(port=container.port) as client:\n result = await client.execute(\"\"\"\n import os\n\n api_key = os.environ['API_KEY']\n print(f\"Using API key: {api_key}\")\n\n debug = bool(int(os.environ.get('DEBUG', '0')))\n if debug:\n print(\"Debug mode enabled\")\n \"\"\") # (2)!\n print(result.text) # (3)!\n
- Define environment variables for the container
- Access environment variables in executed code
- Prints
Using API key: secret-key-123\nDebug mode enabled\n
"},{"location":"usage/#manual-container-lifecycle-management","title":"Manual container lifecycle management","text":"Instead of using ExecutionContainer
as a context manager, you can also manually run()
and kill()
the container. This is useful for running the container on a separate host listening to a user-defined host port (e.g. 7777
in the example below).
container = ExecutionContainer(port=7777) # (1)!\nawait container.run() # (2)!\nassert container.port == 7777\n\n# do some work ...\n\nawait container.kill() # (3)!\n
- Create an
ExecutionContainer
instance using a fixed port. - Run the container (detached).
- Cleanup.
"},{"location":"api/execution_client/","title":"ExecutionClient","text":""},{"location":"api/execution_client/#ipybox.executor.ExecutionClient","title":"ExecutionClient","text":"ExecutionClient(port: int, host: str = 'localhost', heartbeat_interval: float = 10)\n
A context manager for executing code in an IPython kernel.
Parameters:
Name Type Description Default host
str
Hostname where the code execution container is running
'localhost'
port
int
Host port of the code execution container
required heartbeat_interval
float
Interval in seconds between heartbeat pings. Defaults to 10.
10
Example from ipybox import ExecutionClient, ExecutionContainer\n\nbinds = {\"/host/path\": \"example/path\"}\nenv = {\"API_KEY\": \"secret\"}\n\nasync with ExecutionContainer(binds=binds, env=env) as container:\n async with ExecutionClient(host=\"localhost\", port=container.port) as client:\n result = await client.execute(\"print('Hello, world!')\")\n print(result.text)\n
Hello, world!
Source code in ipybox/executor.py
def __init__(self, port: int, host: str = \"localhost\", heartbeat_interval: float = 10):\n self.port = port\n self.host = host\n\n self._heartbeat_interval = heartbeat_interval\n self._heartbeat_callback = None\n\n self._kernel_id = None\n self._ws: WebSocketClientConnection\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.kernel_id","title":"kernel_id property
","text":"kernel_id\n
The ID of the running IPython kernel.
Raises:
Type Description ValueError
If not connected to a kernel
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.connect","title":"connect async
","text":"connect(retries: int = 10, retry_interval: float = 1.0)\n
Creates and connects to an IPython kernel.
Parameters:
Name Type Description Default retries
int
Number of connection attempts. Defaults to 10.
10
retry_interval
float
Delay between retries in seconds. Defaults to 1.0.
1.0
Raises:
Type Description ConnectionError
If connection cannot be established after all retries
Source code in ipybox/executor.py
async def connect(self, retries: int = 10, retry_interval: float = 1.0):\n \"\"\"Creates and connects to an IPython kernel.\n\n Args:\n retries: Number of connection attempts. Defaults to 10.\n retry_interval: Delay between retries in seconds. Defaults to 1.0.\n\n Raises:\n ConnectionError: If connection cannot be established after all retries\n \"\"\"\n for _ in range(retries):\n try:\n self._kernel_id = await self._create_kernel()\n break\n except Exception:\n await asyncio.sleep(retry_interval)\n else:\n raise ConnectionError(\"Failed to create kernel\")\n\n self._ws = await websocket_connect(HTTPRequest(url=self.kernel_ws_url))\n logger.info(\"Connected to kernel\")\n\n self.heartbeat_callback = PeriodicCallback(self._ping_kernel, self._heartbeat_interval * 1000)\n self.heartbeat_callback.start()\n logger.info(f\"Started heartbeat (interval = {self._heartbeat_interval}s)\")\n\n await self._init_kernel()\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.disconnect","title":"disconnect async
","text":"disconnect()\n
Closes the connection to the kernel and cleans up resources.
Source code in ipybox/executor.py
async def disconnect(self):\n \"\"\"Closes the connection to the kernel and cleans up resources.\"\"\"\n self.heartbeat_callback.stop()\n self._ws.close()\n async with aiohttp.ClientSession() as session:\n async with session.delete(self.kernel_http_url):\n pass\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.execute","title":"execute async
","text":"execute(code: str, timeout: float = 120) -> ExecutionResult\n
Executes code and returns the result.
Parameters:
Name Type Description Default code
str
Code to execute
required timeout
float
Maximum execution time in seconds. Defaults to 120.
120
Returns:
Type Description ExecutionResult
ExecutionResult object
Raises:
Type Description ExecutionError
If code execution raised an error
TimeoutError
If execution exceeds timeout duration
Source code in ipybox/executor.py
async def execute(self, code: str, timeout: float = 120) -> ExecutionResult:\n \"\"\"Executes code and returns the result.\n\n Args:\n code: Code to execute\n timeout: Maximum execution time in seconds. Defaults to 120.\n\n Returns:\n ExecutionResult object\n\n Raises:\n ExecutionError: If code execution raised an error\n asyncio.TimeoutError: If execution exceeds timeout duration\n \"\"\"\n execution = await self.submit(code)\n return await execution.result(timeout=timeout)\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionClient.submit","title":"submit async
","text":"submit(code: str) -> Execution\n
Submits code for execution and returns an Execution object to track it.
Parameters:
Name Type Description Default code
str
Python code to execute
required Returns:
Type Description Execution
An Execution object to track the submitted code execution
Source code in ipybox/executor.py
async def submit(self, code: str) -> Execution:\n \"\"\"Submits code for execution and returns an Execution object to track it.\n\n Args:\n code: Python code to execute\n\n Returns:\n An Execution object to track the submitted code execution\n \"\"\"\n req_id = uuid4().hex\n req = {\n \"header\": {\n \"username\": \"\",\n \"version\": \"5.0\",\n \"session\": \"\",\n \"msg_id\": req_id,\n \"msg_type\": \"execute_request\",\n },\n \"parent_header\": {},\n \"channel\": \"shell\",\n \"content\": {\n \"code\": code,\n \"silent\": False,\n \"store_history\": False,\n \"user_expressions\": {},\n \"allow_stdin\": False,\n },\n \"metadata\": {},\n \"buffers\": {},\n }\n\n await self._send_request(req)\n return Execution(client=self, req_id=req_id)\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionResult","title":"ExecutionResult dataclass
","text":"ExecutionResult(text: str | None, images: list[Image])\n
The result of a code execution.
Parameters:
Name Type Description Default text
str | None
Output text generated during execution
required images
list[Image]
List of images generated during execution
required"},{"location":"api/execution_client/#ipybox.executor.Execution","title":"Execution","text":"Execution(client: ExecutionClient, req_id: str)\n
Represents a code execution in an IPython kernel.
Parameters:
Name Type Description Default client
ExecutionClient
The client instance that created this execution
required req_id
str
Unique identifier for the execution request
required Source code in ipybox/executor.py
def __init__(self, client: \"ExecutionClient\", req_id: str):\n self.client = client\n self.req_id = req_id\n\n self._chunks: list[str] = []\n self._images: list[Image.Image] = []\n\n self._stream_consumed: bool = False\n
"},{"location":"api/execution_client/#ipybox.executor.Execution.result","title":"result async
","text":"result(timeout: float = 120) -> ExecutionResult\n
Waits for execution to complete and returns the final result.
If a timeout is reached, the kernel is interrupted.
Parameters:
Name Type Description Default timeout
float
Maximum time to wait in seconds. Defaults to 120.
120
Returns:
Type Description ExecutionResult
ExecutionResult object
Raises:
Type Description TimeoutError
If execution exceeds timeout duration
Source code in ipybox/executor.py
async def result(self, timeout: float = 120) -> ExecutionResult:\n \"\"\"Waits for execution to complete and returns the final result.\n\n If a timeout is reached, the kernel is interrupted.\n\n Args:\n timeout: Maximum time to wait in seconds. Defaults to 120.\n\n Returns:\n ExecutionResult object\n\n Raises:\n asyncio.TimeoutError: If execution exceeds timeout duration\n \"\"\"\n if not self._stream_consumed:\n async for _ in self.stream(timeout=timeout):\n pass\n\n return ExecutionResult(\n text=\"\".join(self._chunks).strip() if self._chunks else None,\n images=self._images,\n )\n
"},{"location":"api/execution_client/#ipybox.executor.Execution.stream","title":"stream async
","text":"stream(timeout: float = 120) -> AsyncIterator[str]\n
Streams the execution output text as it becomes available.
Parameters:
Name Type Description Default timeout
float
Maximum time to wait in seconds. Defaults to 120.
120
Yields:
Type Description AsyncIterator[str]
Output text chunks as they arrive
Raises:
Type Description TimeoutError
If execution exceeds timeout duration
Source code in ipybox/executor.py
async def stream(self, timeout: float = 120) -> AsyncIterator[str]:\n \"\"\"Streams the execution output text as it becomes available.\n\n Args:\n timeout: Maximum time to wait in seconds. Defaults to 120.\n\n Yields:\n Output text chunks as they arrive\n\n Raises:\n asyncio.TimeoutError: If execution exceeds timeout duration\n \"\"\"\n try:\n async with asyncio.timeout(timeout):\n async for elem in self._stream():\n match elem:\n case str():\n self._chunks.append(elem)\n yield elem\n case Image.Image():\n self._images.append(elem)\n except asyncio.TimeoutError:\n await self.client._interrupt_kernel()\n await asyncio.sleep(0.2) # TODO: make configurable\n raise\n finally:\n self._stream_consumed = True\n
"},{"location":"api/execution_client/#ipybox.executor.ExecutionError","title":"ExecutionError","text":"ExecutionError(message: str, trace: str | None = None)\n
Bases: Exception
Exception raised when code execution in the IPython kernel fails.
Parameters:
Name Type Description Default message
str
Error message
required trace
str | None
Stack trace string representation
None
Source code in ipybox/executor.py
def __init__(self, message: str, trace: str | None = None):\n super().__init__(message)\n self.trace = trace\n
"},{"location":"api/execution_client/#ipybox.executor.ConnectionError","title":"ConnectionError","text":" Bases: Exception
Exception raised when connection to an IPython kernel fails.
"},{"location":"api/execution_container/","title":"ExecutionContainer","text":"A context manager for managing the lifecycle of a Docker container used for code execution.
It handles the creation, port mapping, volume binding, and cleanup of the container.
Parameters:
Name Type Description Default tag
str
Tag of the Docker image to use (defaults to gradion-ai/ipybox)
DEFAULT_TAG
binds
dict[str, str] | None
Mapping of host paths to container paths for volume mounting. Host paths may be relative or absolute. Container paths must be relative and are created as subdirectories of /app
in the container.
None
env
dict[str, str] | None
Environment variables to set in the container
None
port
int | None
Host port to map to the container's executor port. If not provided, a random port will be allocated.
None
show_pull_progress
bool
Whether to show progress when pulling the Docker image.
True
Attributes:
Name Type Description port
int
Host port mapped to the container's executor port. This port is dynamically allocated when the container is started.
Example from ipybox import ExecutionClient, ExecutionContainer\n\nbinds = {\"/host/path\": \"example/path\"}\nenv = {\"API_KEY\": \"secret\"}\n\nasync with ExecutionContainer(binds=binds, env=env) as container:\n async with ExecutionClient(host=\"localhost\", port=container.port) as client:\n result = await client.execute(\"print('Hello, world!')\")\n print(result.text)\n
Hello, world!
Source code in ipybox/container.py
def __init__(\n self,\n tag: str = DEFAULT_TAG,\n binds: dict[str, str] | None = None,\n env: dict[str, str] | None = None,\n port: int | None = None,\n show_pull_progress: bool = True,\n):\n self.tag = tag\n self.binds = binds or {}\n self.env = env or {}\n self.show_pull_progress = show_pull_progress\n\n self._docker = None\n self._container = None\n self._port = port\n
"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.port","title":"port property
","text":"port: int\n
The host port mapped to the container's executor port.
This port is dynamically allocated when the container is started unless explicitly provided.
Raises:
Type Description RuntimeError
If the container is not running
"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.kill","title":"kill async
","text":"kill()\n
Kill and remove the Docker container.
Source code in ipybox/container.py
async def kill(self):\n \"\"\"\n Kill and remove the Docker container.\n \"\"\"\n if self._container:\n await self._container.kill()\n\n if self._docker:\n await self._docker.close()\n
"},{"location":"api/execution_container/#ipybox.container.ExecutionContainer.run","title":"run async
","text":"run()\n
Create and start the Docker container.
Source code in ipybox/container.py
async def run(self):\n \"\"\"\n Create and start the Docker container.\n \"\"\"\n self._docker = Docker()\n self._container = await self._run()\n
"}]}
\ No newline at end of file
diff --git a/sitemap.xml.gz b/sitemap.xml.gz
index a0203f4..e3e3ae3 100644
Binary files a/sitemap.xml.gz and b/sitemap.xml.gz differ
diff --git a/usage/index.html b/usage/index.html
index da558ea..be064e2 100644
--- a/usage/index.html
+++ b/usage/index.html
@@ -764,7 +764,7 @@ Basic usage
The default image used by ExecutionContainer
is gradion-ai/ipybox
. You can specify a custom image with the tag
argument like in ExecutionContainer(tag="my-box:v1")
, for example.
State management
Code execution within the same client
context is stateful i.e. you can reference variables from previous executions. Code executions in different client contexts are isolated from each other: