Skip to content

Commit

Permalink
Merge pull request #3 from osskit/add-test
Browse files Browse the repository at this point in the history
  • Loading branch information
guysegal authored Nov 11, 2023
2 parents ac64d31 + 3204c47 commit c00d09a
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 458 deletions.
45 changes: 23 additions & 22 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,35 @@
"logs": "docker-compose -f tests/dafka.yml logs"
},
"dependencies": {
"@osskit/wiremock-client": "^4.3.1",
"@types/jest": "^24.0.12",
"@types/lodash": "^4.14.149",
"@types/node": "^18.16.3",
"@types/node-fetch": "^2.5.2",
"@types/uuid": "^3.4.4",
"delay": "^4.2.0",
"jest": "^29.3.1",
"jest-config": "^29.3.1",
"kafka-node": "4.1.3",
"lodash": "^4.17.15",
"node-fetch": "^2.6.0",
"ts-jest": "^29.1.0",
"tslib": "^1.10.0",
"typescript": "^5.0.4",
"uuid": "^3.3.2"
"@osskit/wiremock-client": "^4.3.2",
"@types/jest": "^29.5.3",
"@types/lodash": "^4.14.197",
"@types/node": "^20.5.0",
"@types/node-fetch": "^2.6.4",
"@types/uuid": "^9.0.2",
"delay": "^6.0.0",
"jest": "^29.6.2",
"jest-config": "^29.6.2",
"kafka-node": "5.0.0",
"lodash": "^4.17.21",
"node-fetch": "^3.3.2",
"ts-jest": "^29.1.1",
"tslib": "^2.6.1",
"typescript": "^5.1.6",
"uuid": "^9.0.0"
},
"devDependencies": {
"@osskit/fetch-enhancers": "^4.1.0",
"@osskit/fetch-enhancers": "^4.1.2",
"@osskit/tsconfig": "^0.0.6",
"@types/lodash-es": "^4.17.7",
"husky": "^3.1.0",
"@testcontainers/kafka": "^10.2.1",
"@types/lodash-es": "^4.17.8",
"husky": "^8.0.3",
"kafkajs": "^2.2.4",
"lodash-es": "^4.17.21",
"p-retry": "^5.1.2",
"prettier": "^2.3.2",
"prettier-plugin-java": "^2.1.0",
"testcontainers": "^9.6.0",
"prettier": "^3.0.2",
"prettier-plugin-java": "^2.2.0",
"testcontainers": "^10.2.1",
"ts-node": "^10.9.1"
},
"husky": {
Expand Down
28 changes: 28 additions & 0 deletions tests/specs/__snapshots__/tests.spec.ts.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`tests Should debounce 4 records 2 of each key 1`] = `
[
{
"partitions": [
{
"metadata": null,
"offset": "2",
"partition": 0,
},
],
"topic": "test-output",
},
]
`;

exports[`tests Should debounce 4 records 2 of each key 2`] = `
{
"data": "foo1",
}
`;

exports[`tests Should debounce 4 records 2 of each key 3`] = `
{
"data": "bar1",
}
`;
100 changes: 99 additions & 1 deletion tests/specs/tests.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,101 @@
import {KafkaOrchestrator, Orchestrator} from '../testcontainers/orchestrator';
import {start as startKafka} from '../testcontainers/orchestrator.js';
import {Consumer, KafkaMessage, Producer} from 'kafkajs';
import delay from 'delay';

