Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 57 additions & 14 deletions rust/log-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1129,28 +1129,71 @@ impl LogServer {
let collection_id = Uuid::parse_str(&scout_logs.collection_id)
.map(CollectionUuid)
.map_err(|_| Status::invalid_argument("Failed to parse collection id"))?;

let prefix = collection_id.storage_prefix_for_log();
let log_reader = LogReader::new(
self.config.reader.clone(),
Arc::clone(&self.storage),
prefix,
);
let (start_position, limit_position) = match log_reader.manifest().await {
Ok(Some(manifest)) => (manifest.oldest_timestamp(), manifest.next_write_timestamp()),
Ok(None) => (LogPosition::from_offset(1), LogPosition::from_offset(1)),
Err(wal3::Error::UninitializedLog) => {
return Err(Status::not_found(format!(
"collection {collection_id} not found"
)));
let cache_key = cache_key_for_manifest_and_etag(collection_id);
let mut cached_manifest_and_e_tag = None;
if let Some(cache) = self.cache.as_ref() {
if let Some(cache_bytes) = cache.get(&cache_key).await.ok().flatten() {
let met = serde_json::from_slice::<ManifestAndETag>(&cache_bytes.bytes).ok();
cached_manifest_and_e_tag = met;
}
Err(err) => {
return Err(Status::new(
err.code().into(),
format!("could not scout logs: {err:?}"),
));
}
// NOTE(rescrv): We verify and if verification fails, we take the cached manifest to fall
// back to the uncached path.
if let Some(cached) = cached_manifest_and_e_tag.as_ref() {
// Here's the linearization point. We have a cached manifest and e_tag.
//
// If we verify (perform a head), then statistically speaking, the manifest and e_tag
// we have in hand is identical (barring md5 collision) to the manifest and e_tag on
// storage. We can use the cached manifest and e_tag in this case because it is the
// identical flow whether we read the whole manifest from storage or whether we pretend
// to read it/verify it with a HEAD and then read out of cache.
if !log_reader.verify(cached).await.unwrap_or_default() {
cached_manifest_and_e_tag.take();
}
Comment on lines +1156 to 1158
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

The use of unwrap_or_default() here effectively treats any error during manifest verification as a cache miss. While this is a safe fallback, it hides potentially important errors (e.g., network issues, S3 permissions) that could indicate a deeper problem. Logging these errors would improve observability and help diagnose issues that might cause frequent cache misses.

Suggested change
if !log_reader.verify(cached).await.unwrap_or_default() {
cached_manifest_and_e_tag.take();
}
match log_reader.verify(cached).await {
Ok(true) => (), // All good, manifest is fresh.
Ok(false) => {
// Stale manifest, invalidate.
cached_manifest_and_e_tag.take();
}
Err(err) => {
tracing::warn!(
"Failed to verify cached manifest for collection {}: {}. Falling back to full fetch.",
collection_id,
err
);
cached_manifest_and_e_tag.take();
}
}

Committable suggestion

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Context for Agents
[**BestPractice**]

The use of `unwrap_or_default()` here effectively treats any error during manifest verification as a cache miss. While this is a safe fallback, it hides potentially important errors (e.g., network issues, S3 permissions) that could indicate a deeper problem. Logging these errors would improve observability and help diagnose issues that might cause frequent cache misses.

```suggestion
            match log_reader.verify(cached).await {
                Ok(true) => (), // All good, manifest is fresh.
                Ok(false) => {
                    // Stale manifest, invalidate.
                    cached_manifest_and_e_tag.take();
                }
                Err(err) => {
                    tracing::warn!(
                        "Failed to verify cached manifest for collection {}: {}. Falling back to full fetch.",
                        collection_id,
                        err
                    );
                    cached_manifest_and_e_tag.take();
                }
            }
```

⚡ **Committable suggestion**

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

File: rust/log-service/src/lib.rs
Line: 1151

};
}
let (start_position, limit_position) =
if let Some(manifest_and_e_tag) = cached_manifest_and_e_tag {
(
manifest_and_e_tag.manifest.oldest_timestamp(),
manifest_and_e_tag.manifest.next_write_timestamp(),
)
} else {
let (start_position, limit_position) = match log_reader.manifest_and_e_tag().await {
Ok(Some(manifest_and_e_tag)) => {
if let Some(cache) = self.cache.as_ref() {
let json = serde_json::to_string(&manifest_and_e_tag)
.map_err(|err| Status::unknown(err.to_string()))?;
let cached_bytes = CachedBytes {
bytes: Vec::from(json),
};
cache.insert(cache_key, cached_bytes).await;
}
(
manifest_and_e_tag.manifest.oldest_timestamp(),
manifest_and_e_tag.manifest.next_write_timestamp(),
)
}
Ok(None) => (LogPosition::from_offset(1), LogPosition::from_offset(1)),
Err(wal3::Error::UninitializedLog) => {
return Err(Status::not_found(format!(
"collection {collection_id} not found"
)));
}
Err(err) => {
return Err(Status::new(
err.code().into(),
format!("could not scout logs: {err:?}"),
));
}
};
(start_position, limit_position)
};
let start_offset = start_position.offset() as i64;
let limit_offset = limit_position.offset() as i64;
Ok(Response::new(ScoutLogsResponse {
Expand Down
17 changes: 17 additions & 0 deletions rust/storage/src/admissioncontrolleds3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,23 @@ impl AdmissionControlledS3Storage {
.await
}

pub async fn confirm_same(&self, key: &str, e_tag: &ETag) -> Result<bool, StorageError> {
self.metrics.nac_outstanding_read_requests.record(
self.metrics
.outstanding_read_requests
.load(Ordering::Relaxed) as u64,
&self.metrics.hostname_attribute,
);
self.metrics
.outstanding_read_requests
.fetch_add(1, Ordering::Relaxed);
let res = self.storage.confirm_same(key, e_tag).await;
self.metrics
.outstanding_read_requests
.fetch_sub(1, Ordering::Relaxed);
res
}

async fn execute_fetch<FetchReturn, FetchFn, FetchFut>(
fetch_fn: FetchFn,
input: Result<(Arc<Vec<u8>>, Option<ETag>), StorageError>,
Expand Down
13 changes: 13 additions & 0 deletions rust/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,19 @@ impl Storage {
}
}

// NOTE(rescrv): Returns Ok(true) if the file is definitely the same. Returns Ok(false) if
// the file cannot be confirmed to be the same but it exists. Returns Err on error. It is up
// to the user to know how they are confirming the same and to react to Ok(false) even if the
// file is definitely the same file on storage.
pub async fn confirm_same(&self, key: &str, e_tag: &ETag) -> Result<bool, StorageError> {
match self {
Storage::ObjectStore(object_store) => object_store.confirm_same(key, e_tag).await,
Storage::S3(s3) => s3.confirm_same(key, e_tag).await,
Storage::Local(local) => local.confirm_same(key, e_tag).await,
Storage::AdmissionControlledS3(as3) => as3.confirm_same(key, e_tag).await,
}
}

pub async fn put_file(
&self,
key: &str,
Expand Down
4 changes: 4 additions & 0 deletions rust/storage/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ impl LocalStorage {
Ok((bytes, Some(etag)))
}

pub async fn confirm_same(&self, _: &str, _: &ETag) -> Result<bool, StorageError> {
Err(StorageError::NotImplemented)
}
Comment on lines +70 to +72
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[TestCoverage]

This NotImplemented error will cause cache verification to always fail for local storage, preventing tests from exercising the cache-hit path. You could implement this using the existing etag_for_bytes helper to make local storage tests more realistic.

Suggested change
pub async fn confirm_same(&self, _: &str, _: &ETag) -> Result<bool, StorageError> {
Err(StorageError::NotImplemented)
}
pub async fn confirm_same(&self, key: &str, e_tag: &ETag) -> Result<bool, StorageError> {
match self.get(key).await {
Ok(bytes) => {
let current_etag = Self::etag_for_bytes(&bytes);
Ok(&current_etag == e_tag)
}
Err(StorageError::NotFound { .. }) => Ok(false),
Err(e) => Err(e),
}
}

Committable suggestion

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.


pub async fn put_bytes(
&self,
key: &str,
Expand Down
4 changes: 4 additions & 0 deletions rust/storage/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ impl ObjectStore {
Err(StorageError::NotImplemented)
}

pub async fn confirm_same(&self, _: &str, _: &ETag) -> Result<bool, StorageError> {
Err(StorageError::NotImplemented)
}
Comment on lines +156 to +158
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

Similar to the LocalStorage implementation, returning NotImplemented here prevents testing the cache verification path for this storage backend. The underlying object_store crate supports head requests which return an ETag, so this could be implemented.

Suggested change
pub async fn confirm_same(&self, _: &str, _: &ETag) -> Result<bool, StorageError> {
Err(StorageError::NotImplemented)
}
pub async fn confirm_same(&self, key: &str, e_tag: &ETag) -> Result<bool, StorageError> {
match self.object_store.head(&object_store::path::Path::from(key)).await {
Ok(meta) => Ok(meta.e_tag == Some(e_tag.0.clone())),
Err(object_store::Error::NotFound { .. }) => Ok(false),
Err(e) => Err(e.into()),
}
}

Committable suggestion

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.


pub async fn get_parallel(&self, key: &str) -> Result<Arc<Vec<u8>>, StorageError> {
let meta = self.object_store.head(&Path::from(key)).await?;
let file_size = meta.size;
Expand Down
92 changes: 92 additions & 0 deletions rust/storage/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,31 @@ impl S3Storage {
}
}

#[allow(clippy::type_complexity)]
pub async fn confirm_same(&self, key: &str, e_tag: &ETag) -> Result<bool, StorageError> {
let res = self
.client
.head_object()
.bucket(self.bucket.clone())
.key(key)
.send()
.await;
match res {
Ok(res) => Ok(res.e_tag() == Some(&e_tag.0)),
Err(e) => match e {
SdkError::ServiceError(err) => {
let inner = err.into_err();
Err(StorageError::Generic {
source: Arc::new(inner),
})
}
_ => Err(StorageError::Generic {
source: Arc::new(e),
}),
},
}
}

#[allow(clippy::type_complexity)]
async fn get_stream_and_e_tag(
&self,
Expand Down Expand Up @@ -1516,4 +1541,71 @@ mod tests {
eprintln!("Successfully deleted: {:#?}", delete_result.deleted);
eprintln!("Errors for non-existent files: {:#?}", delete_result.errors);
}

#[tokio::test]
async fn test_k8s_integration_confirm_same_with_matching_etag() {
let storage = setup_with_bucket(1024 * 1024 * 8, 1024 * 1024 * 8).await;

let test_data = "test data for etag validation";
let etag = storage
.put_bytes(
"test-confirm-same",
test_data.as_bytes().to_vec(),
PutOptions::default(),
)
.await
.unwrap()
.expect("put_bytes should return etag");

let result = storage
.confirm_same("test-confirm-same", &etag)
.await
.unwrap();
assert!(result, "confirm_same should return true for matching etag");
}

#[tokio::test]
async fn test_k8s_integration_confirm_same_with_non_matching_etag() {
let storage = setup_with_bucket(1024 * 1024 * 8, 1024 * 1024 * 8).await;

let test_data = "test data for etag validation";
let _etag = storage
.put_bytes(
"test-confirm-same",
test_data.as_bytes().to_vec(),
PutOptions::default(),
)
.await
.unwrap()
.expect("put_bytes should return etag");

let fake_etag = ETag("fake-etag-wont-match".to_string());
let result = storage
.confirm_same("test-confirm-same", &fake_etag)
.await
.unwrap();
assert!(
!result,
"confirm_same should return false for non-matching etag"
);
}

#[tokio::test]
async fn test_k8s_integration_confirm_same_with_nonexistent_file() {
let storage = setup_with_bucket(1024 * 1024 * 8, 1024 * 1024 * 8).await;

let fake_etag = ETag("fake-etag".to_string());
let result = storage.confirm_same("nonexistent-file", &fake_etag).await;

assert!(
result.is_err(),
"confirm_same should return error for nonexistent file"
);
match result.unwrap_err() {
StorageError::Generic { source: _ } => {
// This is expected - the head operation will fail on nonexistent file
}
other => panic!("Expected Generic error, got: {:?}", other),
}
}
}
Loading
Loading