aboutsummaryrefslogtreecommitdiffhomepage
path: root/lib/util/promises.ts
blob: 736437bc1db835feb1c93eb3e434e1a4292e6ab8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import AggregateError from 'aggregate-error';
import pAll from 'p-all';
import pMap from 'p-map';
import { logger } from '../logger';
import { ExternalHostError } from '../types/errors/external-host-error';

type PromiseFactory<T> = () => Promise<T>;

function isExternalHostError(err: any): err is ExternalHostError {
  return err instanceof ExternalHostError;
}

function handleMultipleErrors(errors: Error[]): never {
  const hostError = errors.find(isExternalHostError);
  if (hostError) {
    throw hostError;
  }
  if (
    errors.length === 1 ||
    new Set(errors.map(({ message }) => message)).size === 1
  ) {
    const [error] = errors;
    throw error;
  }

  throw new AggregateError(errors);
}

function handleError(err: any): never {
  if (!(err instanceof AggregateError)) {
    throw err;
  }

  logger.debug({ err }, 'Aggregate error is thrown');
  handleMultipleErrors([...err]);
}

export async function all<T>(
  tasks: PromiseFactory<T>[],
  options?: pAll.Options,
): Promise<T[]> {
  try {
    const res = await pAll(tasks, {
      concurrency: 5,
      stopOnError: false,
      ...options,
    });
    return res;
  } catch (err) {
    return handleError(err);
  }
}

export async function map<Element, NewElement>(
  input: Iterable<Element>,
  mapper: pMap.Mapper<Element, NewElement>,
  options?: pMap.Options,
): Promise<NewElement[]> {
  try {
    const res = await pMap(input, mapper, {
      concurrency: 5,
      stopOnError: false,
      ...options,
    });
    return res;
  } catch (err) {
    return handleError(err);
  }
}