Skip to content

Commit b477047

Browse files
authored
feat(restore-indices): add urn like filter (#5770)
* feat(restore-indices): add urn like filter * change logging
1 parent 1f1056d commit b477047

File tree

2 files changed

+99
-73
lines changed

2 files changed

+99
-73
lines changed

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/RestoreIndices.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class RestoreIndices implements Upgrade {
2121
public static final String NUM_THREADS_ARG_NAME = "numThreads";
2222
public static final String ASPECT_NAME_ARG_NAME = "aspectName";
2323
public static final String URN_ARG_NAME = "urn";
24+
public static final String URN_LIKE_ARG_NAME = "urnLike";
2425

2526
private final List<UpgradeStep> _steps;
2627

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java

Lines changed: 98 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -58,26 +58,21 @@ public KafkaJobResult(int ignored, int rowsMigrated) {
5858
public class KafkaJob implements Callable<KafkaJobResult> {
5959
UpgradeContext context;
6060
int start;
61-
int batchSize;
62-
long batchDelayMs;
63-
Optional<String> aspectName;
64-
Optional<String> urn;
65-
public KafkaJob(UpgradeContext context, int start, int batchSize, long batchDelayMs, Optional<String> aspectName,
66-
Optional<String> urn) {
61+
JobArgs args;
62+
public KafkaJob(UpgradeContext context, int start, JobArgs args) {
6763
this.context = context;
6864
this.start = start;
69-
this.batchSize = batchSize;
70-
this.batchDelayMs = batchDelayMs;
71-
this.aspectName = aspectName;
72-
this.urn = urn;
65+
this.args = args;
7366
}
7467
@Override
7568
public KafkaJobResult call() {
7669
int ignored = 0;
7770
int rowsMigrated = 0;
78-
context.report()
79-
.addLine(String.format("Reading rows %s through %s from the aspects table.", start, start + batchSize));
80-
PagedList<EbeanAspectV2> rows = getPagedAspects(start, batchSize, aspectName, urn);
71+
context.report().addLine(String.format(
72+
"Reading rows %s through %s from the aspects table started.", start, start + args.batchSize));
73+
PagedList<EbeanAspectV2> rows = getPagedAspects(start, args);
74+
context.report().addLine(String.format(
75+
"Reading rows %s through %s from the aspects table completed.", start, start + args.batchSize));
8176

8277
for (EbeanAspectV2 aspect : rows.getList()) {
8378
// 1. Extract an Entity type from the entity Urn
@@ -140,7 +135,7 @@ public KafkaJobResult call() {
140135
}
141136

142137
try {
143-
TimeUnit.MILLISECONDS.sleep(batchDelayMs);
138+
TimeUnit.MILLISECONDS.sleep(args.batchDelayMs);
144139
} catch (InterruptedException e) {
145140
throw new RuntimeException("Thread interrupted while sleeping after successful batch migration.");
146141
}
@@ -164,10 +159,11 @@ public int retryCount() {
164159
return 0;
165160
}
166161

167-
private KafkaJobResult iterateFutures(List<Future<KafkaJobResult>> futures) {
162+
private List<KafkaJobResult> iterateFutures(List<Future<KafkaJobResult>> futures) {
168163
int beforeSize = futures.size();
169164
int afterSize = futures.size();
170-
while (beforeSize == afterSize) {
165+
List<KafkaJobResult> result = new ArrayList<>();
166+
while (afterSize > 0 && beforeSize == afterSize) {
171167
try {
172168
TimeUnit.SECONDS.sleep(1);
173169
} catch (InterruptedException e) {
@@ -176,83 +172,109 @@ private KafkaJobResult iterateFutures(List<Future<KafkaJobResult>> futures) {
176172
for (Future<KafkaJobResult> future: new ArrayList<>(futures)) {
177173
if (future.isDone()) {
178174
try {
179-
KafkaJobResult result = future.get();
175+
result.add(future.get());
180176
futures.remove(future);
181-
return result;
182177
} catch (InterruptedException | ExecutionException e) {
183178
e.printStackTrace();
184179
}
185180
}
186181
}
187182
afterSize = futures.size();
188183
}
189-
return null;
184+
return result;
185+
}
186+
187+
private static class JobArgs {
188+
int batchSize;
189+
int numThreads;
190+
long batchDelayMs;
191+
String aspectName;
192+
String urn;
193+
String urnLike;
194+
}
195+
196+
private JobArgs getArgs(UpgradeContext context) {
197+
JobArgs result = new JobArgs();
198+
result.batchSize = getBatchSize(context.parsedArgs());
199+
context.report().addLine(String.format("batchSize is %d", result.batchSize));
200+
result.numThreads = getThreadCount(context.parsedArgs());
201+
context.report().addLine(String.format("numThreads is %d", result.numThreads));
202+
result.batchDelayMs = getBatchDelayMs(context.parsedArgs());
203+
context.report().addLine(String.format("batchDelayMs is %d", result.batchDelayMs));
204+
if (containsKey(context.parsedArgs(), RestoreIndices.ASPECT_NAME_ARG_NAME)) {
205+
result.aspectName = context.parsedArgs().get(RestoreIndices.ASPECT_NAME_ARG_NAME).get();
206+
context.report().addLine(String.format("aspect is %s", result.aspectName));
207+
context.report().addLine(String.format("Found aspectName arg as %s", result.aspectName));
208+
} else {
209+
context.report().addLine("No aspectName arg present");
210+
}
211+
if (containsKey(context.parsedArgs(), RestoreIndices.URN_ARG_NAME)) {
212+
result.urn = context.parsedArgs().get(RestoreIndices.URN_ARG_NAME).get();
213+
context.report().addLine(String.format("urn is %s", result.urn));
214+
context.report().addLine(String.format("Found urn arg as %s", result.urn));
215+
} else {
216+
context.report().addLine("No urn arg present");
217+
}
218+
if (containsKey(context.parsedArgs(), RestoreIndices.URN_LIKE_ARG_NAME)) {
219+
result.urnLike = context.parsedArgs().get(RestoreIndices.URN_LIKE_ARG_NAME).get();
220+
context.report().addLine(String.format("urnLike is %s", result.urnLike));
221+
context.report().addLine(String.format("Found urn like arg as %s", result.urnLike));
222+
} else {
223+
context.report().addLine("No urnLike arg present");
224+
}
225+
return result;
226+
}
227+
228+
private int getRowCount(JobArgs args) {
229+
ExpressionList<EbeanAspectV2> countExp =
230+
_server.find(EbeanAspectV2.class)
231+
.where()
232+
.eq(EbeanAspectV2.VERSION_COLUMN, ASPECT_LATEST_VERSION);
233+
if (args.aspectName != null) {
234+
countExp = countExp.eq(EbeanAspectV2.ASPECT_COLUMN, args.aspectName);
235+
}
236+
if (args.urn != null) {
237+
countExp = countExp.eq(EbeanAspectV2.URN_COLUMN, args.urn);
238+
}
239+
if (args.urnLike != null) {
240+
countExp = countExp.like(EbeanAspectV2.URN_COLUMN, args.urnLike);
241+
}
242+
return countExp.findCount();
190243
}
191244

192245
@Override
193246
public Function<UpgradeContext, UpgradeStepResult> executable() {
194247
return (context) -> {
248+
JobArgs args = getArgs(context);
249+
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(args.numThreads);
195250

251+
context.report().addLine("Sending MAE from local DB");
196252
long startTime = System.currentTimeMillis();
197-
198-
int batchSize = getBatchSize(context.parsedArgs());
199-
int numThreads = getThreadCount(context.parsedArgs());
200-
long batchDelayMs = getBatchDelayMs(context.parsedArgs());
201-
Optional<String> aspectName;
202-
if (containsKey(context.parsedArgs(), RestoreIndices.ASPECT_NAME_ARG_NAME)) {
203-
aspectName = context.parsedArgs().get(RestoreIndices.ASPECT_NAME_ARG_NAME);
204-
context.report().addLine(String.format("Found aspectName arg as %s", aspectName));
205-
} else {
206-
aspectName = Optional.empty();
207-
context.report().addLine("No aspectName arg present");
208-
}
209-
Optional<String> urn;
210-
if (containsKey(context.parsedArgs(), RestoreIndices.URN_ARG_NAME)) {
211-
urn = context.parsedArgs().get(RestoreIndices.URN_ARG_NAME);
212-
context.report().addLine(String.format("Found urn arg as %s", urn));
213-
} else {
214-
urn = Optional.empty();
215-
context.report().addLine("No urn arg present");
216-
}
217-
218-
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads);
219-
220-
context.report().addLine(
221-
String.format("Sending MAE from local DB with %s batch size, %s threads, %s batchDelayMs",
222-
batchSize, numThreads, batchDelayMs));
223-
ExpressionList<EbeanAspectV2> countExp =
224-
_server.find(EbeanAspectV2.class)
225-
.where()
226-
.eq(EbeanAspectV2.VERSION_COLUMN, ASPECT_LATEST_VERSION);
227-
if (aspectName.isPresent()) {
228-
countExp = countExp.eq(EbeanAspectV2.ASPECT_COLUMN, aspectName.get());
229-
}
230-
if (urn.isPresent()) {
231-
countExp = countExp.eq(EbeanAspectV2.URN_COLUMN, urn.get());
232-
}
233-
final int rowCount = countExp.findCount();
234-
context.report().addLine(String.format("Found %s latest aspects in aspects table", rowCount));
253+
final int rowCount = getRowCount(args);
254+
context.report().addLine(String.format("Found %s latest aspects in aspects table in %.2f minutes.",
255+
rowCount, (float) (System.currentTimeMillis() - startTime) / 1000 / 60));
235256

236257
int totalRowsMigrated = 0;
237258
int start = 0;
238259
int ignored = 0;
239260

240261
List<Future<KafkaJobResult>> futures = new ArrayList<>();
262+
startTime = System.currentTimeMillis();
241263
while (start < rowCount) {
242-
while (futures.size() < numThreads) {
243-
futures.add(executor.submit(new KafkaJob(context, start, batchSize, batchDelayMs, aspectName, urn)));
244-
start = start + batchSize;
264+
while (futures.size() < args.numThreads && start < rowCount) {
265+
futures.add(executor.submit(new KafkaJob(context, start, args)));
266+
start = start + args.batchSize;
245267
}
246-
KafkaJobResult tmpResult = iterateFutures(futures);
247-
if (tmpResult != null) {
268+
List<KafkaJobResult> tmpResults = iterateFutures(futures);
269+
for (KafkaJobResult tmpResult: tmpResults) {
248270
totalRowsMigrated += tmpResult.rowsMigrated;
249271
ignored += tmpResult.ignored;
250272
reportStats(context, totalRowsMigrated, ignored, rowCount, startTime);
251273
}
252274
}
253275
while (futures.size() > 0) {
254-
KafkaJobResult tmpResult = iterateFutures(futures);
255-
if (tmpResult != null) {
276+
List<KafkaJobResult> tmpResults = iterateFutures(futures);
277+
for (KafkaJobResult tmpResult: tmpResults) {
256278
totalRowsMigrated += tmpResult.rowsMigrated;
257279
ignored += tmpResult.ignored;
258280
reportStats(context, totalRowsMigrated, ignored, rowCount, startTime);
@@ -281,31 +303,34 @@ private static void reportStats(UpgradeContext context, int totalRowsMigrated, i
281303
if (percentSent > 0) {
282304
estimatedTimeMinutesComplete = timeSoFarMinutes * (100 - percentSent) / percentSent;
283305
}
306+
float totalTimeComplete = timeSoFarMinutes + estimatedTimeMinutesComplete;
284307
context.report().addLine(String.format(
285308
"Successfully sent MAEs for %s/%s rows (%.2f%% of total). %s rows ignored (%.2f%% of total)",
286309
totalRowsMigrated, rowCount, percentSent, ignored, percentIgnored));
287-
context.report().addLine(String.format("%.2f minutes taken. %.2f estimate minutes to completion",
288-
timeSoFarMinutes, estimatedTimeMinutesComplete));
310+
context.report().addLine(String.format("%.2f mins taken. %.2f est. mins to completion. Total mins est. = %.2f.",
311+
timeSoFarMinutes, estimatedTimeMinutesComplete, totalTimeComplete));
289312
}
290313

291-
private PagedList<EbeanAspectV2> getPagedAspects(final int start, final int pageSize, Optional<String> aspectName,
292-
Optional<String> urn) {
314+
private PagedList<EbeanAspectV2> getPagedAspects(final int start, final JobArgs args) {
293315
ExpressionList<EbeanAspectV2> exp = _server.find(EbeanAspectV2.class)
294316
.select(EbeanAspectV2.ALL_COLUMNS)
295317
.where()
296318
.eq(EbeanAspectV2.VERSION_COLUMN, ASPECT_LATEST_VERSION);
297-
if (aspectName.isPresent()) {
298-
exp = exp.eq(EbeanAspectV2.ASPECT_COLUMN, aspectName.get());
319+
if (args.aspectName != null) {
320+
exp = exp.eq(EbeanAspectV2.ASPECT_COLUMN, args.aspectName);
321+
}
322+
if (args.urn != null) {
323+
exp = exp.eq(EbeanAspectV2.URN_COLUMN, args.urn);
299324
}
300-
if (urn.isPresent()) {
301-
exp = exp.eq(EbeanAspectV2.URN_COLUMN, urn.get());
325+
if (args.urnLike != null) {
326+
exp = exp.like(EbeanAspectV2.URN_COLUMN, args.urnLike);
302327
}
303328
return exp.orderBy()
304329
.asc(EbeanAspectV2.URN_COLUMN)
305330
.orderBy()
306331
.asc(EbeanAspectV2.ASPECT_COLUMN)
307332
.setFirstRow(start)
308-
.setMaxRows(pageSize)
333+
.setMaxRows(args.batchSize)
309334
.findPagedList();
310335
}
311336

0 commit comments

Comments
 (0)