Skip to content

Commit 44f6fc6

Browse files
committed
Batch concurrent requests serially
1 parent 4ceedca commit 44f6fc6

File tree

9 files changed

+128
-68
lines changed

9 files changed

+128
-68
lines changed

Changelog.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
## Upcoming
44

5+
**Major Changes**
6+
7+
- Requests executed in parallel for each label or class are now batched in
8+
groups of 10 to reduce chance of throttling errors
9+
(<https://github.com/aws/graph-explorer/pull/489>)
10+
511
**Bug Fixes and Minor Changes**
612

713
- Increase request body size limit for proxy server

packages/graph-explorer/src/connector/openCypher/queries/fetchSchema.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -400,10 +400,10 @@ describe("OpenCypher > fetchSchema", () => {
400400
await fetchSchema(openCypherFetchFn);
401401

402402
expect(openCypherFetchFn.mock.calls[3][0]).toStrictEqual(
403-
"MATCH() -[e:`route`]- () RETURN e AS object LIMIT 1"
403+
"MATCH () -[e:`route`]- () RETURN e AS object LIMIT 1"
404404
);
405405
expect(openCypherFetchFn.mock.calls[4][0]).toStrictEqual(
406-
"MATCH() -[e:`contains`]- () RETURN e AS object LIMIT 1"
406+
"MATCH () -[e:`contains`]- () RETURN e AS object LIMIT 1"
407407
);
408408
});
409409

@@ -422,10 +422,10 @@ describe("OpenCypher > fetchSchema", () => {
422422
await fetchSchema(openCypherFetchFn);
423423

424424
expect(openCypherFetchFn.mock.calls[3][0]).toStrictEqual(
425-
"MATCH() -[e:`route`]- () RETURN e AS object LIMIT 1"
425+
"MATCH () -[e:`route`]- () RETURN e AS object LIMIT 1"
426426
);
427427
expect(openCypherFetchFn.mock.calls[4][0]).toStrictEqual(
428-
"MATCH() -[e:`contains`]- () RETURN e AS object LIMIT 1"
428+
"MATCH () -[e:`contains`]- () RETURN e AS object LIMIT 1"
429429
);
430430
});
431431
});

packages/graph-explorer/src/connector/openCypher/queries/fetchSchema.ts

Lines changed: 63 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1-
import { sanitizeText } from "../../../utils";
2-
import type { SchemaResponse } from "../../useGEFetchTypes";
1+
import { batchPromisesSerially, sanitizeText } from "../../../utils";
2+
import { DEFAULT_CONCURRENT_REQUESTS_LIMIT } from "../../../utils/constants";
3+
import type {
4+
EdgeSchemaResponse,
5+
SchemaResponse,
6+
VertexSchemaResponse,
7+
} from "../../useGEFetchTypes";
38
import edgeLabelsTemplate from "../templates/edgeLabelsTemplate";
49
import edgesSchemaTemplate from "../templates/edgesSchemaTemplate";
510
import vertexLabelsTemplate from "../templates/vertexLabelsTemplate";
@@ -27,11 +32,14 @@ type RawEdgeLabelsResponse = {
2732
};
2833

2934
type RawVerticesSchemaResponse = {
30-
results: [
31-
{
32-
object: OCVertex;
33-
},
34-
];
35+
results:
36+
| [
37+
{
38+
object: OCVertex;
39+
},
40+
]
41+
| []
42+
| undefined;
3543
};
3644

3745
type RawEdgesSchemaResponse = {
@@ -41,7 +49,8 @@ type RawEdgesSchemaResponse = {
4149
object: OCEdge;
4250
},
4351
]
44-
| [];
52+
| []
53+
| undefined;
4554
};
4655

