diff --git a/apps/webapp/app/components/logs/LogsSearchInput.tsx b/apps/webapp/app/components/logs/LogsSearchInput.tsx index 58316cead88..44f4d130185 100644 --- a/apps/webapp/app/components/logs/LogsSearchInput.tsx +++ b/apps/webapp/app/components/logs/LogsSearchInput.tsx @@ -3,12 +3,14 @@ import { motion } from "framer-motion"; import { useCallback, useEffect, useRef, useState } from "react"; import { Input } from "~/components/primitives/Input"; import { ShortcutKey } from "~/components/primitives/ShortcutKey"; -import { cn } from "~/utils/cn"; -import { useOptimisticLocation } from "~/hooks/useOptimisticLocation"; import { useSearchParams } from "~/hooks/useSearchParam"; +import { cn } from "~/utils/cn"; + +export type LogsSearchInputProps = { + placeholder?: string; +}; -export function LogsSearchInput() { - const location = useOptimisticLocation(); +export function LogsSearchInput({ placeholder = "Search logs…" }: LogsSearchInputProps) { const inputRef = useRef(null); const { value, replace, del } = useSearchParams(); @@ -61,7 +63,7 @@ export function LogsSearchInput() { type="text" ref={inputRef} variant="secondary-small" - placeholder="Search logs…" + placeholder={placeholder} value={text} onChange={(e) => setText(e.target.value)} fullWidth diff --git a/apps/webapp/app/components/navigation/SideMenu.tsx b/apps/webapp/app/components/navigation/SideMenu.tsx index 8817360aa32..2c486576bee 100644 --- a/apps/webapp/app/components/navigation/SideMenu.tsx +++ b/apps/webapp/app/components/navigation/SideMenu.tsx @@ -24,6 +24,7 @@ import { Squares2X2Icon, TableCellsIcon, UsersIcon, + BugAntIcon, } from "@heroicons/react/20/solid"; import { Link, useFetcher, useNavigation } from "@remix-run/react"; import { LayoutGroup, motion } from "framer-motion"; @@ -73,6 +74,7 @@ import { v3EnvironmentPath, v3EnvironmentVariablesPath, v3LogsPath, + v3ErrorsPath, v3ProjectAlertsPath, v3ProjectPath, v3ProjectSettingsGeneralPath, @@ -112,6 +114,7 @@ import { SideMenuHeader } from "./SideMenuHeader"; import { SideMenuItem } from "./SideMenuItem"; import { SideMenuSection } from "./SideMenuSection"; import { type SideMenuSectionId } from "./sideMenuTypes"; +import { IconBugFilled } from "@tabler/icons-react"; /** Get the collapsed state for a specific side menu section from user preferences */ function getSectionCollapsed( @@ -474,6 +477,15 @@ export function SideMenu({ isCollapsed={isCollapsed} /> )} + { + const locales = useLocales(); + const userTimeZone = useUserTimeZone(); + + const realDate = useMemo(() => (typeof date === "string" ? new Date(date) : date), [date]); + + const relativeText = useMemo(() => { + const text = formatDistanceToNow(realDate, { addSuffix: true }); + return text.charAt(0).toUpperCase() + text.slice(1); + }, [realDate]); + + return ( + {relativeText}} + content={ + + } + side="right" + asChild={true} + /> + ); +}; + export const DateTimeShort = ({ date, hour12 = true }: DateTimeProps) => { const locales = useLocales(); const userTimeZone = useUserTimeZone(); diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index fa019f2f75e..6e44a06d597 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1195,6 +1195,7 @@ const EnvironmentSchema = z RUN_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000), RUN_REPLICATION_INSERT_STRATEGY: z.enum(["insert", "insert_async"]).default("insert"), RUN_REPLICATION_DISABLE_PAYLOAD_INSERT: z.string().default("0"), + RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING: z.string().default("0"), // Clickhouse CLICKHOUSE_URL: z.string(), diff --git a/apps/webapp/app/presenters/v3/ErrorGroupPresenter.server.ts b/apps/webapp/app/presenters/v3/ErrorGroupPresenter.server.ts new file mode 100644 index 00000000000..d3e7412048c --- /dev/null +++ b/apps/webapp/app/presenters/v3/ErrorGroupPresenter.server.ts @@ -0,0 +1,293 @@ +import { z } from "zod"; +import { type ClickHouse } from "@internal/clickhouse"; +import { type PrismaClientOrTransaction } from "@trigger.dev/database"; +import { type Direction } from "~/components/ListPagination"; +import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server"; +import { ServiceValidationError } from "~/v3/services/baseService.server"; +import { BasePresenter } from "~/presenters/v3/basePresenter.server"; + +export type ErrorGroupOptions = { + userId?: string; + projectId: string; + fingerprint: string; + // pagination + direction?: Direction; + cursor?: string; + pageSize?: number; +}; + +export const ErrorGroupOptionsSchema = z.object({ + userId: z.string().optional(), + projectId: z.string(), + fingerprint: z.string(), + direction: z.enum(["forward", "backward"]).optional(), + cursor: z.string().optional(), + pageSize: z.number().int().positive().max(1000).optional(), +}); + +const DEFAULT_PAGE_SIZE = 50; + +export type ErrorGroupDetail = Awaited>; +export type ErrorInstance = ErrorGroupDetail["instances"][0]; + +// Cursor for error instances pagination +type ErrorInstanceCursor = { + createdAt: string; + runId: string; +}; + +const ErrorInstanceCursorSchema = z.object({ + createdAt: z.string(), + runId: z.string(), +}); + +function encodeCursor(cursor: ErrorInstanceCursor): string { + return Buffer.from(JSON.stringify(cursor)).toString("base64"); +} + +function decodeCursor(cursor: string): ErrorInstanceCursor | null { + try { + const decoded = Buffer.from(cursor, "base64").toString("utf-8"); + const parsed = JSON.parse(decoded); + const validated = ErrorInstanceCursorSchema.safeParse(parsed); + if (!validated.success) { + return null; + } + return validated.data as ErrorInstanceCursor; + } catch { + return null; + } +} + +function parseClickHouseDateTime(value: string): Date { + const asNum = Number(value); + if (!isNaN(asNum) && asNum > 1e12) { + return new Date(asNum); + } + return new Date(value.replace(" ", "T") + "Z"); +} + +export type ErrorGroupSummary = { + fingerprint: string; + errorType: string; + errorMessage: string; + stackTrace?: string; + taskIdentifier: string; + count: number; + firstSeen: Date; + lastSeen: Date; +}; + +export type ErrorGroupHourlyActivity = Array<{ date: Date; count: number }>; + +export class ErrorGroupPresenter extends BasePresenter { + constructor( + private readonly replica: PrismaClientOrTransaction, + private readonly clickhouse: ClickHouse + ) { + super(undefined, replica); + } + + public async call( + organizationId: string, + environmentId: string, + { userId, projectId, fingerprint, cursor, pageSize = DEFAULT_PAGE_SIZE }: ErrorGroupOptions + ) { + const displayableEnvironment = await findDisplayableEnvironment(environmentId, userId); + + if (!displayableEnvironment) { + throw new ServiceValidationError("No environment found"); + } + + // Run summary (aggregated) and instances queries in parallel + const [summary, instancesResult] = await Promise.all([ + this.getSummary(organizationId, projectId, environmentId, fingerprint), + this.getInstances(organizationId, projectId, environmentId, fingerprint, cursor, pageSize), + ]); + + // Get stack trace from the most recent instance + let stackTrace: string | undefined; + if (instancesResult.instances.length > 0) { + const firstInstance = instancesResult.instances[0]; + try { + const errorData = JSON.parse(firstInstance.error_text) as Record; + stackTrace = (errorData.stack || errorData.stacktrace) as string | undefined; + } catch { + // no stack trace available + } + } + + // Build error group combining aggregated summary with instance stack trace + let errorGroup: ErrorGroupSummary | undefined; + if (summary) { + errorGroup = { + ...summary, + stackTrace, + }; + } + + // Transform instances + const transformedInstances = instancesResult.instances.map((instance) => { + let parsedError: any; + try { + parsedError = JSON.parse(instance.error_text); + } catch { + parsedError = { message: instance.error_text }; + } + + return { + runId: instance.run_id, + friendlyId: instance.friendly_id, + taskIdentifier: instance.task_identifier, + createdAt: new Date(parseInt(instance.created_at) * 1000), + status: instance.status, + error: parsedError, + traceId: instance.trace_id, + taskVersion: instance.task_version, + }; + }); + + return { + errorGroup, + instances: transformedInstances, + runFriendlyIds: transformedInstances.map((i) => i.friendlyId), + pagination: instancesResult.pagination, + }; + } + + public async getHourlyOccurrences( + organizationId: string, + projectId: string, + environmentId: string, + fingerprint: string + ): Promise { + const hours = 168; // 7 days + + const [queryError, records] = await this.clickhouse.errors.getHourlyOccurrences({ + organizationId, + projectId, + environmentId, + fingerprints: [fingerprint], + hours, + }); + + if (queryError) { + throw queryError; + } + + const buckets: number[] = []; + const nowMs = Date.now(); + for (let i = hours - 1; i >= 0; i--) { + const hourStart = Math.floor((nowMs - i * 3_600_000) / 3_600_000) * 3_600; + buckets.push(hourStart); + } + + const byHour = new Map(); + for (const row of records ?? []) { + byHour.set(row.hour_epoch, row.count); + } + + return buckets.map((epoch) => ({ + date: new Date(epoch * 1000), + count: byHour.get(epoch) ?? 0, + })); + } + + private async getSummary( + organizationId: string, + projectId: string, + environmentId: string, + fingerprint: string + ): Promise | undefined> { + const queryBuilder = this.clickhouse.errors.listQueryBuilder(); + + queryBuilder.where("organization_id = {organizationId: String}", { organizationId }); + queryBuilder.where("project_id = {projectId: String}", { projectId }); + queryBuilder.where("environment_id = {environmentId: String}", { environmentId }); + queryBuilder.where("error_fingerprint = {fingerprint: String}", { fingerprint }); + + queryBuilder.groupBy("error_fingerprint, task_identifier"); + queryBuilder.limit(1); + + const [queryError, records] = await queryBuilder.execute(); + + if (queryError) { + throw queryError; + } + + if (!records || records.length === 0) { + return undefined; + } + + const record = records[0]; + return { + fingerprint: record.error_fingerprint, + errorType: record.error_type, + errorMessage: record.error_message, + taskIdentifier: record.task_identifier, + count: record.occurrence_count, + firstSeen: parseClickHouseDateTime(record.first_seen), + lastSeen: parseClickHouseDateTime(record.last_seen), + }; + } + + private async getInstances( + organizationId: string, + projectId: string, + environmentId: string, + fingerprint: string, + cursor: string | undefined, + pageSize: number + ) { + const queryBuilder = this.clickhouse.errors.instancesQueryBuilder(); + + queryBuilder.where("organization_id = {organizationId: String}", { organizationId }); + queryBuilder.where("project_id = {projectId: String}", { projectId }); + queryBuilder.where("environment_id = {environmentId: String}", { environmentId }); + queryBuilder.where("error_fingerprint = {errorFingerprint: String}", { + errorFingerprint: fingerprint, + }); + queryBuilder.where("_is_deleted = 0"); + + const decodedCursor = cursor ? decodeCursor(cursor) : null; + if (decodedCursor) { + queryBuilder.where( + `(created_at < {cursorCreatedAt: String} OR (created_at = {cursorCreatedAt: String} AND run_id < {cursorRunId: String}))`, + { + cursorCreatedAt: decodedCursor.createdAt, + cursorRunId: decodedCursor.runId, + } + ); + } + + queryBuilder.orderBy("created_at DESC, run_id DESC"); + queryBuilder.limit(pageSize + 1); + + const [queryError, records] = await queryBuilder.execute(); + + if (queryError) { + throw queryError; + } + + const results = records || []; + const hasMore = results.length > pageSize; + const instances = results.slice(0, pageSize); + + let nextCursor: string | undefined; + if (hasMore && instances.length > 0) { + const lastInstance = instances[instances.length - 1]; + nextCursor = encodeCursor({ + createdAt: lastInstance.created_at, + runId: lastInstance.run_id, + }); + } + + return { + instances, + pagination: { + hasMore, + nextCursor, + }, + }; + } +} diff --git a/apps/webapp/app/presenters/v3/ErrorsListPresenter.server.ts b/apps/webapp/app/presenters/v3/ErrorsListPresenter.server.ts new file mode 100644 index 00000000000..459753822ce --- /dev/null +++ b/apps/webapp/app/presenters/v3/ErrorsListPresenter.server.ts @@ -0,0 +1,315 @@ +import { z } from "zod"; +import { type ClickHouse } from "@internal/clickhouse"; +import { type PrismaClientOrTransaction } from "@trigger.dev/database"; +import { type Direction } from "~/components/ListPagination"; +import { timeFilterFromTo } from "~/components/runs/v3/SharedFilters"; +import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server"; +import { getAllTaskIdentifiers } from "~/models/task.server"; +import { ServiceValidationError } from "~/v3/services/baseService.server"; +import { BasePresenter } from "~/presenters/v3/basePresenter.server"; + +export type ErrorsListOptions = { + userId?: string; + projectId: string; + // filters + tasks?: string[]; + period?: string; + from?: number; + to?: number; + defaultPeriod?: string; + retentionLimitDays?: number; + // search + search?: string; + // pagination + direction?: Direction; + cursor?: string; + pageSize?: number; +}; + +export const ErrorsListOptionsSchema = z.object({ + userId: z.string().optional(), + projectId: z.string(), + tasks: z.array(z.string()).optional(), + period: z.string().optional(), + from: z.number().int().nonnegative().optional(), + to: z.number().int().nonnegative().optional(), + defaultPeriod: z.string().optional(), + retentionLimitDays: z.number().int().positive().optional(), + search: z.string().max(1000).optional(), + direction: z.enum(["forward", "backward"]).optional(), + cursor: z.string().optional(), + pageSize: z.number().int().positive().max(1000).optional(), +}); + +const DEFAULT_PAGE_SIZE = 50; + +export type ErrorsList = Awaited>; +export type ErrorGroup = ErrorsList["errorGroups"][0]; +export type ErrorsListAppliedFilters = ErrorsList["filters"]; +export type ErrorHourlyOccurrences = Awaited< + ReturnType +>; +export type ErrorHourlyActivity = ErrorHourlyOccurrences[string]; + +// Cursor for error groups pagination +type ErrorGroupCursor = { + occurrenceCount: number; + fingerprint: string; +}; + +const ErrorGroupCursorSchema = z.object({ + occurrenceCount: z.number(), + fingerprint: z.string(), +}); + +function encodeCursor(cursor: ErrorGroupCursor): string { + return Buffer.from(JSON.stringify(cursor)).toString("base64"); +} + +function decodeCursor(cursor: string): ErrorGroupCursor | null { + try { + const decoded = Buffer.from(cursor, "base64").toString("utf-8"); + const parsed = JSON.parse(decoded); + const validated = ErrorGroupCursorSchema.safeParse(parsed); + if (!validated.success) { + return null; + } + return validated.data as ErrorGroupCursor; + } catch { + return null; + } +} + +function parseClickHouseDateTime(value: string): Date { + const asNum = Number(value); + if (!isNaN(asNum) && asNum > 1e12) { + return new Date(asNum); + } + // ClickHouse returns 'YYYY-MM-DD HH:mm:ss.SSS' in UTC + return new Date(value.replace(" ", "T") + "Z"); +} + +function escapeClickHouseString(val: string): string { + return val.replace(/\\/g, "\\\\").replace(/\//g, "\\/").replace(/%/g, "\\%").replace(/_/g, "\\_"); +} + +export class ErrorsListPresenter extends BasePresenter { + constructor( + private readonly replica: PrismaClientOrTransaction, + private readonly clickhouse: ClickHouse + ) { + super(undefined, replica); + } + + public async call( + organizationId: string, + environmentId: string, + { + userId, + projectId, + tasks, + period, + search, + from, + to, + cursor, + pageSize = DEFAULT_PAGE_SIZE, + defaultPeriod, + retentionLimitDays, + }: ErrorsListOptions + ) { + const time = timeFilterFromTo({ + period, + from, + to, + defaultPeriod: defaultPeriod ?? "7d", + }); + + let effectiveFrom = time.from; + let effectiveTo = time.to; + + // Apply retention limit if provided + let wasClampedByRetention = false; + if (retentionLimitDays !== undefined && effectiveFrom) { + const retentionCutoffDate = new Date(Date.now() - retentionLimitDays * 24 * 60 * 60 * 1000); + + if (effectiveFrom < retentionCutoffDate) { + effectiveFrom = retentionCutoffDate; + wasClampedByRetention = true; + } + } + + const hasFilters = + (tasks !== undefined && tasks.length > 0) || + (search !== undefined && search !== "") || + !time.isDefault; + + const possibleTasksAsync = getAllTaskIdentifiers(this.replica, environmentId); + + const [possibleTasks, displayableEnvironment] = await Promise.all([ + possibleTasksAsync, + findDisplayableEnvironment(environmentId, userId), + ]); + + if (!displayableEnvironment) { + throw new ServiceValidationError("No environment found"); + } + + // Calculate days parameter for ClickHouse query + const now = new Date(); + const daysAgo = effectiveFrom + ? Math.ceil((now.getTime() - effectiveFrom.getTime()) / (1000 * 60 * 60 * 24)) + : 30; + + // Query the pre-aggregated errors_v1 table + const queryBuilder = this.clickhouse.errors.listQueryBuilder(); + + // Apply base WHERE filters + queryBuilder.where("organization_id = {organizationId: String}", { organizationId }); + queryBuilder.where("project_id = {projectId: String}", { projectId }); + queryBuilder.where("environment_id = {environmentId: String}", { environmentId }); + + // Task filter (task_identifier is part of the key, so use WHERE) + if (tasks && tasks.length > 0) { + queryBuilder.where("task_identifier IN {tasks: Array(String)}", { tasks }); + } + + // Group by key columns to merge partial aggregations + queryBuilder.groupBy("error_fingerprint, task_identifier"); + + // Time range filter + queryBuilder.having("max(last_seen_date) >= now() - INTERVAL {days: Int64} DAY", { + days: daysAgo, + }); + + // Search filter - searches in error type and message + if (search && search.trim() !== "") { + const searchTerm = escapeClickHouseString(search.trim()).toLowerCase(); + queryBuilder.having( + "(lower(any(error_type)) like {searchPattern: String} OR lower(any(error_message)) like {searchPattern: String})", + { + searchPattern: `%${searchTerm}%`, + } + ); + } + + // Cursor-based pagination (sorted by occurrence_count DESC) + const decodedCursor = cursor ? decodeCursor(cursor) : null; + if (decodedCursor) { + queryBuilder.having( + "(occurrence_count < {cursorOccurrenceCount: UInt64} OR (occurrence_count = {cursorOccurrenceCount: UInt64} AND error_fingerprint < {cursorFingerprint: String}))", + { + cursorOccurrenceCount: decodedCursor.occurrenceCount, + cursorFingerprint: decodedCursor.fingerprint, + } + ); + } + + queryBuilder.orderBy("occurrence_count DESC, error_fingerprint DESC"); + queryBuilder.limit(pageSize + 1); + + const [queryError, records] = await queryBuilder.execute(); + + if (queryError) { + throw queryError; + } + + const results = records || []; + const hasMore = results.length > pageSize; + const errorGroups = results.slice(0, pageSize); + + // Build next cursor from the last item + let nextCursor: string | undefined; + if (hasMore && errorGroups.length > 0) { + const lastError = errorGroups[errorGroups.length - 1]; + nextCursor = encodeCursor({ + occurrenceCount: lastError.occurrence_count, + fingerprint: lastError.error_fingerprint, + }); + } + + // Transform results + const transformedErrorGroups = errorGroups.map((error) => ({ + errorType: error.error_type, + errorMessage: error.error_message, + fingerprint: error.error_fingerprint, + taskIdentifier: error.task_identifier, + firstSeen: parseClickHouseDateTime(error.first_seen), + lastSeen: parseClickHouseDateTime(error.last_seen), + count: error.occurrence_count, + sampleRunId: error.sample_run_id, + sampleFriendlyId: error.sample_friendly_id, + })); + + return { + errorGroups: transformedErrorGroups, + pagination: { + hasMore, + nextCursor, + }, + filters: { + tasks, + search, + period: time, + hasFilters, + possibleTasks, + wasClampedByRetention, + }, + }; + } + + public async getHourlyOccurrences( + organizationId: string, + projectId: string, + environmentId: string, + fingerprints: string[] + ): Promise>> { + if (fingerprints.length === 0) { + return {}; + } + + const hours = 24; + + const [queryError, records] = await this.clickhouse.errors.getHourlyOccurrences({ + organizationId, + projectId, + environmentId, + fingerprints, + hours, + }); + + if (queryError) { + throw queryError; + } + + // Build 24 hourly buckets as epoch seconds (UTC, floored to hour) + const buckets: number[] = []; + const nowMs = Date.now(); + for (let i = hours - 1; i >= 0; i--) { + const hourStart = Math.floor((nowMs - i * 3_600_000) / 3_600_000) * 3_600; + buckets.push(hourStart); + } + + // Index ClickHouse results by fingerprint → epoch → count + const grouped = new Map>(); + for (const row of records ?? []) { + let byHour = grouped.get(row.error_fingerprint); + if (!byHour) { + byHour = new Map(); + grouped.set(row.error_fingerprint, byHour); + } + byHour.set(row.hour_epoch, row.count); + } + + const result: Record> = {}; + for (const fp of fingerprints) { + const byHour = grouped.get(fp); + result[fp] = buckets.map((epoch) => ({ + date: new Date(epoch * 1000), + count: byHour?.get(epoch) ?? 0, + })); + } + + return result; + } +} diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors.$fingerprint/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors.$fingerprint/route.tsx new file mode 100644 index 00000000000..94f3af88cdb --- /dev/null +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors.$fingerprint/route.tsx @@ -0,0 +1,403 @@ +import { type LoaderFunctionArgs } from "@remix-run/server-runtime"; +import { type MetaFunction } from "@remix-run/react"; +import { ServiceValidationError } from "~/v3/services/baseService.server"; +import { TypedAwait, typeddefer, useTypedLoaderData } from "remix-typedjson"; +import { requireUser } from "~/services/session.server"; +import { EnvironmentParamSchema, v3ErrorsPath } from "~/utils/pathBuilder"; +import { findProjectBySlug } from "~/models/project.server"; +import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; +import { + ErrorGroupPresenter, + type ErrorGroupHourlyActivity, + type ErrorGroupSummary, +} from "~/presenters/v3/ErrorGroupPresenter.server"; +import { + NextRunListPresenter, + type NextRunList, +} from "~/presenters/v3/NextRunListPresenter.server"; +import { $replica } from "~/db.server"; +import { logsClickhouseClient, clickhouseClient } from "~/services/clickhouseInstance.server"; +import { NavBar, PageTitle } from "~/components/primitives/PageHeader"; +import { PageBody, PageContainer } from "~/components/layout/AppLayout"; +import { Suspense, useMemo } from "react"; +import { Spinner } from "~/components/primitives/Spinner"; +import { Paragraph } from "~/components/primitives/Paragraph"; +import { Callout } from "~/components/primitives/Callout"; +import { Header2, Header3 } from "~/components/primitives/Headers"; +import { formatDistanceToNow } from "date-fns"; +import { formatNumberCompact } from "~/utils/numberFormatter"; +import * as Property from "~/components/primitives/PropertyTable"; +import { TaskRunsTable } from "~/components/runs/v3/TaskRunsTable"; +import { DateTime } from "~/components/primitives/DateTime"; +import { ErrorId } from "@trigger.dev/core/v3/isomorphic"; +import { Chart, type ChartConfig } from "~/components/primitives/charts/ChartCompound"; + +export const meta: MetaFunction = ({ data }) => { + return [ + { + title: `Error Details | Trigger.dev`, + }, + ]; +}; + +export const loader = async ({ request, params }: LoaderFunctionArgs) => { + const user = await requireUser(request); + const userId = user.id; + + const { projectParam, organizationSlug, envParam } = EnvironmentParamSchema.parse(params); + const fingerprint = params.fingerprint; + + if (!fingerprint) { + throw new Response("Fingerprint parameter is required", { status: 400 }); + } + + const project = await findProjectBySlug(organizationSlug, projectParam, userId); + if (!project) { + throw new Response("Project not found", { status: 404 }); + } + + const environment = await findEnvironmentBySlug(project.id, envParam, userId); + if (!environment) { + throw new Response("Environment not found", { status: 404 }); + } + + const presenter = new ErrorGroupPresenter($replica, logsClickhouseClient); + + const detailPromise = presenter + .call(project.organizationId, environment.id, { + userId, + projectId: project.id, + fingerprint, + }) + .then(async (result) => { + if (result.runFriendlyIds.length === 0) { + return { ...result, runList: undefined }; + } + + const runListPresenter = new NextRunListPresenter($replica, clickhouseClient); + const runList = await runListPresenter.call(project.organizationId, environment.id, { + userId, + projectId: project.id, + runId: result.runFriendlyIds, + pageSize: 25, + }); + + return { + ...result, + runList, + }; + }) + .catch((error) => { + if (error instanceof ServiceValidationError) { + return { error: error.message }; + } + throw error; + }); + + const hourlyActivityPromise = presenter + .getHourlyOccurrences(project.organizationId, project.id, environment.id, fingerprint) + .catch(() => [] as ErrorGroupHourlyActivity); + + return typeddefer({ + data: detailPromise, + hourlyActivity: hourlyActivityPromise, + organizationSlug, + projectParam, + envParam, + fingerprint, + }); +}; + +export default function Page() { + const { data, hourlyActivity, organizationSlug, projectParam, envParam, fingerprint } = + useTypedLoaderData(); + + const errorsPath = v3ErrorsPath( + { slug: organizationSlug }, + { slug: projectParam }, + { slug: envParam } + ); + + return ( + + + {ErrorId.toFriendlyId(fingerprint)}} + /> + + + + +
+ + Loading error details… +
+ + } + > + + + Unable to load error details. Please refresh the page or try again in a moment. + + + } + > + {(result) => { + if ("error" in result) { + return ( +
+ + {result.error} + +
+ ); + } + return ( + + ); + }} +
+
+
+
+ ); +} + +function ErrorGroupDetail({ + errorGroup, + runList, + hourlyActivity, + organizationSlug, + projectParam, + envParam, +}: { + errorGroup: ErrorGroupSummary | undefined; + runList: NextRunList | undefined; + hourlyActivity: Promise; + organizationSlug: string; + projectParam: string; + envParam: string; +}) { + if (!errorGroup) { + return ( +
+
+ Error not found + + This error group does not exist or has no instances. + +
+
+ ); + } + + return ( +
+ {/* Error Summary */} +
+ {errorGroup.errorMessage} + +
+ + + ID + + {ErrorId.toFriendlyId(errorGroup.fingerprint)} + + + + Task + + {errorGroup.taskIdentifier} + + + + + + + Occurrences + {formatNumberCompact(errorGroup.count)} + + + + + + First seen + + + + + + Last seen + + {formatDistanceToNow(errorGroup.lastSeen, { addSuffix: true })} + + + +
+ + {errorGroup.stackTrace && ( +
+ + Stack Trace + +
+              {errorGroup.stackTrace}
+            
+
+ )} +
+ + {/* Activity over past 7 days by hour */} +
+ Activity (past 7 days) + }> + }> + {(activity) => + activity.length > 0 ? ( + + ) : ( + + ) + } + + +
+ + {/* Runs Table */} +
+ Recent runs + {runList ? ( + + ) : ( + + No runs found for this error. + + )} +
+
+ ); +} + +const activityChartConfig: ChartConfig = { + count: { + label: "Occurrences", + color: "#EC003F", + }, +}; + +function ActivityChart({ activity }: { activity: ErrorGroupHourlyActivity }) { + const data = useMemo( + () => + activity.map((d) => ({ + ...d, + __timestamp: d.date instanceof Date ? d.date.getTime() : new Date(d.date).getTime(), + })), + [activity] + ); + + const midnightTicks = useMemo(() => { + const ticks: number[] = []; + for (const d of data) { + const date = new Date(d.__timestamp); + if (date.getHours() === 0 && date.getMinutes() === 0) { + ticks.push(d.__timestamp); + } + } + return ticks; + }, [data]); + + const xAxisFormatter = useMemo(() => { + return (value: number) => { + const date = new Date(value); + return date.toLocaleDateString("en-US", { + month: "short", + day: "numeric", + hour: "2-digit", + minute: "2-digit", + hour12: false, + }); + }; + }, []); + + const tooltipLabelFormatter = useMemo(() => { + return (_label: string, payload: Array<{ payload?: Record }>) => { + const timestamp = payload[0]?.payload?.__timestamp as number | undefined; + if (timestamp) { + const date = new Date(timestamp); + return date.toLocaleString("en-US", { + month: "short", + day: "numeric", + year: "numeric", + hour: "2-digit", + minute: "2-digit", + hour12: false, + }); + } + return _label; + }; + }, []); + + return ( + + + + ); +} + +function ActivityChartBlankState() { + return ( +
+ {[...Array(42)].map((_, i) => ( +
+ ))} +
+ ); +} diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors._index/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors._index/route.tsx new file mode 100644 index 00000000000..560141f45ce --- /dev/null +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors._index/route.tsx @@ -0,0 +1,454 @@ +import { XMarkIcon } from "@heroicons/react/20/solid"; +import { Form, type MetaFunction } from "@remix-run/react"; +import { type LoaderFunctionArgs } from "@remix-run/server-runtime"; +import { ErrorId } from "@trigger.dev/core/v3/isomorphic"; +import { Suspense } from "react"; +import { + Bar, + BarChart, + ReferenceLine, + ResponsiveContainer, + Tooltip, + YAxis, + type TooltipProps, +} from "recharts"; +import { TypedAwait, typeddefer, useTypedLoaderData } from "remix-typedjson"; +import { PageBody } from "~/components/layout/AppLayout"; +import { LogsSearchInput } from "~/components/logs/LogsSearchInput"; +import { LogsTaskFilter } from "~/components/logs/LogsTaskFilter"; +import { Button } from "~/components/primitives/Buttons"; +import { Callout } from "~/components/primitives/Callout"; +import { formatDateTime, RelativeDateTime } from "~/components/primitives/DateTime"; +import { Header3 } from "~/components/primitives/Headers"; +import { NavBar, PageTitle } from "~/components/primitives/PageHeader"; +import { Paragraph } from "~/components/primitives/Paragraph"; +import { Spinner } from "~/components/primitives/Spinner"; +import { + CopyableTableCell, + Table, + TableBody, + TableCell, + TableCellChevron, + TableHeader, + TableHeaderCell, + TableRow, +} from "~/components/primitives/Table"; +import TooltipPortal from "~/components/primitives/TooltipPortal"; +import { TimeFilter } from "~/components/runs/v3/SharedFilters"; +import { $replica } from "~/db.server"; +import { useOptimisticLocation } from "~/hooks/useOptimisticLocation"; +import { findProjectBySlug } from "~/models/project.server"; +import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; +import { + ErrorsListPresenter, + type ErrorGroup, + type ErrorHourlyActivity, + type ErrorHourlyOccurrences, + type ErrorsList, +} from "~/presenters/v3/ErrorsListPresenter.server"; +import { logsClickhouseClient } from "~/services/clickhouseInstance.server"; +import { getCurrentPlan } from "~/services/platform.v3.server"; +import { requireUser } from "~/services/session.server"; +import { formatNumberCompact } from "~/utils/numberFormatter"; +import { EnvironmentParamSchema, v3ErrorPath } from "~/utils/pathBuilder"; +import { ServiceValidationError } from "~/v3/services/baseService.server"; + +export const meta: MetaFunction = () => { + return [ + { + title: `Errors | Trigger.dev`, + }, + ]; +}; + +export const loader = async ({ request, params }: LoaderFunctionArgs) => { + const user = await requireUser(request); + const userId = user.id; + + const { projectParam, organizationSlug, envParam } = EnvironmentParamSchema.parse(params); + + const project = await findProjectBySlug(organizationSlug, projectParam, userId); + if (!project) { + throw new Response("Project not found", { status: 404 }); + } + + const environment = await findEnvironmentBySlug(project.id, envParam, userId); + if (!environment) { + throw new Response("Environment not found", { status: 404 }); + } + + // Get filters from query params + const url = new URL(request.url); + const tasks = url.searchParams.getAll("tasks").filter((t) => t.length > 0); + const search = url.searchParams.get("search") ?? undefined; + const period = url.searchParams.get("period") ?? undefined; + const fromStr = url.searchParams.get("from"); + const toStr = url.searchParams.get("to"); + const from = fromStr ? parseInt(fromStr, 10) : undefined; + const to = toStr ? parseInt(toStr, 10) : undefined; + + // Get the user's plan to determine retention limit + const plan = await getCurrentPlan(project.organizationId); + const retentionLimitDays = plan?.v3Subscription?.plan?.limits.logRetentionDays.number ?? 30; + + const presenter = new ErrorsListPresenter($replica, logsClickhouseClient); + + const listPromise = presenter + .call(project.organizationId, environment.id, { + userId, + projectId: project.id, + tasks: tasks.length > 0 ? tasks : undefined, + search, + period, + from, + to, + defaultPeriod: "7d", + retentionLimitDays, + }) + .catch((error) => { + if (error instanceof ServiceValidationError) { + return { error: error.message }; + } + throw error; + }); + + const hourlyOccurrencesPromise = listPromise.then((result) => { + if ("error" in result) return {} as ErrorHourlyOccurrences; + const fingerprints = result.errorGroups.map((g) => g.fingerprint); + if (fingerprints.length === 0) return {} as ErrorHourlyOccurrences; + return presenter.getHourlyOccurrences( + project.organizationId, + project.id, + environment.id, + fingerprints + ); + }); + + return typeddefer({ + data: listPromise, + hourlyOccurrences: hourlyOccurrencesPromise, + defaultPeriod: "7d", + retentionLimitDays, + organizationSlug, + projectParam, + envParam, + }); +}; + +export default function Page() { + const { + data, + hourlyOccurrences, + defaultPeriod, + retentionLimitDays, + organizationSlug, + projectParam, + envParam, + } = useTypedLoaderData(); + + return ( + <> + + + + + + +
+
+
+ + Loading errors… +
+
+
+ } + > + + +
+ + Unable to load errors. Please refresh the page or try again in a moment. + +
+
+ } + > + {(result) => { + // Check if result contains an error + if ("error" in result) { + return ( +
+ +
+ + {result.error} + +
+
+ ); + } + return ( +
+ + +
+ ); + }} + + + + + ); +} + +function FiltersBar({ + list, + defaultPeriod, + retentionLimitDays, +}: { + list?: ErrorsList; + defaultPeriod?: string; + retentionLimitDays: number; +}) { + const location = useOptimisticLocation(); + const searchParams = new URLSearchParams(location.search); + const hasFilters = + searchParams.has("tasks") || + searchParams.has("search") || + searchParams.has("period") || + searchParams.has("from") || + searchParams.has("to"); + + return ( +
+
+ {list ? ( + <> + + + + {hasFilters && ( +
+
+
+ ); +} + +function ErrorsList({ + errorGroups, + hourlyOccurrences, + organizationSlug, + projectParam, + envParam, +}: { + errorGroups: ErrorGroup[]; + hourlyOccurrences: Promise; + organizationSlug: string; + projectParam: string; + envParam: string; +}) { + if (errorGroups.length === 0) { + return ( +
+
+ No errors found + + No errors have been recorded in the selected time period. + +
+
+ ); + } + + return ( + + + + ID + Task + Error + Occurrences + Past 24h + First seen + Last seen + + + + {errorGroups.map((errorGroup) => ( + + ))} + +
+ ); +} + +function ErrorGroupRow({ + errorGroup, + hourlyOccurrences, + organizationSlug, + projectParam, + envParam, +}: { + errorGroup: ErrorGroup; + hourlyOccurrences: Promise; + organizationSlug: string; + projectParam: string; + envParam: string; +}) { + const errorPath = v3ErrorPath( + { slug: organizationSlug }, + { slug: projectParam }, + { slug: envParam }, + { fingerprint: errorGroup.fingerprint } + ); + + const errorMessage = `${errorGroup.errorMessage}`; + + return ( + + + {errorGroup.fingerprint.slice(-8)} + + {errorGroup.taskIdentifier} + + {errorMessage} + + {errorGroup.count.toLocaleString()} + + }> + }> + {(data) => { + const activity = data[errorGroup.fingerprint]; + return activity ? ( + + ) : ( + + ); + }} + + + + + + + + + + + ); +} + +function ErrorActivityGraph({ activity }: { activity: ErrorHourlyActivity }) { + const maxCount = Math.max(...activity.map((d) => d.count)); + + return ( +
+
+ + + + } + allowEscapeViewBox={{ x: true, y: true }} + wrapperStyle={{ zIndex: 1000 }} + animationDuration={0} + /> + + + {maxCount > 0 && ( + + )} + + +
+ + {formatNumberCompact(maxCount)} + +
+ ); +} + +const ErrorActivityTooltip = ({ active, payload }: TooltipProps) => { + if (active && payload && payload.length > 0) { + const entry = payload[0].payload as { date: Date; count: number }; + const date = entry.date instanceof Date ? entry.date : new Date(entry.date); + const formattedDate = formatDateTime(date, "UTC", [], false, true); + + return ( + +
+ {formattedDate} +
+ {entry.count}{" "} + + {entry.count === 1 ? "occurrence" : "occurrences"} + +
+
+
+ ); + } + + return null; +}; + +function ErrorActivityBlankState() { + return ( +
+ {[...Array(24)].map((_, i) => ( +
+ ))} +
+ ); +} diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors/route.tsx new file mode 100644 index 00000000000..f6723ddebaa --- /dev/null +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors/route.tsx @@ -0,0 +1,10 @@ +import { Outlet } from "@remix-run/react"; +import { PageContainer } from "~/components/layout/AppLayout"; + +export default function Page() { + return ( + + + + ); +} diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors.ts b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors.ts new file mode 100644 index 00000000000..fd36003e8f3 --- /dev/null +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors.ts @@ -0,0 +1,67 @@ +import { type LoaderFunctionArgs } from "@remix-run/server-runtime"; +import { json } from "@remix-run/node"; +import { requireUser } from "~/services/session.server"; +import { EnvironmentParamSchema } from "~/utils/pathBuilder"; +import { findProjectBySlug } from "~/models/project.server"; +import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; +import { ErrorsListPresenter, ErrorsListOptionsSchema } from "~/presenters/v3/ErrorsListPresenter.server"; +import { $replica } from "~/db.server"; +import { logsClickhouseClient } from "~/services/clickhouseInstance.server"; +import { getCurrentPlan } from "~/services/platform.v3.server"; + +export const loader = async ({ request, params }: LoaderFunctionArgs) => { + const user = await requireUser(request); + const userId = user.id; + + const { projectParam, organizationSlug, envParam } = EnvironmentParamSchema.parse(params); + + const project = await findProjectBySlug(organizationSlug, projectParam, userId); + if (!project) { + throw new Response("Project not found", { status: 404 }); + } + + const environment = await findEnvironmentBySlug(project.id, envParam, userId); + if (!environment) { + throw new Response("Environment not found", { status: 404 }); + } + + // Get the user's plan to determine retention limit + const plan = await getCurrentPlan(project.organizationId); + const retentionLimitDays = plan?.v3Subscription?.plan?.limits.logRetentionDays.number ?? 30; + + // Get filters from query params + const url = new URL(request.url); + const tasks = url.searchParams.getAll("tasks").filter((t) => t.length > 0); + const search = url.searchParams.get("search") ?? undefined; + const cursor = url.searchParams.get("cursor") ?? undefined; + const period = url.searchParams.get("period") ?? undefined; + const fromStr = url.searchParams.get("from"); + const toStr = url.searchParams.get("to"); + let from = fromStr ? parseInt(fromStr, 10) : undefined; + let to = toStr ? parseInt(toStr, 10) : undefined; + + if (Number.isNaN(from)) from = undefined; + if (Number.isNaN(to)) to = undefined; + + const options = ErrorsListOptionsSchema.parse({ + userId, + projectId: project.id, + tasks: tasks.length > 0 ? tasks : undefined, + search, + cursor, + period, + from, + to, + defaultPeriod: "7d", + retentionLimitDays, + }) as any; // Validated by ErrorsListOptionsSchema at runtime + + const presenter = new ErrorsListPresenter($replica, logsClickhouseClient); + const result = await presenter.call(project.organizationId, environment.id, options); + + return json({ + errorGroups: result.errorGroups, + pagination: result.pagination, + filters: result.filters, + }); +}; diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index 8dc078d338f..0a8ab5e1bde 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -68,6 +68,7 @@ function initializeRunsReplicationInstance() { insertMaxDelayMs: env.RUN_REPLICATION_INSERT_MAX_DELAY_MS, insertStrategy: env.RUN_REPLICATION_INSERT_STRATEGY, disablePayloadInsert: env.RUN_REPLICATION_DISABLE_PAYLOAD_INSERT === "1", + disableErrorFingerprinting: env.RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING === "1", }); if (env.RUN_REPLICATION_ENABLED === "1") { diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index faa0f3d9822..1d7714ae478 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -27,6 +27,7 @@ import { nanoid } from "nanoid"; import EventEmitter from "node:events"; import pLimit from "p-limit"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; +import { calculateErrorFingerprint } from "~/utils/errorFingerprinting"; interface TransactionEvent { tag: "insert" | "update" | "delete"; @@ -70,6 +71,7 @@ export type RunsReplicationServiceOptions = { insertBaseDelayMs?: number; insertMaxDelayMs?: number; disablePayloadInsert?: boolean; + disableErrorFingerprinting?: boolean; }; type PostgresTaskRun = TaskRun & { masterQueue: string }; @@ -115,6 +117,7 @@ export class RunsReplicationService { private _insertMaxDelayMs: number; private _insertStrategy: "insert" | "insert_async"; private _disablePayloadInsert: boolean; + private _disableErrorFingerprinting: boolean; // Metrics private _replicationLagHistogram: Histogram; @@ -189,6 +192,7 @@ export class RunsReplicationService { this._insertStrategy = options.insertStrategy ?? "insert"; this._disablePayloadInsert = options.disablePayloadInsert ?? false; + this._disableErrorFingerprinting = options.disableErrorFingerprinting ?? false; this._replicationClient = new LogicalReplicationClient({ pgConfig: { @@ -852,6 +856,15 @@ export class RunsReplicationService { _version: bigint ): Promise { const output = await this.#prepareJson(run.output, run.outputType); + const errorData = { data: run.error }; + + // Calculate error fingerprint for failed runs + const errorFingerprint = ( + !this._disableErrorFingerprinting && + ['SYSTEM_FAILURE', 'CRASHED', 'INTERRUPTED', 'COMPLETED_WITH_ERRORS'].includes(run.status) + ) + ? calculateErrorFingerprint(run.error) + : ''; // Return array matching TASK_RUN_COLUMNS order return [ @@ -880,7 +893,8 @@ export class RunsReplicationService { run.costInCents ?? 0, // cost_in_cents run.baseCostInCents ?? 0, // base_cost_in_cents output, // output - { data: run.error }, // error + errorData, // error + errorFingerprint, // error_fingerprint run.runTags ?? [], // tags run.taskVersion ?? "", // task_version run.sdkVersion ?? "", // sdk_version diff --git a/apps/webapp/app/utils/errorFingerprinting.ts b/apps/webapp/app/utils/errorFingerprinting.ts new file mode 100644 index 00000000000..2e8c9438335 --- /dev/null +++ b/apps/webapp/app/utils/errorFingerprinting.ts @@ -0,0 +1,91 @@ +import { createHash } from "node:crypto"; + +/** + * Calculate error fingerprint using Sentry-style normalization. + * Groups similar errors together by normalizing dynamic values. + */ +export function calculateErrorFingerprint(error: unknown): string { + if (!error || typeof error !== "object" || Array.isArray(error)) return ""; + + // This is a but ugly but… + // 1. We can't use a schema here because it's a hot path and needs to be fast. + // 2. It won't be an instanceof Error because it's from the database. + const errorObj = error as any; + const errorType = String(errorObj.type || errorObj.name || "Error"); + const message = String(errorObj.message || ""); + const stack = String(errorObj.stack || errorObj.stacktrace || ""); + + // Normalize message to group similar errors + const normalizedMessage = normalizeErrorMessage(message); + + // Extract and normalize first few stack frames + const normalizedStack = normalizeStackTrace(stack); + + // Create fingerprint from type + normalized message + stack + const fingerprintInput = `${errorType}:${normalizedMessage}:${normalizedStack}`; + + // Use SHA-256 hash, take first 16 chars for compact storage + return createHash("sha256").update(fingerprintInput).digest("hex").substring(0, 16); +} + +/** + * Normalize error message by replacing dynamic values with placeholders. + * This allows similar errors to be grouped together. + */ +export function normalizeErrorMessage(message: string): string { + if (!message) return ""; + + return ( + message + // UUIDs (8-4-4-4-12 format) + .replace(/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/gi, "") + // Run IDs (run_xxxxx format) + .replace(/run_[a-zA-Z0-9]+/g, "") + // Task run friendly IDs (task_xxxxx or similar) + .replace(/\b[a-z]+_[a-zA-Z0-9]{8,}\b/g, "") + // Standalone numeric IDs (4+ digits) + .replace(/\b\d{4,}\b/g, "") + // ISO 8601 timestamps + .replace(/\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?Z?/g, "") + // Unix timestamps (10 or 13 digits) + .replace(/\b\d{10,13}\b/g, "") + // File paths (Unix style) + .replace(/(?:\/[^\/\s]+){2,}/g, "") + // File paths (Windows style) + .replace(/[A-Z]:\\(?:[^\\]+\\)+[^\\]+/g, "") + // Email addresses + .replace(/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/g, "") + // URLs + .replace(/https?:\/\/[^\s]+/g, "") + // Memory addresses (0x...) + .replace(/0x[0-9a-fA-F]{8,}/g, "") + // Quoted strings with dynamic content + .replace(/"[^"]{20,}"/g, '""') + .replace(/'[^']{20,}'/g, "''") + ); +} + +/** + * Normalize stack trace by taking first few frames and removing dynamic parts. + */ +export function normalizeStackTrace(stack: string): string { + if (!stack) return ""; + + // Take first 5 stack frames only + const lines = stack.split("\n").slice(0, 5); + + return lines + .map((line) => { + // Remove line and column numbers (file.ts:123:45 -> file.ts:_:_) + line = line.replace(/:\d+:\d+/g, ":_:_"); + // Remove standalone numbers + line = line.replace(/\b\d+\b/g, "_"); + // Remove file paths but keep filename + line = line.replace(/(?:\/[^\/\s]+)+\/([^\/\s]+)/g, "$1"); + // Normalize whitespace + line = line.trim(); + return line; + }) + .filter((line) => line.length > 0) + .join("|"); +} diff --git a/apps/webapp/app/utils/pathBuilder.ts b/apps/webapp/app/utils/pathBuilder.ts index c39234a7bbb..e8ed3e9b61a 100644 --- a/apps/webapp/app/utils/pathBuilder.ts +++ b/apps/webapp/app/utils/pathBuilder.ts @@ -527,6 +527,23 @@ export function v3LogsPath( return `${v3EnvironmentPath(organization, project, environment)}/logs`; } +export function v3ErrorsPath( + organization: OrgForPath, + project: ProjectForPath, + environment: EnvironmentForPath +) { + return `${v3EnvironmentPath(organization, project, environment)}/errors`; +} + +export function v3ErrorPath( + organization: OrgForPath, + project: ProjectForPath, + environment: EnvironmentForPath, + error: { fingerprint: string } +) { + return `${v3ErrorsPath(organization, project, environment)}/${error.fingerprint}`; +} + export function v3DeploymentsPath( organization: OrgForPath, project: ProjectForPath, diff --git a/apps/webapp/test/errorFingerprinting.test.ts b/apps/webapp/test/errorFingerprinting.test.ts new file mode 100644 index 00000000000..7f72eb6ed15 --- /dev/null +++ b/apps/webapp/test/errorFingerprinting.test.ts @@ -0,0 +1,327 @@ +import { describe, it, expect } from "vitest"; +import { + calculateErrorFingerprint, + normalizeErrorMessage, + normalizeStackTrace, +} from "~/utils/errorFingerprinting"; + +describe("normalizeErrorMessage", () => { + it("should normalize UUIDs", () => { + const message = "Error processing user 550e8400-e29b-41d4-a716-446655440000"; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe("Error processing user "); + }); + + it("should normalize run IDs", () => { + const message = "Failed to execute run_abcd1234xyz"; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe("Failed to execute "); + }); + + it("should normalize task friendly IDs", () => { + const message = "Task task_abc12345678 failed"; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe("Task failed"); + }); + + it("should normalize numeric IDs (4+ digits)", () => { + const message = "User 12345 not found"; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe("User not found"); + }); + + it("should not normalize short numbers", () => { + const message = "Retry attempt 3 of 5"; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe("Retry attempt 3 of 5"); + }); + + it("should normalize ISO 8601 timestamps", () => { + const message = "Event at 2024-03-01T15:30:45Z failed"; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe("Event at failed"); + }); + + it("should normalize ISO timestamps with milliseconds", () => { + const message = "Timeout at 2024-03-01T15:30:45.123Z"; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe("Timeout at "); + }); + + it("should normalize Unix timestamps", () => { + const message = "Created at 1234567890"; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe("Created at "); + }); + + it("should normalize Unix timestamps (milliseconds)", () => { + const message = "Created at 1234567890123"; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe("Created at "); + }); + + it("should normalize Unix file paths", () => { + const message = "Cannot read /home/user/project/file.ts"; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe("Cannot read "); + }); + + it("should normalize Windows file paths", () => { + const message = "Cannot read C:\\Users\\John\\project\\file.ts"; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe("Cannot read "); + }); + + it("should normalize email addresses", () => { + const message = "Email user@example.com already exists"; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe("Email already exists"); + }); + + it("should normalize URLs", () => { + const message = "Failed to fetch https://api.example.com/users/123"; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe("Failed to fetch "); + }); + + it("should normalize HTTP URLs", () => { + const message = "Request to http://localhost:3000/api failed"; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe("Request to failed"); + }); + + it("should normalize memory addresses", () => { + const message = "Segfault at 0x7fff5fbffab0"; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe("Segfault at "); + }); + + it("should normalize long quoted strings", () => { + const message = 'Error: "this is a very long error message with dynamic content that changes"'; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe('Error: ""'); + }); + + it("should handle multiple replacements", () => { + const message = + "User 12345 at user@example.com failed to access run_abc123 at 2024-03-01T15:30:45Z"; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe("User at failed to access at "); + }); + + it("should return empty string for empty input", () => { + expect(normalizeErrorMessage("")).toBe(""); + }); + + it("should handle messages with no dynamic content", () => { + const message = "Connection timeout"; + const normalized = normalizeErrorMessage(message); + expect(normalized).toBe("Connection timeout"); + }); +}); + +describe("normalizeStackTrace", () => { + it("should normalize line and column numbers", () => { + const stack = `Error: Test error + at functionName (file.ts:123:45) + at anotherFunction (other.ts:67:89)`; + const normalized = normalizeStackTrace(stack); + expect(normalized).toContain(":_:_"); + expect(normalized).not.toContain(":123:45"); + }); + + it("should remove standalone numbers", () => { + const stack = `Error: Test + at Object. (/path/to/file.ts:123:45) + at Module._compile (node:internal/modules/cjs/loader:456:78)`; + const normalized = normalizeStackTrace(stack); + expect(normalized).not.toMatch(/\b\d+\b/); + }); + + it("should keep only first 5 frames", () => { + const stack = `Error: Test + at frame1 (file1.ts:1:1) + at frame2 (file2.ts:2:2) + at frame3 (file3.ts:3:3) + at frame4 (file4.ts:4:4) + at frame5 (file5.ts:5:5) + at frame6 (file6.ts:6:6) + at frame7 (file7.ts:7:7)`; + const normalized = normalizeStackTrace(stack); + const frames = normalized.split("|"); + expect(frames.length).toBeLessThanOrEqual(5); + }); + + it("should remove file paths but keep filenames", () => { + const stack = `Error: Test + at functionName (/home/user/project/src/file.ts:123:45)`; + const normalized = normalizeStackTrace(stack); + expect(normalized).toContain("file.ts"); + expect(normalized).not.toContain("/home/user/project/src/"); + }); + + it("should filter out empty lines", () => { + const stack = `Error: Test + + at functionName (file.ts:123:45) + + at anotherFunction (other.ts:67:89)`; + const normalized = normalizeStackTrace(stack); + const frames = normalized.split("|").filter((f) => f.length > 0); + expect(frames.length).toBeLessThanOrEqual(3); + }); + + it("should return empty string for empty stack", () => { + expect(normalizeStackTrace("")).toBe(""); + }); + + it("should join frames with pipe delimiter", () => { + const stack = `Error: Test + at frame1 (file1.ts:1:1) + at frame2 (file2.ts:2:2)`; + const normalized = normalizeStackTrace(stack); + expect(normalized).toContain("|"); + }); +}); + +describe("calculateErrorFingerprint", () => { + it("should generate consistent fingerprints for same error", () => { + const error = { + type: "DatabaseError", + message: "Connection timeout", + stack: "at db.connect (db.ts:123:45)", + }; + const fp1 = calculateErrorFingerprint(error); + const fp2 = calculateErrorFingerprint(error); + expect(fp1).toBe(fp2); + expect(fp1.length).toBe(16); + }); + + it("should generate same fingerprint for errors with different IDs", () => { + const error1 = { + type: "NotFoundError", + message: "User 12345 not found", + stack: "at findUser (user.ts:50:10)", + }; + const error2 = { + type: "NotFoundError", + message: "User 67890 not found", + stack: "at findUser (user.ts:50:10)", + }; + const fp1 = calculateErrorFingerprint(error1); + const fp2 = calculateErrorFingerprint(error2); + expect(fp1).toBe(fp2); + }); + + it("should generate same fingerprint for errors with different UUIDs", () => { + const error1 = { + type: "ValidationError", + message: "Invalid token 550e8400-e29b-41d4-a716-446655440000", + }; + const error2 = { + type: "ValidationError", + message: "Invalid token 123e4567-e89b-12d3-a456-426614174000", + }; + expect(calculateErrorFingerprint(error1)).toBe(calculateErrorFingerprint(error2)); + }); + + it("should generate same fingerprint for errors with different run IDs", () => { + const error1 = { + type: "TaskError", + message: "Failed to execute run_abc123", + }; + const error2 = { + type: "TaskError", + message: "Failed to execute run_xyz789", + }; + expect(calculateErrorFingerprint(error1)).toBe(calculateErrorFingerprint(error2)); + }); + + it("should generate different fingerprints for different error types", () => { + const error1 = { + type: "DatabaseError", + message: "Connection failed", + }; + const error2 = { + type: "NetworkError", + message: "Connection failed", + }; + expect(calculateErrorFingerprint(error1)).not.toBe(calculateErrorFingerprint(error2)); + }); + + it("should generate different fingerprints for different error messages", () => { + const error1 = { + type: "Error", + message: "Connection timeout", + }; + const error2 = { + type: "Error", + message: "Connection refused", + }; + expect(calculateErrorFingerprint(error1)).not.toBe(calculateErrorFingerprint(error2)); + }); + + it("should handle error with name instead of type", () => { + const error = { + name: "TypeError", + message: "Cannot read property 'foo' of undefined", + }; + const fp = calculateErrorFingerprint(error); + expect(fp).toBeTruthy(); + expect(fp.length).toBe(16); + }); + + it("should handle error with stacktrace instead of stack", () => { + const error = { + type: "Error", + message: "Test error", + stacktrace: "at test (file.ts:1:1)", + }; + const fp = calculateErrorFingerprint(error); + expect(fp).toBeTruthy(); + }); + + it("should return empty string for non-object error", () => { + expect(calculateErrorFingerprint(null)).toBe(""); + expect(calculateErrorFingerprint(undefined)).toBe(""); + expect(calculateErrorFingerprint("error string")).toBe(""); + expect(calculateErrorFingerprint(123)).toBe(""); + }); + + it("should handle errors with no message or stack", () => { + const error = { + type: "Error", + }; + const fp = calculateErrorFingerprint(error); + expect(fp).toBeTruthy(); + expect(fp.length).toBe(16); + }); + + it("should generate fingerprints using stack trace when available", () => { + const error1 = { + type: "Error", + message: "Test", + stack: "at funcA (a.ts:1:1)\nat funcB (b.ts:2:2)", + }; + const error2 = { + type: "Error", + message: "Test", + stack: "at funcX (x.ts:1:1)\nat funcY (y.ts:2:2)", + }; + expect(calculateErrorFingerprint(error1)).not.toBe(calculateErrorFingerprint(error2)); + }); + + it("should normalize line numbers in stack traces for same code location", () => { + const error1 = { + type: "Error", + message: "Test", + stack: "at func (file.ts:123:45)", + }; + const error2 = { + type: "Error", + message: "Test", + stack: "at func (file.ts:456:78)", + }; + expect(calculateErrorFingerprint(error1)).toBe(calculateErrorFingerprint(error2)); + }); +}); diff --git a/apps/webapp/test/runsReplicationBenchmark.README.md b/apps/webapp/test/runsReplicationBenchmark.README.md new file mode 100644 index 00000000000..68ff312eb0b --- /dev/null +++ b/apps/webapp/test/runsReplicationBenchmark.README.md @@ -0,0 +1,283 @@ +# RunsReplicationService Error Fingerprinting Benchmark + +This benchmark measures the performance impact of error fingerprinting in the RunsReplicationService. + +## Overview + +The benchmark: +1. Creates a realistic dataset of TaskRuns (7% with errors by default) +2. Runs the producer in a **separate process** to simulate real-world load +3. Measures replication throughput and Event Loop Utilization (ELU) +4. Compares performance with fingerprinting **enabled** vs **disabled** + +## Architecture + +``` +┌─────────────────┐ ┌──────────────────────┐ +│ Producer │ │ Benchmark Test │ +│ (Child Process)│─────────│ (Main Process) │ +│ │ IPC │ │ +│ - Inserts │ │ - RunsReplication │ +│ TaskRuns │ │ Service │ +│ to Postgres │ │ - ELU Monitor │ +│ │ │ - Metrics │ +└─────────────────┘ └──────────────────────┘ + │ │ + │ │ + ▼ ▼ + ┌──────────┐ ┌──────────────┐ + │ Postgres │ │ ClickHouse │ + └──────────┘ └──────────────┘ +``` + +## Files + +- `runsReplicationBenchmark.test.ts` - Main benchmark test +- `runsReplicationBenchmark.producer.ts` - Producer script (runs in child process) +- `runsReplicationBenchmark.README.md` - This file + +## Configuration + +The benchmark can be configured via environment variables or by editing `BENCHMARK_CONFIG` in the test file: + +```typescript +const BENCHMARK_CONFIG = { + // Number of runs to create + NUM_RUNS: parseInt(process.env.BENCHMARK_NUM_RUNS || "5000", 10), + + // Error rate (0.07 = 7%) + ERROR_RATE: 0.07, + + // Producer batch size + PRODUCER_BATCH_SIZE: 100, + + // Replication service settings + FLUSH_BATCH_SIZE: 50, + FLUSH_INTERVAL_MS: 100, + MAX_FLUSH_CONCURRENCY: 4, + + // Timeout + REPLICATION_TIMEOUT_MS: 120_000, // 2 minutes +}; +``` + +## Running the Benchmark + +### Quick Test (Small Dataset) + +```bash +cd apps/webapp +BENCHMARK_NUM_RUNS=1000 pnpm run test ./test/runsReplicationBenchmark.test.ts --run +``` + +### Realistic Benchmark (Larger Dataset) + +```bash +cd apps/webapp +BENCHMARK_NUM_RUNS=10000 pnpm run test ./test/runsReplicationBenchmark.test.ts --run +``` + +### High Volume Benchmark + +```bash +cd apps/webapp +BENCHMARK_NUM_RUNS=50000 pnpm run test ./test/runsReplicationBenchmark.test.ts --run +``` + +**Note:** The test is marked with `.skip` by default. To run it, remove the `.skip` from the test: + +```typescript +// Change this: +containerTest.skip("should benchmark...", async () => { + +// To this: +containerTest("should benchmark...", async () => { +``` + +## What Gets Measured + +### 1. Producer Metrics +- Total runs created +- Runs with errors (should be ~7%) +- Duration +- Throughput (runs/sec) + +### 2. Replication Metrics +- Total runs replicated to ClickHouse +- Replication duration +- Replication throughput (runs/sec) + +### 3. Event Loop Utilization (ELU) +- Mean utilization (%) +- P50 (median) utilization (%) +- P95 utilization (%) +- P99 utilization (%) +- All samples for detailed analysis + +### 4. OpenTelemetry Metrics +- Batches flushed +- Task runs inserted +- Payloads inserted +- Events processed + +## Output + +The benchmark produces detailed output including: + +``` +================================================================================ +BENCHMARK: baseline-no-fingerprinting +Error Fingerprinting: DISABLED +Runs: 5000, Error Rate: 7.0% +================================================================================ + +[Producer] Starting - will create 5000 runs (7.0% with errors) +[Producer] Progress: 1000/5000 runs (2500 runs/sec) +... +[Producer] Completed: + - Total runs: 5000 + - With errors: 352 (7.0%) + - Duration: 2145ms + - Throughput: 2331 runs/sec + +[Benchmark] Waiting for replication to complete... + +================================================================================ +RESULTS: baseline-no-fingerprinting +================================================================================ + +Producer: + Created: 5000 runs + With errors: 352 (7.0%) + Duration: 2145ms + Throughput: 2331 runs/sec + +Replication: + Replicated: 5000 runs + Duration: 3456ms + Throughput: 1447 runs/sec + +Event Loop Utilization: + Mean: 23.45% + P50: 22.10% + P95: 34.20% + P99: 41.30% + Samples: 346 + +Metrics: + Batches flushed: 102 + Task runs inserted: 5000 + Payloads inserted: 5000 + Events processed: 5000 +================================================================================ + +[... Similar output for "with-fingerprinting" benchmark ...] + +================================================================================ +COMPARISON +Baseline: baseline-no-fingerprinting (fingerprinting OFF) +Comparison: with-fingerprinting (fingerprinting ON) +================================================================================ + +Replication Duration: + 3456ms → 3512ms (+1.62%) + +Throughput: + 1447 → 1424 runs/sec (-1.59%) + +Event Loop Utilization (Mean): + 23.45% → 24.12% (+2.86%) + +Event Loop Utilization (P99): + 41.30% → 43.20% (+4.60%) + +================================================================================ + +BENCHMARK COMPLETE +Fingerprinting impact on replication duration: +1.62% +Fingerprinting impact on throughput: -1.59% +Fingerprinting impact on ELU (mean): +2.86% +Fingerprinting impact on ELU (P99): +4.60% +``` + +## Interpreting Results + +### What to Look For + +1. **Replication Duration Delta** - How much longer replication takes with fingerprinting +2. **Throughput Delta** - Change in runs processed per second +3. **ELU Delta** - Change in event loop utilization (higher = more CPU bound) + +### Expected Results + +With a 7% error rate and SHA-256 hashing: +- **Small impact** (<5% overhead): Fingerprinting is well optimized +- **Moderate impact** (5-15% overhead): May want to consider optimizations +- **Large impact** (>15% overhead): Fingerprinting needs optimization + +### Performance Optimization Ideas + +If the benchmark shows significant overhead, consider: + +1. **Faster hashing algorithm** - Replace SHA-256 with xxHash or MurmurHash3 +2. **Worker threads** - Move fingerprinting to worker threads +3. **Caching** - Cache fingerprints for identical errors +4. **Lazy computation** - Only compute fingerprints when needed +5. **Batch processing** - Group similar errors before hashing + +## Dataset Characteristics + +The producer generates realistic error variety: + +- TypeError (undefined property access) +- Error (API fetch failures) +- ValidationError (input validation) +- TimeoutError (operation timeouts) +- DatabaseError (connection failures) +- ReferenceError (undefined variables) + +Each error template includes: +- Realistic stack traces +- Variable IDs and timestamps +- Line/column numbers +- File paths + +This ensures the fingerprinting algorithm is tested with realistic data. + +## Troubleshooting + +### Benchmark Times Out + +Increase the timeout: +```typescript +REPLICATION_TIMEOUT_MS: 300_000, // 5 minutes +``` + +### Producer Fails + +Check Postgres connection and ensure: +- Docker services are running (`pnpm run docker`) +- Database is accessible +- Sufficient disk space + +### Different Results Each Run + +This is normal! Factors affecting variance: +- System load +- Docker container overhead +- Database I/O +- Network latency (even localhost) + +Run multiple times and look at trends. + +## Future Enhancements + +Potential improvements to the benchmark: + +1. **Multiple error rates** - Test 0%, 5%, 10%, 25%, 50% error rates +2. **Different hash algorithms** - Compare SHA-256 vs xxHash vs MurmurHash3 +3. **Worker thread comparison** - Test main thread vs worker threads +4. **Concurrent producers** - Multiple producer processes +5. **Memory profiling** - Track memory usage over time +6. **Flame graphs** - Generate CPU flame graphs for analysis +7. **Historical tracking** - Store results over time to track regressions diff --git a/apps/webapp/test/runsReplicationBenchmark.producer.ts b/apps/webapp/test/runsReplicationBenchmark.producer.ts new file mode 100644 index 00000000000..dbed1d81938 --- /dev/null +++ b/apps/webapp/test/runsReplicationBenchmark.producer.ts @@ -0,0 +1,205 @@ +#!/usr/bin/env node +/** + * Producer script that runs in a separate process to insert TaskRuns into PostgreSQL. + * This simulates realistic production load for benchmarking RunsReplicationService. + */ + +import { PrismaClient } from "@trigger.dev/database"; +import { performance } from "node:perf_hooks"; + +interface ProducerConfig { + postgresUrl: string; + organizationId: string; + projectId: string; + environmentId: string; + numRuns: number; + errorRate: number; // 0.07 = 7% + batchSize: number; +} + +// Error templates for realistic variety +const ERROR_TEMPLATES = [ + { + type: "TypeError", + message: "Cannot read property 'foo' of undefined", + stack: `TypeError: Cannot read property 'foo' of undefined + at processData (/app/src/handler.ts:42:15) + at runTask (/app/src/runtime.ts:128:20) + at executeRun (/app/src/executor.ts:89:12) + at async Runner.execute (/app/src/runner.ts:56:5)`, + }, + { + type: "Error", + message: "Failed to fetch data from API endpoint https://api.example.com/data/12345", + stack: `Error: Failed to fetch data from API endpoint https://api.example.com/data/12345 + at fetchData (/app/src/api.ts:78:11) + at getData (/app/src/service.ts:34:18) + at processTask (/app/src/handler.ts:23:15) + at runTask (/app/src/runtime.ts:128:20)`, + }, + { + type: "ValidationError", + message: "Invalid input: expected string for field 'email', got number: 1234567890", + stack: `ValidationError: Invalid input: expected string for field 'email', got number: 1234567890 + at validateInput (/app/src/validator.ts:156:9) + at processRequest (/app/src/handler.ts:67:23) + at runTask (/app/src/runtime.ts:128:20)`, + }, + { + type: "TimeoutError", + message: "Operation timed out after 30000ms", + stack: `TimeoutError: Operation timed out after 30000ms + at Timeout._onTimeout (/app/src/timeout.ts:45:15) + at processTask (/app/src/handler.ts:89:12) + at runTask (/app/src/runtime.ts:128:20)`, + }, + { + type: "DatabaseError", + message: "Connection to database 'prod_db' failed: timeout of 5000ms exceeded", + stack: `DatabaseError: Connection to database 'prod_db' failed: timeout of 5000ms exceeded + at connect (/app/node_modules/pg/lib/client.js:234:11) + at query (/app/src/db.ts:89:18) + at getData (/app/src/service.ts:45:22)`, + }, + { + type: "ReferenceError", + message: "userId is not defined", + stack: `ReferenceError: userId is not defined + at validateUser (/app/src/auth.ts:123:9) + at processTask (/app/src/handler.ts:34:15) + at runTask (/app/src/runtime.ts:128:20)`, + }, +]; + +function generateError() { + const template = ERROR_TEMPLATES[Math.floor(Math.random() * ERROR_TEMPLATES.length)]; + + // Add variation to make errors slightly different + const randomId = Math.floor(Math.random() * 100000); + const randomTimestamp = Date.now() + Math.floor(Math.random() * 10000); + + return { + type: template.type, + name: template.type, + message: template.message + .replace(/\d{4,}/g, String(randomId)) + .replace(/\d{13}/g, String(randomTimestamp)), + stack: template.stack + .replace(/:\d+:\d+/g, `:${Math.floor(Math.random() * 500)}:${Math.floor(Math.random() * 50)}`) + .replace(/\d{4,}/g, String(randomId)), + }; +} + +async function runProducer(config: ProducerConfig) { + const prisma = new PrismaClient({ + datasources: { + db: { + url: config.postgresUrl, + }, + }, + }); + + try { + console.log(`[Producer] Starting - will create ${config.numRuns} runs (${(config.errorRate * 100).toFixed(1)}% with errors)`); + const startTime = performance.now(); + let created = 0; + let withErrors = 0; + + // Process in batches to avoid overwhelming the database + for (let batch = 0; batch < Math.ceil(config.numRuns / config.batchSize); batch++) { + const batchStart = batch * config.batchSize; + const batchEnd = Math.min(batchStart + config.batchSize, config.numRuns); + const batchSize = batchEnd - batchStart; + + const runs = []; + for (let i = batchStart; i < batchEnd; i++) { + const hasError = Math.random() < config.errorRate; + const status = hasError ? "COMPLETED_WITH_ERRORS" : "COMPLETED_SUCCESSFULLY"; + + const runData: any = { + friendlyId: `run_bench_${Date.now()}_${i}`, + taskIdentifier: `benchmark-task-${i % 10}`, // Vary task identifiers + payload: JSON.stringify({ index: i, timestamp: Date.now() }), + traceId: `trace_${i}`, + spanId: `span_${i}`, + queue: `queue-${i % 5}`, // Vary queues + runtimeEnvironmentId: config.environmentId, + projectId: config.projectId, + organizationId: config.organizationId, + environmentType: "DEVELOPMENT", + engine: "V2", + status, + createdAt: new Date(Date.now() - Math.floor(Math.random() * 1000)), + updatedAt: new Date(), + }; + + if (hasError) { + runData.error = generateError(); + withErrors++; + } + + runs.push(runData); + } + + // Insert batch + await prisma.taskRun.createMany({ + data: runs, + }); + + created += batchSize; + + if (batch % 10 === 0 || batch === Math.ceil(config.numRuns / config.batchSize) - 1) { + const elapsed = performance.now() - startTime; + const rate = (created / elapsed) * 1000; + console.log(`[Producer] Progress: ${created}/${config.numRuns} runs (${rate.toFixed(0)} runs/sec)`); + } + } + + const endTime = performance.now(); + const duration = endTime - startTime; + const throughput = (created / duration) * 1000; + + console.log(`[Producer] Completed:`); + console.log(` - Total runs: ${created}`); + console.log(` - With errors: ${withErrors} (${((withErrors / created) * 100).toFixed(1)}%)`); + console.log(` - Duration: ${duration.toFixed(0)}ms`); + console.log(` - Throughput: ${throughput.toFixed(0)} runs/sec`); + + // Send results to parent process + if (process.send) { + process.send({ + type: "complete", + stats: { + created, + withErrors, + duration, + throughput, + }, + }); + } + } catch (error) { + console.error("[Producer] Error:", error); + if (process.send) { + process.send({ + type: "error", + error: error instanceof Error ? error.message : String(error), + }); + } + process.exit(1); + } finally { + await prisma.$disconnect(); + } +} + +// Parse config from command line args +const configArg = process.argv[2]; +if (!configArg) { + console.error("Usage: runsReplicationBenchmark.producer.ts "); + process.exit(1); +} + +const config: ProducerConfig = JSON.parse(configArg); +runProducer(config).catch((error) => { + console.error("Fatal error:", error); + process.exit(1); +}); diff --git a/apps/webapp/test/runsReplicationBenchmark.test.ts b/apps/webapp/test/runsReplicationBenchmark.test.ts new file mode 100644 index 00000000000..5f0fbb8b27f --- /dev/null +++ b/apps/webapp/test/runsReplicationBenchmark.test.ts @@ -0,0 +1,567 @@ +import { ClickHouse } from "@internal/clickhouse"; +import { containerTest } from "@internal/testcontainers"; +import { fork, type ChildProcess } from "node:child_process"; +import { performance, PerformanceObserver } from "node:perf_hooks"; +import { setTimeout } from "node:timers/promises"; +import path from "node:path"; +import { z } from "zod"; +import { RunsReplicationService } from "~/services/runsReplicationService.server"; +import { createInMemoryTracing, createInMemoryMetrics } from "./utils/tracing"; + +// Extend test timeout for benchmarks +vi.setConfig({ testTimeout: 300_000 }); // 5 minutes + +/** + * Benchmark configuration + */ +const BENCHMARK_CONFIG = { + // Number of runs to create - adjust this to test different volumes + // Start with smaller numbers (1000) for quick tests, increase to 10000+ for realistic benchmarks + NUM_RUNS: parseInt(process.env.BENCHMARK_NUM_RUNS || "5000", 10), + + // Error rate (7% = realistic production load with some failures) + ERROR_RATE: 0.07, + + // Batch size for producer + PRODUCER_BATCH_SIZE: 100, + + // Replication service settings + FLUSH_BATCH_SIZE: 50, + FLUSH_INTERVAL_MS: 100, + MAX_FLUSH_CONCURRENCY: 4, + + // How long to wait for replication to complete (in ms) + REPLICATION_TIMEOUT_MS: 120_000, // 2 minutes +}; + +interface BenchmarkResult { + name: string; + fingerprintingEnabled: boolean; + producerStats: { + created: number; + withErrors: number; + duration: number; + throughput: number; + }; + replicationStats: { + duration: number; + throughput: number; + replicatedRuns: number; + }; + eluStats: { + mean: number; + p50: number; + p95: number; + p99: number; + samples: number[]; + }; + metricsStats: { + batchesFlushed: number; + taskRunsInserted: number; + payloadsInserted: number; + eventsProcessed: number; + }; +} + +/** + * Measure Event Loop Utilization during benchmark + */ +class ELUMonitor { + private samples: number[] = []; + private interval: NodeJS.Timeout | null = null; + private startELU: { idle: number; active: number } | null = null; + + start(intervalMs: number = 100) { + this.samples = []; + this.startELU = performance.eventLoopUtilization(); + + this.interval = setInterval(() => { + const elu = performance.eventLoopUtilization(); + const utilization = elu.utilization * 100; // Convert to percentage + this.samples.push(utilization); + }, intervalMs); + } + + stop(): { mean: number; p50: number; p95: number; p99: number; samples: number[] } { + if (this.interval) { + clearInterval(this.interval); + this.interval = null; + } + + if (this.samples.length === 0) { + return { mean: 0, p50: 0, p95: 0, p99: 0, samples: [] }; + } + + const sorted = [...this.samples].sort((a, b) => a - b); + const mean = sorted.reduce((sum, val) => sum + val, 0) / sorted.length; + const p50 = sorted[Math.floor(sorted.length * 0.5)]; + const p95 = sorted[Math.floor(sorted.length * 0.95)]; + const p99 = sorted[Math.floor(sorted.length * 0.99)]; + + return { mean, p50, p95, p99, samples: sorted }; + } +} + +/** + * Run the producer script in a separate process + */ +async function runProducer(config: { + postgresUrl: string; + organizationId: string; + projectId: string; + environmentId: string; + numRuns: number; + errorRate: number; + batchSize: number; +}): Promise<{ created: number; withErrors: number; duration: number; throughput: number }> { + return new Promise((resolve, reject) => { + const producerPath = path.join(__dirname, "runsReplicationBenchmark.producer.ts"); + + // Use tsx to run the TypeScript file directly + const child = fork(producerPath, [JSON.stringify(config)], { + stdio: ["ignore", "pipe", "pipe", "ipc"], + execArgv: ["-r", "tsx/cjs"], + }); + + let output = ""; + + child.stdout?.on("data", (data) => { + const text = data.toString(); + output += text; + console.log(text.trim()); + }); + + child.stderr?.on("data", (data) => { + console.error(data.toString().trim()); + }); + + child.on("message", (message: any) => { + if (message.type === "complete") { + resolve(message.stats); + } else if (message.type === "error") { + reject(new Error(message.error)); + } + }); + + child.on("error", (error) => { + reject(error); + }); + + child.on("exit", (code) => { + if (code !== 0) { + reject(new Error(`Producer exited with code ${code}`)); + } + }); + }); +} + +/** + * Wait for all runs to be replicated to ClickHouse + */ +async function waitForReplication( + clickhouse: ClickHouse, + organizationId: string, + expectedCount: number, + timeoutMs: number +): Promise<{ duration: number; replicatedRuns: number }> { + const startTime = performance.now(); + const deadline = startTime + timeoutMs; + + const queryRuns = clickhouse.reader.query({ + name: "benchmark-count", + query: + "SELECT count(*) as count FROM trigger_dev.task_runs_v2 WHERE organization_id = {org_id:String}", + schema: z.object({ count: z.number() }), + params: z.object({ org_id: z.string() }), + }); + + while (performance.now() < deadline) { + const [error, result] = await queryRuns({ org_id: organizationId }); + + if (error) { + throw new Error(`Failed to query ClickHouse: ${error.message}`); + } + + const count = result?.[0]?.count || 0; + + if (count >= expectedCount) { + const duration = performance.now() - startTime; + return { duration, replicatedRuns: count }; + } + + // Wait a bit before checking again + await setTimeout(500); + } + + throw new Error( + `Replication timeout: expected ${expectedCount} runs, but only found ${await getRunCount( + clickhouse + )} after ${timeoutMs}ms` + ); +} + +async function getRunCount(clickhouse: ClickHouse): Promise { + const queryRuns = clickhouse.reader.query({ + name: "benchmark-count", + query: "SELECT count(*) as count FROM trigger_dev.task_runs_v2", + schema: z.object({ count: z.number() }), + }); + + const [error, result] = await queryRuns({}); + if (error) return 0; + return result?.[0]?.count || 0; +} + +/** + * Extract metrics from OpenTelemetry metrics + */ +function extractMetrics(metrics: any[]): { + batchesFlushed: number; + taskRunsInserted: number; + payloadsInserted: number; + eventsProcessed: number; +} { + function getMetricData(name: string) { + for (const resourceMetrics of metrics) { + for (const scopeMetrics of resourceMetrics.scopeMetrics) { + for (const metric of scopeMetrics.metrics) { + if (metric.descriptor.name === name) { + return metric; + } + } + } + } + return null; + } + + function sumCounterValues(metric: any): number { + if (!metric?.dataPoints) return 0; + return metric.dataPoints.reduce((sum: number, dp: any) => sum + (dp.value || 0), 0); + } + + return { + batchesFlushed: sumCounterValues(getMetricData("runs_replication.batches_flushed")), + taskRunsInserted: sumCounterValues(getMetricData("runs_replication.task_runs_inserted")), + payloadsInserted: sumCounterValues(getMetricData("runs_replication.payloads_inserted")), + eventsProcessed: sumCounterValues(getMetricData("runs_replication.events_processed")), + }; +} + +/** + * Run a single benchmark test + */ +async function runBenchmark( + name: string, + fingerprintingEnabled: boolean, + { + clickhouseContainer, + redisOptions, + postgresContainer, + prisma, + }: { + clickhouseContainer: any; + redisOptions: any; + postgresContainer: any; + prisma: any; + } +): Promise { + console.log(`\n${"=".repeat(80)}`); + console.log(`BENCHMARK: ${name}`); + console.log(`Error Fingerprinting: ${fingerprintingEnabled ? "ENABLED" : "DISABLED"}`); + console.log( + `Runs: ${BENCHMARK_CONFIG.NUM_RUNS}, Error Rate: ${(BENCHMARK_CONFIG.ERROR_RATE * 100).toFixed( + 1 + )}%` + ); + console.log(`${"=".repeat(80)}\n`); + + // Setup + const organization = await prisma.organization.create({ + data: { + title: `benchmark-${name}`, + slug: `benchmark-${name}`, + }, + }); + + const project = await prisma.project.create({ + data: { + name: `benchmark-${name}`, + slug: `benchmark-${name}`, + organizationId: organization.id, + externalRef: `benchmark-${name}`, + }, + }); + + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: `benchmark-${name}`, + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: `benchmark-${name}`, + pkApiKey: `benchmark-${name}`, + shortcode: `benchmark-${name}`, + }, + }); + + // Setup ClickHouse + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: `benchmark-${name}`, + compression: { + request: true, + }, + logLevel: "warn", + }); + + // Setup tracing and metrics + const { tracer } = createInMemoryTracing(); + const metricsHelper = createInMemoryMetrics(); + + // Create and start replication service + const runsReplicationService = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: `benchmark-${name}`, + slotName: `benchmark_${name.replace(/-/g, "_")}`, + publicationName: `benchmark_${name.replace(/-/g, "_")}_pub`, + redisOptions, + maxFlushConcurrency: BENCHMARK_CONFIG.MAX_FLUSH_CONCURRENCY, + flushIntervalMs: BENCHMARK_CONFIG.FLUSH_INTERVAL_MS, + flushBatchSize: BENCHMARK_CONFIG.FLUSH_BATCH_SIZE, + leaderLockTimeoutMs: 10000, + leaderLockExtendIntervalMs: 2000, + ackIntervalSeconds: 10, + tracer, + meter: metricsHelper.meter, + logLevel: "warn", + disableErrorFingerprinting: !fingerprintingEnabled, + }); + + await runsReplicationService.start(); + + // Start ELU monitoring + const eluMonitor = new ELUMonitor(); + eluMonitor.start(100); + + // Run producer in separate process + console.log("\n[Benchmark] Starting producer..."); + const producerStats = await runProducer({ + postgresUrl: postgresContainer.getConnectionUri(), + organizationId: organization.id, + projectId: project.id, + environmentId: runtimeEnvironment.id, + numRuns: BENCHMARK_CONFIG.NUM_RUNS, + errorRate: BENCHMARK_CONFIG.ERROR_RATE, + batchSize: BENCHMARK_CONFIG.PRODUCER_BATCH_SIZE, + }); + + console.log("\n[Benchmark] Waiting for replication to complete..."); + const replicationResult = await waitForReplication( + clickhouse, + organization.id, + producerStats.created, + BENCHMARK_CONFIG.REPLICATION_TIMEOUT_MS + ); + + // Stop ELU monitoring + const eluStats = eluMonitor.stop(); + + // Get metrics + const metrics = await metricsHelper.getMetrics(); + const metricsStats = extractMetrics(metrics); + + // Cleanup + await runsReplicationService.stop(); + await metricsHelper.shutdown(); + + const throughput = (replicationResult.replicatedRuns / replicationResult.duration) * 1000; + + const result: BenchmarkResult = { + name, + fingerprintingEnabled, + producerStats, + replicationStats: { + duration: replicationResult.duration, + throughput, + replicatedRuns: replicationResult.replicatedRuns, + }, + eluStats, + metricsStats, + }; + + // Print results + console.log(`\n${"=".repeat(80)}`); + console.log(`RESULTS: ${name}`); + console.log(`${"=".repeat(80)}`); + console.log("\nProducer:"); + console.log(` Created: ${producerStats.created} runs`); + console.log( + ` With errors: ${producerStats.withErrors} (${( + (producerStats.withErrors / producerStats.created) * + 100 + ).toFixed(1)}%)` + ); + console.log(` Duration: ${producerStats.duration.toFixed(0)}ms`); + console.log(` Throughput: ${producerStats.throughput.toFixed(0)} runs/sec`); + console.log("\nReplication:"); + console.log(` Replicated: ${replicationResult.replicatedRuns} runs`); + console.log(` Duration: ${replicationResult.duration.toFixed(0)}ms`); + console.log(` Throughput: ${throughput.toFixed(0)} runs/sec`); + console.log("\nEvent Loop Utilization:"); + console.log(` Mean: ${eluStats.mean.toFixed(2)}%`); + console.log(` P50: ${eluStats.p50.toFixed(2)}%`); + console.log(` P95: ${eluStats.p95.toFixed(2)}%`); + console.log(` P99: ${eluStats.p99.toFixed(2)}%`); + console.log(` Samples: ${eluStats.samples.length}`); + console.log("\nMetrics:"); + console.log(` Batches flushed: ${metricsStats.batchesFlushed}`); + console.log(` Task runs inserted: ${metricsStats.taskRunsInserted}`); + console.log(` Payloads inserted: ${metricsStats.payloadsInserted}`); + console.log(` Events processed: ${metricsStats.eventsProcessed}`); + console.log(`${"=".repeat(80)}\n`); + + return result; +} + +/** + * Compare two benchmark results and print delta + */ +function compareBenchmarks(baseline: BenchmarkResult, comparison: BenchmarkResult) { + console.log(`\n${"=".repeat(80)}`); + console.log("COMPARISON"); + console.log( + `Baseline: ${baseline.name} (fingerprinting ${baseline.fingerprintingEnabled ? "ON" : "OFF"})` + ); + console.log( + `Comparison: ${comparison.name} (fingerprinting ${ + comparison.fingerprintingEnabled ? "ON" : "OFF" + })` + ); + console.log(`${"=".repeat(80)}`); + + const replicationDurationDelta = + ((comparison.replicationStats.duration - baseline.replicationStats.duration) / + baseline.replicationStats.duration) * + 100; + const throughputDelta = + ((comparison.replicationStats.throughput - baseline.replicationStats.throughput) / + baseline.replicationStats.throughput) * + 100; + const eluMeanDelta = + ((comparison.eluStats.mean - baseline.eluStats.mean) / baseline.eluStats.mean) * 100; + const eluP99Delta = + ((comparison.eluStats.p99 - baseline.eluStats.p99) / baseline.eluStats.p99) * 100; + + console.log("\nReplication Duration:"); + console.log( + ` ${baseline.replicationStats.duration.toFixed( + 0 + )}ms → ${comparison.replicationStats.duration.toFixed(0)}ms (${ + replicationDurationDelta > 0 ? "+" : "" + }${replicationDurationDelta.toFixed(2)}%)` + ); + + console.log("\nThroughput:"); + console.log( + ` ${baseline.replicationStats.throughput.toFixed( + 0 + )} → ${comparison.replicationStats.throughput.toFixed(0)} runs/sec (${ + throughputDelta > 0 ? "+" : "" + }${throughputDelta.toFixed(2)}%)` + ); + + console.log("\nEvent Loop Utilization (Mean):"); + console.log( + ` ${baseline.eluStats.mean.toFixed(2)}% → ${comparison.eluStats.mean.toFixed(2)}% (${ + eluMeanDelta > 0 ? "+" : "" + }${eluMeanDelta.toFixed(2)}%)` + ); + + console.log("\nEvent Loop Utilization (P99):"); + console.log( + ` ${baseline.eluStats.p99.toFixed(2)}% → ${comparison.eluStats.p99.toFixed(2)}% (${ + eluP99Delta > 0 ? "+" : "" + }${eluP99Delta.toFixed(2)}%)` + ); + + console.log(`\n${"=".repeat(80)}\n`); + + // Return deltas for assertions if needed + return { + replicationDurationDelta, + throughputDelta, + eluMeanDelta, + eluP99Delta, + }; +} + +describe("RunsReplicationService Benchmark", () => { + containerTest.skipIf(process.env.BENCHMARKS_ENABLED !== "1")( + "should benchmark error fingerprinting performance impact", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + // Enable replica identity for TaskRun table + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + console.log("\n" + "=".repeat(80)); + console.log("RUNS REPLICATION SERVICE - ERROR FINGERPRINTING BENCHMARK"); + console.log("=".repeat(80)); + console.log(`Configuration:`); + console.log(` Total runs: ${BENCHMARK_CONFIG.NUM_RUNS}`); + console.log(` Error rate: ${(BENCHMARK_CONFIG.ERROR_RATE * 100).toFixed(1)}%`); + console.log( + ` Expected errors: ~${Math.floor(BENCHMARK_CONFIG.NUM_RUNS * BENCHMARK_CONFIG.ERROR_RATE)}` + ); + console.log(` Producer batch size: ${BENCHMARK_CONFIG.PRODUCER_BATCH_SIZE}`); + console.log(` Replication batch size: ${BENCHMARK_CONFIG.FLUSH_BATCH_SIZE}`); + console.log(` Max flush concurrency: ${BENCHMARK_CONFIG.MAX_FLUSH_CONCURRENCY}`); + console.log("=".repeat(80) + "\n"); + + // Run benchmark WITHOUT error fingerprinting (baseline) + const baselineResult = await runBenchmark("baseline-no-fingerprinting", false, { + clickhouseContainer, + redisOptions, + postgresContainer, + prisma, + }); + + // Run benchmark WITH error fingerprinting + const fingerprintingResult = await runBenchmark("with-fingerprinting", true, { + clickhouseContainer, + redisOptions, + postgresContainer, + prisma, + }); + + // Compare results + const deltas = compareBenchmarks(baselineResult, fingerprintingResult); + + // Basic assertions - just to ensure benchmarks completed successfully + expect(baselineResult.replicationStats.replicatedRuns).toBe(BENCHMARK_CONFIG.NUM_RUNS); + expect(fingerprintingResult.replicationStats.replicatedRuns).toBe(BENCHMARK_CONFIG.NUM_RUNS); + + // Log final summary + console.log("BENCHMARK COMPLETE"); + console.log( + `Fingerprinting impact on replication duration: ${ + deltas.replicationDurationDelta > 0 ? "+" : "" + }${deltas.replicationDurationDelta.toFixed(2)}%` + ); + console.log( + `Fingerprinting impact on throughput: ${ + deltas.throughputDelta > 0 ? "+" : "" + }${deltas.throughputDelta.toFixed(2)}%` + ); + console.log( + `Fingerprinting impact on ELU (mean): ${ + deltas.eluMeanDelta > 0 ? "+" : "" + }${deltas.eluMeanDelta.toFixed(2)}%` + ); + console.log( + `Fingerprinting impact on ELU (P99): ${ + deltas.eluP99Delta > 0 ? "+" : "" + }${deltas.eluP99Delta.toFixed(2)}%` + ); + } + ); +}); diff --git a/internal-packages/clickhouse/schema/021_add_error_fingerprint_to_task_runs_v2.sql b/internal-packages/clickhouse/schema/021_add_error_fingerprint_to_task_runs_v2.sql new file mode 100644 index 00000000000..f702b91d3b9 --- /dev/null +++ b/internal-packages/clickhouse/schema/021_add_error_fingerprint_to_task_runs_v2.sql @@ -0,0 +1,11 @@ +-- +goose Up +ALTER TABLE trigger_dev.task_runs_v2 + ADD COLUMN error_fingerprint String DEFAULT ''; + +-- Bloom filter index for fast error fingerprint lookups +ALTER TABLE trigger_dev.task_runs_v2 + ADD INDEX idx_error_fingerprint error_fingerprint TYPE bloom_filter GRANULARITY 4; + +-- +goose Down +ALTER TABLE trigger_dev.task_runs_v2 DROP INDEX idx_error_fingerprint; +ALTER TABLE trigger_dev.task_runs_v2 DROP COLUMN error_fingerprint; diff --git a/internal-packages/clickhouse/schema/022_create_errors_v1_table.sql b/internal-packages/clickhouse/schema/022_create_errors_v1_table.sql new file mode 100644 index 00000000000..ecc6e9fdff6 --- /dev/null +++ b/internal-packages/clickhouse/schema/022_create_errors_v1_table.sql @@ -0,0 +1,78 @@ +-- +goose Up + +-- Aggregated error groups table (per task + fingerprint) +CREATE TABLE trigger_dev.errors_v1 +( + organization_id String, + project_id String, + environment_id String, + task_identifier String, + error_fingerprint String, + + -- Error details (samples from occurrences) + error_type String, + error_message String, + sample_stack_trace String, + + -- SimpleAggregateFunction stores raw values and applies the function during merge, + -- avoiding binary state encoding issues with AggregateFunction. + last_seen_date SimpleAggregateFunction(max, DateTime), + + first_seen SimpleAggregateFunction(min, DateTime64(3)), + last_seen SimpleAggregateFunction(max, DateTime64(3)), + occurrence_count AggregateFunction(sum, UInt64), + affected_task_versions AggregateFunction(uniq, String), + + -- Samples for debugging + sample_run_id AggregateFunction(any, String), + sample_friendly_id AggregateFunction(any, String), + + -- Status distribution + status_distribution AggregateFunction(sumMap, Array(String), Array(UInt64)) +) +ENGINE = AggregatingMergeTree() +ORDER BY (organization_id, project_id, environment_id, task_identifier, error_fingerprint) +TTL last_seen_date + INTERVAL 90 DAY +SETTINGS index_granularity = 8192; + +-- Materialized view to auto-populate from task_runs_v2 +CREATE MATERIALIZED VIEW trigger_dev.mv_errors_v1 +TO trigger_dev.errors_v1 +AS +SELECT + organization_id, + project_id, + environment_id, + task_identifier, + error_fingerprint, + + any(coalesce(nullIf(toString(error.data.type), ''), nullIf(toString(error.data.name), ''), 'Error')) as error_type, + any(coalesce(nullIf(substring(toString(error.data.message), 1, 500), ''), 'Unknown error')) as error_message, + any(coalesce(substring(toString(error.data.stack), 1, 2000), '')) as sample_stack_trace, + + toDateTime(max(created_at)) as last_seen_date, + + min(created_at) as first_seen, + max(created_at) as last_seen, + sumState(toUInt64(1)) as occurrence_count, + uniqState(task_version) as affected_task_versions, + + anyState(run_id) as sample_run_id, + anyState(friendly_id) as sample_friendly_id, + + sumMapState([status], [toUInt64(1)]) as status_distribution +FROM trigger_dev.task_runs_v2 +WHERE + error_fingerprint != '' + AND status IN ('SYSTEM_FAILURE', 'CRASHED', 'INTERRUPTED', 'COMPLETED_WITH_ERRORS') + AND _is_deleted = 0 +GROUP BY + organization_id, + project_id, + environment_id, + task_identifier, + error_fingerprint; + +-- +goose Down +DROP VIEW IF EXISTS trigger_dev.mv_errors_v1; +DROP TABLE IF EXISTS trigger_dev.errors_v1; diff --git a/internal-packages/clickhouse/src/client/queryBuilder.ts b/internal-packages/clickhouse/src/client/queryBuilder.ts index e802fc11bf3..dc0fb297cc9 100644 --- a/internal-packages/clickhouse/src/client/queryBuilder.ts +++ b/internal-packages/clickhouse/src/client/queryBuilder.ts @@ -13,6 +13,7 @@ export class ClickhouseQueryBuilder { private name: string; private baseQuery: string; private whereClauses: string[] = []; + private havingClauses: string[] = []; private params: QueryParams = {}; private orderByClause: string | null = null; private limitClause: string | null = null; @@ -69,6 +70,21 @@ export class ClickhouseQueryBuilder { return this; } + having(clause: string, params?: QueryParams): this { + this.havingClauses.push(clause); + if (params) { + Object.assign(this.params, params); + } + return this; + } + + havingIf(condition: any, clause: string, params?: QueryParams): this { + if (condition) { + this.having(clause, params); + } + return this; + } + orderBy(clause: string): this { this.orderByClause = clause; return this; @@ -101,6 +117,9 @@ export class ClickhouseQueryBuilder { if (this.groupByClause) { query += ` GROUP BY ${this.groupByClause}`; } + if (this.havingClauses.length > 0) { + query += " HAVING " + this.havingClauses.join(" AND "); + } if (this.orderByClause) { query += ` ORDER BY ${this.orderByClause}`; } @@ -119,6 +138,7 @@ export class ClickhouseQueryFastBuilder> { private settings: ClickHouseSettings | undefined; private prewhereClauses: string[] = []; private whereClauses: string[] = []; + private havingClauses: string[] = []; private params: QueryParams = {}; private orderByClause: string | null = null; private limitClause: string | null = null; @@ -191,6 +211,21 @@ export class ClickhouseQueryFastBuilder> { return this; } + having(clause: string, params?: QueryParams): this { + this.havingClauses.push(clause); + if (params) { + Object.assign(this.params, params); + } + return this; + } + + havingIf(condition: any, clause: string, params?: QueryParams): this { + if (condition) { + this.having(clause, params); + } + return this; + } + orderBy(clause: string): this { this.orderByClause = clause; return this; @@ -225,6 +260,9 @@ export class ClickhouseQueryFastBuilder> { if (this.groupByClause) { query += ` GROUP BY ${this.groupByClause}`; } + if (this.havingClauses.length > 0) { + query += " HAVING " + this.havingClauses.join(" AND "); + } if (this.orderByClause) { query += ` ORDER BY ${this.orderByClause}`; } diff --git a/internal-packages/clickhouse/src/errors.ts b/internal-packages/clickhouse/src/errors.ts new file mode 100644 index 00000000000..040561ebd5d --- /dev/null +++ b/internal-packages/clickhouse/src/errors.ts @@ -0,0 +1,254 @@ +import { ClickHouseSettings } from "@clickhouse/client"; +import { z } from "zod"; +import { ClickhouseReader } from "./client/types.js"; + +export const ErrorGroupsListQueryResult = z.object({ + error_fingerprint: z.string(), + task_identifier: z.string(), + error_type: z.string(), + error_message: z.string(), + first_seen: z.string(), + last_seen: z.string(), + occurrence_count: z.number(), + sample_run_id: z.string(), + sample_friendly_id: z.string(), +}); + +export type ErrorGroupsListQueryResult = z.infer; + +/** + * Gets a query builder for listing error groups from the pre-aggregated errors_v1 table. + * Allows flexible filtering and pagination. + */ +export function getErrorGroupsListQueryBuilder( + ch: ClickhouseReader, + settings?: ClickHouseSettings +) { + return ch.queryBuilder({ + name: "getErrorGroupsList", + baseQuery: ` + SELECT + error_fingerprint, + task_identifier, + any(error_type) as error_type, + any(error_message) as error_message, + toString(toUnixTimestamp64Milli(min(first_seen))) as first_seen, + toString(toUnixTimestamp64Milli(max(last_seen))) as last_seen, + toUInt64(sumMerge(occurrence_count)) as occurrence_count, + anyMerge(sample_run_id) as sample_run_id, + anyMerge(sample_friendly_id) as sample_friendly_id + FROM trigger_dev.errors_v1 + `, + schema: ErrorGroupsListQueryResult, + settings, + }); +} + +export const ErrorGroupQueryResult = z.object({ + error_fingerprint: z.string(), + task_identifier: z.string(), + error_type: z.string(), + error_message: z.string(), + first_seen: z.string(), + last_seen: z.string(), + occurrence_count: z.number(), + sample_run_id: z.string(), + sample_friendly_id: z.string(), +}); + +export type ErrorGroupQueryResult = z.infer; + +export const ErrorGroupQueryParams = z.object({ + organizationId: z.string(), + projectId: z.string(), + environmentId: z.string(), + days: z.number().int().default(30), + limit: z.number().int().default(50), + offset: z.number().int().default(0), +}); + +export type ErrorGroupQueryParams = z.infer; + +/** + * Gets error groups from the pre-aggregated errors_v1 table. + * Much faster than on-the-fly aggregation. + */ +export function getErrorGroups(ch: ClickhouseReader, settings?: ClickHouseSettings) { + return ch.query({ + name: "getErrorGroups", + query: ` + SELECT + error_fingerprint, + task_identifier, + any(error_type) as error_type, + any(error_message) as error_message, + toString(toUnixTimestamp64Milli(min(first_seen))) as first_seen, + toString(toUnixTimestamp64Milli(max(last_seen))) as last_seen, + toUInt64(sumMerge(occurrence_count)) as occurrence_count, + anyMerge(sample_run_id) as sample_run_id, + anyMerge(sample_friendly_id) as sample_friendly_id + FROM trigger_dev.errors_v1 + WHERE + organization_id = {organizationId: String} + AND project_id = {projectId: String} + AND environment_id = {environmentId: String} + GROUP BY error_fingerprint, task_identifier + HAVING max(last_seen) >= now() - INTERVAL {days: Int64} DAY + ORDER BY last_seen DESC + LIMIT {limit: Int64} + OFFSET {offset: Int64} + `, + schema: ErrorGroupQueryResult, + params: ErrorGroupQueryParams, + settings, + }); +} + +export const ErrorInstanceQueryResult = z.object({ + run_id: z.string(), + friendly_id: z.string(), + task_identifier: z.string(), + created_at: z.string(), + status: z.string(), + error_text: z.string(), + trace_id: z.string(), + task_version: z.string(), +}); + +export type ErrorInstanceQueryResult = z.infer; + +export const ErrorInstanceQueryParams = z.object({ + organizationId: z.string(), + projectId: z.string(), + environmentId: z.string(), + errorFingerprint: z.string(), + limit: z.number().int().default(50), + offset: z.number().int().default(0), +}); + +export type ErrorInstanceQueryParams = z.infer; + +export const ErrorInstancesListQueryResult = z.object({ + run_id: z.string(), + friendly_id: z.string(), + task_identifier: z.string(), + created_at: z.string(), + status: z.string(), + error_text: z.string(), + trace_id: z.string(), + task_version: z.string(), +}); + +export type ErrorInstancesListQueryResult = z.infer; + +/** + * Gets a query builder for listing error instances from task_runs_v2. + * Allows flexible filtering and pagination for runs with a specific error fingerprint. + */ +export function getErrorInstancesListQueryBuilder( + ch: ClickhouseReader, + settings?: ClickHouseSettings +) { + return ch.queryBuilder({ + name: "getErrorInstancesList", + baseQuery: ` + SELECT + run_id, + friendly_id, + task_identifier, + toString(created_at) as created_at, + status, + error_text, + trace_id, + task_version + FROM trigger_dev.task_runs_v2 FINAL + `, + schema: ErrorInstancesListQueryResult, + settings, + }); +} + +export const ErrorHourlyOccurrencesQueryResult = z.object({ + error_fingerprint: z.string(), + hour_epoch: z.number(), + count: z.number(), +}); + +export type ErrorHourlyOccurrencesQueryResult = z.infer; + +export const ErrorHourlyOccurrencesQueryParams = z.object({ + organizationId: z.string(), + projectId: z.string(), + environmentId: z.string(), + fingerprints: z.array(z.string()), + hours: z.number().int().default(24), +}); + +export type ErrorHourlyOccurrencesQueryParams = z.infer; + +/** + * Gets hourly occurrence counts for specific error fingerprints over the past N hours. + * Queries task_runs_v2 directly, grouped by fingerprint and hour. + */ +export function getErrorHourlyOccurrences(ch: ClickhouseReader, settings?: ClickHouseSettings) { + return ch.query({ + name: "getErrorHourlyOccurrences", + query: ` + SELECT + error_fingerprint, + toUnixTimestamp(toStartOfHour(created_at)) as hour_epoch, + count() as count + FROM trigger_dev.task_runs_v2 FINAL + WHERE + organization_id = {organizationId: String} + AND project_id = {projectId: String} + AND environment_id = {environmentId: String} + AND created_at >= now() - INTERVAL {hours: Int64} HOUR + AND error_fingerprint IN {fingerprints: Array(String)} + AND status IN ('SYSTEM_FAILURE', 'CRASHED', 'INTERRUPTED', 'COMPLETED_WITH_ERRORS') + AND _is_deleted = 0 + GROUP BY + error_fingerprint, + hour_epoch + ORDER BY + error_fingerprint ASC, + hour_epoch ASC + `, + schema: ErrorHourlyOccurrencesQueryResult, + params: ErrorHourlyOccurrencesQueryParams, + settings, + }); +} + +/** + * Gets individual run instances for a specific error fingerprint. + */ +export function getErrorInstances(ch: ClickhouseReader, settings?: ClickHouseSettings) { + return ch.query({ + name: "getErrorInstances", + query: ` + SELECT + run_id, + friendly_id, + task_identifier, + toString(created_at) as created_at, + status, + error_text, + trace_id, + task_version + FROM trigger_dev.task_runs_v2 FINAL + WHERE + organization_id = {organizationId: String} + AND project_id = {projectId: String} + AND environment_id = {environmentId: String} + AND error_fingerprint = {errorFingerprint: String} + AND _is_deleted = 0 + ORDER BY created_at DESC + LIMIT {limit: Int64} + OFFSET {offset: Int64} + `, + schema: ErrorInstanceQueryResult, + params: ErrorInstanceQueryParams, + settings, + }); +} diff --git a/internal-packages/clickhouse/src/index.ts b/internal-packages/clickhouse/src/index.ts index b66ce8e3ed6..58ee7dca17a 100644 --- a/internal-packages/clickhouse/src/index.ts +++ b/internal-packages/clickhouse/src/index.ts @@ -27,6 +27,13 @@ import { getLogsSearchListQueryBuilder, } from "./taskEvents.js"; import { insertMetrics } from "./metrics.js"; +import { + getErrorGroups, + getErrorInstances, + getErrorGroupsListQueryBuilder, + getErrorInstancesListQueryBuilder, + getErrorHourlyOccurrences, +} from "./errors.js"; import { Logger, type LogLevel } from "@trigger.dev/core/logger"; import type { Agent as HttpAgent } from "http"; import type { Agent as HttpsAgent } from "https"; @@ -34,6 +41,7 @@ import type { Agent as HttpsAgent } from "https"; export type * from "./taskRuns.js"; export type * from "./taskEvents.js"; export type * from "./metrics.js"; +export type * from "./errors.js"; export type * from "./client/queryBuilder.js"; // Re-export column constants, indices, and type-safe accessors @@ -229,4 +237,14 @@ export class ClickHouse { logsListQueryBuilder: getLogsSearchListQueryBuilder(this.reader), }; } + + get errors() { + return { + getGroups: getErrorGroups(this.reader), + getInstances: getErrorInstances(this.reader), + getHourlyOccurrences: getErrorHourlyOccurrences(this.reader), + listQueryBuilder: getErrorGroupsListQueryBuilder(this.reader), + instancesQueryBuilder: getErrorInstancesListQueryBuilder(this.reader), + }; + } } diff --git a/internal-packages/clickhouse/src/taskRuns.ts b/internal-packages/clickhouse/src/taskRuns.ts index 8c1d29ac162..4162691ed7a 100644 --- a/internal-packages/clickhouse/src/taskRuns.ts +++ b/internal-packages/clickhouse/src/taskRuns.ts @@ -29,6 +29,7 @@ export const TaskRunV2 = z.object({ base_cost_in_cents: z.number().default(0), output: z.unknown(), error: z.unknown(), + error_fingerprint: z.string().default(""), tags: z.array(z.string()).default([]), task_version: z.string(), sdk_version: z.string(), @@ -82,6 +83,7 @@ export const TASK_RUN_COLUMNS = [ "base_cost_in_cents", "output", "error", + "error_fingerprint", "tags", "task_version", "sdk_version", @@ -144,6 +146,7 @@ export type TaskRunFieldTypes = { base_cost_in_cents: number; output: { data: unknown }; error: { data: unknown }; + error_fingerprint: string; tags: string[]; task_version: string; sdk_version: string; @@ -277,6 +280,7 @@ export type TaskRunInsertArray = [ base_cost_in_cents: number, output: { data: unknown }, error: { data: unknown }, + error_fingerprint: string, tags: string[], task_version: string, sdk_version: string, diff --git a/packages/core/src/v3/isomorphic/friendlyId.ts b/packages/core/src/v3/isomorphic/friendlyId.ts index 90fa31bd573..a230f8c7450 100644 --- a/packages/core/src/v3/isomorphic/friendlyId.ts +++ b/packages/core/src/v3/isomorphic/friendlyId.ts @@ -96,6 +96,7 @@ export const WaitpointId = new IdUtil("waitpoint"); export const BatchId = new IdUtil("batch"); export const BulkActionId = new IdUtil("bulk"); export const AttemptId = new IdUtil("attempt"); +export const ErrorId = new IdUtil("error"); export class IdGenerator { private alphabet: string;