Skip to content

Commit 214864d

Browse files
authored
[ENH] ScoutLogs issues a HEAD if possible. (#5376)
## Description of changes This PR changes scout logs to consult the cache on ScoutLogs. If the manifest was recently in the cache, wal3/rls will perform a HEAD operation to fetch the object into cache. This PR contains tests written by Claude. ## Test plan CI ## Migration plan N/A ## Observability plan N/A ## Documentation Changes N/A
1 parent 004c30f commit 214864d

File tree

9 files changed

+448
-21
lines changed

9 files changed

+448
-21
lines changed

Cargo.lock

Lines changed: 19 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/log-service/src/lib.rs

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,28 +1129,71 @@ impl LogServer {
11291129
let collection_id = Uuid::parse_str(&scout_logs.collection_id)
11301130
.map(CollectionUuid)
11311131
.map_err(|_| Status::invalid_argument("Failed to parse collection id"))?;
1132-
11331132
let prefix = collection_id.storage_prefix_for_log();
11341133
let log_reader = LogReader::new(
11351134
self.config.reader.clone(),
11361135
Arc::clone(&self.storage),
11371136
prefix,
11381137
);
1139-
let (start_position, limit_position) = match log_reader.manifest().await {
1140-
Ok(Some(manifest)) => (manifest.oldest_timestamp(), manifest.next_write_timestamp()),
1141-
Ok(None) => (LogPosition::from_offset(1), LogPosition::from_offset(1)),
1142-
Err(wal3::Error::UninitializedLog) => {
1143-
return Err(Status::not_found(format!(
1144-
"collection {collection_id} not found"
1145-
)));
1138+
let cache_key = cache_key_for_manifest_and_etag(collection_id);
1139+
let mut cached_manifest_and_e_tag = None;
1140+
if let Some(cache) = self.cache.as_ref() {
1141+
if let Some(cache_bytes) = cache.get(&cache_key).await.ok().flatten() {
1142+
let met = serde_json::from_slice::<ManifestAndETag>(&cache_bytes.bytes).ok();
1143+
cached_manifest_and_e_tag = met;
11461144
}
1147-
Err(err) => {
1148-
return Err(Status::new(
1149-
err.code().into(),
1150-
format!("could not scout logs: {err:?}"),
1151-
));
1145+
}
1146+
// NOTE(rescrv): We verify and if verification fails, we take the cached manifest to fall
1147+
// back to the uncached path.
1148+
if let Some(cached) = cached_manifest_and_e_tag.as_ref() {
1149+
// Here's the linearization point. We have a cached manifest and e_tag.
1150+
//
1151+
// If we verify (perform a head), then statistically speaking, the manifest and e_tag
1152+
// we have in hand is identical (barring md5 collision) to the manifest and e_tag on
1153+
// storage. We can use the cached manifest and e_tag in this case because it is the
1154+
// identical flow whether we read the whole manifest from storage or whether we pretend
1155+
// to read it/verify it with a HEAD and then read out of cache.
1156+
if !log_reader.verify(cached).await.unwrap_or_default() {
1157+
cached_manifest_and_e_tag.take();
11521158
}
1153-
};
1159+
}
1160+
let (start_position, limit_position) =
1161+
if let Some(manifest_and_e_tag) = cached_manifest_and_e_tag {
1162+
(
1163+
manifest_and_e_tag.manifest.oldest_timestamp(),
1164+
manifest_and_e_tag.manifest.next_write_timestamp(),
1165+
)
1166+
} else {
1167+
let (start_position, limit_position) = match log_reader.manifest_and_e_tag().await {
1168+
Ok(Some(manifest_and_e_tag)) => {
1169+
if let Some(cache) = self.cache.as_ref() {
1170+
let json = serde_json::to_string(&manifest_and_e_tag)
1171+
.map_err(|err| Status::unknown(err.to_string()))?;
1172+
let cached_bytes = CachedBytes {
1173+
bytes: Vec::from(json),
1174+
};
1175+
cache.insert(cache_key, cached_bytes).await;
1176+
}
1177+
(
1178+
manifest_and_e_tag.manifest.oldest_timestamp(),
1179+
manifest_and_e_tag.manifest.next_write_timestamp(),
1180+
)
1181+
}
1182+
Ok(None) => (LogPosition::from_offset(1), LogPosition::from_offset(1)),
1183+
Err(wal3::Error::UninitializedLog) => {
1184+
return Err(Status::not_found(format!(
1185+
"collection {collection_id} not found"
1186+
)));
1187+
}
1188+
Err(err) => {
1189+
return Err(Status::new(
1190+
err.code().into(),
1191+
format!("could not scout logs: {err:?}"),
1192+
));
1193+
}
1194+
};
1195+
(start_position, limit_position)
1196+
};
11541197
let start_offset = start_position.offset() as i64;
11551198
let limit_offset = limit_position.offset() as i64;
11561199
Ok(Response::new(ScoutLogsResponse {

rust/storage/src/admissioncontrolleds3.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,23 @@ impl AdmissionControlledS3Storage {
458458
.await
459459
}
460460

461+
pub async fn confirm_same(&self, key: &str, e_tag: &ETag) -> Result<bool, StorageError> {
462+
self.metrics.nac_outstanding_read_requests.record(
463+
self.metrics
464+
.outstanding_read_requests
465+
.load(Ordering::Relaxed) as u64,
466+
&self.metrics.hostname_attribute,
467+
);
468+
self.metrics
469+
.outstanding_read_requests
470+
.fetch_add(1, Ordering::Relaxed);
471+
let res = self.storage.confirm_same(key, e_tag).await;
472+
self.metrics
473+
.outstanding_read_requests
474+
.fetch_sub(1, Ordering::Relaxed);
475+
res
476+
}
477+
461478
async fn execute_fetch<FetchReturn, FetchFn, FetchFut>(
462479
fetch_fn: FetchFn,
463480
input: Result<(Arc<Vec<u8>>, Option<ETag>), StorageError>,

rust/storage/src/lib.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,19 @@ impl Storage {
289289
}
290290
}
291291

292+
// NOTE(rescrv): Returns Ok(true) if the file is definitely the same. Returns Ok(false) if
293+
// the file cannot be confirmed to be the same but it exists. Returns Err on error. It is up
294+
// to the user to know how they are confirming the same and to react to Ok(false) even if the
295+
// file is definitely the same file on storage.
296+
pub async fn confirm_same(&self, key: &str, e_tag: &ETag) -> Result<bool, StorageError> {
297+
match self {
298+
Storage::ObjectStore(object_store) => object_store.confirm_same(key, e_tag).await,
299+
Storage::S3(s3) => s3.confirm_same(key, e_tag).await,
300+
Storage::Local(local) => local.confirm_same(key, e_tag).await,
301+
Storage::AdmissionControlledS3(as3) => as3.confirm_same(key, e_tag).await,
302+
}
303+
}
304+
292305
pub async fn put_file(
293306
&self,
294307
key: &str,

rust/storage/src/local.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ impl LocalStorage {
6767
Ok((bytes, Some(etag)))
6868
}
6969

70+
pub async fn confirm_same(&self, _: &str, _: &ETag) -> Result<bool, StorageError> {
71+
Err(StorageError::NotImplemented)
72+
}
73+
7074
pub async fn put_bytes(
7175
&self,
7276
key: &str,

rust/storage/src/object_store.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ impl ObjectStore {
153153
Err(StorageError::NotImplemented)
154154
}
155155

156+
pub async fn confirm_same(&self, _: &str, _: &ETag) -> Result<bool, StorageError> {
157+
Err(StorageError::NotImplemented)
158+
}
159+
156160
pub async fn get_parallel(&self, key: &str) -> Result<Arc<Vec<u8>>, StorageError> {
157161
let meta = self.object_store.head(&Path::from(key)).await?;
158162
let file_size = meta.size;

rust/storage/src/s3.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,31 @@ impl S3Storage {
217217
}
218218
}
219219

220+
#[allow(clippy::type_complexity)]
221+
pub async fn confirm_same(&self, key: &str, e_tag: &ETag) -> Result<bool, StorageError> {
222+
let res = self
223+
.client
224+
.head_object()
225+
.bucket(self.bucket.clone())
226+
.key(key)
227+
.send()
228+
.await;
229+
match res {
230+
Ok(res) => Ok(res.e_tag() == Some(&e_tag.0)),
231+
Err(e) => match e {
232+
SdkError::ServiceError(err) => {
233+
let inner = err.into_err();
234+
Err(StorageError::Generic {
235+
source: Arc::new(inner),
236+
})
237+
}
238+
_ => Err(StorageError::Generic {
239+
source: Arc::new(e),
240+
}),
241+
},
242+
}
243+
}
244+
220245
#[allow(clippy::type_complexity)]
221246
async fn get_stream_and_e_tag(
222247
&self,
@@ -1545,4 +1570,71 @@ mod tests {
15451570
eprintln!("Successfully deleted: {:#?}", delete_result.deleted);
15461571
eprintln!("Errors for non-existent files: {:#?}", delete_result.errors);
15471572
}
1573+
1574+
#[tokio::test]
1575+
async fn test_k8s_integration_confirm_same_with_matching_etag() {
1576+
let storage = setup_with_bucket(1024 * 1024 * 8, 1024 * 1024 * 8).await;
1577+
1578+
let test_data = "test data for etag validation";
1579+
let etag = storage
1580+
.put_bytes(
1581+
"test-confirm-same",
1582+
test_data.as_bytes().to_vec(),
1583+
PutOptions::default(),
1584+
)
1585+
.await
1586+
.unwrap()
1587+
.expect("put_bytes should return etag");
1588+
1589+
let result = storage
1590+
.confirm_same("test-confirm-same", &etag)
1591+
.await
1592+
.unwrap();
1593+
assert!(result, "confirm_same should return true for matching etag");
1594+
}
1595+
1596+
#[tokio::test]
1597+
async fn test_k8s_integration_confirm_same_with_non_matching_etag() {
1598+
let storage = setup_with_bucket(1024 * 1024 * 8, 1024 * 1024 * 8).await;
1599+
1600+
let test_data = "test data for etag validation";
1601+
let _etag = storage
1602+
.put_bytes(
1603+
"test-confirm-same",
1604+
test_data.as_bytes().to_vec(),
1605+
PutOptions::default(),
1606+
)
1607+
.await
1608+
.unwrap()
1609+
.expect("put_bytes should return etag");
1610+
1611+
let fake_etag = ETag("fake-etag-wont-match".to_string());
1612+
let result = storage
1613+
.confirm_same("test-confirm-same", &fake_etag)
1614+
.await
1615+
.unwrap();
1616+
assert!(
1617+
!result,
1618+
"confirm_same should return false for non-matching etag"
1619+
);
1620+
}
1621+
1622+
#[tokio::test]
1623+
async fn test_k8s_integration_confirm_same_with_nonexistent_file() {
1624+
let storage = setup_with_bucket(1024 * 1024 * 8, 1024 * 1024 * 8).await;
1625+
1626+
let fake_etag = ETag("fake-etag".to_string());
1627+
let result = storage.confirm_same("nonexistent-file", &fake_etag).await;
1628+
1629+
assert!(
1630+
result.is_err(),
1631+
"confirm_same should return error for nonexistent file"
1632+
);
1633+
match result.unwrap_err() {
1634+
StorageError::Generic { source: _ } => {
1635+
// This is expected - the head operation will fail on nonexistent file
1636+
}
1637+
other => panic!("Expected Generic error, got: {:?}", other),
1638+
}
1639+
}
15481640
}

0 commit comments

Comments
 (0)