From 08e4ee5ec5c514a9cedf127fce54ba71a8141900 Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Fri, 17 Jan 2025 17:22:01 +0700 Subject: [PATCH] UBERF-9167 Enhance blob migration tool (#7697) Signed-off-by: Alexander Onnikov --- dev/tool/src/index.ts | 30 ++++++++++++++++++++----- dev/tool/src/storage.ts | 42 ++++++++++++++++++++++++++++++----- server/datalake/src/client.ts | 4 ++-- 3 files changed, 63 insertions(+), 13 deletions(-) diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index cbffad5fc0b..0ac46cfe95a 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -1193,12 +1193,14 @@ export function devTool ( program .command('copy-s3-datalake') - .description('migrate files from s3 to datalake') + .description('copy files from s3 to datalake') .option('-w, --workspace ', 'Selected workspace only', '') .option('-c, --concurrency ', 'Number of files being processed concurrently', '10') - .action(async (cmd: { workspace: string, concurrency: string }) => { + .option('-e, --existing', 'Copy existing blobs', false) + .action(async (cmd: { workspace: string, concurrency: string, existing: boolean }) => { const params = { - concurrency: parseInt(cmd.concurrency) + concurrency: parseInt(cmd.concurrency), + existing: cmd.existing } const storageConfig = storageConfigFromEnv(process.env.STORAGE) @@ -1222,14 +1224,32 @@ export function devTool ( workspaces = workspaces .filter((p) => isActiveMode(p.mode) || isArchivingMode(p.mode)) .filter((p) => cmd.workspace === '' || p.workspace === cmd.workspace) - .sort((a, b) => b.lastVisit - a.lastVisit) + // .sort((a, b) => b.lastVisit - a.lastVisit) + .sort((a, b) => { + if (a.backupInfo !== undefined && b.backupInfo !== undefined) { + return b.backupInfo.blobsSize - a.backupInfo.blobsSize + } else if (b.backupInfo !== undefined) { + return 1 + } else if (a.backupInfo !== undefined) { + return -1 + } else { + return b.lastVisit - a.lastVisit + } + }) }) const count = workspaces.length + console.log('found workspaces', count) + let index = 0 for (const workspace of workspaces) { index++ - toolCtx.info('processing workspace', { workspace: workspace.workspace, index, count }) + toolCtx.info('processing workspace', { + workspace: workspace.workspace, + index, + count, + blobsSize: workspace.backupInfo?.blobsSize ?? 0 + }) const workspaceId = getWorkspaceId(workspace.workspace) for (const config of storages) { diff --git a/dev/tool/src/storage.ts b/dev/tool/src/storage.ts index eb9582b8dde..e4afd9bb516 100644 --- a/dev/tool/src/storage.ts +++ b/dev/tool/src/storage.ts @@ -261,6 +261,7 @@ async function retryOnFailure ( export interface CopyDatalakeParams { concurrency: number + existing: boolean } export async function copyToDatalake ( @@ -281,7 +282,9 @@ export async function copyToDatalake ( let time = Date.now() let processedCnt = 0 + let processedSize = 0 let skippedCnt = 0 + let existingCnt = 0 let failedCnt = 0 function printStats (): void { @@ -291,14 +294,32 @@ export async function copyToDatalake ( processedCnt, 'skipped', skippedCnt, + 'existing', + existingCnt, 'failed', failedCnt, - Math.round(duration / 1000) + 's' + Math.round(duration / 1000) + 's', + formatSize(processedSize) ) time = Date.now() } + const existing = new Set() + + let cursor: string | undefined = '' + let hasMore = true + while (hasMore) { + const res = await datalake.listObjects(ctx, workspaceId, cursor, 1000) + cursor = res.cursor + hasMore = res.cursor !== undefined + for (const blob of res.blobs) { + existing.add(blob.name) + } + } + + console.info('found blobs in datalake:', existing.size) + const rateLimiter = new RateLimiter(params.concurrency) const iterator = await adapter.listStream(ctx, workspaceId) @@ -315,6 +336,12 @@ export async function copyToDatalake ( continue } + if (!params.existing && existing.has(objectName)) { + // TODO handle mutable blobs + existingCnt++ + continue + } + await rateLimiter.add(async () => { try { await retryOnFailure( @@ -323,6 +350,7 @@ export async function copyToDatalake ( async () => { await copyBlobToDatalake(ctx, workspaceId, blob, config, adapter, datalake) processedCnt += 1 + processedSize += blob.size }, 50 ) @@ -352,11 +380,6 @@ export async function copyBlobToDatalake ( datalake: DatalakeClient ): Promise { const objectName = blob._id - const stat = await datalake.statObject(ctx, workspaceId, objectName) - if (stat !== undefined) { - return - } - if (blob.size < 1024 * 1024 * 64) { // Handle small file const { endpoint, accessKey: accessKeyId, secretKey: secretAccessKey, region } = config @@ -392,3 +415,10 @@ export async function copyBlobToDatalake ( } } } + +export function formatSize (size: number): string { + const units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'] + const pow = size === 0 ? 0 : Math.floor(Math.log(size) / Math.log(1024)) + const val = (1.0 * size) / Math.pow(1024, pow) + return `${val.toFixed(2)} ${units[pow]}` +} diff --git a/server/datalake/src/client.ts b/server/datalake/src/client.ts index cca743083f8..553aaaeeb6f 100644 --- a/server/datalake/src/client.ts +++ b/server/datalake/src/client.ts @@ -90,9 +90,9 @@ export class DatalakeClient { async listObjects ( ctx: MeasureContext, workspace: WorkspaceId, - cursor: string | undefined + cursor: string | undefined, + limit: number = 100 ): Promise { - const limit = 100 const path = `/blob/${workspace.name}` const url = new URL(concatLink(this.endpoint, path)) url.searchParams.append('limit', String(limit))