Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Inline decoupled routes into main router file #627

Merged
merged 5 commits into from
Jan 26, 2025

Conversation

anushkafka
Copy link
Collaborator

@anushkafka anushkafka commented Jan 25, 2025

PR Type

Enhancement, Other


Description

  • Consolidated all router files into a single main router file.

  • Removed individual router files for auth, integrations, jobs, machines, and runs.

  • Added new constants for machine services and integrations.

  • Introduced inline route definitions for better maintainability.


Changes walkthrough 📝

Relevant files
Enhancement
router.ts
Removed `authRouter` and moved logic to main router           

control-plane/src/modules/auth/router.ts

  • Removed the authRouter and its route definitions.
  • Moved route logic to the main router file.
  • +0/-111 
    constants.ts
    Added `NEW_CONNECTION_ID` constant for integrations           

    control-plane/src/modules/integrations/constants.ts

    • Added a new constant NEW_CONNECTION_ID for integrations.
    +3/-0     
    router.ts
    Removed `integrationsRouter` and moved logic to main router

    control-plane/src/modules/integrations/router.ts

  • Removed the integrationsRouter and its route definitions.
  • Moved route logic to the main router file.
  • +0/-233 
    router.ts
    Removed `jobsRouter` and moved logic to main router           

    control-plane/src/modules/jobs/router.ts

  • Removed the jobsRouter and its route definitions.
  • Moved route logic to the main router file.
  • +0/-355 
    constants.ts
    Added `ILLEGAL_SERVICE_NAMES` constant for machine services

    control-plane/src/modules/machines/constants.ts

    • Added a new constant ILLEGAL_SERVICE_NAMES for machine services.
    +6/-0     
    router.ts
    Removed `machineRouter` and moved logic to main router     

    control-plane/src/modules/machines/router.ts

  • Removed the machineRouter and its route definitions.
  • Moved route logic to the main router file.
  • +0/-94   
    router.ts
    Consolidated all routers into the main router file             

    control-plane/src/modules/router.ts

  • Consolidated all individual routers into the main router file.
  • Added inline route definitions for auth, integrations, jobs, machines,
    and runs.
  • Integrated constants like ILLEGAL_SERVICE_NAMES and NEW_CONNECTION_ID.
  • +1154/-13
    router.ts
    Removed `runsRouter` and moved logic to main router           

    control-plane/src/modules/runs/router.ts

  • Removed the runsRouter and its route definitions.
  • Moved route logic to the main router file.
  • +0/-449 

    Need help?
  • Type /help how to ... in the comments thread for any questions about Qodo Merge usage.
  • Check out the documentation for more information.
  • @anushkafka anushkafka marked this pull request as ready for review January 25, 2025 12:05
    Copy link
    Contributor

    Qodo Merge was enabled for this repository. To continue using it, please link your Git account with your Qodo account here.

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
    🧪 No relevant tests
    🔒 Security concerns

    Sensitive information exposure:
    The Nango webhook handler logs the entire request body which may contain sensitive connection details. Consider sanitizing logs to remove sensitive data.

    ⚡ Recommended focus areas for review

    Code Duplication

    The consolidated router file is very large (1000+ lines) and contains duplicated validation logic across different route handlers. Consider extracting common validation patterns into reusable middleware functions.

    createMachine: async (request) => {
      const machine = request.request.getAuth().isMachine();
    
      const machineId = request.headers["x-machine-id"];
    
      if (!machineId) {
        throw new BadRequestError("Request does not contain machine ID header");
      }
    
      const { service, functions } = request.body;
    
      if (service && ILLEGAL_SERVICE_NAMES.includes(service)) {
        throw new BadRequestError(
          `Service name ${service} is reserved and cannot be used.`,
        );
      }
    
      const derefedFns = functions?.map((fn) => {
        const schema = fn.schema
          ? safeParse(fn.schema)
          : { success: true, data: undefined };
    
        if (!schema.success) {
          throw new BadRequestError(
            `Function ${fn.name} has an invalid schema.`,
          );
        }
    
        return {
          clusterId: machine.clusterId,
          name: fn.name,
          description: fn.description,
          schema: schema.data
            ? JSON.stringify(dereferenceSync(schema.data))
            : undefined,
          config: fn.config,
        };
      });
    
      await Promise.all([
        upsertMachine({
          clusterId: machine.clusterId,
          machineId,
          sdkVersion: request.headers["x-machine-sdk-version"],
          sdkLanguage: request.headers["x-machine-sdk-language"],
          xForwardedFor: request.headers["x-forwarded-for"],
          ip: request.request.ip,
        }),
        service &&
          upsertServiceDefinition({
            service,
            definition: {
              name: service,
              functions: derefedFns,
            },
            owner: machine,
          }),
      ]);
    
      events.write({
        type: "machineRegistered",
        clusterId: machine.clusterId,
        machineId,
        service,
      });
    
      return {
        status: 200,
        body: {
          clusterId: machine.clusterId,
        },
      };
    },
    getRun: async request => {
      const { clusterId, runId } = request.params;
      const auth = request.request.getAuth();
      await auth.canAccess({ run: { clusterId, runId } });
    
      const run = await getRunDetails({
        clusterId,
        runId,
      });
    
      if (!run.id) {
        return {
          status: 404,
        };
      }
    
      return {
        status: 200,
        body: {
          ...run,
          result: run.result ?? null,
        },
      };
    },
    createRun: async request => {
      const { clusterId } = request.params;
      const body = request.body;
    
      const auth = request.request.getAuth();
      await auth.canAccess({ cluster: { clusterId } });
      auth.canCreate({ run: true });
    
      if (body.attachedFunctions && body.attachedFunctions.length == 0) {
        return {
          status: 400,
          body: {
            message: "attachedFunctions cannot be an empty array",
          },
        };
      }
    
      if (body.resultSchema) {
        const validationError = validateSchema({
          schema: body.resultSchema,
          name: "resultSchema",
        });
        if (validationError) {
          return validationError;
        }
      }
    
      let onStatusChangeHandler: string | undefined = undefined;
      let onStatusChangeStatuses: string[] | undefined = undefined;
    
      if (body.onStatusChange) {
        if (body.onStatusChange.function && body.onStatusChange.webhook) {
          return {
            status: 400,
            body: {
              message: "onStatusChange cannot have both function and webhook",
            },
          };
        }
    
        if (body.onStatusChange.function) {
          // TODO: Validate that onStatusChange and attachedFunctions exist
          // TODO: Validate that onStatusChange schema is correct
          onStatusChangeHandler =
            body.onStatusChange.function && normalizeFunctionReference(body.onStatusChange.function);
        }
    
        if (body.onStatusChange.webhook) {
          // Check for valid Schema
          if (!body.onStatusChange.webhook.startsWith("https://")) {
            return {
              status: 400,
              body: {
                message: "onStatusChange webhook must start with https://",
              },
            };
          }
    
          onStatusChangeHandler = body.onStatusChange.webhook;
          onStatusChangeStatuses = body.onStatusChange.statuses ?? ["done", "failed"];
        }
      }
    
      let runOptions: RunOptions & { runId?: string } = {
        runId: body.runId,
        initialPrompt: body.initialPrompt,
        systemPrompt: body.systemPrompt,
        attachedFunctions: body.attachedFunctions?.map(normalizeFunctionReference),
        resultSchema: body.resultSchema
          ? (dereferenceSync(body.resultSchema) as JsonSchemaInput)
          : undefined,
        interactive: body.interactive,
        modelIdentifier: body.model,
        callSummarization: body.callSummarization,
        reasoningTraces: body.reasoningTraces,
        enableResultGrounding: body.enableResultGrounding,
    
        input: body.input,
      };
    
      const agent = body.agentId
        ? await getAgent({
            clusterId,
            id: body.agentId,
          })
        : undefined;
    
      if (body.agentId) {
        if (!agent) {
          throw new NotFoundError("Agent not found");
        }
    
        const merged = mergeAgentOptions(runOptions, agent);
    
        if (merged.error) {
          return merged.error;
        }
    
        runOptions = merged.options;
    
        runOptions.messageMetadata = {
          displayable: {
            templateName: agent.name,
            templateId: agent.id,
            ...body.input,
          },
        };
      }
    
      if (runOptions.input) {
        runOptions.initialPrompt = `${runOptions.initialPrompt}\n\n<DATA>\n${JSON.stringify(runOptions.input, null, 2)}\n</DATA>`;
      }
    
      const customAuth = auth.type === "custom" ? auth.isCustomAuth() : undefined;
    
      const run = await createRun({
        runId: runOptions.runId,
        userId: auth.entityId,
        clusterId,
    
        name: body.name,
        test: body.test?.enabled ?? false,
        testMocks: body.test?.mocks,
        tags: body.tags,
    
        agentId: agent?.id,
    
        // Customer Auth context (In the future all auth types should inject context into the run)
        authContext: customAuth?.context,
    
        context: body.context,
    
        onStatusChangeHandler: onStatusChangeHandler,
        onStatusChangeStatuses: onStatusChangeStatuses,
    
        // Merged Options
        resultSchema: runOptions.resultSchema,
        enableSummarization: runOptions.callSummarization,
        modelIdentifier: runOptions.modelIdentifier,
        interactive: runOptions.interactive,
        systemPrompt: runOptions.systemPrompt,
        attachedFunctions: runOptions.attachedFunctions,
        reasoningTraces: runOptions.reasoningTraces,
        enableResultGrounding: runOptions.enableResultGrounding,
      });
    
      if (runOptions.initialPrompt) {
        await addMessageAndResumeWithRun({
          id: ulid(),
          userId: auth.entityId,
          clusterId,
          run,
          message: runOptions.initialPrompt,
          type: body.agentId ? "template" : "human",
          metadata: runOptions.messageMetadata,
          skipAssert: true,
        });
      }
    
      const cluster = await getClusterDetails(clusterId);
    
      posthog?.capture({
        distinctId: unqualifiedEntityId(auth.entityId),
        event: "api:run_create",
        groups: {
          organization: auth.organizationId,
          cluster: clusterId,
        },
        properties: {
          cluster_id: clusterId,
          is_demo: cluster.is_demo,
          run_id: run.id,
          agent_id: run.agentId,
          cli_version: request.headers["x-cli-version"],
          user_agent: request.headers["user-agent"],
        },
      });
    
      return {
        status: 201,
        body: { id: run.id },
      };
    },
    deleteRun: async request => {
      const { clusterId, runId } = request.params;
    
      const auth = request.request.getAuth();
      await auth.canManage({ run: { clusterId, runId } });
    
      await deleteRun({
        clusterId,
        runId,
      });
    
      posthog?.capture({
        distinctId: unqualifiedEntityId(auth.entityId),
        event: "api:run_delete",
        groups: {
          organization: auth.organizationId,
          cluster: clusterId,
        },
        properties: {
          cluster_id: clusterId,
          run_id: runId,
          cli_version: request.headers["x-cli-version"],
          user_agent: request.headers["user-agent"],
        },
      });
    
      return {
        status: 204,
        body: undefined,
      };
    },
    createFeedback: async request => {
      const { clusterId, runId } = request.params;
      const { comment, score } = request.body;
    
      const auth = request.request.getAuth();
      await auth.canManage({ run: { clusterId, runId } });
    
      await updateRunFeedback({
        id: runId,
        clusterId,
        feedbackComment: comment ?? undefined,
        feedbackScore: score ?? undefined,
      });
    
      events.write({
        type: "runFeedbackSubmitted",
        clusterId,
        workflowId: runId,
        userId: auth.entityId,
        meta: {
          feedbackScore: score ?? undefined,
          feedbackComment: comment ?? undefined,
        },
      });
    
      posthog?.capture({
        distinctId: unqualifiedEntityId(auth.entityId),
        event: "api:feedback_create",
        groups: {
          organization: auth.organizationId,
          cluster: clusterId,
        },
        properties: {
          cluster_id: clusterId,
          run_id: runId,
          score: score,
          cli_version: request.headers["x-cli-version"],
          user_agent: request.headers["user-agent"],
        },
      });
    
      return {
        status: 204,
        body: undefined,
      };
    },
    listRuns: async request => {
      const { clusterId } = request.params;
      const { test, limit, tags, agentId } = request.query;
      let { userId } = request.query;
    
      const auth = request.request.getAuth();
      await auth.canAccess({ cluster: { clusterId } });
    
      // Custom auth can only access their own Runs
      if (auth.type === "custom") {
        userId = auth.entityId;
      }
    
      if (tags) {
        // ?meta=key:value
        const [key, value] = tags.split(":");
    
        if (!key || !value) {
          return {
            status: 400,
            body: {
              message: "Invalid tag filter format",
            },
          };
        }
    
        const result = await getRunsByTag({
          clusterId,
          key,
          value,
          limit,
          agentId,
          userId,
        });
    
        return {
          status: 200,
          body: result,
        };
      }
    
      const result = await getClusterRuns({
        clusterId,
        userId,
        test: test ?? false,
        limit,
        agentId,
      });
    
      return {
        status: 200,
        body: result,
      };
    },
    getRunTimeline: async request => {
      const { clusterId, runId } = request.params;
      const { messagesAfter, activityAfter } = request.query;
    
      const auth = request.request.getAuth();
      await auth.canAccess({ run: { clusterId, runId } });
    
      const { messages, activity, jobs, run } = await timeline.getRunTimeline({
        clusterId,
        runId,
        messagesAfter,
        activityAfter,
      });
    
      if (!run) {
        return {
          status: 404,
        };
      }
    
      const blobs = await getBlobsForJobs({
        clusterId,
        jobIds: jobs.map(job => job.id),
      });
    
      return {
        status: 200,
        body: {
          messages,
          activity,
          jobs,
          run,
          blobs,
        },
      };
    },
    getAgentMetrics: async request => {
      const { clusterId, agentId } = request.params;
    
      const auth = request.request.getAuth();
      await auth.canAccess({ cluster: { clusterId } });
    
      const result = await getAgentMetrics({
        clusterId,
        agentId,
      });
    
      return {
        status: 200,
        body: result,
      };
    },
    listRunReferences: async request => {
      const { clusterId, runId } = request.params;
      const { token, before } = request.query;
    
      const auth = request.request.getAuth();
      await auth.canAccess({ run: { clusterId, runId } });
    
      const jobReferences = await getJobReferences({
        clusterId,
        runId,
        token,
        before: new Date(before),
      });
    
      return {
        status: 200,
        body: jobReferences,
      };
    },
    createApiKey: async (request) => {
      const { name } = request.body;
      const { clusterId } = request.params;
    
      const auth = request.request.getAuth().isAdmin();
      await auth.canManage({ cluster: { clusterId } });
    
      const { id, key } = await createApiKey({
        clusterId,
        name,
        createdBy: auth.entityId,
      });
    
      posthog?.identify({
        distinctId: id,
        properties: {
          key_name: name,
          auth_type: "api",
          created_by: auth.entityId,
        },
      });
    
      posthog?.groupIdentify({
        distinctId: id,
        groupType: "organization",
        groupKey: auth.organizationId,
      });
    
      posthog?.groupIdentify({
        distinctId: id,
        groupType: "cluster",
        groupKey: clusterId,
      });
    
      posthog?.capture({
        distinctId: unqualifiedEntityId(auth.entityId),
        event: "api:api_key_create",
        groups: {
          organization: auth.organizationId,
          cluster: clusterId,
        },
        properties: {
          cluster_id: clusterId,
          key_id: id,
          key_name: id,
          cli_version: request.headers["x-cli-version"],
          user_agent: request.headers["user-agent"],
        },
      });
    
      return {
        status: 200,
        body: { id, key },
      };
    },
    listApiKeys: async (request) => {
      const { clusterId } = request.params;
    
      const auth = request.request.getAuth().isAdmin();
      await auth.canManage({ cluster: { clusterId } });
    
      const apiKeys = await listApiKeys({ clusterId });
    
      return {
        status: 200,
        body: apiKeys,
      };
    },
    revokeApiKey: async (request) => {
      const { clusterId, keyId } = request.params;
    
      const auth = request.request.getAuth().isAdmin();
      await auth.canManage({ cluster: { clusterId } });
    
      await revokeApiKey({ clusterId, keyId });
    
      posthog?.capture({
        distinctId: unqualifiedEntityId(auth.entityId),
        event: "api:api_key_revoke",
        groups: {
          organization: auth.organizationId,
          cluster: clusterId,
        },
        properties: {
          cluster_id: clusterId,
          api_key_id: keyId,
          cli_version: request.headers["x-cli-version"],
          user_agent: request.headers["user-agent"],
        },
      });
    
      return {
        status: 204,
        body: undefined,
      };
    },
    createJob: async request => {
      const { clusterId } = request.params;
    
      const auth = request.request.getAuth();
    
      auth.canAccess({ cluster: { clusterId } });
      auth.canCreate({ call: true });
    
      const { function: fn, input, service } = request.body;
      const { waitTime } = request.query;
    
      const { id } = await jobs.createJob({
        service: service,
        targetFn: fn,
        targetArgs: packer.pack(input),
        owner: { clusterId },
        runId: getClusterBackgroundRun(clusterId),
      });
    
      if (!waitTime || waitTime <= 0) {
        return {
          status: 200,
          body: {
            id,
            status: "pending",
            result: null,
            resultType: null,
          },
        };
      }
    
      const jobResult = await jobs.getJobStatusSync({
        jobId: id,
        owner: { clusterId },
        ttl: waitTime * 1000,
      });
    
      if (!jobResult) {
        throw new Error("Could not get call result");
      }
    
      const { status, result, resultType } = jobResult;
    
      const unpackedResult = result ? packer.unpack(result) : null;
    
      return {
        status: 200,
        body: {
          id,
          status,
          result: unpackedResult,
          resultType,
        },
      };
    },
    cancelJob: async request => {
      const { clusterId, jobId } = request.params;
    
      const auth = request.request.getAuth();
    
      auth.canAccess({ cluster: { clusterId } });
      auth.canManage({ cluster: { clusterId } });
    
      await jobs.cancelJob({
        jobId,
        clusterId,
      });
    
      return {
        status: 204,
        body: undefined,
      };
    },
    createJobResult: async request => {
      const { clusterId, jobId } = request.params;
      let { result, resultType } = request.body;
      const { meta } = request.body;
    
      const machine = request.request.getAuth().isMachine();
      machine.canAccess({ cluster: { clusterId } });
    
      const machineId = request.headers["x-machine-id"];
    
      if (!machineId) {
        throw new BadRequestError("Request does not contain machine ID header");
      }
    
      if (resultType === "interrupt") {
        const parsed = await interruptSchema.safeParseAsync(result);
    
        if (!parsed.success) {
          throw new BadRequestError(parsed.error.message);
        }
    
        if (parsed.data.type === "approval") {
          logger.info("Requesting approval", {
            jobId,
          });
    
          await jobs.requestApproval({
            jobId,
            clusterId,
          });
    
          return {
            status: 204,
            body: undefined,
          };
        } else {
          throw new BadRequestError("Unsupported interrupt type");
        }
      }
    
      if (!!result) {
        // Max result size 500kb
        const data = Buffer.from(JSON.stringify(result));
        if (Buffer.byteLength(data) > 500 * 1024) {
          logger.info("Job result too large, persisting as blob", {
            jobId,
          });
    
          const job = await getJob({ clusterId, jobId });
    
          if (!job) {
            throw new NotFoundError("Job not found");
          }
    
          await createBlob({
            data: data.toString("base64"),
            size: Buffer.byteLength(data),
            encoding: "base64",
            type: "application/json",
            name: "Oversize Job result",
            clusterId,
            runId: job.runId ?? undefined,
            jobId: job.id ?? undefined,
          });
    
          result = {
            message: "The result was too large and was returned to the user directly",
          };
    
          resultType = "rejection";
        }
      }
    
      await Promise.all([
        upsertMachine({
          clusterId,
          machineId,
          sdkVersion: request.headers["x-machine-sdk-version"],
          sdkLanguage: request.headers["x-machine-sdk-language"],
          xForwardedFor: request.headers["x-forwarded-for"],
          ip: request.request.ip,
        }).catch(e => {
          // don't fail the request if the machine upsert fails
    
          logger.error("Failed to upsert machine", {
            error: e,
          });
        }),
        jobs.persistJobResult({
          owner: machine,
          result: packer.pack(result),
          resultType,
          functionExecutionTime: meta?.functionExecutionTime,
          jobId,
          machineId,
        }),
      ]);
    
      return {
        status: 204,
        body: undefined,
      };
    },
    listJobs: async request => {
      const { clusterId } = request.params;
      const { service, limit, acknowledge, status } = request.query;
    
      if (acknowledge && status !== "pending") {
        throw new BadRequestError("Only pending jobs can be acknowledged");
      }
    
      if (!acknowledge) {
        throw new Error("Not implemented");
      }
    
      const machineId = request.headers["x-machine-id"];
    
      if (!machineId) {
        throw new BadRequestError("Request does not contain machine ID header");
      }
    
      const machine = request.request.getAuth().isMachine();
      machine.canAccess({ cluster: { clusterId } });
    
      const [, servicePing, pollResult] = await Promise.all([
        upsertMachine({
          clusterId,
          machineId,
          sdkVersion: request.headers["x-machine-sdk-version"],
          sdkLanguage: request.headers["x-machine-sdk-language"],
          xForwardedFor: request.headers["x-forwarded-for"],
          ip: request.request.ip,
        }),
        recordServicePoll({
          clusterId,
          service,
        }),
        jobs.pollJobs({
          clusterId,
          machineId,
          service,
          limit,
        }),
      ]);
    
      if (servicePing === false) {
        logger.info("Machine polling for unregistered service", {
          service,
        });
        return {
          status: 410,
          body: {
            message: `Service ${service} is not registered`,
          },
        };
      }
    
      request.reply.header("retry-after", 1);
    
      return {
        status: 200,
        body: pollResult.map(job => ({
          id: job.id,
          function: job.targetFn,
          input: packer.unpack(job.targetArgs),
          authContext: job.authContext,
          runContext: job.runContext,
          approved: job.approved,
        })),
      };
    },
    createJobBlob: async request => {
      const { jobId, clusterId } = request.params;
      const body = request.body;
    
      const machine = request.request.getAuth().isMachine();
      machine.canAccess({ cluster: { clusterId } });
    
      const job = await jobs.getJob({ clusterId, jobId });
    
      if (!job) {
        return {
          status: 404,
          body: {
            message: "Job not found",
          },
        };
      }
    
      const blob = await createBlob({
        ...body,
        clusterId,
        runId: job.runId ?? undefined,
        jobId: jobId ?? undefined,
      });
    
      return {
        status: 201,
        body: blob,
      };
    },
    getJob: async request => {
      const { clusterId, jobId } = request.params;
    
      const auth = request.request.getAuth();
      await auth.canAccess({ cluster: { clusterId } });
    
      const job = await jobs.getJob({ clusterId, jobId });
    
      if (!job) {
        return {
          status: 404,
          body: {
            message: "Job not found",
          },
        };
      }
    
      if (job.runId) {
        await auth.canAccess({
          run: { clusterId, runId: job.runId },
        });
      }
    
      return {
        status: 200,
        body: job,
      };
    },
    createJobApproval: async request => {
      const { clusterId, jobId } = request.params;
    
      const auth = request.request.getAuth();
      await auth.canManage({ cluster: { clusterId } });
    
      const job = await jobs.getJob({ clusterId, jobId });
    
      if (!job) {
        return {
          status: 404,
          body: {
            message: "Job not found",
          },
        };
      }
    
      await jobs.submitApproval({
        jobId,
        clusterId,
        approved: request.body.approved,
      });
    
      return {
        status: 204,
        body: undefined,
      };
    },
    upsertIntegrations: async (request) => {
      const { clusterId } = request.params;
    
      const auth = request.request.getAuth().isAdmin();
      await auth.canManage({ cluster: { clusterId } });
    
      if (request.body.slack) {
        const integrations = await getIntegrations({ clusterId });
        if (!integrations.slack) {
          throw new BadRequestError("Slack integration does not exist");
        }
    
        // Only the agentId is editable via the API
        request.body.slack = {
          agentId: request.body.slack.agentId,
          ...integrations.slack
        }
      }
    
      if (request.body.email) {
        const existing = await getIntegrations({ clusterId });
    
        const connectionId = request.body.email.connectionId;
        const agentId = request.body.email.agentId;
    
    
        if (connectionId === NEW_CONNECTION_ID) {
          const connectionId = crypto.randomUUID();
          const collision = await integrationByConnectionId(connectionId);
          if (collision) {
            // This is so unlikely that we will fail the request if we experience a collision
            logger.error("Unexpected connectionId collision", {
              clusterId,
              connectionId,
            })
            throw new Error("Unexpected connectionId collision");
          }
    
          request.body.email.connectionId = connectionId;
        } else if (connectionId !== existing?.email?.connectionId) {
          throw new BadRequestError("Email connectionId is not user editable");
        }
    
        if (agentId && agentId !== existing?.email?.agentId) {
          const agent = await getAgent({
            clusterId,
            id: agentId
          });
    
          if (!agent) {
            logger.warn("Attempted to connect email to non-existent agent", {
              clusterId,
              agentId: request.body.email.agentId
            })
    
            request.body.email.agentId = undefined;
          }
        }
      }
    
      if (request.body.toolhouse) {
        try {
          await validateConfig(request.body);
        } catch (error) {
          return {
            status: 400,
            body: {
              message: `Failed to validate ToolHouse config: ${error}`,
            },
          };
        }
      }
    
      await upsertIntegrations({
        clusterId,
        config: request.body,
      });
    
      Object.entries(request.body).forEach(([key, value]) => {
    
        const action = value === null ? "delete" : "update";
    
        posthog?.capture({
          distinctId: unqualifiedEntityId(auth.entityId),
          event: `api:integration_${action}`,
          groups: {
            organization: auth.organizationId,
            cluster: clusterId,
          },
          properties: {
            cluster_id: clusterId,
            integration: key,
            cli_version: request.headers["x-cli-version"],
            user_agent: request.headers["user-agent"],
          },
        });
    
      })
    
    
      return {
        status: 200,
        body: undefined,
      };
    },
    getIntegrations: async (request) => {
      const { clusterId } = request.params;
    
      const auth = request.request.getAuth();
      await auth.canAccess({ cluster: { clusterId } });
      auth.isAdmin();
    
      const integrations = await getIntegrations({
        clusterId,
      });
    
      return {
        status: 200,
        body: integrations,
      };
    },
    createNangoSession: async (request) => {
      if (!nango) {
        throw new Error("Nango is not configured");
      }
    
      const { clusterId } = request.params;
      const { integration } = request.body;
    
      if (integration !== env.NANGO_SLACK_INTEGRATION_ID) {
        throw new BadRequestError("Invalid Nango integration ID");
      }
    
      const auth = request.request.getAuth();
      await auth.canAccess({ cluster: { clusterId } });
      auth.isAdmin();
    
      return {
        status: 200,
        body: {
          token: await getSession({ clusterId, integrationId: env.NANGO_SLACK_INTEGRATION_ID }),
        },
      }
    },
    createNangoEvent: async (request) => {
      if (!nango) {
        throw new Error("Nango is not configured");
      }
    
      const signature = request.headers["x-nango-signature"];
    
      const isValid = nango.verifyWebhookSignature(signature, request.body);
    
      if (!isValid) {
        throw new AuthenticationError("Invalid Nango webhook signature");
      }
    
      logger.info("Received Nango webhook", {
        body: request.body
      });
    
      const webhook = webhookSchema.safeParse(request.body);
      if (!webhook.success) {
        logger.error("Failed to parse Nango webhook", {
          error: webhook.error,
        })
        throw new BadRequestError("Invalid Nango webhook payload");
      }
    
      if (
        webhook.data.provider === "slack"
          && webhook.data.operation === "creation"
          && webhook.data.success
      ) {
        const connection = await nango.getConnection(
          webhook.data.providerConfigKey,
          webhook.data.connectionId,
        );
    
        logger.info("New Slack connection registered", {
          connectionId: webhook.data.connectionId,
          teamId: connection.connection_config["team.id"],
        });
    
        const clusterId = connection.end_user?.id;
    
        if (!clusterId) {
          throw new BadRequestError("End user ID not found in Nango connection");
        }
    
        await upsertIntegrations({
          clusterId,
          config: {
            slack: {
              nangoConnectionId: webhook.data.connectionId,
              teamId: connection.connection_config["team.id"],
              botUserId: connection.connection_config["bot_user_id"],
            },
          }
        })
      }
    
      return {
        status: 200,
        body: undefined,
      }
    },
    live: async () => {
    Error Handling

    Several route handlers have inconsistent error handling patterns - some throw errors directly while others return error responses. This should be standardized across all routes.

    createMachine: async (request) => {
      const machine = request.request.getAuth().isMachine();
    
      const machineId = request.headers["x-machine-id"];
    
      if (!machineId) {
        throw new BadRequestError("Request does not contain machine ID header");
      }
    
      const { service, functions } = request.body;
    
      if (service && ILLEGAL_SERVICE_NAMES.includes(service)) {
        throw new BadRequestError(
          `Service name ${service} is reserved and cannot be used.`,
        );
      }
    
      const derefedFns = functions?.map((fn) => {
        const schema = fn.schema
          ? safeParse(fn.schema)
          : { success: true, data: undefined };
    
        if (!schema.success) {
          throw new BadRequestError(
            `Function ${fn.name} has an invalid schema.`,
          );
        }
    
        return {
          clusterId: machine.clusterId,
          name: fn.name,
          description: fn.description,
          schema: schema.data
            ? JSON.stringify(dereferenceSync(schema.data))
            : undefined,
          config: fn.config,
        };
      });
    
      await Promise.all([
        upsertMachine({
          clusterId: machine.clusterId,
          machineId,
          sdkVersion: request.headers["x-machine-sdk-version"],
          sdkLanguage: request.headers["x-machine-sdk-language"],
          xForwardedFor: request.headers["x-forwarded-for"],
          ip: request.request.ip,
        }),
        service &&
          upsertServiceDefinition({
            service,
            definition: {
              name: service,
              functions: derefedFns,
            },
            owner: machine,
          }),
      ]);
    
      events.write({
        type: "machineRegistered",
        clusterId: machine.clusterId,
        machineId,
        service,
      });
    
      return {
        status: 200,
        body: {
          clusterId: machine.clusterId,
        },
      };
    },
    getRun: async request => {
      const { clusterId, runId } = request.params;
      const auth = request.request.getAuth();
      await auth.canAccess({ run: { clusterId, runId } });
    
      const run = await getRunDetails({
        clusterId,
        runId,
      });
    
      if (!run.id) {
        return {
          status: 404,
        };
      }
    
      return {
        status: 200,
        body: {
          ...run,
          result: run.result ?? null,
        },
      };
    },
    createRun: async request => {
      const { clusterId } = request.params;
      const body = request.body;
    
      const auth = request.request.getAuth();
      await auth.canAccess({ cluster: { clusterId } });
      auth.canCreate({ run: true });
    
      if (body.attachedFunctions && body.attachedFunctions.length == 0) {
        return {
          status: 400,
          body: {
            message: "attachedFunctions cannot be an empty array",
          },
        };
      }
    
      if (body.resultSchema) {
        const validationError = validateSchema({
          schema: body.resultSchema,
          name: "resultSchema",
        });
        if (validationError) {
          return validationError;
        }
      }
    
      let onStatusChangeHandler: string | undefined = undefined;
      let onStatusChangeStatuses: string[] | undefined = undefined;
    
      if (body.onStatusChange) {
        if (body.onStatusChange.function && body.onStatusChange.webhook) {
          return {
            status: 400,
            body: {
              message: "onStatusChange cannot have both function and webhook",
            },
          };
        }
    
        if (body.onStatusChange.function) {
          // TODO: Validate that onStatusChange and attachedFunctions exist
          // TODO: Validate that onStatusChange schema is correct
          onStatusChangeHandler =
            body.onStatusChange.function && normalizeFunctionReference(body.onStatusChange.function);
        }
    
        if (body.onStatusChange.webhook) {
          // Check for valid Schema
          if (!body.onStatusChange.webhook.startsWith("https://")) {
            return {
              status: 400,
              body: {
                message: "onStatusChange webhook must start with https://",
              },
            };
          }
    
          onStatusChangeHandler = body.onStatusChange.webhook;
          onStatusChangeStatuses = body.onStatusChange.statuses ?? ["done", "failed"];
        }
      }
    
      let runOptions: RunOptions & { runId?: string } = {
        runId: body.runId,
        initialPrompt: body.initialPrompt,
        systemPrompt: body.systemPrompt,
        attachedFunctions: body.attachedFunctions?.map(normalizeFunctionReference),
        resultSchema: body.resultSchema
          ? (dereferenceSync(body.resultSchema) as JsonSchemaInput)
          : undefined,
        interactive: body.interactive,
        modelIdentifier: body.model,
        callSummarization: body.callSummarization,
        reasoningTraces: body.reasoningTraces,
        enableResultGrounding: body.enableResultGrounding,
    
        input: body.input,
      };
    
      const agent = body.agentId
        ? await getAgent({
            clusterId,
            id: body.agentId,
          })
        : undefined;
    
      if (body.agentId) {
        if (!agent) {
          throw new NotFoundError("Agent not found");
        }
    
        const merged = mergeAgentOptions(runOptions, agent);
    
        if (merged.error) {
          return merged.error;
        }
    
        runOptions = merged.options;
    
        runOptions.messageMetadata = {
          displayable: {
            templateName: agent.name,
            templateId: agent.id,
            ...body.input,
          },
        };
      }
    
      if (runOptions.input) {
        runOptions.initialPrompt = `${runOptions.initialPrompt}\n\n<DATA>\n${JSON.stringify(runOptions.input, null, 2)}\n</DATA>`;
      }
    
      const customAuth = auth.type === "custom" ? auth.isCustomAuth() : undefined;
    
      const run = await createRun({
        runId: runOptions.runId,
        userId: auth.entityId,
        clusterId,
    
        name: body.name,
        test: body.test?.enabled ?? false,
        testMocks: body.test?.mocks,
        tags: body.tags,
    
        agentId: agent?.id,
    
        // Customer Auth context (In the future all auth types should inject context into the run)
        authContext: customAuth?.context,
    
        context: body.context,
    
        onStatusChangeHandler: onStatusChangeHandler,
        onStatusChangeStatuses: onStatusChangeStatuses,
    
        // Merged Options
        resultSchema: runOptions.resultSchema,
        enableSummarization: runOptions.callSummarization,
        modelIdentifier: runOptions.modelIdentifier,
        interactive: runOptions.interactive,
        systemPrompt: runOptions.systemPrompt,
        attachedFunctions: runOptions.attachedFunctions,
        reasoningTraces: runOptions.reasoningTraces,
        enableResultGrounding: runOptions.enableResultGrounding,
      });
    
      if (runOptions.initialPrompt) {
        await addMessageAndResumeWithRun({
          id: ulid(),
          userId: auth.entityId,
          clusterId,
          run,
          message: runOptions.initialPrompt,
          type: body.agentId ? "template" : "human",
          metadata: runOptions.messageMetadata,
          skipAssert: true,
        });
      }
    
      const cluster = await getClusterDetails(clusterId);
    
      posthog?.capture({
        distinctId: unqualifiedEntityId(auth.entityId),
        event: "api:run_create",
        groups: {
          organization: auth.organizationId,
          cluster: clusterId,
        },
        properties: {
          cluster_id: clusterId,
          is_demo: cluster.is_demo,
          run_id: run.id,
          agent_id: run.agentId,
          cli_version: request.headers["x-cli-version"],
          user_agent: request.headers["user-agent"],
        },
      });
    
      return {
        status: 201,
        body: { id: run.id },
      };
    },
    deleteRun: async request => {
      const { clusterId, runId } = request.params;
    
      const auth = request.request.getAuth();
      await auth.canManage({ run: { clusterId, runId } });
    
      await deleteRun({
        clusterId,
        runId,
      });
    
      posthog?.capture({
        distinctId: unqualifiedEntityId(auth.entityId),
        event: "api:run_delete",
        groups: {
          organization: auth.organizationId,
          cluster: clusterId,
        },
        properties: {
          cluster_id: clusterId,
          run_id: runId,
          cli_version: request.headers["x-cli-version"],
          user_agent: request.headers["user-agent"],
        },
      });
    
      return {
        status: 204,
        body: undefined,
      };
    },
    createFeedback: async request => {
      const { clusterId, runId } = request.params;
      const { comment, score } = request.body;
    
      const auth = request.request.getAuth();
      await auth.canManage({ run: { clusterId, runId } });
    
      await updateRunFeedback({
        id: runId,
        clusterId,
        feedbackComment: comment ?? undefined,
        feedbackScore: score ?? undefined,
      });
    
      events.write({
        type: "runFeedbackSubmitted",
        clusterId,
        workflowId: runId,
        userId: auth.entityId,
        meta: {
          feedbackScore: score ?? undefined,
          feedbackComment: comment ?? undefined,
        },
      });
    
      posthog?.capture({
        distinctId: unqualifiedEntityId(auth.entityId),
        event: "api:feedback_create",
        groups: {
          organization: auth.organizationId,
          cluster: clusterId,
        },
        properties: {
          cluster_id: clusterId,
          run_id: runId,
          score: score,
          cli_version: request.headers["x-cli-version"],
          user_agent: request.headers["user-agent"],
        },
      });
    
      return {
        status: 204,
        body: undefined,
      };
    },
    listRuns: async request => {
      const { clusterId } = request.params;
      const { test, limit, tags, agentId } = request.query;
      let { userId } = request.query;
    
      const auth = request.request.getAuth();
      await auth.canAccess({ cluster: { clusterId } });
    
      // Custom auth can only access their own Runs
      if (auth.type === "custom") {
        userId = auth.entityId;
      }
    
      if (tags) {
        // ?meta=key:value
        const [key, value] = tags.split(":");
    
        if (!key || !value) {
          return {
            status: 400,
            body: {
              message: "Invalid tag filter format",
            },
          };
        }
    
        const result = await getRunsByTag({
          clusterId,
          key,
          value,
          limit,
          agentId,
          userId,
        });
    
        return {
          status: 200,
          body: result,
        };
      }
    
      const result = await getClusterRuns({
        clusterId,
        userId,
        test: test ?? false,
        limit,
        agentId,
      });
    
      return {
        status: 200,
        body: result,
      };
    },
    getRunTimeline: async request => {
      const { clusterId, runId } = request.params;
      const { messagesAfter, activityAfter } = request.query;
    
      const auth = request.request.getAuth();
      await auth.canAccess({ run: { clusterId, runId } });
    
      const { messages, activity, jobs, run } = await timeline.getRunTimeline({
        clusterId,
        runId,
        messagesAfter,
        activityAfter,
      });
    
      if (!run) {
        return {
          status: 404,
        };
      }
    
      const blobs = await getBlobsForJobs({
        clusterId,
        jobIds: jobs.map(job => job.id),
      });
    
      return {
        status: 200,
        body: {
          messages,
          activity,
          jobs,
          run,
          blobs,
        },
      };
    },
    getAgentMetrics: async request => {
      const { clusterId, agentId } = request.params;
    
      const auth = request.request.getAuth();
      await auth.canAccess({ cluster: { clusterId } });
    
      const result = await getAgentMetrics({
        clusterId,
        agentId,
      });
    
      return {
        status: 200,
        body: result,
      };
    },
    listRunReferences: async request => {
      const { clusterId, runId } = request.params;
      const { token, before } = request.query;
    
      const auth = request.request.getAuth();
      await auth.canAccess({ run: { clusterId, runId } });
    
      const jobReferences = await getJobReferences({
        clusterId,
        runId,
        token,
        before: new Date(before),
      });
    
      return {
        status: 200,
        body: jobReferences,
      };
    },
    createApiKey: async (request) => {
      const { name } = request.body;
      const { clusterId } = request.params;
    
      const auth = request.request.getAuth().isAdmin();
      await auth.canManage({ cluster: { clusterId } });
    
      const { id, key } = await createApiKey({
        clusterId,
        name,
        createdBy: auth.entityId,
      });
    
      posthog?.identify({
        distinctId: id,
        properties: {
          key_name: name,
          auth_type: "api",
          created_by: auth.entityId,
        },
      });
    
      posthog?.groupIdentify({
        distinctId: id,
        groupType: "organization",
        groupKey: auth.organizationId,
      });
    
      posthog?.groupIdentify({
        distinctId: id,
        groupType: "cluster",
        groupKey: clusterId,
      });
    
      posthog?.capture({
        distinctId: unqualifiedEntityId(auth.entityId),
        event: "api:api_key_create",
        groups: {
          organization: auth.organizationId,
          cluster: clusterId,
        },
        properties: {
          cluster_id: clusterId,
          key_id: id,
          key_name: id,
          cli_version: request.headers["x-cli-version"],
          user_agent: request.headers["user-agent"],
        },
      });
    
      return {
        status: 200,
        body: { id, key },
      };
    },
    listApiKeys: async (request) => {
      const { clusterId } = request.params;
    
      const auth = request.request.getAuth().isAdmin();
      await auth.canManage({ cluster: { clusterId } });
    
      const apiKeys = await listApiKeys({ clusterId });
    
      return {
        status: 200,
        body: apiKeys,
      };
    },
    revokeApiKey: async (request) => {
      const { clusterId, keyId } = request.params;
    
      const auth = request.request.getAuth().isAdmin();
      await auth.canManage({ cluster: { clusterId } });
    
      await revokeApiKey({ clusterId, keyId });
    
      posthog?.capture({
        distinctId: unqualifiedEntityId(auth.entityId),
        event: "api:api_key_revoke",
        groups: {
          organization: auth.organizationId,
          cluster: clusterId,
        },
        properties: {
          cluster_id: clusterId,
          api_key_id: keyId,
          cli_version: request.headers["x-cli-version"],
          user_agent: request.headers["user-agent"],
        },
      });
    
      return {
        status: 204,
        body: undefined,
      };
    },
    createJob: async request => {
      const { clusterId } = request.params;
    
      const auth = request.request.getAuth();
    
      auth.canAccess({ cluster: { clusterId } });
      auth.canCreate({ call: true });
    
      const { function: fn, input, service } = request.body;
      const { waitTime } = request.query;
    
      const { id } = await jobs.createJob({
        service: service,
        targetFn: fn,
        targetArgs: packer.pack(input),
        owner: { clusterId },
        runId: getClusterBackgroundRun(clusterId),
      });
    
      if (!waitTime || waitTime <= 0) {
        return {
          status: 200,
          body: {
            id,
            status: "pending",
            result: null,
            resultType: null,
          },
        };
      }
    
      const jobResult = await jobs.getJobStatusSync({
        jobId: id,
        owner: { clusterId },
        ttl: waitTime * 1000,
      });
    
      if (!jobResult) {
        throw new Error("Could not get call result");
      }
    
      const { status, result, resultType } = jobResult;
    
      const unpackedResult = result ? packer.unpack(result) : null;
    
      return {
        status: 200,
        body: {
          id,
          status,
          result: unpackedResult,
          resultType,
        },
      };
    },
    cancelJob: async request => {
      const { clusterId, jobId } = request.params;
    
      const auth = request.request.getAuth();
    
      auth.canAccess({ cluster: { clusterId } });
      auth.canManage({ cluster: { clusterId } });
    
      await jobs.cancelJob({
        jobId,
        clusterId,
      });
    
      return {
        status: 204,
        body: undefined,
      };
    },
    createJobResult: async request => {
      const { clusterId, jobId } = request.params;
      let { result, resultType } = request.body;
      const { meta } = request.body;
    
      const machine = request.request.getAuth().isMachine();
      machine.canAccess({ cluster: { clusterId } });
    
      const machineId = request.headers["x-machine-id"];
    
      if (!machineId) {
        throw new BadRequestError("Request does not contain machine ID header");
      }
    
      if (resultType === "interrupt") {
        const parsed = await interruptSchema.safeParseAsync(result);
    
        if (!parsed.success) {
          throw new BadRequestError(parsed.error.message);
        }
    
        if (parsed.data.type === "approval") {
          logger.info("Requesting approval", {
            jobId,
          });
    
          await jobs.requestApproval({
            jobId,
            clusterId,
          });
    
          return {
            status: 204,
            body: undefined,
          };
        } else {
          throw new BadRequestError("Unsupported interrupt type");
        }
      }
    
      if (!!result) {
        // Max result size 500kb
        const data = Buffer.from(JSON.stringify(result));
        if (Buffer.byteLength(data) > 500 * 1024) {
          logger.info("Job result too large, persisting as blob", {
            jobId,
          });
    
          const job = await getJob({ clusterId, jobId });
    
          if (!job) {
            throw new NotFoundError("Job not found");
          }
    
          await createBlob({
            data: data.toString("base64"),
            size: Buffer.byteLength(data),
            encoding: "base64",
            type: "application/json",
            name: "Oversize Job result",
            clusterId,
            runId: job.runId ?? undefined,
            jobId: job.id ?? undefined,
          });
    
          result = {
            message: "The result was too large and was returned to the user directly",
          };
    
          resultType = "rejection";
        }
      }
    
      await Promise.all([
        upsertMachine({
          clusterId,
          machineId,
          sdkVersion: request.headers["x-machine-sdk-version"],
          sdkLanguage: request.headers["x-machine-sdk-language"],
          xForwardedFor: request.headers["x-forwarded-for"],
          ip: request.request.ip,
        }).catch(e => {
          // don't fail the request if the machine upsert fails
    
          logger.error("Failed to upsert machine", {
            error: e,
          });
        }),
        jobs.persistJobResult({
          owner: machine,
          result: packer.pack(result),
          resultType,
          functionExecutionTime: meta?.functionExecutionTime,
          jobId,
          machineId,
        }),
      ]);
    
      return {
        status: 204,
        body: undefined,
      };
    },
    listJobs: async request => {
      const { clusterId } = request.params;
      const { service, limit, acknowledge, status } = request.query;
    
      if (acknowledge && status !== "pending") {
        throw new BadRequestError("Only pending jobs can be acknowledged");
      }
    
      if (!acknowledge) {
        throw new Error("Not implemented");
      }
    
      const machineId = request.headers["x-machine-id"];
    
      if (!machineId) {
        throw new BadRequestError("Request does not contain machine ID header");
      }
    
      const machine = request.request.getAuth().isMachine();
      machine.canAccess({ cluster: { clusterId } });
    
      const [, servicePing, pollResult] = await Promise.all([
        upsertMachine({
          clusterId,
          machineId,
          sdkVersion: request.headers["x-machine-sdk-version"],
          sdkLanguage: request.headers["x-machine-sdk-language"],
          xForwardedFor: request.headers["x-forwarded-for"],
          ip: request.request.ip,
        }),
        recordServicePoll({
          clusterId,
          service,
        }),
        jobs.pollJobs({
          clusterId,
          machineId,
          service,
          limit,
        }),
      ]);
    
      if (servicePing === false) {
        logger.info("Machine polling for unregistered service", {
          service,
        });
        return {
          status: 410,
          body: {
            message: `Service ${service} is not registered`,
          },
        };
      }
    
      request.reply.header("retry-after", 1);
    
      return {
        status: 200,
        body: pollResult.map(job => ({
          id: job.id,
          function: job.targetFn,
          input: packer.unpack(job.targetArgs),
          authContext: job.authContext,
          runContext: job.runContext,
          approved: job.approved,
        })),
      };
    },
    createJobBlob: async request => {
      const { jobId, clusterId } = request.params;
      const body = request.body;
    
      const machine = request.request.getAuth().isMachine();
      machine.canAccess({ cluster: { clusterId } });
    
      const job = await jobs.getJob({ clusterId, jobId });
    
      if (!job) {
        return {
          status: 404,
          body: {
            message: "Job not found",
          },
        };
      }
    
      const blob = await createBlob({
        ...body,
        clusterId,
        runId: job.runId ?? undefined,
        jobId: jobId ?? undefined,
      });
    
      return {
        status: 201,
        body: blob,
      };
    },
    getJob: async request => {
      const { clusterId, jobId } = request.params;
    
      const auth = request.request.getAuth();
      await auth.canAccess({ cluster: { clusterId } });
    
      const job = await jobs.getJob({ clusterId, jobId });
    
      if (!job) {
        return {
          status: 404,
          body: {
            message: "Job not found",
          },
        };
      }
    
      if (job.runId) {
        await auth.canAccess({
          run: { clusterId, runId: job.runId },
        });
      }
    
      return {
        status: 200,
        body: job,
      };
    },
    createJobApproval: async request => {
      const { clusterId, jobId } = request.params;
    
      const auth = request.request.getAuth();
      await auth.canManage({ cluster: { clusterId } });
    
      const job = await jobs.getJob({ clusterId, jobId });
    
      if (!job) {
        return {
          status: 404,
          body: {
            message: "Job not found",
          },
        };
      }
    
      await jobs.submitApproval({
        jobId,
        clusterId,
        approved: request.body.approved,
      });
    
      return {
        status: 204,
        body: undefined,
      };
    },
    upsertIntegrations: async (request) => {
      const { clusterId } = request.params;
    
      const auth = request.request.getAuth().isAdmin();
      await auth.canManage({ cluster: { clusterId } });
    
      if (request.body.slack) {
        const integrations = await getIntegrations({ clusterId });
        if (!integrations.slack) {
          throw new BadRequestError("Slack integration does not exist");
        }
    
        // Only the agentId is editable via the API
        request.body.slack = {
          agentId: request.body.slack.agentId,
          ...integrations.slack
        }
      }
    
      if (request.body.email) {
        const existing = await getIntegrations({ clusterId });
    
        const connectionId = request.body.email.connectionId;
        const agentId = request.body.email.agentId;
    
    
        if (connectionId === NEW_CONNECTION_ID) {
          const connectionId = crypto.randomUUID();
          const collision = await integrationByConnectionId(connectionId);
          if (collision) {
            // This is so unlikely that we will fail the request if we experience a collision
            logger.error("Unexpected connectionId collision", {
              clusterId,
              connectionId,
            })
            throw new Error("Unexpected connectionId collision");
          }
    
          request.body.email.connectionId = connectionId;
        } else if (connectionId !== existing?.email?.connectionId) {
          throw new BadRequestError("Email connectionId is not user editable");
        }
    
        if (agentId && agentId !== existing?.email?.agentId) {
          const agent = await getAgent({
            clusterId,
            id: agentId
          });
    
          if (!agent) {
            logger.warn("Attempted to connect email to non-existent agent", {
              clusterId,
              agentId: request.body.email.agentId
            })
    
            request.body.email.agentId = undefined;
          }
        }
      }
    
      if (request.body.toolhouse) {
        try {
          await validateConfig(request.body);
        } catch (error) {
          return {
            status: 400,
            body: {
              message: `Failed to validate ToolHouse config: ${error}`,
            },
          };
        }
      }
    
      await upsertIntegrations({
        clusterId,
        config: request.body,
      });
    
      Object.entries(request.body).forEach(([key, value]) => {
    
        const action = value === null ? "delete" : "update";
    
        posthog?.capture({
          distinctId: unqualifiedEntityId(auth.entityId),
          event: `api:integration_${action}`,
          groups: {
            organization: auth.organizationId,
            cluster: clusterId,
          },
          properties: {
            cluster_id: clusterId,
            integration: key,
            cli_version: request.headers["x-cli-version"],
            user_agent: request.headers["user-agent"],
          },
        });
    
      })
    
    
      return {
        status: 200,
        body: undefined,
      };
    },
    getIntegrations: async (request) => {
      const { clusterId } = request.params;
    
      const auth = request.request.getAuth();
      await auth.canAccess({ cluster: { clusterId } });
      auth.isAdmin();
    
      const integrations = await getIntegrations({
        clusterId,
      });
    
      return {
        status: 200,
        body: integrations,
      };
    },
    createNangoSession: async (request) => {
      if (!nango) {
        throw new Error("Nango is not configured");
      }
    
      const { clusterId } = request.params;
      const { integration } = request.body;
    
      if (integration !== env.NANGO_SLACK_INTEGRATION_ID) {
        throw new BadRequestError("Invalid Nango integration ID");
      }
    
      const auth = request.request.getAuth();
      await auth.canAccess({ cluster: { clusterId } });
      auth.isAdmin();
    
      return {
        status: 200,
        body: {
          token: await getSession({ clusterId, integrationId: env.NANGO_SLACK_INTEGRATION_ID }),
        },
      }
    },
    createNangoEvent: async (request) => {
      if (!nango) {
        throw new Error("Nango is not configured");
      }
    
      const signature = request.headers["x-nango-signature"];
    
      const isValid = nango.verifyWebhookSignature(signature, request.body);
    
      if (!isValid) {
        throw new AuthenticationError("Invalid Nango webhook signature");
      }
    
      logger.info("Received Nango webhook", {
        body: request.body
      });
    
      const webhook = webhookSchema.safeParse(request.body);
      if (!webhook.success) {
        logger.error("Failed to parse Nango webhook", {
          error: webhook.error,
        })
        throw new BadRequestError("Invalid Nango webhook payload");
      }
    
      if (
        webhook.data.provider === "slack"
          && webhook.data.operation === "creation"
          && webhook.data.success
      ) {
        const connection = await nango.getConnection(
          webhook.data.providerConfigKey,
          webhook.data.connectionId,
        );
    
        logger.info("New Slack connection registered", {
          connectionId: webhook.data.connectionId,
          teamId: connection.connection_config["team.id"],
        });
    
        const clusterId = connection.end_user?.id;
    
        if (!clusterId) {
          throw new BadRequestError("End user ID not found in Nango connection");
        }
    
        await upsertIntegrations({
          clusterId,
          config: {
            slack: {
              nangoConnectionId: webhook.data.connectionId,
              teamId: connection.connection_config["team.id"],
              botUserId: connection.connection_config["bot_user_id"],
            },
          }
        })
      }
    
      return {
        status: 200,
        body: undefined,
      }
    },
    live: async () => {
    Complex Logic

    The integration routes contain complex business logic mixed with route handling. Consider extracting this logic into separate service modules for better separation of concerns.

    upsertIntegrations: async (request) => {
      const { clusterId } = request.params;
    
      const auth = request.request.getAuth().isAdmin();
      await auth.canManage({ cluster: { clusterId } });
    
      if (request.body.slack) {
        const integrations = await getIntegrations({ clusterId });
        if (!integrations.slack) {
          throw new BadRequestError("Slack integration does not exist");
        }
    
        // Only the agentId is editable via the API
        request.body.slack = {
          agentId: request.body.slack.agentId,
          ...integrations.slack
        }
      }
    
      if (request.body.email) {
        const existing = await getIntegrations({ clusterId });
    
        const connectionId = request.body.email.connectionId;
        const agentId = request.body.email.agentId;
    
    
        if (connectionId === NEW_CONNECTION_ID) {
          const connectionId = crypto.randomUUID();
          const collision = await integrationByConnectionId(connectionId);
          if (collision) {
            // This is so unlikely that we will fail the request if we experience a collision
            logger.error("Unexpected connectionId collision", {
              clusterId,
              connectionId,
            })
            throw new Error("Unexpected connectionId collision");
          }
    
          request.body.email.connectionId = connectionId;
        } else if (connectionId !== existing?.email?.connectionId) {
          throw new BadRequestError("Email connectionId is not user editable");
        }
    
        if (agentId && agentId !== existing?.email?.agentId) {
          const agent = await getAgent({
            clusterId,
            id: agentId
          });
    
          if (!agent) {
            logger.warn("Attempted to connect email to non-existent agent", {
              clusterId,
              agentId: request.body.email.agentId
            })
    
            request.body.email.agentId = undefined;
          }
        }
      }
    
      if (request.body.toolhouse) {
        try {
          await validateConfig(request.body);
        } catch (error) {
          return {
            status: 400,
            body: {
              message: `Failed to validate ToolHouse config: ${error}`,
            },
          };
        }
      }
    
      await upsertIntegrations({
        clusterId,
        config: request.body,
      });
    
      Object.entries(request.body).forEach(([key, value]) => {
    
        const action = value === null ? "delete" : "update";
    
        posthog?.capture({
          distinctId: unqualifiedEntityId(auth.entityId),
          event: `api:integration_${action}`,
          groups: {
            organization: auth.organizationId,
            cluster: clusterId,
          },
          properties: {
            cluster_id: clusterId,
            integration: key,
            cli_version: request.headers["x-cli-version"],
            user_agent: request.headers["user-agent"],
          },
        });
    
      })
    
    
      return {
        status: 200,
        body: undefined,
      };
    },
    getIntegrations: async (request) => {
      const { clusterId } = request.params;
    
      const auth = request.request.getAuth();
      await auth.canAccess({ cluster: { clusterId } });
      auth.isAdmin();
    
      const integrations = await getIntegrations({
        clusterId,
      });
    
      return {
        status: 200,
        body: integrations,
      };
    },
    createNangoSession: async (request) => {
      if (!nango) {
        throw new Error("Nango is not configured");
      }
    
      const { clusterId } = request.params;
      const { integration } = request.body;
    
      if (integration !== env.NANGO_SLACK_INTEGRATION_ID) {
        throw new BadRequestError("Invalid Nango integration ID");
      }
    
      const auth = request.request.getAuth();
      await auth.canAccess({ cluster: { clusterId } });
      auth.isAdmin();
    
      return {
        status: 200,
        body: {
          token: await getSession({ clusterId, integrationId: env.NANGO_SLACK_INTEGRATION_ID }),
        },
      }
    },
    createNangoEvent: async (request) => {
      if (!nango) {
        throw new Error("Nango is not configured");
      }
    
      const signature = request.headers["x-nango-signature"];
    
      const isValid = nango.verifyWebhookSignature(signature, request.body);
    
      if (!isValid) {
        throw new AuthenticationError("Invalid Nango webhook signature");
      }
    
      logger.info("Received Nango webhook", {
        body: request.body
      });
    
      const webhook = webhookSchema.safeParse(request.body);
      if (!webhook.success) {
        logger.error("Failed to parse Nango webhook", {
          error: webhook.error,
        })
        throw new BadRequestError("Invalid Nango webhook payload");
      }
    
      if (
        webhook.data.provider === "slack"
          && webhook.data.operation === "creation"
          && webhook.data.success
      ) {
        const connection = await nango.getConnection(
          webhook.data.providerConfigKey,
          webhook.data.connectionId,
        );
    
        logger.info("New Slack connection registered", {
          connectionId: webhook.data.connectionId,
          teamId: connection.connection_config["team.id"],
        });
    
        const clusterId = connection.end_user?.id;
    
        if (!clusterId) {
          throw new BadRequestError("End user ID not found in Nango connection");
        }
    
        await upsertIntegrations({
          clusterId,
          config: {
            slack: {
              nangoConnectionId: webhook.data.connectionId,
              teamId: connection.connection_config["team.id"],
              botUserId: connection.connection_config["bot_user_id"],
            },
          }
        })
      }
    
      return {
        status: 200,
        body: undefined,
      }
    },

    Copy link
    Contributor

    Qodo Merge was enabled for this repository. To continue using it, please link your Git account with your Qodo account here.

    PR Code Suggestions ✨

    Explore these optional code suggestions:

    CategorySuggestion                                                                                                                                    Score
    General
    Add retry logic for ID collisions

    The email connection ID collision check only retries once. For better reliability,
    implement a retry mechanism with a maximum number of attempts to handle the unlikely
    but possible case of multiple collisions.

    control-plane/src/modules/router.ts [994-1004]

     if (connectionId === NEW_CONNECTION_ID) {
    -  const connectionId = crypto.randomUUID();
    -  const collision = await integrationByConnectionId(connectionId);
    -  if (collision) {
    -    logger.error("Unexpected connectionId collision", {
    +  const maxAttempts = 3;
    +  let attempts = 0;
    +  let newConnectionId;
    +  
    +  while (attempts < maxAttempts) {
    +    newConnectionId = crypto.randomUUID();
    +    const collision = await integrationByConnectionId(newConnectionId);
    +    if (!collision) break;
    +    attempts++;
    +  }
    +  
    +  if (attempts === maxAttempts) {
    +    logger.error("Failed to generate unique connectionId after multiple attempts", {
           clusterId,
    -      connectionId,
    -    })
    -    throw new Error("Unexpected connectionId collision");
    +      attempts: maxAttempts
    +    });
    +    throw new Error("Failed to generate unique connectionId");
       }
    +  
    +  request.body.email.connectionId = newConnectionId;
    +}
    • Apply this suggestion
    Suggestion importance[1-10]: 7

    Why: The improved code adds important resilience by implementing a retry mechanism for UUID collisions. While collisions are extremely rare, having proper retry logic is a good practice for production systems handling critical operations.

    7
    Extract magic number to constant

    The code uses a hardcoded value of 500kb for the maximum job result size. This
    should be extracted into a named constant at the module level for better
    maintainability and clarity.

    control-plane/src/modules/router.ts [753]

    -if (Buffer.byteLength(data) > 500 * 1024) {
    +const MAX_JOB_RESULT_SIZE = 500 * 1024; // 500kb
    +if (Buffer.byteLength(data) > MAX_JOB_RESULT_SIZE) {
    • Apply this suggestion
    Suggestion importance[1-10]: 5

    Why: Moving the magic number to a named constant improves code readability and maintainability. While not critical, it makes the code more self-documenting and easier to modify if the size limit needs to change.

    5

    @anushkafka anushkafka merged commit 8ee5a0e into main Jan 26, 2025
    29 checks passed
    @anushkafka anushkafka deleted the af/consolidate-routes branch January 26, 2025 01:30
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    2 participants