Skip to content

Commit

Permalink
refactor tests for stability
Browse files Browse the repository at this point in the history
  • Loading branch information
guysegal committed Nov 17, 2023
1 parent 21c8770 commit f218b94
Show file tree
Hide file tree
Showing 16 changed files with 188 additions and 195 deletions.
2 changes: 2 additions & 0 deletions tests/jest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ const config: JestConfigWithTsJest = {
// eslint-disable-next-line @typescript-eslint/naming-convention
'^(\\.{1,2}/.*)\\.js$': '$1',
},
setupFilesAfterEnv: ['<rootDir>/setupFilesAfterEnv.ts'],
testTimeout: 1800000,
};

export default config;
17 changes: 17 additions & 0 deletions tests/services/consume.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import {Kafka, KafkaMessage} from 'kafkajs';

export const consume = async (kafka: Kafka, topic: string, parse = true) => {
const consumer = kafka.consumer({groupId: 'orchestrator'});
await consumer.subscribe({topic: topic, fromBeginning: true});
const consumedMessage = await new Promise<KafkaMessage>((resolve) => {
consumer.run({
eachMessage: async ({message}) => resolve(message),
});
});
await consumer.disconnect();
const value = parse ? JSON.parse(consumedMessage.value?.toString() ?? '{}') : consumedMessage.value?.toString();
const headers = Object.fromEntries(
Object.entries(consumedMessage.headers!).map(([key, value]) => [key, value?.toString()])
);
return {value, headers};
};
3 changes: 3 additions & 0 deletions tests/setupFilesAfterEnv.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import {jest} from '@jest/globals';

jest.retryTimes(3, {logErrorsBeforeRetry: true});
3 changes: 0 additions & 3 deletions tests/specs/__snapshots__/failures.spec.ts.snap

This file was deleted.

18 changes: 18 additions & 0 deletions tests/specs/__snapshots__/produce.spec.ts.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`tests produce 1`] = `
{
"headers": {
"x-b3-flags": "1",
"x-b3-parentspanid": "101112",
"x-b3-sampled": "1",
"x-b3-spanid": "789",
"x-b3-traceid": "456",
"x-ot-span-context": "foo",
"x-request-id": "123",
},
"value": {
"data": "foo",
},
}
`;
3 changes: 3 additions & 0 deletions tests/specs/__snapshots__/produceFailure.spec.ts.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`tests produce failure 1`] = `[TypeError: fetch failed]`;
27 changes: 0 additions & 27 deletions tests/specs/__snapshots__/tests.spec.ts.snap

This file was deleted.

29 changes: 0 additions & 29 deletions tests/specs/failures.spec.ts

This file was deleted.

51 changes: 51 additions & 0 deletions tests/specs/produce.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import delay from 'delay';

import type {Orchestrator} from '../testcontainers/orchestrator.js';
import {start} from '../testcontainers/orchestrator.js';
import {consume} from '../services/consume.js';

const topic = 'my-topic';

describe('tests', () => {
let orchestrator: Orchestrator;

beforeEach(async () => {
orchestrator = await start(
{
KAFKA_BROKER: 'kafka:9092',
MAX_BLOCK_MS: '1000',
},
[topic]
);
}, 5 * 60 * 1000);

afterEach(async () => {
if (!orchestrator) {
return;
}
await orchestrator.stop();
});

it('produce', async () => {
orchestrator.dafkaProducer.produce([
{
topic,
key: 'thekey',
value: {data: 'foo'},
headers: {
'x-request-id': '123',
'x-b3-traceid': '456',
'x-b3-spanid': '789',
'x-b3-parentspanid': '101112',
'x-b3-sampled': '1',
'x-b3-flags': '1',
'x-ot-span-context': 'foo',
},
},
]);

await delay(5000);

await expect(consume(orchestrator.kafkaClient, topic)).resolves.toMatchSnapshot();
});
});
33 changes: 33 additions & 0 deletions tests/specs/produceFailure.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import type {Orchestrator} from '../testcontainers/orchestrator.js';
import {start} from '../testcontainers/orchestrator.js';

