Skip to content

Commit

Permalink
feat: reconcile orphan objects from admin endpoint (#606)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos authored Jan 21, 2025
1 parent 13fd5fe commit cf23e7d
Show file tree
Hide file tree
Showing 35 changed files with 1,769 additions and 12 deletions.
2 changes: 2 additions & 0 deletions src/admin-app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import { Registry } from 'prom-client'

const build = (opts: FastifyServerOptions = {}, appInstance?: FastifyInstance): FastifyInstance => {
const app = fastify(opts)
app.register(plugins.signals)
app.register(plugins.adminTenantId)
app.register(plugins.logRequest({ excludeUrls: ['/status', '/metrics', '/health'] }))
app.register(routes.tenants, { prefix: 'tenants' })
app.register(routes.objects, { prefix: 'tenants' })
app.register(routes.migrations, { prefix: 'migrations' })
app.register(routes.s3Credentials, { prefix: 's3' })

Expand Down
2 changes: 2 additions & 0 deletions src/http/plugins/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export const db = fastifyPlugin(

interface DbSuperUserPluginOptions {
disableHostCheck?: boolean
maxConnections?: number
}

export const dbSuperUser = fastifyPlugin<DbSuperUserPluginOptions>(
Expand All @@ -113,6 +114,7 @@ export const dbSuperUser = fastifyPlugin<DbSuperUserPluginOptions>(
method: request.method,
headers: request.headers,
disableHostCheck: opts.disableHostCheck,
maxConnections: opts.maxConnections,
operation: () => request.operation?.type,
})
})
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/admin/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export { default as migrations } from './migrations'
export { default as tenants } from './tenants'
export { default as s3Credentials } from './s3'
export { default as objects } from './objects'
207 changes: 207 additions & 0 deletions src/http/routes/admin/objects.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import { FastifyInstance, RequestGenericInterface } from 'fastify'
import apiKey from '../../plugins/apikey'
import { dbSuperUser, storage } from '../../plugins'
import { ObjectScanner } from '@storage/scanner/scanner'
import { FastifyReply } from 'fastify/types/reply'

const listOrphanedObjects = {
description: 'List Orphaned Objects',
params: {
type: 'object',
properties: {
tenantId: { type: 'string' },
bucketId: { type: 'string' },
},
required: ['tenantId', 'bucketId'],
},
query: {
type: 'object',
properties: {
before: { type: 'string' },
keepTmpTable: { type: 'boolean' },
},
},
} as const

const syncOrphanedObjects = {
description: 'Sync Orphaned Objects',
params: {
type: 'object',
properties: {
tenantId: { type: 'string' },
bucketId: { type: 'string' },
},
required: ['tenantId', 'bucketId'],
},
body: {
type: 'object',
properties: {
deleteDbKeys: { type: 'boolean' },
deleteS3Keys: { type: 'boolean' },
tmpTable: { type: 'string' },
},
},
optional: ['deleteDbKeys', 'deleteS3Keys'],
} as const

interface ListOrphanObjectsRequest extends RequestGenericInterface {
Params: {
tenantId: string
bucketId: string
}
Querystring: {
before?: string
keepTmpTable?: boolean
}
}

interface SyncOrphanObjectsRequest extends RequestGenericInterface {
Params: {
tenantId: string
bucketId: string
}
Body: {
deleteDbKeys?: boolean
deleteS3Keys?: boolean
before?: string
tmpTable?: string
keepTmpTable?: boolean
}
}

export default async function routes(fastify: FastifyInstance) {
fastify.register(apiKey)
fastify.register(dbSuperUser, {
disableHostCheck: true,
maxConnections: 5,
})
fastify.register(storage)

fastify.get<ListOrphanObjectsRequest>(
'/:tenantId/buckets/:bucketId/orphan-objects',
{
schema: listOrphanedObjects,
},
async (req, reply) => {
const bucket = req.params.bucketId
let before = req.query.before ? new Date(req.query.before as string) : undefined

if (before && isNaN(before.getTime())) {
return reply.status(400).send({
error: 'Invalid date format',
})
}
if (!before) {
before = new Date()
before.setHours(before.getHours() - 1)
}

const scanner = new ObjectScanner(req.storage)
const orphanObjects = scanner.listOrphaned(bucket, {
signal: req.signals.disconnect.signal,
before: before,
keepTmpTable: Boolean(req.query.keepTmpTable),
})

reply.header('Content-Type', 'application/json; charset=utf-8')

// Do not let the connection time out, periodically send
// a ping message to keep the connection alive
const respPing = ping(reply)

try {
for await (const result of orphanObjects) {
if (result.value.length > 0) {
respPing.update()
reply.raw.write(
JSON.stringify({
...result,
event: 'data',
})
)
}
}
} catch (e) {
throw e
} finally {
respPing.clear()
reply.raw.end()
}
}
)

fastify.delete<SyncOrphanObjectsRequest>(
'/:tenantId/buckets/:bucketId/orphan-objects',
{
schema: syncOrphanedObjects,
},
async (req, reply) => {
if (!req.body.deleteDbKeys && !req.body.deleteS3Keys) {
return reply.status(400).send({
error: 'At least one of deleteDbKeys or deleteS3Keys must be set to true',
})
}

const bucket = `${req.params.bucketId}`
let before = req.body.before ? new Date(req.body.before as string) : undefined

if (!before) {
before = new Date()
before.setHours(before.getHours() - 1)
}

const respPing = ping(reply)

try {
const scanner = new ObjectScanner(req.storage)
const result = scanner.deleteOrphans(bucket, {
deleteDbKeys: req.body.deleteDbKeys,
deleteS3Keys: req.body.deleteS3Keys,
signal: req.signals.disconnect.signal,
before,
tmpTable: req.body.tmpTable,
})

for await (const deleted of result) {
respPing.update()
reply.raw.write(
JSON.stringify({
...deleted,
event: 'data',
})
)
}
} catch (e) {
throw e
} finally {
respPing.clear()
reply.raw.end()
}
}
)
}

// Occasionally write a ping message to the response stream
function ping(reply: FastifyReply) {
let lastSend = undefined as Date | undefined
const clearPing = setInterval(() => {
const fiveSecondsEarly = new Date()
fiveSecondsEarly.setSeconds(fiveSecondsEarly.getSeconds() - 5)

if (!lastSend || (lastSend && lastSend < fiveSecondsEarly)) {
lastSend = new Date()
reply.raw.write(
JSON.stringify({
event: 'ping',
})
)
}
}, 1000 * 10)

return {
clear: () => clearInterval(clearPing),
update: () => {
lastSend = new Date()
},
}
}
1 change: 1 addition & 0 deletions src/internal/concurrency/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './mutex'
export * from './async-abort-controller'
export * from './merge-async-itertor'
44 changes: 44 additions & 0 deletions src/internal/concurrency/merge-async-itertor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
type MergedYield<Gens extends Record<string, AsyncGenerator<any>>> = {
[K in keyof Gens]: Gens[K] extends AsyncGenerator<infer V> ? { type: K; value: V } : never
}[keyof Gens]

export async function* mergeAsyncGenerators<Gens extends Record<string, AsyncGenerator<any>>>(
gens: Gens
): AsyncGenerator<MergedYield<Gens>> {
// Convert the input object into an array of [name, generator] tuples
const entries = Object.entries(gens) as [keyof Gens, Gens[keyof Gens]][]

// Initialize an array to keep track of each generator's state
const iterators = entries.map(([name, gen]) => ({
name,
iterator: gen[Symbol.asyncIterator](),
done: false,
}))

// Continue looping as long as at least one generator is not done
while (iterators.some((it) => !it.done)) {
// Prepare an array of promises to fetch the next value from each generator
const nextPromises = iterators.map((it) =>
it.done ? Promise.resolve({ done: true, value: undefined }) : it.iterator.next()
)

// Await all the next() promises concurrently
const results = await Promise.all(nextPromises)

// Iterate through the results and yield values with their corresponding names
for (let i = 0; i < iterators.length; i++) {
const it = iterators[i]
const result = results[i]

if (!it.done && !result.done) {
// Yield an object containing the generator's name and the yielded value
yield { type: it.name, value: result.value } as MergedYield<Gens>
}

if (!it.done && result.done) {
// Mark the generator as done if it has no more values
it.done = true
}
}
}
}
1 change: 1 addition & 0 deletions src/internal/database/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { ERRORS } from '@internal/errors'
interface ConnectionOptions {
host: string
tenantId: string
maxConnections?: number
headers?: Record<string, string | undefined | string[]>
method?: string
path?: string
Expand Down
6 changes: 4 additions & 2 deletions src/internal/database/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export const searchPath = ['storage', 'public', 'extensions', ...dbSearchPath.sp
export class TenantConnection {
public readonly role: string

constructor(protected readonly pool: Knex, protected readonly options: TenantConnectionOptions) {
constructor(public readonly pool: Knex, protected readonly options: TenantConnectionOptions) {
this.role = options.user.payload.role || 'anon'
}

Expand Down Expand Up @@ -101,7 +101,9 @@ export class TenantConnection {
searchPath: isExternalPool ? undefined : searchPath,
pool: {
min: 0,
max: isExternalPool ? 1 : options.maxConnections || databaseMaxConnections,
max: isExternalPool
? options.maxConnections || 1
: options.maxConnections || databaseMaxConnections,
acquireTimeoutMillis: databaseConnectionTimeout,
idleTimeoutMillis: isExternalPool
? options.idleTimeoutMillis || 100
Expand Down
2 changes: 1 addition & 1 deletion src/internal/monitoring/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export const baseLogger = pino({
headers: whitelistHeaders(request.headers),
hostname: request.hostname,
remoteAddress: request.ip,
remotePort: request.socket.remotePort,
remotePort: request.socket?.remotePort,
}
},
},
Expand Down
4 changes: 4 additions & 0 deletions src/internal/queue/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ export class Event<T extends Omit<BasePayload, '$version'>> {
}

static batchSend<T extends Event<any>[]>(messages: T) {
if (!pgQueueEnable) {
return Promise.all(messages.map((message) => message.send()))
}

return Queue.getInstance().insert(
messages.map((message) => {
const sendOptions = (this.getQueueOptions(message.payload) as PgBoss.JobInsert) || {}
Expand Down
25 changes: 25 additions & 0 deletions src/internal/testing/generators/array.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
export async function eachParallel<T>(times: number, fn: (index: number) => Promise<T>) {
const promises = []
for (let i = 0; i < times; i++) {
promises.push(fn(i))
}

return Promise.all(promises)
}

export function pickRandomFromArray<T>(arr: T[]): T {
return arr[Math.floor(Math.random() * arr.length)]
}

export function pickRandomRangeFromArray<T>(arr: T[], range: number): T[] {
if (arr.length <= range) {
return arr
}

const result = new Set<T>()
while (result.size < range) {
result.add(pickRandomFromArray(arr))
}

return Array.from(result)
}
Loading

0 comments on commit cf23e7d

Please sign in to comment.