Skip to content

Commit

Permalink
run_id in the backend
Browse files Browse the repository at this point in the history
  • Loading branch information
mscolnick committed Dec 14, 2024
1 parent 2b3050c commit 1f58e53
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 15 deletions.
118 changes: 118 additions & 0 deletions frontend/src/core/cells/runs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/* Copyright 2024 Marimo. All rights reserved. */
import { createReducerAndAtoms } from "@/utils/createReducer";
import type { CellMessage } from "../kernel/messages";
import type { TypedString } from "@/utils/typed";
import type { CellId } from "./ids";
export type RunId = TypedString<"RunId">;

export interface CellRun {
cellId: CellId;
code: string;
elapsedTime: number;
startTime: number;
status: "success" | "error";
}

export interface Run {
runId: RunId;
cellRuns: CellRun[];
runStartTime: number;
}

export interface RunsState {
runIds: RunId[];
runMap: Map<RunId, Run>;
}

function initialState(): RunsState {
return {
runIds: [],
runMap: new Map(),
};
}

const MAX_RUNS = 100;
const MAX_CODE_LENGTH = 200;

const {
reducer,
createActions,
valueAtom: runsAtom,
useActions: useRunsActions,
} = createReducerAndAtoms(initialState, {
addCellOperation: (
state,
opts: { cellOperation: CellMessage; code: string },
) => {
console.log("addCellOperation", opts);

Check failure on line 47 in frontend/src/core/cells/runs.ts

View workflow job for this annotation

GitHub Actions / 🖥️ Lint, test, build frontend

Unexpected console statement

Check failure on line 47 in frontend/src/core/cells/runs.ts

View workflow job for this annotation

GitHub Actions / 🖥️ Lint, test, build frontend

Unexpected console statement
const { cellOperation, code } = opts;
const runId = cellOperation.run_id as RunId | undefined;
if (!runId) {
return state;
}
let run = state.runMap.get(runId);
if (!run) {
run = {
runId: runId,
cellRuns: [],
runStartTime: cellOperation.timestamp,
};
}

const nextRuns: CellRun[] = [];
let found = false;
for (const cellRun of run.cellRuns) {
if (cellRun.cellId === cellOperation.cell_id) {
nextRuns.push({
...cellRun,
elapsedTime: cellOperation.timestamp - cellRun.startTime,
});
found = true;
} else {
nextRuns.push(cellRun);
}
}
if (!found) {
nextRuns.push({
cellId: cellOperation.cell_id as CellId,
code: code.slice(0, MAX_CODE_LENGTH),
elapsedTime: 0,
// TODO: not actually correct logic
status: cellOperation.status === "idle" ? "success" : "error",
startTime: cellOperation.timestamp,
});
}

const nextRunMap = new Map(state.runMap);
nextRunMap.set(runId, run);

return {
...state,
runIds: [runId, ...state.runIds.slice(0, MAX_RUNS)],
runMap: nextRunMap,
};
},
clearRuns: (state) => ({
...state,
runIds: [],
runMap: new Map(),
}),
removeRun: (state, runId: RunId) => {
const nextRunIds = state.runIds.filter((id) => id !== runId);
const nextRunMap = new Map(state.runMap);
nextRunMap.delete(runId);
return {
...state,
runIds: nextRunIds,
runMap: nextRunMap,
};
},
});

export { runsAtom, useRunsActions };

export const exportedForTesting = {
reducer,
createActions,
initialState,
};
9 changes: 7 additions & 2 deletions frontend/src/core/websocket/useMarimoWebSocket.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { useAtom, useSetAtom } from "jotai";
import { connectionAtom } from "../network/connection";
import { useWebSocket } from "@/core/websocket/useWebSocket";
import { logNever } from "@/utils/assertNever";
import { useCellActions } from "@/core/cells/cells";
import { getNotebook, useCellActions } from "@/core/cells/cells";
import { AUTOCOMPLETER } from "@/core/codemirror/completion/Autocompleter";
import type { OperationMessage } from "@/core/kernel/messages";
import type { CellData } from "../cells/types";
Expand Down Expand Up @@ -40,6 +40,7 @@ import { focusAndScrollCellOutputIntoView } from "../cells/scrollCellIntoView";
import { capabilitiesAtom } from "../config/capabilities";
import { UI_ELEMENT_REGISTRY } from "../dom/uiregistry";
import { reloadSafe } from "@/utils/reload-safe";
import { useRunsActions } from "../cells/runs";

