Skip to content

Commit

Permalink
checkpoint 3
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewHanasiro committed Jun 9, 2024
1 parent 0c128c0 commit 1c5085a
Show file tree
Hide file tree
Showing 22 changed files with 225 additions and 259 deletions.
10 changes: 10 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"@commitlint/config-conventional": "^19.2.2",
"@stryker-mutator/core": "8.2.6",
"@stryker-mutator/jest-runner": "8.2.6",
"@testcontainers/kafka": "^10.9.0",
"@testcontainers/postgresql": "10.9.0",
"@testcontainers/redis": "10.9.0",
"@types/bcrypt": "5.0.2",
Expand Down
38 changes: 6 additions & 32 deletions src/config/kafka.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,4 @@
import { Kafka, logLevel, CompressionTypes } from 'kafkajs'

import { getEnv } from './enviroment_config'

const kafka = new Kafka({
logLevel: logLevel.ERROR,
clientId: getEnv().app.name,
connectionTimeout: 5000,
brokers: [`${getEnv().broker.url}`],
})
import { Kafka, CompressionTypes } from 'kafkajs'

type TOPIC =
| '2FA_EMAIL_CREATED'
Expand All @@ -17,29 +8,12 @@ type TOPIC =
| 'USER_CREATED'
| 'ORGANIZATION_CREATED'
| 'RESET_PASSWORD'
const TOPIC_LIST: Array<TOPIC> = [
'2FA_EMAIL_CREATED',
'2FA_PHONE_CREATED',
'2FA_EMAIL_SENT',
'2FA_PHONE_SENT',
'USER_CREATED',
'ORGANIZATION_CREATED',
'RESET_PASSWORD',
]

export async function configKafka() {
const admin = kafka.admin()
await admin.connect()
const currentTopic = await admin.listTopics()
const toBeCreatedTopic = TOPIC_LIST.filter((t) => !currentTopic.includes(t))
await admin.createTopics({
waitForLeaders: true,
topics: toBeCreatedTopic.map((t) => ({ topic: t })),
})
await admin.disconnect()
}

export async function produce<PayloadType>(topic: TOPIC, payload: PayloadType) {
export async function produce<PayloadType>(
topic: TOPIC,
payload: PayloadType,
kafka: Kafka
) {
const producer = kafka.producer()
await producer.connect()
await producer.send({
Expand Down
1 change: 0 additions & 1 deletion src/config/metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {
} from 'prom-client'

import { getEnv } from './enviroment_config'
console.warn(getEnv().app)
collectDefaultMetrics({ prefix: getEnv().app.name.replaceAll('-', '_') })

export const registry = new Registry()
Expand Down
24 changes: 20 additions & 4 deletions src/core/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Kafka, logLevel } from 'kafkajs'
import knex from 'knex'
import { createClient } from 'redis'

Expand Down Expand Up @@ -71,6 +72,13 @@ export function getCore() {
})
})

const kafka = new Kafka({
logLevel: logLevel.ERROR,
clientId: getEnv().app.name,
connectionTimeout: 5000,
brokers: [`${getEnv().broker.url}`],
})

// SERVICES
const passwordService = new PasswordService()
const uuidService = new UuidService()
Expand Down Expand Up @@ -121,13 +129,21 @@ export function getCore() {
const invalidatingToken: InvalidatingToken = new TokenRepository()
const creatingToken: CreatingToken = new TokenRepository()
const decodingToken: DecodingToken = new TokenRepository()
const sendingMfaCode: SendingMfaCode = new NotificationProvider(database)
const sendingMfaHash: SendingMfaHash = new NotificationProvider(database)
const sendingMfaCode: SendingMfaCode = new NotificationProvider(
database,
kafka
)
const sendingMfaHash: SendingMfaHash = new NotificationProvider(
database,
kafka
)
const sendingResetEmail: SendingResetEmail = new NotificationProvider(
database
database,
kafka
)
const creatingSystemUser: CreatingSystemUser = new NotificationProvider(
database
database,
kafka
)
const creatingResetPassword: CreatingResetPassword =
new ResetPasswordRepository(uuidService)
Expand Down
76 changes: 52 additions & 24 deletions src/core/providers/notification.provider.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Kafka } from 'kafkajs'
import { Knex } from 'knex'

