From f218b9434ce4caab33ef8faa60374f8a18fb6b1d Mon Sep 17 00:00:00 2001 From: Guy Segal Date: Fri, 17 Nov 2023 21:19:48 +0200 Subject: [PATCH] refactor tests for stability --- tests/jest.config.ts | 2 + tests/services/consume.ts | 17 ++++++ tests/setupFilesAfterEnv.ts | 3 + .../specs/__snapshots__/failures.spec.ts.snap | 3 - .../specs/__snapshots__/produce.spec.ts.snap | 18 ++++++ .../__snapshots__/produceFailure.spec.ts.snap | 3 + tests/specs/__snapshots__/tests.spec.ts.snap | 27 -------- tests/specs/failures.spec.ts | 29 --------- tests/specs/produce.spec.ts | 51 ++++++++++++++++ tests/specs/produceFailure.spec.ts | 33 ++++++++++ tests/specs/tests.spec.ts | 61 ------------------- tests/testcontainers/dafka.ts | 43 +++++++++++++ tests/testcontainers/dafkaProducer.ts | 43 ------------- tests/testcontainers/kafka.ts | 28 +++------ tests/testcontainers/orchestrator.ts | 20 +++--- tests/tsconfig.json | 2 +- 16 files changed, 188 insertions(+), 195 deletions(-) create mode 100644 tests/services/consume.ts create mode 100644 tests/setupFilesAfterEnv.ts delete mode 100644 tests/specs/__snapshots__/failures.spec.ts.snap create mode 100644 tests/specs/__snapshots__/produce.spec.ts.snap create mode 100644 tests/specs/__snapshots__/produceFailure.spec.ts.snap delete mode 100644 tests/specs/__snapshots__/tests.spec.ts.snap delete mode 100644 tests/specs/failures.spec.ts create mode 100644 tests/specs/produce.spec.ts create mode 100644 tests/specs/produceFailure.spec.ts delete mode 100644 tests/specs/tests.spec.ts create mode 100644 tests/testcontainers/dafka.ts delete mode 100644 tests/testcontainers/dafkaProducer.ts diff --git a/tests/jest.config.ts b/tests/jest.config.ts index 437d2ee..5bac063 100644 --- a/tests/jest.config.ts +++ b/tests/jest.config.ts @@ -21,6 +21,8 @@ const config: JestConfigWithTsJest = { // eslint-disable-next-line @typescript-eslint/naming-convention '^(\\.{1,2}/.*)\\.js$': '$1', }, + setupFilesAfterEnv: ['/setupFilesAfterEnv.ts'], + testTimeout: 1800000, }; export default config; diff --git a/tests/services/consume.ts b/tests/services/consume.ts new file mode 100644 index 0000000..5ced5c8 --- /dev/null +++ b/tests/services/consume.ts @@ -0,0 +1,17 @@ +import {Kafka, KafkaMessage} from 'kafkajs'; + +export const consume = async (kafka: Kafka, topic: string, parse = true) => { + const consumer = kafka.consumer({groupId: 'orchestrator'}); + await consumer.subscribe({topic: topic, fromBeginning: true}); + const consumedMessage = await new Promise((resolve) => { + consumer.run({ + eachMessage: async ({message}) => resolve(message), + }); + }); + await consumer.disconnect(); + const value = parse ? JSON.parse(consumedMessage.value?.toString() ?? '{}') : consumedMessage.value?.toString(); + const headers = Object.fromEntries( + Object.entries(consumedMessage.headers!).map(([key, value]) => [key, value?.toString()]) + ); + return {value, headers}; +}; diff --git a/tests/setupFilesAfterEnv.ts b/tests/setupFilesAfterEnv.ts new file mode 100644 index 0000000..b0e7ac6 --- /dev/null +++ b/tests/setupFilesAfterEnv.ts @@ -0,0 +1,3 @@ +import {jest} from '@jest/globals'; + +jest.retryTimes(3, {logErrorsBeforeRetry: true}); diff --git a/tests/specs/__snapshots__/failures.spec.ts.snap b/tests/specs/__snapshots__/failures.spec.ts.snap deleted file mode 100644 index 69e2935..0000000 --- a/tests/specs/__snapshots__/failures.spec.ts.snap +++ /dev/null @@ -1,3 +0,0 @@ -// Jest Snapshot v1, https://goo.gl/fbAQLP - -exports[`tests should throw if failed to produce 1`] = `[TypeError: fetch failed]`; diff --git a/tests/specs/__snapshots__/produce.spec.ts.snap b/tests/specs/__snapshots__/produce.spec.ts.snap new file mode 100644 index 0000000..5d28b0f --- /dev/null +++ b/tests/specs/__snapshots__/produce.spec.ts.snap @@ -0,0 +1,18 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`tests produce 1`] = ` +{ + "headers": { + "x-b3-flags": "1", + "x-b3-parentspanid": "101112", + "x-b3-sampled": "1", + "x-b3-spanid": "789", + "x-b3-traceid": "456", + "x-ot-span-context": "foo", + "x-request-id": "123", + }, + "value": { + "data": "foo", + }, +} +`; diff --git a/tests/specs/__snapshots__/produceFailure.spec.ts.snap b/tests/specs/__snapshots__/produceFailure.spec.ts.snap new file mode 100644 index 0000000..3af33b2 --- /dev/null +++ b/tests/specs/__snapshots__/produceFailure.spec.ts.snap @@ -0,0 +1,3 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`tests produce failure 1`] = `[TypeError: fetch failed]`; diff --git a/tests/specs/__snapshots__/tests.spec.ts.snap b/tests/specs/__snapshots__/tests.spec.ts.snap deleted file mode 100644 index e6ac7cf..0000000 --- a/tests/specs/__snapshots__/tests.spec.ts.snap +++ /dev/null @@ -1,27 +0,0 @@ -// Jest Snapshot v1, https://goo.gl/fbAQLP - -exports[`tests produce and consume 1`] = ` -{ - "data": "foo", -} -`; - -exports[`tests produce and consume 2`] = ` -{ - "x-b3-flags": "1", - "x-b3-parentspanid": "101112", - "x-b3-sampled": "1", - "x-b3-spanid": "789", - "x-b3-traceid": "456", - "x-ot-span-context": "foo", - "x-request-id": "123", -} -`; - -exports[`tests produce and consume 3`] = ` -{ - "data": "foo", -} -`; - -exports[`tests produce and consume 4`] = `{}`; diff --git a/tests/specs/failures.spec.ts b/tests/specs/failures.spec.ts deleted file mode 100644 index 6adc501..0000000 --- a/tests/specs/failures.spec.ts +++ /dev/null @@ -1,29 +0,0 @@ -import type {Orchestrator} from '../testcontainers/orchestrator.js'; -import {start as startKafka} from '../testcontainers/orchestrator.js'; - -describe('tests', () => { - let orchestrator: Orchestrator; - - beforeAll(async () => { - orchestrator = await startKafka({ - KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false', - }); - }, 1800000); - - afterAll(async () => { - await orchestrator.stop(); - }, 1800000); - - it('should throw if failed to produce', async () => { - const topic = `topic-${Date.now()}`; - - expect(() => - orchestrator.produce([ - { - topic, - value: {data: 'foo'}, - }, - ]) - ).rejects.toMatchSnapshot(); - }); -}); diff --git a/tests/specs/produce.spec.ts b/tests/specs/produce.spec.ts new file mode 100644 index 0000000..905cb92 --- /dev/null +++ b/tests/specs/produce.spec.ts @@ -0,0 +1,51 @@ +import delay from 'delay'; + +import type {Orchestrator} from '../testcontainers/orchestrator.js'; +import {start} from '../testcontainers/orchestrator.js'; +import {consume} from '../services/consume.js'; + +const topic = 'my-topic'; + +describe('tests', () => { + let orchestrator: Orchestrator; + + beforeEach(async () => { + orchestrator = await start( + { + KAFKA_BROKER: 'kafka:9092', + MAX_BLOCK_MS: '1000', + }, + [topic] + ); + }, 5 * 60 * 1000); + + afterEach(async () => { + if (!orchestrator) { + return; + } + await orchestrator.stop(); + }); + + it('produce', async () => { + orchestrator.dafkaProducer.produce([ + { + topic, + key: 'thekey', + value: {data: 'foo'}, + headers: { + 'x-request-id': '123', + 'x-b3-traceid': '456', + 'x-b3-spanid': '789', + 'x-b3-parentspanid': '101112', + 'x-b3-sampled': '1', + 'x-b3-flags': '1', + 'x-ot-span-context': 'foo', + }, + }, + ]); + + await delay(5000); + + await expect(consume(orchestrator.kafkaClient, topic)).resolves.toMatchSnapshot(); + }); +}); diff --git a/tests/specs/produceFailure.spec.ts b/tests/specs/produceFailure.spec.ts new file mode 100644 index 0000000..cedbda3 --- /dev/null +++ b/tests/specs/produceFailure.spec.ts @@ -0,0 +1,33 @@ +import type {Orchestrator} from '../testcontainers/orchestrator.js'; +import {start} from '../testcontainers/orchestrator.js'; + +describe('tests', () => { + let orchestrator: Orchestrator; + + beforeEach(async () => { + orchestrator = await start( + { + KAFKA_BROKER: 'kafka:9092', + }, + [] + ); + }, 5 * 60 * 1000); + + afterEach(async () => { + if (!orchestrator) { + return; + } + await orchestrator.stop(); + }); + + it('produce failure', async () => { + expect(() => + orchestrator.dafkaProducer.produce([ + { + topic: 'not exists', + value: {data: 'foo'}, + }, + ]) + ).rejects.toMatchSnapshot(); + }); +}); diff --git a/tests/specs/tests.spec.ts b/tests/specs/tests.spec.ts deleted file mode 100644 index 7a667f3..0000000 --- a/tests/specs/tests.spec.ts +++ /dev/null @@ -1,61 +0,0 @@ -import type {Orchestrator} from '../testcontainers/orchestrator.js'; -import {start as startKafka} from '../testcontainers/orchestrator.js'; -import {KafkaMessage} from 'kafkajs'; - -describe('tests', () => { - let orchestrator: Orchestrator; - - beforeAll(async () => { - orchestrator = await startKafka(); - }, 1800000); - - afterAll(async () => { - await orchestrator.stop(); - }, 1800000); - - it.each([ - { - key: 'thekey', - value: {data: 'foo'}, - headers: { - 'x-request-id': '123', - 'x-b3-traceid': '456', - 'x-b3-spanid': '789', - 'x-b3-parentspanid': '101112', - 'x-b3-sampled': '1', - 'x-b3-flags': '1', - 'x-ot-span-context': 'foo', - }, - }, - { - value: {data: 'foo'}, - }, - ])('produce and consume', async (message) => { - const topic = `topic-${Date.now()}`; - - const consumer = orchestrator.kafkaClient.consumer({groupId: 'test'}); - await consumer.connect(); - - await consumer.subscribe({topic, fromBeginning: true}); - - await orchestrator.produce([ - { - topic, - ...message, - }, - ]); - - const consumedMessage = await new Promise((resolve) => { - consumer.run({ - eachMessage: async ({message}) => resolve(message), - }); - }); - - expect(JSON.parse(consumedMessage.value?.toString() ?? '{}')).toMatchSnapshot(); - expect( - Object.fromEntries(Object.entries(consumedMessage.headers!).map(([key, value]) => [key, value?.toString()])) - ).toMatchSnapshot(); - - await consumer.disconnect(); - }); -}); diff --git a/tests/testcontainers/dafka.ts b/tests/testcontainers/dafka.ts new file mode 100644 index 0000000..3c20c42 --- /dev/null +++ b/tests/testcontainers/dafka.ts @@ -0,0 +1,43 @@ +import {StartedNetwork, Wait} from 'testcontainers'; +import {GenericContainer} from 'testcontainers'; +import fs from 'node:fs'; +import {withThrow} from '@osskit/fetch-enhancers'; + +const enhanchedFetch = withThrow(fetch); + +export interface ServiceClient { + produce: (payload: any) => Promise; +} + +export const dafka = async (network: StartedNetwork, env: Record) => { + const container = await new GenericContainer('bazel/src:image') + .withExposedPorts(8080) + .withNetwork(network) + .withEnvironment(env) + .withStartupTimeout(parseInt(process.env.STARTUP_TIMEOUT ?? '60000')) + .withWaitStrategy(Wait.forHttp('/ready', 8080).forStatusCode(200)) + .start(); + + if (process.env.DEBUG) { + try { + fs.truncateSync('service.log', 0); + } catch (err) { + fs.writeFileSync('service.log', '', {flag: 'wx'}); + } + await container.logs().then((logs) => logs.pipe(fs.createWriteStream('service.log'))); + } + + const baseUrl = `http://localhost:${container.getMappedPort(8080)}`; + + return { + stop: () => container.stop(), + client: { + produce: (payload: any) => + enhanchedFetch(`${baseUrl}/produce`, { + method: 'post', + body: JSON.stringify(payload), + headers: {'content-type': 'application/json'}, + }), + }, + }; +}; diff --git a/tests/testcontainers/dafkaProducer.ts b/tests/testcontainers/dafkaProducer.ts deleted file mode 100644 index fae03ec..0000000 --- a/tests/testcontainers/dafkaProducer.ts +++ /dev/null @@ -1,43 +0,0 @@ -import {StartedNetwork, StoppedTestContainer} from 'testcontainers'; -import {GenericContainer, Wait} from 'testcontainers'; -import {withThrow} from '@osskit/fetch-enhancers'; - -const startupTimeout = parseInt(process.env.STARTUP_TIMEOUT ?? '60000'); - -const enhanchedFetch = withThrow(fetch); - -export interface ServiceContainer { - stop: () => Promise; - produce: (payload: any) => Promise; -} - -export const dafkaProducer = async (network: StartedNetwork): Promise => { - const container = await new GenericContainer('bazel/src:image') - .withExposedPorts(8080) - .withNetwork(network) - .withEnvironment({ - KAFKA_BROKER: 'kafka:9092', - MAX_BLOCK_MS: '1000', - }) - .withStartupTimeout(startupTimeout) - .withWaitStrategy(Wait.forHttp('/ready', 8080).forStatusCode(200)) - - .start(); - - if (process.env.VERBOSE) { - const logs = await container.logs(); - logs.pipe(process.stdout); - } - - const baseUrl = `http://localhost:${container.getMappedPort(8080)}`; - - return { - stop: () => container.stop(), - produce: (payload: any) => - enhanchedFetch(`${baseUrl}/produce`, { - method: 'post', - body: JSON.stringify(payload), - headers: {'content-type': 'application/json'}, - }), - }; -}; diff --git a/tests/testcontainers/kafka.ts b/tests/testcontainers/kafka.ts index c00df8c..ca39ecb 100644 --- a/tests/testcontainers/kafka.ts +++ b/tests/testcontainers/kafka.ts @@ -1,34 +1,24 @@ import {StartedNetwork, Wait} from 'testcontainers'; -import {KafkaContainer} from 'testcontainers'; +import {KafkaContainer} from '@testcontainers/kafka'; import {Kafka, logLevel} from 'kafkajs'; +import delay from 'delay'; -export const kafka = async (network: StartedNetwork, kafkaConfig?: Record) => { +export const kafka = async (network: StartedNetwork, topics: string[]) => { const container = await new KafkaContainer('confluentinc/cp-kafka:7.2.2') .withNetwork(network) .withNetworkAliases('kafka') - .withEnvironment({ - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT', - KAFKA_INTER_BROKER_LISTENER_NAME: 'BROKER', - KAFKA_BROKER_ID: '1', - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1', - KAFKA_STATE_LOG_REPLICATION_FACTOR: '1', - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '1', - KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: '1', - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: '1', - KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: '9223372036854775807', - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0', - KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false', - ...kafkaConfig, - }) - .withWaitStrategy( - Wait.forLogMessage('Registered broker 1 at path /brokers/ids/1 with addresses: BROKER://kafka:9092') - ) + .withWaitStrategy(Wait.forLogMessage('started (kafka.server.KafkaServer)')) .start(); + + await delay(10000); + const client = new Kafka({ logLevel: logLevel.NOTHING, brokers: [`${container.getHost()}:${container.getMappedPort(9093)}`], }); + await client.admin().createTopics({topics: topics.map((topic) => ({topic, numPartitions: 100}))}); + return { stop: () => container.stop(), client, diff --git a/tests/testcontainers/orchestrator.ts b/tests/testcontainers/orchestrator.ts index 60de4ad..2255196 100644 --- a/tests/testcontainers/orchestrator.ts +++ b/tests/testcontainers/orchestrator.ts @@ -1,30 +1,26 @@ import {Network} from 'testcontainers'; -import {dafkaProducer} from './dafkaProducer.js'; +import {ServiceClient, dafka} from './dafka.js'; import {kafka} from './kafka.js'; - import {Kafka} from 'kafkajs'; export interface Orchestrator { - stop: () => Promise; - produce: (payload: any) => Promise; kafkaClient: Kafka; + dafkaProducer: ServiceClient; + stop: () => Promise; } - -export const start = async (kafkaConfig?: Record) => { +export const start = async (env: Record, topics: string[]) => { const network = await new Network().start(); - const [{client: kafkaClient, stop: stopKafka}, {produce: produce, stop: stopService}] = await Promise.all([ - kafka(network, kafkaConfig), - dafkaProducer(network), - ]); + const {client: kafkaClient, stop: stopKafka} = await kafka(network, topics); + const {stop: stopDafka, client: dafkaProducer} = await dafka(network, env); return { kafkaClient, + dafkaProducer, stop: async () => { - await stopService(); + await stopDafka(); await stopKafka(); await network.stop(); }, - produce, }; }; diff --git a/tests/tsconfig.json b/tests/tsconfig.json index 8d3af4b..7c774e7 100644 --- a/tests/tsconfig.json +++ b/tests/tsconfig.json @@ -3,5 +3,5 @@ "compilerOptions": { "module": "ES2020" }, - "include": ["specs", "testcontainers"] + "include": ["specs", "testcontainers", "services"] }