Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: read replica db for findByCid #1212

12 changes: 12 additions & 0 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@
"tableName": "knex_migrations"
}
},
"replica_db": {
"client": "postgresql",
"connection": {
"database": "@@REPLICA_DB_NAME",
"host": "@@REPLICA_DB_HOST",
"user": "@@REPLICA_DB_USERNAME",
"password": "@@REPLICA_DB_PASSWORD",
"port": "@@REPLICA_DB_PORT",
"connectionString": ""
},
"debug": false
},
"queue": {
"type": "sqs",
"awsRegion": "us-east-1",
Expand Down
9 changes: 9 additions & 0 deletions config/env/dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@
"port": "@@DB_PORT"
}
},
"replica_db": {
"connection": {
"database": "@@REPLICA_DB_NAME",
"host": "@@REPLICA_DB_HOST",
"user": "@@REPLICA_DB_USERNAME",
"password": "@@REPLICA_DB_PASSWORD",
"port": "@@REPLICA_DB_PORT"
}
},
"queue": {
"type": "sqs",
"awsRegion": "@@AWS_REGION",
Expand Down
9 changes: 9 additions & 0 deletions config/env/prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@
"port": "@@DB_PORT"
}
},
"replica_db": {
"connection": {
"database": "@@REPLICA_DB_NAME",
"host": "@@REPLICA_DB_HOST",
"user": "@@REPLICA_DB_USERNAME",
"password": "@@REPLICA_DB_PASSWORD",
"port": "@@REPLICA_DB_PORT"
}
},
"queue": {
"type": "sqs",
"awsRegion": "@@AWS_REGION",
Expand Down
10 changes: 10 additions & 0 deletions config/env/test.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@
"tableName": "knex_migrations"
}
},
"replica_db": {
"connection": {
"database": "@@REPLICA_DB_NAME",
"host": "@@REPLICA_DB_HOST",
"user": "@@REPLICA_DB_USERNAME",
"password": "@@REPLICA_DB_PASSWORD",
"port": "@@REPLICA_DB_PORT",
"connectionString": "@@DATABASE_URL"
}
},
"queue": {
"type": "sqs",
"awsRegion": "us-east-1",
Expand Down
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@
"supertest": "^6.3.3",
"tmp-promise": "^3.0.3",
"ts-essentials": "^9.3.2",
"typescript": "^5.0.4"
"typescript": "^5.4.5"
},
"release-it": {
"git": {
Expand Down
27 changes: 19 additions & 8 deletions src/__tests__/ceramic_integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import type { GanacheServer } from './make-ganache.util.js'
import tmp from 'tmp-promise'
import getPort from 'get-port'
import type { Knex } from 'knex'
import { clearTables, createDbConnection } from '../db-connection.js'
import { clearTables, createDbConnection, createReplicaDbConnection } from '../db-connection.js'
import { CeramicAnchorApp } from '../app.js'
import { config } from 'node-config-ts'
import cloneDeep from 'lodash.clonedeep'
Expand Down Expand Up @@ -184,7 +184,8 @@ interface MinimalCASConfig {
async function makeCAS(
container: Injector,
dbConnection: Knex,
minConfig: MinimalCASConfig
minConfig: MinimalCASConfig,
replicaDbConnection: Knex
): Promise<CeramicAnchorApp> {
const configCopy = cloneDeep(config)
configCopy.mode = minConfig.mode
Expand All @@ -204,7 +205,7 @@ async function makeCAS(
mode: 'inmemory',
}
return new CeramicAnchorApp(
container.provideValue('config', configCopy).provideValue('dbConnection', dbConnection)
container.provideValue('config', configCopy).provideValue('dbConnection', dbConnection).provideValue('replicaDbConnection', replicaDbConnection)
)
}

Expand Down Expand Up @@ -256,6 +257,9 @@ describe('Ceramic Integration Test', () => {
let dbConnection1: Knex
let dbConnection2: Knex

let replicaDbConnection1: Knex
let replicaDbConnection2: Knex

let casPort1: number
let cas1: CeramicAnchorApp
let anchorService1: AnchorService
Expand Down Expand Up @@ -319,12 +323,15 @@ describe('Ceramic Integration Test', () => {
await anchorLauncher.stop()
})

// TODO_WS2-3238_1 : update tests to test with replica db connection as well
// TODO_WS2-3238_2 : make hermetic env have replica db connection
describe('Using anchor version 1', () => {
beforeAll(async () => {
const useSmartContractAnchors = true

// Start anchor services
dbConnection1 = await createDbConnection()
replicaDbConnection1 = await createReplicaDbConnection()
casPort1 = await getPort()

cas1 = await makeCAS(createInjector(), dbConnection1, {
Expand All @@ -333,18 +340,19 @@ describe('Ceramic Integration Test', () => {
ganachePort: ganacheServer.port,
port: casPort1,
useSmartContractAnchors,
})
}, replicaDbConnection1)
await cas1.start()
anchorService1 = cas1.container.resolve('anchorService')
dbConnection2 = await teeDbConnection(dbConnection1)
replicaDbConnection2 = await createReplicaDbConnection()
const casPort2 = await getPort()
cas2 = await makeCAS(createInjector(), dbConnection2, {
mode: 'server',
ipfsPort: ipfsApiPort2,
ganachePort: ganacheServer.port,
port: casPort2,
useSmartContractAnchors,
})
}, replicaDbConnection2)
await cas2.start()
anchorService2 = cas2.container.resolve('anchorService')

Expand All @@ -368,6 +376,7 @@ describe('Ceramic Integration Test', () => {
await cas1.stop()
await cas2.stop()
await Promise.all([dbConnection1.destroy(), dbConnection2.destroy()])
await Promise.all([replicaDbConnection1.destroy(), replicaDbConnection2.destroy()])
await Promise.all([ceramic1.close(), ceramic2.close()])
})

Expand Down Expand Up @@ -532,14 +541,15 @@ describe('CAR file', () => {
const casIPFS = await createIPFS(ipfsApiPort)
const ganacheServer = await makeGanache()
const dbConnection = await createDbConnection()
const dummyReplicaDbConnection = await createReplicaDbConnection()
const casPort = await getPort()
const cas = await makeCAS(createInjector(), dbConnection, {
mode: 'server',
ipfsPort: ipfsApiPort,
ganachePort: ganacheServer.port,
port: casPort,
useSmartContractAnchors: true,
})
}, dummyReplicaDbConnection)
await cas.start()

const ceramicIPFS = await createIPFS(await getPort())
Expand Down Expand Up @@ -608,6 +618,7 @@ describe('Metrics Options', () => {
const casIPFS = await createIPFS(ipfsApiPort)
const ganacheServer = await makeGanache()
const dbConnection = await createDbConnection()
const dummyReplicaDbConnection = await createReplicaDbConnection()
const casPort = await getPort()
const cas = await makeCAS(createInjector(), dbConnection, {
mode: 'server',
Expand All @@ -618,7 +629,7 @@ describe('Metrics Options', () => {
metrics: {
instanceIdentifier: '234fffffffffffffffffffffffffffffffff9726129',
},
})
}, dummyReplicaDbConnection)
await cas.start()
// Teardown
await cas.stop()
Expand All @@ -632,7 +643,7 @@ describe('Metrics Options', () => {
metrics: {
instanceIdentifier: '',
},
})
}, dummyReplicaDbConnection)
await cas2.start()
await cas2.stop()

Expand Down
8 changes: 6 additions & 2 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { BlockchainService } from './services/blockchain/blockchain-service.js'
import { HTTPEventProducerService } from './services/event-producer/http/http-event-producer-service.js'
import { AnchorRepository } from './repositories/anchor-repository.js'
import { RequestRepository } from './repositories/request-repository.js'
import { ReplicationRequestRepository } from './repositories/replication-request-repository.js'
import { TransactionRepository } from './repositories/transaction-repository.js'
import { HealthcheckController } from './controllers/healthcheck-controller.js'
import { AnchorController } from './controllers/anchor-controller.js'
Expand Down Expand Up @@ -48,11 +49,13 @@ import { makeWitnessService, type IWitnessService } from './services/witness-ser
type DependenciesContext = {
config: Config
dbConnection: Knex
replicaDbConnection: Knex
}

type ProvidedContext = {
anchorService: AnchorService
requestRepository: RequestRepository
replicationRequestRepository: ReplicationRequestRepository
anchorRepository: AnchorRepository
transactionRepository: TransactionRepository
blockchainService: BlockchainService
Expand Down Expand Up @@ -98,6 +101,7 @@ export class CeramicAnchorApp {
.provideFactory('requestRepository', RequestRepository.make)
.provideClass('anchorRepository', AnchorRepository)
.provideClass('transactionRepository', TransactionRepository)
.provideClass('replicationRequestRepository', ReplicationRequestRepository)
// register services
.provideFactory('blockchainService', EthereumBlockchainService.make)
.provideClass('eventProducerService', HTTPEventProducerService)
Expand All @@ -113,8 +117,8 @@ export class CeramicAnchorApp {
.provideClass('healthcheckService', HealthcheckService)
.provideClass('requestPresentationService', RequestPresentationService)
.provideClass('anchorRequestParamsParser', AnchorRequestParamsParser)
.provideClass('requestService', RequestService)
.provideClass('continualAnchoringScheduler', TaskSchedulerService)
.provideClass('requestService', RequestService)

try {
Metrics.start(
Expand All @@ -130,7 +134,7 @@ export class CeramicAnchorApp {
Metrics.count('HELLO', 1)
logger.imp('Metrics exporter started')
if (this.config.metrics.instanceIdentifier) {
Metrics.setInstanceIdentifier(this.config.metrics.instanceIdentifier)
Metrics.setInstanceIdentifier(this.config.metrics.instanceIdentifier)
}
} catch (e: any) {
logger.imp('ERROR: Metrics exporter failed to start. Continuing anyway.')
Expand Down
23 changes: 19 additions & 4 deletions src/controllers/__tests__/request-controller.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, expect, jest, test, beforeAll, afterAll } from '@jest/globals'
import { createDbConnection, clearTables } from '../../db-connection.js'
import { createDbConnection, clearTables, createReplicaDbConnection } from '../../db-connection.js'
import { createInjector, Injector } from 'typed-inject'
import { config } from 'node-config-ts'
import { RequestController } from '../request-controller.js'
Expand Down Expand Up @@ -32,11 +32,13 @@ import { RequestService } from '../../services/request-service.js'
import { ValidationSqsQueueService } from '../../services/queue/sqs-queue-service.js'
import { makeWitnessService } from '../../services/witness-service.js'
import { makeMerkleCarService } from '../../services/merkle-car-service.js'
import { ReplicationRequestRepository } from '../../repositories/replication-request-repository.js'

type Tokens = {
requestController: RequestController
requestRepository: RequestRepository
metadataService: IMetadataService
replicationRequestRepository: ReplicationRequestRepository
}

const FAKE_STREAM_ID_1 = StreamID.fromString(
Expand Down Expand Up @@ -80,19 +82,24 @@ class MockMetadataService implements IMetadataService {

// TODO: CDB-2287 Add tests checking for expected errors when missing/malformed CID/StreamID/GenesisCommit
// are detected in a CAR file
// TODO: WS2-3238 Add calls to replica db connection in the test as well
describe('createRequest', () => {
let dbConnection: Knex
let replicaDbConnection: Knex
let container: Injector<Tokens>
let controller: RequestController

beforeAll(async () => {
dbConnection = await createDbConnection()
replicaDbConnection = await createReplicaDbConnection()
await clearTables(dbConnection)
container = createInjector()
.provideValue('config', config)
.provideValue('dbConnection', dbConnection)
.provideValue('replicaDbConnection', replicaDbConnection)
.provideClass('metadataRepository', MetadataRepository)
.provideFactory('requestRepository', RequestRepository.make)
.provideClass('replicationRequestRepository', ReplicationRequestRepository)
.provideClass('anchorRepository', AnchorRepository)
.provideClass('ipfsService', MockIpfsService)
.provideFactory('merkleCarService', makeMerkleCarService)
Expand Down Expand Up @@ -343,13 +350,19 @@ describe('createRequest', () => {

const requestRepository = container.resolve('requestRepository')
const findByCidSpy = jest.spyOn(requestRepository, 'findByCid')
const replicaRequestRepository = container.resolve('replicationRequestRepository')
const findByCidSpyReplica = jest.spyOn(replicaRequestRepository, 'findByCid')
const res0 = mockResponse()
const res1 = mockResponse()

await Promise.all([controller.createRequest(req, res0), controller.createRequest(req, res1)])

expect(findByCidSpy).toBeCalledTimes(1)
expect(findByCidSpy).toBeCalledWith(cid)
try {
expect(findByCidSpyReplica).toBeCalledTimes(1)
expect(findByCidSpyReplica).toBeCalledWith(cid)
} catch (err) {
expect(findByCidSpy).toBeCalledTimes(1)
expect(findByCidSpy).toBeCalledWith(cid)
}

const status0 = res0.status.mock.calls[0][0]
const status1 = res1.status.mock.calls[0][0]
Expand All @@ -370,8 +383,10 @@ describe('createRequest', () => {
queue: { sqsQueueUrl: 'testurl' },
})
.provideValue('dbConnection', dbConnection)
.provideValue('replicaDbConnection', replicaDbConnection)
.provideClass('metadataRepository', MetadataRepository)
.provideFactory('requestRepository', RequestRepository.make)
.provideClass('replicationRequestRepository', ReplicationRequestRepository)
.provideClass('anchorRepository', AnchorRepository)
.provideClass('ipfsService', MockIpfsService)
.provideFactory('merkleCarService', makeMerkleCarService)
Expand Down
Loading