4756
// Fetches all vertex labels and their counts
@@ -79,29 +88,39 @@ const fetchVerticesAttributes = async (
7988
labels: Array<string>,
8089
countsByLabel: Record<string, number>
8190
): Promise<SchemaResponse["vertices"]> => {
82-
const vertices: SchemaResponse["vertices"] = [];
83-
8491
if (labels.length === 0) {
85-
return vertices;
92+
return [];
8693
}
8794

88-
await Promise.all(
89-
labels.map(async labelResult => {
95+
const responses = await batchPromisesSerially(
96+
labels,
97+
DEFAULT_CONCURRENT_REQUESTS_LIMIT,
98+
async label => {
9099
const verticesTemplate = verticesSchemaTemplate({
91-
type: labelResult,
100+
type: label,
92101
});
93102

94103
const response =
95104
await openCypherFetch<RawVerticesSchemaResponse>(verticesTemplate);
96105

97-
const vertex = response.results[0]?.object as OCVertex;
98-
if (!vertex) {
99-
return;
106+
return {
107+
vertex: response.results ? response.results[0]?.object : null,
108+
label,
109+
};
110+
}
111+
);
112+
113+
const vertices = responses
114+
.map(({ vertex }) => {
115+
// verify response has the info we need
116+
if (!vertex || !vertex["~labels"]) {
117+
return null;
100118
}
101119

102-
const label = vertex["~labels"][0] as string;
120+
// Use the first label
121+
const label = vertex["~labels"][0];
103122
const properties = vertex["~properties"];
104-
vertices.push({
123+
const vertexSchema: VertexSchemaResponse = {
105124
type: label,
106125
displayLabel: sanitizeText(label),
107126
total: countsByLabel[label],
@@ -113,9 +132,10 @@ const fetchVerticesAttributes = async (
113132
dataType: typeof value === "string" ? "String" : "Number",
114133
};
115134
}),
116-
});
135+
};
136+
return vertexSchema;
117137
})
118-
);
138+
.filter(vertexSchema => vertexSchema != null);
119139

120140
return vertices;
121141
};
@@ -170,44 +190,41 @@ const fetchEdgesAttributes = async (
170190
labels: Array<string>,
171191
countsByLabel: Record<string, number>
172192
): Promise<SchemaResponse["edges"]> => {
173-
const edges: SchemaResponse["edges"] = [];
174-
175193
if (labels.length === 0) {
176-
return edges;
194+
return [];
177195
}
178196

179-
await Promise.all(
180-
labels.map(async labelResult => {
197+
const responses = await batchPromisesSerially(
198+
labels,
199+
DEFAULT_CONCURRENT_REQUESTS_LIMIT,
200+
async label => {
181201
const edgesTemplate = edgesSchemaTemplate({
182-
type: labelResult,
202+
type: label,
183203
});
184204

185205
const response =
186206
await openCypherFetch<RawEdgesSchemaResponse>(edgesTemplate);
187207

188-
// verify response has the info we need
189-
if (
190-
!response.results ||
191-
response.results.length === 0 ||
192-
!response.results[0].object
193-
) {
194-
return;
195-
}
196-
197-
const edge = response.results[0].object as OCEdge;
198-
const entityType = edge["~entityType"] as string;
199-
const type = edge["~type"] as string;
208+
return {
209+
edge: response.results ? response.results[0]?.object : null,
210+
label,
211+
};
212+
}
213+
);
200214

215+
const edges = responses
216+
.map(({ edge, label }) => {
201217
// verify response has the info we need
202-
if (!entityType || !type) {
203-
return;
218+
if (!edge || !edge["~entityType"] || !edge["~type"]) {
219+
return null;
204220
}
205221

222+
const type = edge["~type"];
206223
const properties = edge["~properties"];
207-
edges.push({
224+
const edgeSchema: EdgeSchemaResponse = {
208225
type: type,
209226
displayLabel: sanitizeText(type),
210-
total: countsByLabel[labelResult],
227+
total: countsByLabel[label],
211228
attributes: Object.entries(properties || {}).map(([name, prop]) => {
212229
const value = prop;
213230
return {
@@ -216,9 +233,10 @@ const fetchEdgesAttributes = async (
216233
dataType: typeof value === "string" ? "String" : "Number",
217234
};
218235
}),
219-
});
236+
};
237+
return edgeSchema;
220238
})
221-
);
239+
.filter(edgeSchema => edgeSchema != null);
222240

223241
return edges;
224242
};

packages/graph-explorer/src/connector/openCypher/templates/edgeSchemaTemplate.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ describe("OpenCypher > edgesSchemaTemplate", () => {
55
const template = edgesSchemaTemplate({ type: "route" });
66

77
expect(template).toBe(
8-
`MATCH() -[e:\`route\`]- () RETURN e AS object LIMIT 1`
8+
`MATCH () -[e:\`route\`]- () RETURN e AS object LIMIT 1`
99
);
1010
});
1111
});

packages/graph-explorer/src/connector/openCypher/templates/edgesSchemaTemplate.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* LIMIT 1`
1111
*/
1212
const edgesSchemaTemplate = ({ type }: { type: string }) => {
13-
return `MATCH() -[e:\`${type}\`]- () RETURN e AS object LIMIT 1`;
13+
return `MATCH () -[e:\`${type}\`]- () RETURN e AS object LIMIT 1`;
1414
};
1515

1616
export default edgesSchemaTemplate;

packages/graph-explorer/src/connector/sparql/queries/fetchSchema.ts

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { batchPromisesSerially } from "../../../utils";
2+
import { DEFAULT_CONCURRENT_REQUESTS_LIMIT } from "../../../utils/constants";
13
import type { SchemaResponse } from "../../useGEFetchTypes";
24
import classesWithCountsTemplates from "../templates/classesWithCountsTemplates";
35
import predicatesByClassTemplate from "../templates/predicatesByClassTemplate";
@@ -62,11 +64,12 @@ const fetchPredicatesByClass = async (
6264
classes: Array<string>,
6365
countsByClass: Record<string, number>
6466
) => {
65-
const vertices: SchemaResponse["vertices"] = [];
66-
await Promise.all(
67-
classes.map(async classResult => {
67+
const responses = await batchPromisesSerially(
68+
classes,
69+
DEFAULT_CONCURRENT_REQUESTS_LIMIT,
70+
async resourceClass => {
6871
const classPredicatesTemplate = predicatesByClassTemplate({
69-
class: classResult,
72+
class: resourceClass,
7073
});
7174
const predicatesResponse =
7275
await sparqlFetch<RawPredicatesSamplesResponse>(
@@ -81,22 +84,25 @@ const fetchPredicatesByClass = async (
8184
dataType: TYPE_MAP[item.sample.datatype || ""] || "String",
8285
}));
8386

84-
vertices.push({
85-
type: classResult,
86-
displayLabel: "",
87-
total: countsByClass[classResult],
88-
displayNameAttribute:
89-
attributes.find(attr => displayNameCandidates.includes(attr.name))
90-
?.name || "id",
91-
longDisplayNameAttribute:
92-
attributes.find(attr => displayDescCandidates.includes(attr.name))
93-
?.name || "types",
87+
return {
9488
attributes,
95-
});
96-
})
89+
resourceClass,
90+
};
91+
}
9792
);
9893

99-
return vertices;
94+
return responses.map(({ attributes, resourceClass }) => ({
95+
type: resourceClass,
96+
displayLabel: "",
97+
total: countsByClass[resourceClass],
98+
displayNameAttribute:
99+
attributes.find(attr => displayNameCandidates.includes(attr.name))
100+
?.name || "id",
101+
longDisplayNameAttribute:
102+
attributes.find(attr => displayDescCandidates.includes(attr.name))
103+
?.name || "types",
104+
attributes,
105+
}));
100106
};
101107

102108
const fetchClassesSchema = async (sparqlFetch: SparqlFetch) => {
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { chunk } from "lodash";
2+
3+
/**
4+
* Performs the given operation on the collection of items in groups of
5+
* concurrent requests. A batch group must complete entirely before progressing
6+
* to the next group.
7+
*
8+
* @param items The array of items that will be passed to the callback
9+
* @param batchSize The size of the batch groups
10+
* @param callback The async operation to perform on the given item
11+
* @returns The results of all the operations performed on the given items in
12+
* the order of the items array.
13+
*/
14+
export default async function batchPromisesSerially<Item, Result>(
15+
items: Item[],
16+
batchSize: number,
17+
callback: (item: Item) => Promise<Result>
18+
): Promise<Result[]> {
19+
const batches = chunk(items, batchSize);
20+
const results = new Array<Result>();
21+
22+
for (const batch of batches) {
23+
const batchResults = await Promise.all(batch.map(callback));
24+
results.push(...batchResults);
25+
}
26+
27+
return results;
28+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
export const DEFAULT_SERVICE_TYPE = "neptune-db";
22
export const DEFAULT_FETCH_TIMEOUT = 240000;
33
export const DEFAULT_NODE_EXPAND_LIMIT = 500;
4+
export const DEFAULT_CONCURRENT_REQUESTS_LIMIT = 10;

packages/graph-explorer/src/utils/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ export { default as useClickOutside } from "./useClickOutside";
77
export { default as sanitizeText } from "./sanitizeText";
88
export { DEFAULT_SERVICE_TYPE } from "./constants";
99
export { default as escapeString } from "./escapeString";
10+
export { default as batchPromisesSerially } from "./batchPromisesSerially";
1011
export * from "./set";
1112
export * from "./env";

0 commit comments

Comments
 (0)