Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions .yarn/versions/85334687.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
releases:
"@yarnpkg/cli": minor
"@yarnpkg/core": minor
"@yarnpkg/plugin-file": minor
"@yarnpkg/plugin-git": minor
"@yarnpkg/plugin-github": minor
"@yarnpkg/plugin-http": minor
"@yarnpkg/plugin-npm": minor

declined:
- "@yarnpkg/plugin-compat"
- "@yarnpkg/plugin-constraints"
- "@yarnpkg/plugin-dlx"
- "@yarnpkg/plugin-essentials"
- "@yarnpkg/plugin-exec"
- "@yarnpkg/plugin-init"
- "@yarnpkg/plugin-interactive-tools"
- "@yarnpkg/plugin-link"
- "@yarnpkg/plugin-nm"
- "@yarnpkg/plugin-npm-cli"
- "@yarnpkg/plugin-pack"
- "@yarnpkg/plugin-patch"
- "@yarnpkg/plugin-pnp"
- "@yarnpkg/plugin-pnpm"
- "@yarnpkg/plugin-stage"
- "@yarnpkg/plugin-typescript"
- "@yarnpkg/plugin-version"
- "@yarnpkg/plugin-workspace-tools"
- "@yarnpkg/builder"
- "@yarnpkg/doctor"
- "@yarnpkg/extensions"
- "@yarnpkg/nm"
- "@yarnpkg/pnpify"
- "@yarnpkg/sdks"
15 changes: 15 additions & 0 deletions packages/docusaurus/static/configuration/yarnrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,21 @@
}
}
},
"taskPoolConcurrency": {
"_package": "@yarnpkg/core",
"title": "Maximal amount of concurrent heavy task processing.",
"description": "We default to the platform parallelism, but for some CI, `os.cpus` may not report accurate values and may overwhelm their containers.",
"type": "number",
"default": "os.availableParallelism()"
},
"workerPoolMode": {
"_package": "@yarnpkg/core",
"title": "Execution strategy for heavy tasks.",
"description": "By default will use workers when performing heavy tasks, such as converting tgz files to zip. This setting can be used to disable workers and use a regular in-thread async processing.",
"type": "string",
"enum": ["async", "workers"],
"default": "workers"
},
"telemetryInterval": {
"_package": "@yarnpkg/core",
"title": "Define the minimal amount of time between two telemetry events, in days.",
Expand Down
2 changes: 1 addition & 1 deletion packages/plugin-file/sources/TarballFileFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class TarballFileFetcher implements Fetcher {
const sourceBuffer = await fileUtils.fetchArchiveFromLocator(locator, opts);

return await tgzUtils.convertToZip(sourceBuffer, {
compressionLevel: opts.project.configuration.get(`compressionLevel`),
configuration: opts.project.configuration,
prefixPath: structUtils.getIdentVendorPath(locator),
stripComponents: 1,
});
Expand Down
2 changes: 1 addition & 1 deletion packages/plugin-git/sources/GitFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class GitFetcher implements Fetcher {

return await miscUtils.releaseAfterUseAsync(async () => {
return await tgzUtils.convertToZip(sourceBuffer, {
compressionLevel: opts.project.configuration.get(`compressionLevel`),
configuration: opts.project.configuration,
prefixPath: structUtils.getIdentVendorPath(locator),
stripComponents: 1,
});
Expand Down
2 changes: 1 addition & 1 deletion packages/plugin-github/sources/GithubFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export class GithubFetcher implements Fetcher {
const packedBuffer = await xfs.readFilePromise(packagePath);

return await tgzUtils.convertToZip(packedBuffer, {
compressionLevel: opts.project.configuration.get(`compressionLevel`),
configuration: opts.project.configuration,
prefixPath: structUtils.getIdentVendorPath(locator),
stripComponents: 1,
});
Expand Down
2 changes: 1 addition & 1 deletion packages/plugin-http/sources/TarballHttpFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class TarballHttpFetcher implements Fetcher {
});

return await tgzUtils.convertToZip(sourceBuffer, {
compressionLevel: opts.project.configuration.get(`compressionLevel`),
configuration: opts.project.configuration,
prefixPath: structUtils.getIdentVendorPath(locator),
stripComponents: 1,
});
Expand Down
2 changes: 1 addition & 1 deletion packages/plugin-npm/sources/NpmHttpFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class NpmHttpFetcher implements Fetcher {
});

return await tgzUtils.convertToZip(sourceBuffer, {
compressionLevel: opts.project.configuration.get(`compressionLevel`),
configuration: opts.project.configuration,
prefixPath: structUtils.getIdentVendorPath(locator),
stripComponents: 1,
});
Expand Down
2 changes: 1 addition & 1 deletion packages/plugin-npm/sources/NpmSemverFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export class NpmSemverFetcher implements Fetcher {
}

return await tgzUtils.convertToZip(sourceBuffer, {
compressionLevel: opts.project.configuration.get(`compressionLevel`),
configuration: opts.project.configuration,
prefixPath: structUtils.getIdentVendorPath(locator),
stripComponents: 1,
});
Expand Down
13 changes: 13 additions & 0 deletions packages/yarnpkg-core/sources/Configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,17 @@ export const coreDefinitions: {[coreSettingName: string]: SettingsDefinition} =
type: SettingsType.NUMBER,
default: 50,
},
taskPoolConcurrency: {
description: `Maximal amount of concurrent heavy task processing`,
type: SettingsType.NUMBER,
default: nodeUtils.availableParallelism(),
},
taskPoolMode: {
description: `Execution strategy for heavy tasks`,
type: SettingsType.STRING,
values: [`async`, `workers`],
default: `workers`,
},
networkSettings: {
description: `Network settings per hostname (glob patterns are supported)`,
type: SettingsType.MAP,
Expand Down Expand Up @@ -643,6 +654,8 @@ export interface ConfigurationValueMap {
httpsKeyFilePath: PortablePath | null;
httpsCertFilePath: PortablePath | null;
enableStrictSsl: boolean;
taskPoolConcurrency: number;
taskPoolMode: string;

logFilters: Array<miscUtils.ToMapValue<{code?: string, text?: string, pattern?: string, level?: formatUtils.LogLevel | null}>>;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,38 @@
import PLimit from 'p-limit';
import {Worker} from 'worker_threads';

import * as nodeUtils from './nodeUtils';
import pLimit, {Limit} from 'p-limit';
import {Worker} from 'worker_threads';

const kTaskInfo = Symbol(`kTaskInfo`);

type PoolWorker<TOut> = Worker & {
[kTaskInfo]: null | { resolve: (value: TOut) => void, reject: (reason?: any) => void };
};

export class WorkerPool<TIn, TOut> {
export interface TaskPool<TIn, TOut> {
run(data: TIn): Promise<TOut>;
}

export class AsyncPool<TIn, TOut> implements TaskPool<TIn, TOut> {
private limit: Limit;

constructor(private fn: (data: TIn) => Promise<TOut>, opts: {poolSize: number}) {
this.limit = pLimit(opts.poolSize);
}

run(data: TIn) {
return this.limit(() => this.fn(data));
}
}

export class WorkerPool<TIn, TOut> implements TaskPool<TIn, TOut> {
private workers: Array<PoolWorker<TOut>> = [];
private limit = PLimit(nodeUtils.availableParallelism());

private cleanupInterval: ReturnType<typeof setInterval>;

constructor(private source: string) {
private limit: Limit;

constructor(private source: string, opts: {poolSize: number}) {
this.limit = pLimit(opts.poolSize);

this.cleanupInterval = setInterval(() => {
if (this.limit.pendingCount === 0 && this.limit.activeCount === 0) {
// Start terminating one worker at a time when there are no tasks left.
Expand Down
13 changes: 9 additions & 4 deletions packages/yarnpkg-core/sources/miscUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ export function convertMapsToIndexableObjects<T>(arg: T): MapValueToObjectValue<
return arg as MapValueToObjectValue<T>;
}

export function getFactoryWithDefault<K, T>(map: Map<K, T>, key: K, factory: () => T) {
export interface GetSetMap<K, V> {
get(k: K): V | undefined;
set(k: K, v: V): void;
}

export function getFactoryWithDefault<K, T>(map: GetSetMap<K, T>, key: K, factory: () => T) {
let value = map.get(key);

if (typeof value === `undefined`)
Expand All @@ -129,7 +134,7 @@ export function getFactoryWithDefault<K, T>(map: Map<K, T>, key: K, factory: ()
return value;
}

export function getArrayWithDefault<K, T>(map: Map<K, Array<T>>, key: K) {
export function getArrayWithDefault<K, T>(map: GetSetMap<K, Array<T>>, key: K) {
let value = map.get(key);

if (typeof value === `undefined`)
Expand All @@ -138,7 +143,7 @@ export function getArrayWithDefault<K, T>(map: Map<K, Array<T>>, key: K) {
return value;
}

export function getSetWithDefault<K, T>(map: Map<K, Set<T>>, key: K) {
export function getSetWithDefault<K, T>(map: GetSetMap<K, Set<T>>, key: K) {
let value = map.get(key);

if (typeof value === `undefined`)
Expand All @@ -147,7 +152,7 @@ export function getSetWithDefault<K, T>(map: Map<K, Set<T>>, key: K) {
return value;
}

export function getMapWithDefault<K, MK, MV>(map: Map<K, Map<MK, MV>>, key: K) {
export function getMapWithDefault<K, MK, MV>(map: GetSetMap<K, Map<MK, MV>>, key: K) {
let value = map.get(key);

if (typeof value === `undefined`)
Expand Down
108 changes: 95 additions & 13 deletions packages/yarnpkg-core/sources/tgzUtils.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,84 @@
import {FakeFS, PortablePath, NodeFS, ppath, xfs, npath, constants} from '@yarnpkg/fslib';
import {ZipCompression, ZipFS} from '@yarnpkg/libzip';
import {PassThrough, Readable} from 'stream';
import tar from 'tar';
import {Configuration, nodeUtils} from '@yarnpkg/core';
import {FakeFS, PortablePath, NodeFS, ppath, xfs, npath, constants, statUtils} from '@yarnpkg/fslib';
import {ZipCompression, ZipFS} from '@yarnpkg/libzip';
import {PassThrough, Readable} from 'stream';
import tar from 'tar';

import {AsyncPool, TaskPool, WorkerPool} from './TaskPool';
import * as miscUtils from './miscUtils';
import {getContent as getZipWorkerSource} from './worker-zip';

export type ConvertToZipPayload = {
tmpFile: PortablePath;
tgz: Buffer | Uint8Array;
extractBufferOpts: ExtractBufferOptions;
compressionLevel: ZipCompression;
};

export type ZipWorkerPool = TaskPool<ConvertToZipPayload, PortablePath>;

function createTaskPool(poolMode: string, poolSize: number): ZipWorkerPool {
switch (poolMode) {
case `async`:
return new AsyncPool(convertToZipWorker, {poolSize});

case `workers`:
return new WorkerPool(getZipWorkerSource(), {poolSize});

default: {
throw new Error(`Assertion failed: Unknown value ${poolMode} for taskPoolMode`);
}
}
}

let defaultWorkerPool: ZipWorkerPool | undefined;

export function getDefaultTaskPool() {
if (typeof defaultWorkerPool === `undefined`)
defaultWorkerPool = createTaskPool(`workers`, nodeUtils.availableParallelism());

return defaultWorkerPool;
}

import {WorkerPool} from './WorkerPool';
import * as miscUtils from './miscUtils';
import {getContent as getZipWorkerSource, ConvertToZipPayload} from './worker-zip';
const workerPools = new WeakMap<Configuration, ZipWorkerPool>();

interface MakeArchiveFromDirectoryOptions {
export function getTaskPoolForConfiguration(configuration: Configuration | void): ZipWorkerPool {
if (typeof configuration === `undefined`)
return getDefaultTaskPool();

return miscUtils.getFactoryWithDefault(workerPools, configuration, () => {
const poolMode = configuration.get(`taskPoolMode`);
const poolSize = configuration.get(`taskPoolConcurrency`);

switch (poolMode) {
case `async`:
return new AsyncPool(convertToZipWorker, {poolSize});

case `workers`:
return new WorkerPool(getZipWorkerSource(), {poolSize});

default: {
throw new Error(`Assertion failed: Unknown value ${poolMode} for taskPoolMode`);
}
}
});
}

export async function convertToZipWorker(data: ConvertToZipPayload) {
const {tmpFile, tgz, compressionLevel, extractBufferOpts} = data;

const zipFs = new ZipFS(tmpFile, {create: true, level: compressionLevel, stats: statUtils.makeDefaultStats()});

// Buffers sent through Node are turned into regular Uint8Arrays
const tgzBuffer = Buffer.from(tgz.buffer, tgz.byteOffset, tgz.byteLength);
await extractArchiveTo(tgzBuffer, zipFs, extractBufferOpts);

zipFs.saveAndClose();

return tmpFile;
}

export interface MakeArchiveFromDirectoryOptions {
baseFs?: FakeFS<PortablePath>;
prefixPath?: PortablePath | null;
compressionLevel?: ZipCompression;
Expand All @@ -32,20 +103,31 @@ export async function makeArchiveFromDirectory(source: PortablePath, {baseFs = n
}

export interface ExtractBufferOptions {
compressionLevel?: ZipCompression;
prefixPath?: PortablePath;
stripComponents?: number;
}

let workerPool: WorkerPool<ConvertToZipPayload, PortablePath> | null;
export interface ConvertToZipOptions extends ExtractBufferOptions {
configuration?: Configuration;
compressionLevel?: ZipCompression;
taskPool?: ZipWorkerPool;
}

export async function convertToZip(tgz: Buffer, opts: ExtractBufferOptions) {
export async function convertToZip(tgz: Buffer, opts: ConvertToZipOptions = {}) {
const tmpFolder = await xfs.mktempPromise();
const tmpFile = ppath.join(tmpFolder, `archive.zip`);

workerPool ||= new WorkerPool(getZipWorkerSource());
const compressionLevel = opts.compressionLevel
?? opts.configuration?.get(`compressionLevel`)
?? `mixed`;

const extractBufferOpts: ExtractBufferOptions = {
prefixPath: opts.prefixPath,
stripComponents: opts.stripComponents,
};

await workerPool.run({tmpFile, tgz, opts});
const taskPool = opts.taskPool ?? getTaskPoolForConfiguration(opts.configuration);
await taskPool.run({tmpFile, tgz, compressionLevel, extractBufferOpts});

return new ZipFS(tmpFile, {level: opts.compressionLevel});
}
Expand Down
20 changes: 3 additions & 17 deletions packages/yarnpkg-core/sources/worker-zip/Worker.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,11 @@
import {PortablePath, statUtils} from '@yarnpkg/fslib';
import {ZipFS} from '@yarnpkg/libzip';
import {parentPort} from 'worker_threads';
import {parentPort} from 'worker_threads';

import {extractArchiveTo, ExtractBufferOptions} from '../tgzUtils';
import {convertToZipWorker, ConvertToZipPayload} from '../tgzUtils';

export type ConvertToZipPayload = {tmpFile: PortablePath, tgz: Buffer | Uint8Array, opts: ExtractBufferOptions};

if (!parentPort)
throw new Error(`Assertion failed: Expected parentPort to be set`);

parentPort.on(`message`, async (data: ConvertToZipPayload) => {
const {opts, tgz, tmpFile} = data;
const {compressionLevel, ...bufferOpts} = opts;

const zipFs = new ZipFS(tmpFile, {create: true, level: compressionLevel, stats: statUtils.makeDefaultStats()});

// Buffers sent through Node are turned into regular Uint8Arrays
const tgzBuffer = Buffer.from(tgz.buffer, tgz.byteOffset, tgz.byteLength);
await extractArchiveTo(tgzBuffer, zipFs, bufferOpts);

zipFs.saveAndClose();

parentPort!.postMessage(data.tmpFile);
parentPort!.postMessage(await convertToZipWorker(data));
});
1 change: 0 additions & 1 deletion packages/yarnpkg-core/sources/worker-zip/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export function getContent(): string;
export type {ConvertToZipPayload} from './Worker';
2 changes: 1 addition & 1 deletion packages/yarnpkg-core/sources/worker-zip/index.js

Large diffs are not rendered by default.

Loading