describe('tests', () => {
let orchestrator: Orchestrator;

beforeEach(async () => {
orchestrator = await start(
{
KAFKA_BROKER: 'kafka:9092',
},
[]
);
}, 5 * 60 * 1000);

afterEach(async () => {
if (!orchestrator) {
return;
}
await orchestrator.stop();
});

it('produce failure', async () => {
expect(() =>
orchestrator.dafkaProducer.produce([
{
topic: 'not exists',
value: {data: 'foo'},
},
])
).rejects.toMatchSnapshot();
});
});
61 changes: 0 additions & 61 deletions tests/specs/tests.spec.ts

This file was deleted.

43 changes: 43 additions & 0 deletions tests/testcontainers/dafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import {StartedNetwork, Wait} from 'testcontainers';
import {GenericContainer} from 'testcontainers';
import fs from 'node:fs';
import {withThrow} from '@osskit/fetch-enhancers';

const enhanchedFetch = withThrow(fetch);

export interface ServiceClient {
produce: (payload: any) => Promise<Response>;
}

export const dafka = async (network: StartedNetwork, env: Record<string, string>) => {
const container = await new GenericContainer('bazel/src:image')
.withExposedPorts(8080)
.withNetwork(network)
.withEnvironment(env)
.withStartupTimeout(parseInt(process.env.STARTUP_TIMEOUT ?? '60000'))
.withWaitStrategy(Wait.forHttp('/ready', 8080).forStatusCode(200))
.start();

if (process.env.DEBUG) {
try {
fs.truncateSync('service.log', 0);
} catch (err) {
fs.writeFileSync('service.log', '', {flag: 'wx'});
}
await container.logs().then((logs) => logs.pipe(fs.createWriteStream('service.log')));
}

const baseUrl = `http://localhost:${container.getMappedPort(8080)}`;

return {
stop: () => container.stop(),
client: {
produce: (payload: any) =>
enhanchedFetch(`${baseUrl}/produce`, {
method: 'post',
body: JSON.stringify(payload),
headers: {'content-type': 'application/json'},
}),
},
};
};
43 changes: 0 additions & 43 deletions tests/testcontainers/dafkaProducer.ts

This file was deleted.

28 changes: 9 additions & 19 deletions tests/testcontainers/kafka.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,24 @@
import {StartedNetwork, Wait} from 'testcontainers';
import {KafkaContainer} from 'testcontainers';
import {KafkaContainer} from '@testcontainers/kafka';
import {Kafka, logLevel} from 'kafkajs';
import delay from 'delay';

export const kafka = async (network: StartedNetwork, kafkaConfig?: Record<string, string>) => {
export const kafka = async (network: StartedNetwork, topics: string[]) => {
const container = await new KafkaContainer('confluentinc/cp-kafka:7.2.2')
.withNetwork(network)
.withNetworkAliases('kafka')
.withEnvironment({
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT',
KAFKA_INTER_BROKER_LISTENER_NAME: 'BROKER',
KAFKA_BROKER_ID: '1',
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1',
KAFKA_STATE_LOG_REPLICATION_FACTOR: '1',
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '1',
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: '1',
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: '1',
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: '9223372036854775807',
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0',
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false',
...kafkaConfig,
})
.withWaitStrategy(
Wait.forLogMessage('Registered broker 1 at path /brokers/ids/1 with addresses: BROKER://kafka:9092')
)
.withWaitStrategy(Wait.forLogMessage('started (kafka.server.KafkaServer)'))
.start();

await delay(10000);

const client = new Kafka({
logLevel: logLevel.NOTHING,
brokers: [`${container.getHost()}:${container.getMappedPort(9093)}`],
});

await client.admin().createTopics({topics: topics.map((topic) => ({topic, numPartitions: 100}))});

return {
stop: () => container.stop(),
client,
Expand Down
Loading

0 comments on commit f218b94

Please sign in to comment.