import { produce } from '../../config/kafka'
Expand All @@ -22,7 +23,10 @@ export class NotificationProvider
SendingResetEmail,
CreatingSystemUser
{
constructor(private database: Knex) {}
constructor(
private database: Knex,
private kafkaClient: Kafka
) {}

async sendCodeByEmail(userId: string, code: string): Promise<void> {
const tuples: { email: string }[] = await this.database('user')
Expand All @@ -34,10 +38,14 @@ export class NotificationProvider
SendingMfaCodeErrorsTypes.USER_EMAIL_NOT_FOUND
)
}
await produce('2FA_EMAIL_SENT', {
email: tuples[0].email,
content: code,
})
await produce(
'2FA_EMAIL_SENT',
{
email: tuples[0].email,
content: code,
},
this.kafkaClient
)
}

async sendCodeByPhone(userId: string, code: string): Promise<void> {
Expand All @@ -52,10 +60,14 @@ export class NotificationProvider
SendingMfaCodeErrorsTypes.USER_PHONE_NOT_FOUND
)
}
await produce('2FA_PHONE_SENT', {
email: tuples[0].value,
content: code,
})
await produce(
'2FA_PHONE_SENT',
{
email: tuples[0].value,
content: code,
},
this.kafkaClient
)
}

async sendCodeByStrategy(
Expand Down Expand Up @@ -85,10 +97,14 @@ export class NotificationProvider
SendingMfaHashErrorsTypes.USER_EMAIL_HASH_NOT_FOUND
)
}
await produce('2FA_EMAIL_CREATED', {
email: tuples[0].email,
content: hash,
})
await produce(
'2FA_EMAIL_CREATED',
{
email: tuples[0].email,
content: hash,
},
this.kafkaClient
)
}

async sendMfaHashByPhone(userId: string, hash: string): Promise<void> {
Expand All @@ -103,10 +119,14 @@ export class NotificationProvider
SendingMfaHashErrorsTypes.USER_PHONE_HASH_NOT_FOUND
)
}
await produce('2FA_PHONE_CREATED', {
email: tuples[0].value,
content: hash,
})
await produce(
'2FA_PHONE_CREATED',
{
email: tuples[0].value,
content: hash,
},
this.kafkaClient
)
}

async sendMfaHashByStrategy(
Expand All @@ -127,15 +147,23 @@ export class NotificationProvider
}

async sendEmail(email: string, hash: string): Promise<void> {
await produce('RESET_PASSWORD', {
email: email,
hash: hash,
})
await produce(
'RESET_PASSWORD',
{
email: email,
hash: hash,
},
this.kafkaClient
)
}

async create(userId: string): Promise<void> {
await produce('USER_CREATED', {
external_id: userId,
})
await produce(
'USER_CREATED',
{
external_id: userId,
},
this.kafkaClient
)
}
}
2 changes: 1 addition & 1 deletion test/fixtures/multi_factor_authentication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ interface MfaInput {
isEnable?: boolean
}

export async function insertMfaIntoDatabase(database: Knex, input?: MfaInput) {
export async function insertMfaIntoDatabase(database: Knex, input: MfaInput) {
const row: Array<{ id: string }> = await database(
'multi_factor_authentication'
)
Expand Down
8 changes: 3 additions & 5 deletions test/fixtures/organization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@ import { Knex } from 'knex'

interface OrganizationInput {
name?: string
parentOrganizationId?: string
parentOrganizationId: string | null
}

export async function insertOrgIntoDatabase(
database: Knex,
input?: OrganizationInput
) {
let name = ''
if (!input?.name) {
name = casual.company_name
}
const name = input?.name ?? casual.company_name

const row: Array<{ id: string }> = await database('organization')
.insert({
name,
Expand Down
Loading

0 comments on commit 1c5085a

Please sign in to comment.