Skip to content

Commit

Permalink
UBERF-9167 Enhance blob migration tool (#7697)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
  • Loading branch information
aonnikov authored Jan 17, 2025
1 parent 8829859 commit 08e4ee5
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 13 deletions.
30 changes: 25 additions & 5 deletions dev/tool/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1193,12 +1193,14 @@ export function devTool (

program
.command('copy-s3-datalake')
.description('migrate files from s3 to datalake')
.description('copy files from s3 to datalake')
.option('-w, --workspace <workspace>', 'Selected workspace only', '')
.option('-c, --concurrency <concurrency>', 'Number of files being processed concurrently', '10')
.action(async (cmd: { workspace: string, concurrency: string }) => {
.option('-e, --existing', 'Copy existing blobs', false)
.action(async (cmd: { workspace: string, concurrency: string, existing: boolean }) => {
const params = {
concurrency: parseInt(cmd.concurrency)
concurrency: parseInt(cmd.concurrency),
existing: cmd.existing
}

const storageConfig = storageConfigFromEnv(process.env.STORAGE)
Expand All @@ -1222,14 +1224,32 @@ export function devTool (
workspaces = workspaces
.filter((p) => isActiveMode(p.mode) || isArchivingMode(p.mode))
.filter((p) => cmd.workspace === '' || p.workspace === cmd.workspace)
.sort((a, b) => b.lastVisit - a.lastVisit)
// .sort((a, b) => b.lastVisit - a.lastVisit)
.sort((a, b) => {
if (a.backupInfo !== undefined && b.backupInfo !== undefined) {
return b.backupInfo.blobsSize - a.backupInfo.blobsSize
} else if (b.backupInfo !== undefined) {
return 1
} else if (a.backupInfo !== undefined) {
return -1
} else {
return b.lastVisit - a.lastVisit
}
})
})

const count = workspaces.length
console.log('found workspaces', count)

let index = 0
for (const workspace of workspaces) {
index++
toolCtx.info('processing workspace', { workspace: workspace.workspace, index, count })
toolCtx.info('processing workspace', {
workspace: workspace.workspace,
index,
count,
blobsSize: workspace.backupInfo?.blobsSize ?? 0
})
const workspaceId = getWorkspaceId(workspace.workspace)

for (const config of storages) {
Expand Down
42 changes: 36 additions & 6 deletions dev/tool/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ async function retryOnFailure<T> (

export interface CopyDatalakeParams {
concurrency: number
existing: boolean
}

export async function copyToDatalake (
Expand All @@ -281,7 +282,9 @@ export async function copyToDatalake (

let time = Date.now()
let processedCnt = 0
let processedSize = 0
let skippedCnt = 0
let existingCnt = 0
let failedCnt = 0

function printStats (): void {
Expand All @@ -291,14 +294,32 @@ export async function copyToDatalake (
processedCnt,
'skipped',
skippedCnt,
'existing',
existingCnt,
'failed',
failedCnt,
Math.round(duration / 1000) + 's'
Math.round(duration / 1000) + 's',
formatSize(processedSize)
)

time = Date.now()
}

const existing = new Set<string>()

let cursor: string | undefined = ''
let hasMore = true
while (hasMore) {
const res = await datalake.listObjects(ctx, workspaceId, cursor, 1000)
cursor = res.cursor
hasMore = res.cursor !== undefined
for (const blob of res.blobs) {
existing.add(blob.name)
}
}

console.info('found blobs in datalake:', existing.size)

const rateLimiter = new RateLimiter(params.concurrency)

const iterator = await adapter.listStream(ctx, workspaceId)
Expand All @@ -315,6 +336,12 @@ export async function copyToDatalake (
continue
}

if (!params.existing && existing.has(objectName)) {
// TODO handle mutable blobs
existingCnt++
continue
}

await rateLimiter.add(async () => {
try {
await retryOnFailure(
Expand All @@ -323,6 +350,7 @@ export async function copyToDatalake (
async () => {
await copyBlobToDatalake(ctx, workspaceId, blob, config, adapter, datalake)
processedCnt += 1
processedSize += blob.size
},
50
)
Expand Down Expand Up @@ -352,11 +380,6 @@ export async function copyBlobToDatalake (
datalake: DatalakeClient
): Promise<void> {
const objectName = blob._id
const stat = await datalake.statObject(ctx, workspaceId, objectName)
if (stat !== undefined) {
return
}

if (blob.size < 1024 * 1024 * 64) {
// Handle small file
const { endpoint, accessKey: accessKeyId, secretKey: secretAccessKey, region } = config
Expand Down Expand Up @@ -392,3 +415,10 @@ export async function copyBlobToDatalake (
}
}
}

export function formatSize (size: number): string {
const units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']
const pow = size === 0 ? 0 : Math.floor(Math.log(size) / Math.log(1024))
const val = (1.0 * size) / Math.pow(1024, pow)
return `${val.toFixed(2)} ${units[pow]}`
}
4 changes: 2 additions & 2 deletions server/datalake/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ export class DatalakeClient {
async listObjects (
ctx: MeasureContext,
workspace: WorkspaceId,
cursor: string | undefined
cursor: string | undefined,
limit: number = 100
): Promise<ListObjectOutput> {
const limit = 100
const path = `/blob/${workspace.name}`
const url = new URL(concatLink(this.endpoint, path))
url.searchParams.append('limit', String(limit))
Expand Down

0 comments on commit 08e4ee5

Please sign in to comment.