Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,56 @@
*/
package org.jumpmind.symmetric.io.stage;

import java.util.Objects;

public class PerfResult {
private String name;
private long count;
private long millis;
private float rating;
private String outcome;
public static final String OUTCOME_SUCCESS = "Success";
public static final String OUTCOME_FAILED = "Failed";
public static final String OUTCOME_SKIPPED = "Skipped";

public PerfResult(String name, long count, long millis, float rating) {
this.name = name;
this.count = count;
this.millis = millis;
this.rating = rating;
this.outcome = "";
}

public PerfResult(String name) {
this.name = name;
this.count = 0;
this.millis = 0;
this.outcome = "";
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
return Objects.hash(this.count, this.millis, this.name, this.outcome);
}

@Override
public boolean equals(Object obj) {
if (this == obj)
if (this == obj) {
return true;
if (obj == null)
}
if (obj == null) {
return false;
if (getClass() != obj.getClass())
}
if (getClass() != obj.getClass()) {
return false;
}
PerfResult other = (PerfResult) obj;
if (name == null) {
if (other.name != null)
if (other.name != null) {
return false;
} else if (!name.equals(other.name))
}
} else if (!name.equals(other.name)) {
return false;
}
return true;
}

Expand Down Expand Up @@ -113,4 +124,12 @@ public float getRating() {
public void setRating(float rating) {
this.rating = rating;
}

public String getOutcome() {
return outcome;
}

public void setOutcome(String outcome) {
this.outcome = outcome;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
Expand Down Expand Up @@ -103,63 +104,128 @@ protected void logResults(long totalSeconds, List<PerfResult> resultsAsList) {
}

protected void testBatch(Batch batch, Map<String, PerfResult> results) {
String timestampedHeaderLine = String.format("TEST: SymmetricDS staging file with random contents. Current Timestamp: %tc", System.currentTimeMillis());
long ts = System.currentTimeMillis();
StagingFileLock lock = stagingMgr.acquireFileLock(serverInfo, STAGE_PATH, batch.getStagedLocation(), batch.getBatchId());
if (lock.isAcquired()) {
increment(results, STAT_LOCK_ACQUIRE, System.currentTimeMillis() - ts);
incrementTaskDuration(results, STAT_LOCK_ACQUIRE, System.currentTimeMillis() - ts);
lock.releaseLock();
} else {
throw new RuntimeException("Failed to create lock file");
String errorMsg = "Failed to create staging file " + lock.getLockFile().getAbsolutePath();
failTask(results, STAT_LOCK_ACQUIRE, errorMsg);
throw new RuntimeException(errorMsg);
}
ts = System.currentTimeMillis();
IStagedResource resource = stagingMgr.create(STAGE_PATH, batch.getStagedLocation(), batch.getBatchId());
String resourceLocation = "";
if (resource != null) {
increment(results, STAT_BATCH_CREATE, System.currentTimeMillis() - ts);
resourceLocation = resource.getPath();
incrementTaskDuration(results, STAT_BATCH_CREATE, System.currentTimeMillis() - ts);
ts = System.currentTimeMillis();
try (BufferedWriter writer = resource.getWriter(0l)) {
writer.write(timestampedHeaderLine + "\n");
for (int i = 0; i < 100; i++) {
writer.write(RandomStringUtils.random(1000));
}
} catch (IOException e) {
String errorMsg = "Failed to write staging file " + resourceLocation + " " + e.getMessage();
failTask(results, STAT_BATCH_WRITE, errorMsg);
throw new RuntimeException(e);
} finally {
resource.close();
increment(results, STAT_BATCH_WRITE, System.currentTimeMillis() - ts);
}
incrementTaskDuration(results, STAT_BATCH_WRITE, System.currentTimeMillis() - ts);
ts = System.currentTimeMillis();
resource.setState(State.DONE);
increment(results, STAT_BATCH_RENAME, System.currentTimeMillis() - ts);
incrementTaskDuration(results, STAT_BATCH_RENAME, System.currentTimeMillis() - ts);
} else {
throw new RuntimeException("Failed to create staging file");
String errorMsg = "Failed to create staging file " + STAGE_PATH;
failTask(results, STAT_BATCH_CREATE, errorMsg);
skipTask(results, STAT_BATCH_WRITE, null);
skipTask(results, STAT_BATCH_RENAME, null);
skipTask(results, STAT_BATCH_READ, null);
skipTask(results, STAT_BATCH_FIND, null);
throw new RuntimeException(errorMsg);
}
ts = System.currentTimeMillis();
ts = System.currentTimeMillis();
resource = stagingMgr.find(STAGE_PATH, batch.getStagedLocation(), batch.getBatchId());
if (resource != null) {
increment(results, STAT_BATCH_FIND, System.currentTimeMillis() - ts);
if (resource == null) {
String errorMsg = "Failed to locate staging file " + resourceLocation;
failTask(results, STAT_BATCH_FIND, errorMsg);
skipTask(results, STAT_BATCH_READ, null);
throw new RuntimeException(errorMsg);
}
else {
resourceLocation = resource.getPath();
incrementTaskDuration(results, STAT_BATCH_FIND, System.currentTimeMillis() - ts);
ts = System.currentTimeMillis();
String testHeaderLine = null;
try (BufferedReader reader = resource.getReader()) {
testHeaderLine = reader.readLine();
if(testHeaderLine==null) {
String errorMsg = "Failed to read contents of staging file " + resourceLocation;
failTask(results, STAT_BATCH_READ, errorMsg);
throw new RuntimeException(errorMsg);
}
if (!timestampedHeaderLine.equals(testHeaderLine)) {
String errorMsg = "Failed to validate contents of staging file " + resourceLocation;
failTask(results, STAT_BATCH_READ, errorMsg);
throw new RuntimeException(errorMsg);
}
while (reader.readLine() != null) {
}
} catch (IOException e) {
failTask(results, STAT_BATCH_READ, "Failed to read staging file " + resourceLocation + " " + e.getMessage());
throw new RuntimeException(e);
} finally {
resource.close();
increment(results, STAT_BATCH_READ, System.currentTimeMillis() - ts);
if (resource != null) {
resource.close();
}
}
incrementTaskDuration(results, STAT_BATCH_READ, System.currentTimeMillis() - ts);
resource.delete();
}
}

protected void failTask(Map<String, PerfResult> results, String statName, String error) {
PerfResult result = results.get(statName);
if (result == null) {
result = new PerfResult(statName);
results.put(statName, result);
}
result.incrementCount(1);
if (error == null) {
result.setOutcome(PerfResult.OUTCOME_FAILED);
} else {
result.setOutcome(error);
}
}

protected void skipTask(Map<String, PerfResult> results, String statName, String outcome) {
PerfResult result = results.get(statName);
if (result == null) {
result = new PerfResult(statName);
results.put(statName, result);
}
result.incrementCount(1);
if (outcome == null) {
result.setOutcome(PerfResult.OUTCOME_SKIPPED);
} else {
throw new RuntimeException("Failed to find staging file");
result.setOutcome(outcome);
}
}

protected void increment(Map<String, PerfResult> results, String statName, long millis) {
protected void incrementTaskDuration(Map<String, PerfResult> results, String statName, long millis) {
PerfResult result = results.get(statName);
if (result == null) {
result = new PerfResult(statName);
results.put(statName, result);
}
result.incrementCount(1);
result.incrementMillis(millis);
if (StringUtils.isBlank(result.getOutcome())) {
result.setOutcome(PerfResult.OUTCOME_SUCCESS);
}
}

public static List<PerfResult> getEmptyResults() {
Expand All @@ -175,29 +241,35 @@ public static List<PerfResult> getEmptyResults() {

protected List<PerfResult> getResultsAsList(Map<String, PerfResult> results) {
List<PerfResult> list = new ArrayList<PerfResult>();
updateRating(STAT_LOCK_ACQUIRE, results, list, 50, 8000);
updateRating(STAT_BATCH_CREATE, results, list, 100, 12000);
updateRating(STAT_BATCH_WRITE, results, list, 5, 200);
updateRating(STAT_BATCH_RENAME, results, list, 150, 18000);
updateRating(STAT_BATCH_FIND, results, list, 150, 18000);
updateRating(STAT_BATCH_READ, results, list, 10, 400);
updateTaskRating(STAT_LOCK_ACQUIRE, results, list, 50, 8000);
updateTaskRating(STAT_BATCH_CREATE, results, list, 100, 12000);
updateTaskRating(STAT_BATCH_WRITE, results, list, 5, 200);
updateTaskRating(STAT_BATCH_RENAME, results, list, 150, 18000);
updateTaskRating(STAT_BATCH_FIND, results, list, 150, 18000);
updateTaskRating(STAT_BATCH_READ, results, list, 10, 400);
return list;
}

protected void updateRating(String statName, Map<String, PerfResult> results, List<PerfResult> list,
protected void updateTaskRating(String statName, Map<String, PerfResult> results, List<PerfResult> list,
long lowCount, long highCount) {
PerfResult result = results.get(statName);
if (result != null) {
long opSec = result.getOperationsPerSecond();
if (opSec <= lowCount) {
result.setRating(1.0f);
} else if (opSec >= highCount) {
result.setRating(9.9f);
String outcome = result.getOutcome();
if (!StringUtils.isBlank(statName) && !PerfResult.OUTCOME_SUCCESS.equals(outcome)) {
result.setRating(0f);
} else {
float rating = 9.9f * ((opSec - lowCount) / ((float) (highCount - lowCount)));
result.setRating(rating);
long opSec = result.getOperationsPerSecond();
if (opSec <= lowCount) {
result.setRating(1.0f);
} else if (opSec >= highCount) {
result.setRating(9.9f);
} else {
float rating = 9.9f * ((opSec - lowCount) / ((float) (highCount - lowCount)));
result.setRating(rating);
}
}
list.add(result);
}
}

}