Skip to content

Commit

Permalink
fix: small tracing perf (#3336)
Browse files Browse the repository at this point in the history
- Changes the tracing runs from a map to list for O(1) lookup 
- Only runs the tracing reducer when the feature flag is enabled
  • Loading branch information
mscolnick authored Jan 3, 2025
1 parent 6e89431 commit ff521aa
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 107 deletions.
26 changes: 14 additions & 12 deletions frontend/src/components/tracing/tracing.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -209,17 +209,19 @@ const TraceBlockBody: React.FC<{

const cellIds = useCellIds();

const chartValues: ChartValues[] = run.cellRuns.map((cellRun) => {
const elapsedTime = cellRun.elapsedTime ?? 0;
return {
cell: cellRun.cellId,
cellNum: cellIds.inOrderIds.indexOf(cellRun.cellId),
startTimestamp: formatChartTime(cellRun.startTime),
endTimestamp: formatChartTime(cellRun.startTime + elapsedTime),
elapsedTime: formatElapsedTime(elapsedTime * 1000),
status: cellRun.status,
};
});
const chartValues: ChartValues[] = [...run.cellRuns.values()].map(
(cellRun) => {
const elapsedTime = cellRun.elapsedTime ?? 0;
return {
cell: cellRun.cellId,
cellNum: cellIds.inOrderIds.indexOf(cellRun.cellId),
startTimestamp: formatChartTime(cellRun.startTime),
endTimestamp: formatChartTime(cellRun.startTime + elapsedTime),
elapsedTime: formatElapsedTime(elapsedTime * 1000),
status: cellRun.status,
};
},
);

const hiddenInputElementId = `hiddenInputElement-${run.runId}`;
const vegaSpec = compile(
Expand Down Expand Up @@ -301,7 +303,7 @@ const TraceRows = (props: {
hidden={true}
ref={hiddenInputRef}
/>
{run.cellRuns.map((cellRun) => (
{[...run.cellRuns.values()].map((cellRun) => (
<TraceRow
key={cellRun.cellId}
cellRun={cellRun}
Expand Down
57 changes: 37 additions & 20 deletions frontend/src/core/cells/__tests__/runs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@ import {
type RunsState,
} from "../runs";
import type { CellMessage } from "@/core/kernel/messages";
import { invariant } from "@/utils/invariant";

const { reducer, initialState, isPureMarkdown } = exportedForTesting;

function first<T>(map: Map<string, T> | undefined): T {
invariant(map, "Map is undefined");
return map.values().next().value;
}

describe("RunsState Reducer", () => {
let state: RunsState;

Expand Down Expand Up @@ -53,15 +59,18 @@ describe("RunsState Reducer", () => {
expect(nextState.runMap.get(runId)).toEqual({
runId,
runStartTime: timestamp,
cellRuns: [
{
cellRuns: new Map([
[
cellId,
code: code.slice(0, MAX_CODE_LENGTH),
elapsedTime: 0,
startTime: timestamp,
status: "queued",
},
],
{
cellId,
code: code.slice(0, MAX_CODE_LENGTH),
elapsedTime: 0,
startTime: timestamp,
status: "queued",
},
],
]),
});
});

Expand Down Expand Up @@ -124,8 +133,10 @@ describe("RunsState Reducer", () => {
});

expect(updatedState.runIds).toEqual([runId]);
expect(updatedState.runMap.get(runId)?.cellRuns[0].status).toBe("running");
expect(updatedState.runMap.get(runId)?.cellRuns[0].startTime).toBe(
expect(first(updatedState.runMap.get(runId)?.cellRuns).status).toBe(
"running",
);
expect(first(updatedState.runMap.get(runId)?.cellRuns).startTime).toBe(
runStartTimestamp,
);

Expand All @@ -143,11 +154,15 @@ describe("RunsState Reducer", () => {
});

expect(successState.runIds).toEqual([runId]);
expect(successState.runMap.get(runId)?.cellRuns[0].status).toBe("success");
expect(successState.runMap.get(runId)?.cellRuns[0].startTime).toBe(
expect(first(successState.runMap.get(runId)?.cellRuns).status).toBe(
"success",
);
expect(first(successState.runMap.get(runId)?.cellRuns).startTime).toBe(
runStartTimestamp,
);
expect(successState.runMap.get(runId)?.cellRuns[0].elapsedTime).toBe(5000);
expect(first(successState.runMap.get(runId)?.cellRuns).elapsedTime).toBe(
5000,
);
});

it("should limit the number of runs to MAX_RUNS", () => {
Expand Down Expand Up @@ -191,7 +206,9 @@ describe("RunsState Reducer", () => {
},
});

expect(nextState.runMap.get(runId)?.cellRuns[0].code).toBe(truncatedCode);
expect(first(nextState.runMap.get(runId)?.cellRuns).code).toBe(
truncatedCode,
);
});

it("should update the run status to error when stderr occurs", () => {
Expand All @@ -216,8 +233,8 @@ describe("RunsState Reducer", () => {
});

expect(errorState.runIds).toEqual([runId]);
expect(errorState.runMap.get(runId)?.cellRuns[0].status).toBe("error");
expect(errorState.runMap.get(runId)?.cellRuns[0].elapsedTime).toBe(
expect(first(errorState.runMap.get(runId)?.cellRuns).status).toBe("error");
expect(first(errorState.runMap.get(runId)?.cellRuns).elapsedTime).toBe(
errorTimestamp - timestamp,
);
});
Expand All @@ -244,8 +261,8 @@ describe("RunsState Reducer", () => {
});

expect(errorState.runIds).toEqual([runId]);
expect(errorState.runMap.get(runId)?.cellRuns[0].status).toBe("error");
expect(errorState.runMap.get(runId)?.cellRuns[0].elapsedTime).toBe(
expect(first(errorState.runMap.get(runId)?.cellRuns).status).toBe("error");
expect(first(errorState.runMap.get(runId)?.cellRuns).elapsedTime).toBe(
errorTimestamp - timestamp,
);
});
Expand Down Expand Up @@ -280,7 +297,7 @@ describe("RunsState Reducer", () => {
});

expect(finalState.runIds).toEqual([runId]);
expect(finalState.runMap.get(runId)?.cellRuns[0].status).toBe("error");
expect(first(finalState.runMap.get(runId)?.cellRuns).status).toBe("error");
});

it("should order runs from newest to oldest", () => {
Expand Down Expand Up @@ -412,7 +429,7 @@ describe("RunsState Reducer", () => {

expect(nextState.runIds).toEqual([runId]);
expect(nextState.runMap.size).toBe(1);
expect(nextState.runMap.get(runId)?.cellRuns.length).toBe(2);
expect(nextState.runMap.get(runId)?.cellRuns.size).toBe(2);
});
});

Expand Down
159 changes: 85 additions & 74 deletions frontend/src/core/cells/runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export interface CellRun {

export interface Run {
runId: RunId;
cellRuns: CellRun[];
cellRuns: Map<CellId, CellRun>;
runStartTime: number;
}

Expand Down Expand Up @@ -49,91 +49,102 @@ const {
if (!runId) {
return state;
}
let runIds: RunId[];

let run = state.runMap.get(runId);
if (run) {
runIds = state.runIds;
// Shallow copy the run to avoid mutating the existing run
run = { ...run };
} else {
// If it is a brand new run and the cell code is "pure markdown",
// we don't want to show the trace since it's not helpful.
// This spams the tracing because we re-render pure markdown on keystrokes.
if (isPureMarkdown(code)) {
return state;
}
const existingRun = state.runMap.get(runId);
// If it is a brand new run and the cell code is "pure markdown",
// we don't want to show the trace since it's not helpful.
// This spams the tracing because we re-render pure markdown on keystrokes.
if (!existingRun && isPureMarkdown(code)) {
return state;
}

run = {
runId: runId,
cellRuns: [],
// We determine if the cell operation errored by looking at the output
const erroredOutput =
cellOperation.output &&
(cellOperation.output.channel === "marimo-error" ||
cellOperation.output.channel === "stderr");

let status: CellRun["status"] = erroredOutput
? "error"
: cellOperation.status === "queued"
? "queued"
: cellOperation.status === "running"
? "running"
: "success";

// Create new run if needed
if (!existingRun) {
const newRun: Run = {
runId,
cellRuns: new Map([
[
cellOperation.cell_id as CellId,
{
cellId: cellOperation.cell_id as CellId,
code: code.slice(0, MAX_CODE_LENGTH),
elapsedTime: 0,
status: status,
startTime: cellOperation.timestamp,
},
],
]),
runStartTime: cellOperation.timestamp,
};

runIds = [runId, ...state.runIds];
// Manage run history size
const runIds = [runId, ...state.runIds];
const nextRunMap = new Map(state.runMap);
if (runIds.length > MAX_RUNS) {
const oldestRunId = runIds.pop();
if (oldestRunId) {
state.runMap.delete(oldestRunId);
nextRunMap.delete(oldestRunId);
}
}

nextRunMap.set(runId, newRun);
return {
runIds,
runMap: nextRunMap,
};
}

// We determine if the cell operation errored by looking at the output
const erroredOutput =
cellOperation.output &&
(cellOperation.output.channel === "marimo-error" ||
cellOperation.output.channel === "stderr");
// Update existing run
const nextCellRuns = new Map(existingRun.cellRuns);
const existingCellRun = nextCellRuns.get(cellOperation.cell_id as CellId);

const nextRuns: CellRun[] = [];
let found = false;
for (const existingCellRun of run.cellRuns) {
if (existingCellRun.cellId === cellOperation.cell_id) {
const hasErroredPreviously = existingCellRun.status === "error";
let status: CellRun["status"];
let startTime = existingCellRun.startTime;

if (hasErroredPreviously || erroredOutput) {
status = "error";
} else if (cellOperation.status === "queued") {
status = "queued";
} else if (cellOperation.status === "running") {
status = "running";
startTime = cellOperation.timestamp;
} else {
status = "success";
}
// Early return if nothing changed
if (
existingCellRun &&
!erroredOutput &&
cellOperation.status === "queued"
) {
return state;
}

let elapsedTime: number | undefined = undefined;
if (status === "success" || status === "error") {
elapsedTime = cellOperation.timestamp - existingCellRun.startTime;
}
if (existingCellRun) {
const hasErroredPreviously = existingCellRun.status === "error";

nextRuns.push({
...existingCellRun,
startTime: startTime,
elapsedTime: elapsedTime,
status: status,
});
found = true;
} else {
nextRuns.push(existingCellRun);
}
}
if (!found) {
let status: CellRun["status"];

if (erroredOutput) {
status = "error";
} else if (cellOperation.status === "queued") {
status = "queued";
} else if (cellOperation.status === "running") {
status = "running";
} else {
status = "success";
}
// Compute new status and timing
status = hasErroredPreviously || erroredOutput ? "error" : status;

const startTime =
cellOperation.status === "running"
? cellOperation.timestamp
: existingCellRun.startTime;

const elapsedTime =
status === "success" || status === "error"
? cellOperation.timestamp - existingCellRun.startTime
: undefined;

nextRuns.push({
nextCellRuns.set(cellOperation.cell_id as CellId, {
...existingCellRun,
startTime,
elapsedTime,
status,
});
} else {
nextCellRuns.set(cellOperation.cell_id as CellId, {
cellId: cellOperation.cell_id as CellId,
code: code.slice(0, MAX_CODE_LENGTH),
elapsedTime: 0,
Expand All @@ -142,14 +153,14 @@ const {
});
}

run.cellRuns = nextRuns;

const nextRunMap = new Map(state.runMap);
nextRunMap.set(runId, run);
nextRunMap.set(runId, {
...existingRun,
cellRuns: nextCellRuns,
});

return {
...state,
runIds: runIds,
runMap: nextRunMap,
};
},
Expand Down
5 changes: 4 additions & 1 deletion frontend/src/core/websocket/useMarimoWebSocket.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import { capabilitiesAtom } from "../config/capabilities";
import { UI_ELEMENT_REGISTRY } from "../dom/uiregistry";
import { reloadSafe } from "@/utils/reload-safe";
import { useRunsActions } from "../cells/runs";
import { getFeatureFlag } from "../config/feature-flag";

/**
* WebSocket that connects to the Marimo kernel and handles incoming messages.
Expand Down Expand Up @@ -115,7 +116,9 @@ export function useMarimoWebSocket(opts: {
case "cell-op": {
handleCellOperation(msg.data, handleCellMessage);
const cellData = getNotebook().cellData[msg.data.cell_id as CellId];
addCellOperation({ cellOperation: msg.data, code: cellData.code });
if (getFeatureFlag("tracing")) {
addCellOperation({ cellOperation: msg.data, code: cellData.code });
}
return;
}

Expand Down

0 comments on commit ff521aa

Please sign in to comment.