describe('tests', () => {
it('placeholder', async () => {});
let kafkaOrchestrator: KafkaOrchestrator;
let orchestrator: Orchestrator;
let producer: Producer;
let consumer: Consumer;

beforeAll(async () => {
kafkaOrchestrator = await startKafka();
}, 180000);

afterAll(async () => {
await kafkaOrchestrator.stop();
}, 180000);

afterEach(async () => {
if (producer) {
await producer.disconnect();
}
if (consumer) {
await consumer.disconnect();
}
await orchestrator.stop();
});

const start = async (sourceTopic: string, targetTopic: string, debounceSettings?: Record<string, string>) => {
const admin = kafkaOrchestrator.kafkaClient.admin();
await admin.createTopics({topics: [{topic: sourceTopic}, {topic: targetTopic}]});

orchestrator = await kafkaOrchestrator.startOrchestrator({
GROUP_ID: 'test',
SOURCE_TOPIC: sourceTopic,
TARGET_TOPIC: targetTopic,
...debounceSettings,
});

await orchestrator.debounceReady();

producer = kafkaOrchestrator.kafkaClient.producer();
consumer = kafkaOrchestrator.kafkaClient.consumer({groupId: 'test-consumer'});

await consumer.connect();
await consumer.subscribe({topic: targetTopic, fromBeginning: true});

await producer.connect();
};

const assertOffset = async (topic: string) => {
const admin = kafkaOrchestrator.kafkaClient.admin();

await admin.connect();

const metadata = await admin.fetchOffsets({groupId: 'test-consumer', topics: [topic]});

await admin.disconnect();

expect(metadata).toMatchSnapshot();
};

it('Should debounce 4 records 2 of each key', async () => {
await start('test-input', 'test-output', {
WINDOW_DURATION: '5',
});

await producer.send({topic: 'test-input', messages: [{value: JSON.stringify({data: 'foo'}), key: 'fookey'}]});

await delay(2000);

await producer.send({topic: 'test-input', messages: [{value: JSON.stringify({data: 'bar'}), key: 'barkey'}]});

await delay(1000);

await producer.send({topic: 'test-input', messages: [{value: JSON.stringify({data: 'foo1'}), key: 'fookey'}]});

await delay(1000);

await producer.send({topic: 'test-input', messages: [{value: JSON.stringify({data: 'bar1'}), key: 'barkey'}]});

await delay(5000);

const messages: KafkaMessage[] = [];
await new Promise<KafkaMessage[]>((resolve) => {
consumer.run({
eachMessage: async ({message}) => {
messages.push(message);
if (messages.length == 2) resolve(messages);
},
});
});
await delay(2000);

await assertOffset('test-output');

expect(JSON.parse(messages[0]?.value?.toString() ?? '{}')).toMatchSnapshot();
expect(JSON.parse(messages[1]?.value?.toString() ?? '{}')).toMatchSnapshot();
}, 180000);
});
2 changes: 1 addition & 1 deletion tests/testcontainers/kafka.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {StartedNetwork, Wait} from 'testcontainers';
import {KafkaContainer} from 'testcontainers';
import {KafkaContainer} from '@testcontainers/kafka';
import {Kafka, logLevel} from 'kafkajs';

export const kafka = async (network: StartedNetwork) => {
Expand Down
17 changes: 8 additions & 9 deletions tests/testcontainers/orchestrator.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import {Network, StartedNetwork} from 'testcontainers';
import {dafkaDebounce} from './dafkaDebounce.js';
import {kafka} from './kafka.js';
import {Kafka} from 'kafkajs';
import {dafkaDebounce} from './dafkaDebounce';

export interface KafkaOrchestrator {
stop: () => Promise<void>;
startOrchestrator: (dafkaEnv: Record<string, string>) => Promise<Orchestrator>;
startOrchestrator: (debounceEnv: Record<string, string>) => Promise<Orchestrator>;
kafkaClient: Kafka;
}

export interface Orchestrator {
stop: () => Promise<void>;
streamReady: () => Promise<Response>;
debounceReady: () => Promise<Response>;
}

export const start = async () => {
Expand All @@ -23,25 +23,24 @@ export const start = async () => {
kafkaClient,
stop: async () => {
await stopKafka();

await network.stop();
},
startOrchestrator: async (dafkaEnv: Record<string, string>) => {
return startOrchestratorInner(network, dafkaEnv);
startOrchestrator: async (debounceEnv: Record<string, string>) => {
return startOrchestratorInner(network, debounceEnv);
},
};
};

const startOrchestratorInner = async (
network: StartedNetwork,
dafkaEnv: Record<string, string>
debounceEnv: Record<string, string>
): Promise<Orchestrator> => {
const {ready: streamReady, stop: stopService} = await dafkaDebounce(network, dafkaEnv);
const {ready: debounceReady, stop: stopService} = await dafkaDebounce(network, debounceEnv);

return {
async stop() {
await stopService();
},
streamReady,
debounceReady,
};
};
Loading

0 comments on commit c00d09a

Please sign in to comment.