/**
* WebSocket that connects to the Marimo kernel and handles incoming messages.
Expand All @@ -55,6 +56,7 @@ export function useMarimoWebSocket(opts: {
const { showBoundary } = useErrorBoundary();

const { handleCellMessage, setCellCodes, setCellIds } = useCellActions();
const { addCellOperation } = useRunsActions();
const setAppConfig = useSetAppConfig();
const { setVariables, setMetadata } = useVariablesActions();
const { addColumnPreview } = useDatasetsActions();
Expand Down Expand Up @@ -110,9 +112,12 @@ export function useMarimoWebSocket(opts: {
msg.data,
);
return;
case "cell-op":
case "cell-op": {
handleCellOperation(msg.data, handleCellMessage);
const cellData = getNotebook().cellData[msg.data.cell_id as CellId];
addCellOperation({ cellOperation: msg.data, code: cellData.code });
return;
}

case "variables":
setVariables(
Expand Down
23 changes: 23 additions & 0 deletions marimo/_messaging/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import uuid
from contextvars import ContextVar
from dataclasses import dataclass
from typing import Any, Optional

RunId_t = str
RUN_ID_CTX = ContextVar[Optional[RunId_t]]("run_id")


@dataclass
class run_id_context:
"""Context manager for setting and unsetting the run ID."""

run_id: RunId_t

def __init__(self) -> None:
self.run_id = str(uuid.uuid4())

def __enter__(self) -> None:
self.token = RUN_ID_CTX.set(self.run_id)

def __exit__(self, *_: Any) -> None:
RUN_ID_CTX.reset(self.token)
16 changes: 16 additions & 0 deletions marimo/_messaging/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from marimo._dependencies.dependencies import DependencyManager
from marimo._messaging.cell_output import CellChannel, CellOutput
from marimo._messaging.completion_option import CompletionOption
from marimo._messaging.context import RUN_ID_CTX, RunId_t
from marimo._messaging.errors import (
Error,
MarimoInternalError,
Expand Down Expand Up @@ -127,8 +128,23 @@ class CellOp(Op):
console: Optional[Union[CellOutput, List[CellOutput]]] = None
status: Optional[RuntimeStateType] = None
stale_inputs: Optional[bool] = None
run_id: Optional[RunId_t] = None
timestamp: float = field(default_factory=lambda: time.time())

def __post_init__(self) -> None:
try:
self.run_id = RUN_ID_CTX.get()
except LookupError:
# Be specific about the exception we're catching
# The context variable hasn't been set yet
# TODO: where are these warnings coming from?
# good enough to silence for now?
LOGGER.warning("No run_id context found, setting to None")
self.run_id = None
except Exception as e:
LOGGER.error("Error getting run id: %s", str(e))
self.run_id = None

@staticmethod
def maybe_truncate_output(
mimetype: KnownMimeType, data: str
Expand Down
26 changes: 14 additions & 12 deletions marimo/_runtime/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)
from marimo._dependencies.dependencies import DependencyManager
from marimo._messaging.cell_output import CellChannel
from marimo._messaging.context import run_id_context
from marimo._messaging.errors import (
Error,
MarimoInterruptionError,
Expand Down Expand Up @@ -1136,18 +1137,19 @@ def mutate_graph(
async def _run_cells(self, cell_ids: set[CellId_t]) -> None:
"""Run cells and any state updates they trigger"""

# This patch is an attempt to mitigate problems caused by the fact
# that in run mode, kernels run in threads and share the same
# sys.modules. Races can still happen, but this should help in most
# common cases. We could also be more aggressive and run this before
# every cell, or even before pickle.dump/pickle.dumps()
with patches.patch_main_module_context(self._module):
while cell_ids := await self._run_cells_internal(cell_ids):
LOGGER.debug("Running state updates ...")
if self.lazy() and cell_ids:
self.graph.set_stale(cell_ids, prune_imports=True)
break
LOGGER.debug("Finished run.")
with run_id_context():
# This patch is an attempt to mitigate problems caused by the fact
# that in run mode, kernels run in threads and share the same
# sys.modules. Races can still happen, but this should help in most
# common cases. We could also be more aggressive and run this before
# every cell, or even before pickle.dump/pickle.dumps()
with patches.patch_main_module_context(self._module):
while cell_ids := await self._run_cells_internal(cell_ids):
LOGGER.debug("Running state updates ...")
if self.lazy() and cell_ids:
self.graph.set_stale(cell_ids, prune_imports=True)
break
LOGGER.debug("Finished run.")

async def _if_autorun_then_run_cells(
self, cell_ids: set[CellId_t]
Expand Down
17 changes: 16 additions & 1 deletion openapi/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ components:
output:
$ref: '#/components/schemas/CellOutput'
nullable: true
run_id:
nullable: true
type: string
stale_inputs:
nullable: true
type: boolean
Expand Down Expand Up @@ -1953,7 +1956,7 @@ components:
type: object
info:
title: marimo API
version: 0.9.34
version: 0.10.2
openapi: 3.1.0
paths:
/@file/{filename_and_length}:
Expand Down Expand Up @@ -2627,6 +2630,18 @@ paths:
type: string
type: object
description: Get the status of the application
/api/status/connections:
get:
responses:
200:
content:
application/json:
schema:
properties:
active:
type: integer
type: object
description: Get the number of active websocket connections
/api/usage:
get:
responses:
Expand Down
38 changes: 38 additions & 0 deletions openapi/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1900,6 +1900,43 @@ export interface paths {
patch?: never;
trace?: never;
};
"/api/status/connections": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
get: {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Get the number of active websocket connections */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": {
active?: number;
};
};
};
};
};
put?: never;
post?: never;
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/api/usage": {
parameters: {
query?: never;
Expand Down Expand Up @@ -2121,6 +2158,7 @@ export interface components {
/** @enum {string} */
name: "cell-op";
output?: components["schemas"]["CellOutput"];
run_id?: string | null;
stale_inputs?: boolean | null;
status?: components["schemas"]["RuntimeState"];
timestamp: number;
Expand Down

0 comments on commit 1f58e53

Please sign in to comment.