Skip to content

Commit

Permalink
fix(msar)!: snapshot --all pooled queue
Browse files Browse the repository at this point in the history
Close #174.

BREAKING: Renamed `--batchSize` flag to `--concurrentThreads` to better reflect its purpose.
  • Loading branch information
battis committed Jan 3, 2025
1 parent 0c260f2 commit a256536
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 19 deletions.
1 change: 1 addition & 0 deletions packages/msar/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"datadirect-puppeteer": "workspace:*",
"dotenv": "^16.4.7",
"ejs": "^3.1.10",
"p-queue": "^8.0.1",
"puppeteer": "^23.11.1"
},
"devDependencies": {
Expand Down
8 changes: 4 additions & 4 deletions packages/msar/src/common/workflow/args/options.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import cli from '@battis/qui-cli';

export const defaults = {
batchSize: 10
concurrentThreads: 10
};

export const options = {
batchSize: {
description: `Number of simultaneous requests to batch together (default: ${cli.colors.value(defaults.batchSize)})`,
default: defaults.batchSize.toString()
concurrentThreads: {
description: `Number of simultaneous requests to batch together (default: ${cli.colors.value(defaults.concurrentThreads)})`,
default: defaults.concurrentThreads.toString()
}
};
8 changes: 6 additions & 2 deletions packages/msar/src/common/workflow/args/parse.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import { defaults } from './options.js';

export type Parsed = {
ignoreErrors: boolean;
batchSize: number;
concurrentThreads: number;
logRequests: boolean;
};

export function parse(values: Record<string, any>): Parsed {
return {
ignoreErrors: !!values.ignoreErrors,
batchSize: parseInt(values.batchSize),
concurrentThreads: parseInt(
values.concurrentThreads || defaults.concurrentThreads.toString()
),
logRequests: !!values.logRequests
};
}
31 changes: 18 additions & 13 deletions packages/msar/src/workflows/Snapshot/Manager/All.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ import cli from '@battis/qui-cli';
import cliProgress from 'cli-progress';
import { api, PuppeteerSession } from 'datadirect-puppeteer';
import crypto from 'node:crypto';
import { EventEmitter } from 'node:events';
import fs from 'node:fs/promises';
import path from 'node:path';
import PQueue from 'p-queue';
import * as common from '../../../common.js';
import * as Single from './Single.js';

Expand All @@ -19,6 +21,9 @@ export type Options = Single.SnapshotOptions &
url: URL | string;
} & common.args.Parsed;

export type Item = Single.Data;
export type Data = Item[];

const TEMP = path.join('/tmp/msar/snapshot', crypto.randomUUID());

function cleanSplit(list?: string) {
Expand All @@ -40,7 +45,7 @@ export async function snapshot({
if (!year) {
throw new Error(`year must be defined`);
}
const { ignoreErrors, batchSize } = options;
const { ignoreErrors, concurrentThreads } = options;
const { outputPath, pretty } = outputOptions;

const session = await PuppeteerSession.Fetchable.init(url, {
Expand All @@ -54,7 +59,6 @@ export async function snapshot({
const terms = cleanSplit(termsOffered);
const groups = (
await api.datadirect.groupFinderByYear({
session,
...options,
payload: {
schoolYearLabel: year
Expand Down Expand Up @@ -88,17 +92,20 @@ export async function snapshot({
}

const errors: typeof groups = [];
const data: Data = [];

// FIXME msar snapshot --all needs to return to batched snapshots
for (let i = 0; i < groups.length; i++) {
async function snapshotGroup(i: number) {
const tempPath = path.join(TEMP, `${pad(i)}.json`);
try {
const tempPath = path.join(TEMP, `${pad(i)}.json`);
const snapshot = await Single.snapshot({
session,
...options,
groupId: groups[i].lead_pk,
quit: true
});
data[i] = snapshot;
// TODO Configurable snapshot --all temp directory
// TODO Optional snapshot --all temp files
common.output.writeJSON(tempPath, snapshot);
progressBars.log(
`Wrote snapshot ${snapshot?.SectionInfo?.Teacher}'s ${snapshot?.SectionInfo?.SchoolYear} ${snapshot?.SectionInfo?.GroupName} ${snapshot?.SectionInfo?.Block} to ${cli.colors.url(outputPath)}\n`
Expand All @@ -114,20 +121,18 @@ export async function snapshot({
progress.increment();
}

const data: Single.Data[] = [];
const queue = new PQueue({ concurrency: concurrentThreads });
await queue.addAll(groups.map((group, i) => snapshotGroup.bind(null, i)));

let Start = new Date();
let Finish = new Date('1/1/1970');
let first: Single.Metadata | undefined = undefined;

const partials = await fs.readdir(TEMP);
for (const partial of partials) {
const snapshot = JSON.parse(
(await fs.readFile(path.join(TEMP, partial))).toString()
) as Single.Data;
data.push(snapshot);
for (const snapshot of data) {
if (snapshot.Metadata.Start < Start) {
Start = snapshot.Metadata.Start;
}
// FIXME snapshot --all Finish is not being calculated correctly
if (snapshot.Metadata.Finish > Finish) {
Finish = snapshot.Metadata.Finish;
}
Expand All @@ -146,7 +151,7 @@ export async function snapshot({
Start,
Finish,
year,
batchSize,
concurrentThreads,
groupsPath,
bulletinBoard,
topics,
Expand Down
23 changes: 23 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a256536

Please sign in to comment.