Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reshard logic #82

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open

Reshard logic #82

wants to merge 18 commits into from

Conversation

akp111
Copy link
Collaborator

@akp111 akp111 commented Nov 15, 2024

No description provided.

@akp111 akp111 marked this pull request as ready for review January 12, 2025 11:57
@@ -39,7 +39,7 @@ const options = {
logger.info(`PG_USER is ${EnvLoader.getPropertyOrFail('PG_USER')}`)
const pg: pgPromise.IMain<{}, IClient> = pgPromise(options)
export const pgPool = pg(
`postgres://${EnvLoader.getPropertyOrFail('PG_USER')}:${EnvLoader.getPropertyOrFail('PG_PASS')}@${EnvLoader.getPropertyOrFail('PG_HOST')}:${EnvLoader.getPropertyOrDefault('PG_PORT', '5432')}/${EnvLoader.getPropertyOrFail('DB_NAME')}`
`postgres://${EnvLoader.getPropertyOrFail('PG_USER')}:${EnvLoader.getPropertyOrFail('PG_PASS')}@${EnvLoader.getPropertyOrFail('PG_HOST')}:${EnvLoader.getPropertyOrDefault('PG_PORT', '10007')}/${EnvLoader.getPropertyOrFail('DB_NAME')}`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the same 5432 as a default port?

@@ -41,4 +49,366 @@ export class Block {
Block.log.debug('getBulkBlocksByHash() result: %s', statusArray)
return statusArray
}

static async checkIfTableViewExists(viewName: string): Promise<boolean> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you remove this method? I cannot find any real usages.


public async execute(params: [string, number, number, number]) {
try {
console.log('Executing push_getBlockHashFromView', { params })
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we replace console log with this.log.debug everywhere

shardIds
)

console.log({
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we replace console log with this.log.debug everywhere

}
}

this.log.info('Node shard map constructed successfully', {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't log anything. Please find all places with parameterized logging and so smth like
log.info('test %s', param);

valid params are %s, %o, %d

for (const [shardId, clients] of this.nodeMap.entries()) {
this.log.info('Initiating pooling for shard:', { shardId })
// Update the currently syncing shard and last synced page number
clients.map(async (client) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!!! This code looks fully async.
It might lead to the fact that
await this.validateAndStoreBlockHashes - won't see your db changes, such as syncing shard = shardId and pageNumber = 1 ???????

const blockProcessPromises = fullBlocks.map((block) => {
const blockBytes = BitUtil.base16ToBytes(block)
const parsedBlock = BlockUtil.parseBlock(blockBytes)
return Container.get(StorageNode).handleBlock(parsedBlock, blockBytes)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you inject the field into BlockPolling like this

@Inject((type) => StorageNode)
private storageNode: StorageNode

or
@Injject() without params; whatever works

}
}

public async validateAndStoreBlockHashesOld(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you remove this, unless the method is being used somewhere

client.getViewName()
)

const blockProcessPromises = fullBlocks.map((block) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need a

try {

do block processing

} catch (e) {

log.error ...

}

I'd expect that some block processing might throw exceptions and this should not stop syncing the client itself.

* 3. if the blockhashes are not present in the db, query the full block from the respective node
* 4. parse the block and store it in the db
*/
public async validateAndStoreBlockHashes(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!!! I'd expect to have some parallelization in one of two key points

Option1:
every shardId is processing in parallel (so 32 logical workloads max)

in this case we will call
let promises = [];
promises.push(validateAndStoreBlockHashes(1,...32))
Promise.allSettled(promises)

Option2:
every "for (const client of clients) {" body at line 246 gets executed in parallel

Let me know what you think.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants