Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Denis Bykhov <bykhov.denis@gmail.com>
  • Loading branch information
BykhovDenis committed Aug 20, 2024
1 parent 80453a0 commit 2fbbdc2
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ jobs:
- name: Testing...
run: node common/scripts/install-run-rush.js test
env:
DB_URL: 'postgresql://postgres:example@localhost:5433'
DB_URL: 'postgresql://postgres:example@localhost:5432'
ELASTIC_URL: 'http://localhost:9201'
MONGO_URL: 'mongodb://localhost:27018'
uitest:
Expand Down
21 changes: 11 additions & 10 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@
"request": "launch",
"args": ["src/__start.ts"],
"env": {
"ELASTIC_URL": "http://localhost:9200",
"MONGO_URL": "postgresql://postgres:example@localhost:5432;mongodb://localhost:27017",
"ELASTIC_URL": "http://localhost:9201",
"MONGO_URL": "postgresql://postgres:example@localhost:5432;mongodb://localhost:27018",
"APM_SERVER_URL2": "http://localhost:8200",
"METRICS_CONSOLE": "false",
"METRICS_FILE": "${workspaceRoot}/metrics.txt", // Show metrics in console evert 30 seconds.,
"STORAGE_CONFIG": "minio|localhost?accessKey=minioadmin&secretKey=minioadmin",
"STORAGE_CONFIG": "minio|localhost:9002?accessKey=minioadmin&secretKey=minioadmin",
"SERVER_SECRET": "secret",
"ENABLE_CONSOLE": "true",
"COLLABORATOR_URL": "ws://localhost:3078",
"COLLABORATOR_API_URL": "http://localhost:3078",
"REKONI_URL": "http://localhost:4004",
"FRONT_URL": "http://localhost:8080",
"ACCOUNTS_URL": "http://localhost:3000",
"ACCOUNTS_URL": "http://localhost:3003",
// "SERVER_PROVIDER":"uweb"
"SERVER_PROVIDER":"ws",
"MODEL_VERSION": "",
Expand Down Expand Up @@ -188,17 +188,18 @@
"name": "Debug tool upgrade",
"type": "node",
"request": "launch",
"args": ["src/__start.ts", "stress", "ws://localhost:3333", "wrong"],
"args": ["src/__start.ts", "create-workspace", "sanity-ws", "-w sanity-ws"],
"env": {
"SERVER_SECRET": "secret",
"MINIO_ACCESS_KEY": "minioadmin",
"MINIO_SECRET_KEY": "minioadmin",
"MINIO_ENDPOINT": "localhost",
"TRANSACTOR_URL": "ws://localhost:3333",
"MONGO_URL": "mongodb://localhost:27017",
"ACCOUNTS_URL": "http://localhost:3000",
"MINIO_ENDPOINT": "localhost:9002",
"TRANSACTOR_URL": "ws://localhost:3334",
"MONGO_URL": "mongodb://localhost:27018",
"ACCOUNTS_URL": "http://localhost:3003",
"TELEGRAM_DATABASE": "telegram-service",
"ELASTIC_URL": "http://localhost:9200",
"ELASTIC_URL": "http://localhost:9201",
"DB_URL": "postgresql://postgres:example@localhost:5432",
"REKONI_URL": "http://localhost:4004"
},
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
Expand Down
2 changes: 1 addition & 1 deletion pods/server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export function start (
[serverAiBotId]: {
factory: createAIBotAdapter,
db: '%ai-bot',
url: rawDbUrl
url: rawDbUrl ?? mainDbUrl
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/account/src/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,7 @@ export async function createWorkspace (
dbUrls,
{
externalStorage: storageAdapter,
fullTextUrl: '',
fullTextUrl: 'http://localhost:9200',
indexParallel: 0,
indexProcessing: 0,
rekoniUrl: '',
Expand Down
153 changes: 149 additions & 4 deletions server/postgres/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,16 @@ import core, {
type WorkspaceId
} from '@hcengineering/core'
import {
estimateDocSize,
updateHashForDoc,
type DbAdapter,
type DbAdapterHandler,
type DomainHelperOperations,
type ServerFindOptions,
type StorageAdapter,
type TxAdapter
} from '@hcengineering/server-core'
import { createHash } from 'crypto'
import { type PoolClient } from 'pg'
import {
convertDoc,
Expand Down Expand Up @@ -709,8 +712,134 @@ abstract class PostgresAdapterBase implements DbAdapter {
return []
}

find (ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator {
throw new Error('Method not implemented.')
async * fetchRows (query: string): AsyncGenerator<any, void, unknown> {
const cursorQuery = `${query} DECLARE my_cursor CURSOR FOR SELECT * FROM your_table;`
await this.client.query(cursorQuery);

let fetchMore = true;

while (fetchMore) {
const result = await this.client.query('FETCH 100 FROM my_cursor;')

for (const row of result.rows) {
yield row;
}

fetchMore = result.rows.length > 0
}

await this.client.query('CLOSE my_cursor;')
}

find (_ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator {
const ctx = _ctx.newChild('find', { domain })

const getCursorName = () => {
return `cursor_${translateDomain(this.workspaceId.name)}_${translateDomain(domain)}_${mode}`
}

let initialized: boolean = false
let mode: 'hashed' | 'non_hashed' = 'hashed'
let cursorName = getCursorName()
const bulkUpdate = new Map<Ref<Doc>, string>()

const close = async (cursorName: string) => {
try {
await this.client.query(`CLOSE ${cursorName}`)
await this.client.query(`COMMIT`)
ctx.info('Cursor closed', { cursorName })
} catch (err) {
ctx.error('Error while closing cursor', { cursorName, err })
}
}

const init = async (projection: string, query: string) => {
cursorName = getCursorName()
await this.client.query(`BEGIN`)
await this.client.query(`DECLARE ${cursorName} CURSOR FOR SELECT ${projection} FROM ${translateDomain(domain)} WHERE "workspaceId" = $1 AND ${query}`, [this.workspaceId.name])
ctx.info('Cursor initialized', { cursorName })
}

const next = async (): Promise<Doc | null> => {
const result = await this.client.query(`FETCH 1 FROM ${cursorName}`)
if (result.rows.length === 0) {
return null
}
return result.rows[0] !== undefined ? parseDoc(result.rows[0]) : null
}

const flush = async (flush = false): Promise<void> => {
if (bulkUpdate.size > 1000 || flush) {
if (bulkUpdate.size > 0) {
await ctx.with(
'bulk-write-find',
{},
async () => {
const updates = new Map(Array.from(bulkUpdate.entries()).map((it) => [it[0], { '%hash%': it[1] }]))
await this.update(ctx, domain, updates)
}
)
}
bulkUpdate.clear()
}
}

return {
next: async () => {
if (!initialized) {
await init('_id, data', `data ->> '%hash%' IS NOT NULL AND data ->> '%hash%' <> ''`)
initialized = true
}
let d = await ctx.with('next', { mode }, async () => await next())
if (d == null && mode === 'hashed') {
await close(cursorName)
mode = 'non_hashed'
await init('*', `data ->> '%hash%' IS NULL OR data ->> '%hash%' = ''`)
d = await ctx.with('next', { mode }, async () => await next())
}
if (d == null) {
return undefined
}
let digest: string | null = (d as any)['%hash%']
if ('%hash%' in d) {
delete d['%hash%']
}
const pos = (digest ?? '').indexOf('|')
if (digest == null || digest === '') {
const cs = ctx.newChild('calc-size', {})
const size = estimateDocSize(d)
cs.end()

const hash = createHash('sha256')
updateHashForDoc(hash, d)
digest = hash.digest('base64')

bulkUpdate.set(d._id, `${digest}|${size.toString(16)}`)

await ctx.with('flush', {}, async () => {
await flush()
})
return {
id: d._id,
hash: digest,
size
}
} else {
return {
id: d._id,
hash: digest.slice(0, pos),
size: parseInt(digest.slice(pos + 1), 16)
}
}
},
close: async () => {
await ctx.with('flush', {}, async () => {
await flush(true)
})
await close(cursorName)
ctx.end()
}
}
}

async load (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
Expand All @@ -727,7 +856,21 @@ abstract class PostgresAdapterBase implements DbAdapter {
}

async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise<void> {
await this.insert(domain, docs)
return await this.retryTxn(async (client) => {
while (docs.length > 0) {
const part = docs.splice(0, 1)
const vals = part
.map((doc) => {
const d = convertDoc(doc, this.workspaceId.name)
return `('${d._id}', '${d.workspaceId}', '${d._class}', '${d.createdBy ?? d.modifiedBy}', '${d.modifiedBy}', ${d.modifiedOn}, ${d.createdOn ?? d.modifiedOn}, '${d.space}', '${d.attachedTo ?? 'NULL'}', '${escapeBackticks(JSON.stringify(d.data))}')`
})
.join(', ')
await client.query(
`INSERT INTO ${translateDomain(domain)} (_id, "workspaceId", _class, "createdBy", "modifiedBy", "modifiedOn", "createdOn", space, "attachedTo", data) VALUES ${vals}
ON CONFLICT (_id, "workspaceId") DO UPDATE SET _class = EXCLUDED._class, "createdBy" = EXCLUDED."createdBy", "modifiedBy" = EXCLUDED."modifiedBy", "modifiedOn" = EXCLUDED."modifiedOn", "createdOn" = EXCLUDED."createdOn", space = EXCLUDED.space, "attachedTo" = EXCLUDED."attachedTo", data = EXCLUDED.data;`
)
}
})
}

async clean (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
Expand Down Expand Up @@ -766,7 +909,9 @@ abstract class PostgresAdapterBase implements DbAdapter {
for (const [_id, ops] of operations) {
const doc = map.get(_id)
if (doc === undefined) continue
;(ops as any)['%hash%'] = null
if ((ops as any)['%hash%'] === undefined) {
;(ops as any)['%hash%'] = null
}
TxProcessor.applyUpdate(doc, ops)
const converted = convertDoc(doc, this.workspaceId.name)
await client.query(`UPDATE ${translateDomain(domain)} SET data = $3 WHERE _id = $1 AND "workspaceId" = $2`, [
Expand Down
2 changes: 0 additions & 2 deletions server/tool/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ export async function initModel (
} catch (err: any) {
ctx.error('Failed to create workspace', { error: err })
throw err
} finally {
await storageAdapter.close()
}
}

Expand Down

0 comments on commit 2fbbdc2

Please sign in to comment.