diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 57eecb33339..f7863394457 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -188,7 +188,7 @@ jobs: - name: Testing... run: node common/scripts/install-run-rush.js test env: - DB_URL: 'postgresql://postgres:example@localhost:5433' + DB_URL: 'postgresql://postgres:example@localhost:5432' ELASTIC_URL: 'http://localhost:9201' MONGO_URL: 'mongodb://localhost:27018' uitest: diff --git a/.vscode/launch.json b/.vscode/launch.json index d12aea6e3b4..6c296a3fdbf 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -34,19 +34,19 @@ "request": "launch", "args": ["src/__start.ts"], "env": { - "ELASTIC_URL": "http://localhost:9200", - "MONGO_URL": "postgresql://postgres:example@localhost:5432;mongodb://localhost:27017", + "ELASTIC_URL": "http://localhost:9201", + "MONGO_URL": "postgresql://postgres:example@localhost:5432;mongodb://localhost:27018", "APM_SERVER_URL2": "http://localhost:8200", "METRICS_CONSOLE": "false", "METRICS_FILE": "${workspaceRoot}/metrics.txt", // Show metrics in console evert 30 seconds., - "STORAGE_CONFIG": "minio|localhost?accessKey=minioadmin&secretKey=minioadmin", + "STORAGE_CONFIG": "minio|localhost:9002?accessKey=minioadmin&secretKey=minioadmin", "SERVER_SECRET": "secret", "ENABLE_CONSOLE": "true", "COLLABORATOR_URL": "ws://localhost:3078", "COLLABORATOR_API_URL": "http://localhost:3078", "REKONI_URL": "http://localhost:4004", "FRONT_URL": "http://localhost:8080", - "ACCOUNTS_URL": "http://localhost:3000", + "ACCOUNTS_URL": "http://localhost:3003", // "SERVER_PROVIDER":"uweb" "SERVER_PROVIDER":"ws", "MODEL_VERSION": "", @@ -188,17 +188,18 @@ "name": "Debug tool upgrade", "type": "node", "request": "launch", - "args": ["src/__start.ts", "stress", "ws://localhost:3333", "wrong"], + "args": ["src/__start.ts", "create-workspace", "sanity-ws", "-w sanity-ws"], "env": { "SERVER_SECRET": "secret", "MINIO_ACCESS_KEY": "minioadmin", "MINIO_SECRET_KEY": "minioadmin", - "MINIO_ENDPOINT": "localhost", - "TRANSACTOR_URL": "ws://localhost:3333", - "MONGO_URL": "mongodb://localhost:27017", - "ACCOUNTS_URL": "http://localhost:3000", + "MINIO_ENDPOINT": "localhost:9002", + "TRANSACTOR_URL": "ws://localhost:3334", + "MONGO_URL": "mongodb://localhost:27018", + "ACCOUNTS_URL": "http://localhost:3003", "TELEGRAM_DATABASE": "telegram-service", - "ELASTIC_URL": "http://localhost:9200", + "ELASTIC_URL": "http://localhost:9201", + "DB_URL": "postgresql://postgres:example@localhost:5432", "REKONI_URL": "http://localhost:4004" }, "runtimeArgs": ["--nolazy", "-r", "ts-node/register"], diff --git a/pods/server/src/server.ts b/pods/server/src/server.ts index 1c7596fa760..c662f9973c6 100644 --- a/pods/server/src/server.ts +++ b/pods/server/src/server.ts @@ -66,7 +66,7 @@ export function start ( [serverAiBotId]: { factory: createAIBotAdapter, db: '%ai-bot', - url: rawDbUrl + url: rawDbUrl ?? mainDbUrl } } } diff --git a/server/account/src/operations.ts b/server/account/src/operations.ts index 01d7485357e..a6fb9353f39 100644 --- a/server/account/src/operations.ts +++ b/server/account/src/operations.ts @@ -1325,7 +1325,7 @@ export async function createWorkspace ( dbUrls, { externalStorage: storageAdapter, - fullTextUrl: '', + fullTextUrl: 'http://localhost:9200', indexParallel: 0, indexProcessing: 0, rekoniUrl: '', diff --git a/server/postgres/src/storage.ts b/server/postgres/src/storage.ts index 4820954212a..3d51b1c6ff2 100644 --- a/server/postgres/src/storage.ts +++ b/server/postgres/src/storage.ts @@ -49,6 +49,8 @@ import core, { type WorkspaceId } from '@hcengineering/core' import { + estimateDocSize, + updateHashForDoc, type DbAdapter, type DbAdapterHandler, type DomainHelperOperations, @@ -56,6 +58,7 @@ import { type StorageAdapter, type TxAdapter } from '@hcengineering/server-core' +import { createHash } from 'crypto' import { type PoolClient } from 'pg' import { convertDoc, @@ -709,8 +712,134 @@ abstract class PostgresAdapterBase implements DbAdapter { return [] } - find (ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator { - throw new Error('Method not implemented.') + async * fetchRows (query: string): AsyncGenerator { + const cursorQuery = `${query} DECLARE my_cursor CURSOR FOR SELECT * FROM your_table;` + await this.client.query(cursorQuery); + + let fetchMore = true; + + while (fetchMore) { + const result = await this.client.query('FETCH 100 FROM my_cursor;') + + for (const row of result.rows) { + yield row; + } + + fetchMore = result.rows.length > 0 + } + + await this.client.query('CLOSE my_cursor;') + } + + find (_ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator { + const ctx = _ctx.newChild('find', { domain }) + + const getCursorName = () => { + return `cursor_${translateDomain(this.workspaceId.name)}_${translateDomain(domain)}_${mode}` + } + + let initialized: boolean = false + let mode: 'hashed' | 'non_hashed' = 'hashed' + let cursorName = getCursorName() + const bulkUpdate = new Map, string>() + + const close = async (cursorName: string) => { + try { + await this.client.query(`CLOSE ${cursorName}`) + await this.client.query(`COMMIT`) + ctx.info('Cursor closed', { cursorName }) + } catch (err) { + ctx.error('Error while closing cursor', { cursorName, err }) + } + } + + const init = async (projection: string, query: string) => { + cursorName = getCursorName() + await this.client.query(`BEGIN`) + await this.client.query(`DECLARE ${cursorName} CURSOR FOR SELECT ${projection} FROM ${translateDomain(domain)} WHERE "workspaceId" = $1 AND ${query}`, [this.workspaceId.name]) + ctx.info('Cursor initialized', { cursorName }) + } + + const next = async (): Promise => { + const result = await this.client.query(`FETCH 1 FROM ${cursorName}`) + if (result.rows.length === 0) { + return null + } + return result.rows[0] !== undefined ? parseDoc(result.rows[0]) : null + } + + const flush = async (flush = false): Promise => { + if (bulkUpdate.size > 1000 || flush) { + if (bulkUpdate.size > 0) { + await ctx.with( + 'bulk-write-find', + {}, + async () => { + const updates = new Map(Array.from(bulkUpdate.entries()).map((it) => [it[0], { '%hash%': it[1] }])) + await this.update(ctx, domain, updates) + } + ) + } + bulkUpdate.clear() + } + } + + return { + next: async () => { + if (!initialized) { + await init('_id, data', `data ->> '%hash%' IS NOT NULL AND data ->> '%hash%' <> ''`) + initialized = true + } + let d = await ctx.with('next', { mode }, async () => await next()) + if (d == null && mode === 'hashed') { + await close(cursorName) + mode = 'non_hashed' + await init('*', `data ->> '%hash%' IS NULL OR data ->> '%hash%' = ''`) + d = await ctx.with('next', { mode }, async () => await next()) + } + if (d == null) { + return undefined + } + let digest: string | null = (d as any)['%hash%'] + if ('%hash%' in d) { + delete d['%hash%'] + } + const pos = (digest ?? '').indexOf('|') + if (digest == null || digest === '') { + const cs = ctx.newChild('calc-size', {}) + const size = estimateDocSize(d) + cs.end() + + const hash = createHash('sha256') + updateHashForDoc(hash, d) + digest = hash.digest('base64') + + bulkUpdate.set(d._id, `${digest}|${size.toString(16)}`) + + await ctx.with('flush', {}, async () => { + await flush() + }) + return { + id: d._id, + hash: digest, + size + } + } else { + return { + id: d._id, + hash: digest.slice(0, pos), + size: parseInt(digest.slice(pos + 1), 16) + } + } + }, + close: async () => { + await ctx.with('flush', {}, async () => { + await flush(true) + }) + await close(cursorName) + ctx.end() + } + } } async load (ctx: MeasureContext, domain: Domain, docs: Ref[]): Promise { @@ -727,7 +856,21 @@ abstract class PostgresAdapterBase implements DbAdapter { } async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise { - await this.insert(domain, docs) + return await this.retryTxn(async (client) => { + while (docs.length > 0) { + const part = docs.splice(0, 1) + const vals = part + .map((doc) => { + const d = convertDoc(doc, this.workspaceId.name) + return `('${d._id}', '${d.workspaceId}', '${d._class}', '${d.createdBy ?? d.modifiedBy}', '${d.modifiedBy}', ${d.modifiedOn}, ${d.createdOn ?? d.modifiedOn}, '${d.space}', '${d.attachedTo ?? 'NULL'}', '${escapeBackticks(JSON.stringify(d.data))}')` + }) + .join(', ') + await client.query( + `INSERT INTO ${translateDomain(domain)} (_id, "workspaceId", _class, "createdBy", "modifiedBy", "modifiedOn", "createdOn", space, "attachedTo", data) VALUES ${vals} + ON CONFLICT (_id, "workspaceId") DO UPDATE SET _class = EXCLUDED._class, "createdBy" = EXCLUDED."createdBy", "modifiedBy" = EXCLUDED."modifiedBy", "modifiedOn" = EXCLUDED."modifiedOn", "createdOn" = EXCLUDED."createdOn", space = EXCLUDED.space, "attachedTo" = EXCLUDED."attachedTo", data = EXCLUDED.data;` + ) + } + }) } async clean (ctx: MeasureContext, domain: Domain, docs: Ref[]): Promise { @@ -766,7 +909,9 @@ abstract class PostgresAdapterBase implements DbAdapter { for (const [_id, ops] of operations) { const doc = map.get(_id) if (doc === undefined) continue - ;(ops as any)['%hash%'] = null + if ((ops as any)['%hash%'] === undefined) { + ;(ops as any)['%hash%'] = null + } TxProcessor.applyUpdate(doc, ops) const converted = convertDoc(doc, this.workspaceId.name) await client.query(`UPDATE ${translateDomain(domain)} SET data = $3 WHERE _id = $1 AND "workspaceId" = $2`, [ diff --git a/server/tool/src/index.ts b/server/tool/src/index.ts index 1d24db0463b..7ca8b2175ee 100644 --- a/server/tool/src/index.ts +++ b/server/tool/src/index.ts @@ -143,8 +143,6 @@ export async function initModel ( } catch (err: any) { ctx.error('Failed to create workspace', { error: err }) throw err - } finally { - await storageAdapter.close() } }