diff --git a/Cargo.lock b/Cargo.lock index 4630c559d0d..8fbe48acfb1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4760,7 +4760,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -9148,9 +9148,9 @@ dependencies = [ [[package]] name = "utoipa-swagger-ui" -version = "9.0.0" +version = "9.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "161166ec520c50144922a625d8bc4925cc801b2dda958ab69878527c0e5c5d61" +checksum = "d047458f1b5b65237c2f6dc6db136945667f40a7668627b3490b9513a3d43a55" dependencies = [ "axum 0.8.1", "base64 0.22.1", @@ -9161,7 +9161,7 @@ dependencies = [ "serde_json", "url", "utoipa", - "zip", + "zip 3.0.0", ] [[package]] @@ -10180,6 +10180,20 @@ dependencies = [ "zstd", ] +[[package]] +name = "zip" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12598812502ed0105f607f941c386f43d441e00148fce9dec3ca5ffb0bde9308" +dependencies = [ + "arbitrary", + "crc32fast", + "flate2", + "indexmap 2.6.0", + "memchr", + "zopfli", +] + [[package]] name = "zip-extract" version = "0.2.1" @@ -10188,7 +10202,7 @@ checksum = "25a8c9e90f27d1435088a7b540b6cc8ae6ee525d992a695f16012d2f365b3d3c" dependencies = [ "log", "thiserror 1.0.69", - "zip", + "zip 2.2.2", ] [[package]] diff --git a/rust/garbage_collector/src/garbage_collector_component.rs b/rust/garbage_collector/src/garbage_collector_component.rs index d1994295684..a45b5e3c449 100644 --- a/rust/garbage_collector/src/garbage_collector_component.rs +++ b/rust/garbage_collector/src/garbage_collector_component.rs @@ -661,6 +661,7 @@ mod tests { tracing::info!( attempt, max_attempts, + collection_id, "Waiting for new version to be created..." ); diff --git a/rust/garbage_collector/src/garbage_collector_orchestrator.rs b/rust/garbage_collector/src/garbage_collector_orchestrator.rs index 10651140449..9c16e67c312 100644 --- a/rust/garbage_collector/src/garbage_collector_orchestrator.rs +++ b/rust/garbage_collector/src/garbage_collector_orchestrator.rs @@ -552,6 +552,7 @@ mod tests { tracing::info!( attempt, max_attempts, + collection_id, "Waiting for new version to be created..." ); diff --git a/rust/worker/src/execution/operators/fetch_log.rs b/rust/worker/src/execution/operators/fetch_log.rs index 3e52171da74..cc63846b1f1 100644 --- a/rust/worker/src/execution/operators/fetch_log.rs +++ b/rust/worker/src/execution/operators/fetch_log.rs @@ -77,100 +77,61 @@ impl Operator for FetchLogOperator { ); let mut log_client = self.log_client.clone(); - let limit_offset = log_client + let mut limit_offset = log_client .scout_logs(&self.tenant, self.collection_uuid, self.start_log_offset_id) .await - .ok(); + .inspect_err(|err| { + tracing::error!("could not pull logs: {err:?}"); + })?; let mut fetched = Vec::new(); let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos() as i64; - if let Some(mut limit_offset) = limit_offset { - tracing::debug!( - "taking new code path with range [{}, {})", - self.start_log_offset_id, - limit_offset + if let Some(maximum_fetch_count) = self.maximum_fetch_count { + limit_offset = std::cmp::min( + limit_offset, + self.start_log_offset_id + maximum_fetch_count as u64, ); - if let Some(maximum_fetch_count) = self.maximum_fetch_count { - limit_offset = std::cmp::min( - limit_offset, - self.start_log_offset_id + maximum_fetch_count as u64, - ); - } - let window_size: usize = self.batch_size as usize; - let ranges = (self.start_log_offset_id..limit_offset) - .step_by(window_size) - .map(|x| (x, std::cmp::min(x + window_size as u64, limit_offset))) - .collect::>(); - let sema = Arc::new(tokio::sync::Semaphore::new(10)); - let batch_readers = ranges - .into_iter() - .map(|(start, limit)| { - let mut log_client = log_client.clone(); - let collection_uuid = self.collection_uuid; - let num_records = (limit - start) as i32; - let start = start as i64; - let sema = Arc::clone(&sema); - async move { - let _permit = sema.acquire().await.unwrap(); - log_client - .read( - &self.tenant, - collection_uuid, - start, - num_records, - Some(timestamp), - ) - .await - } - }) - .collect::>(); - let batches = futures::future::join_all(batch_readers).await; - for batch in batches { - match batch { - Ok(batch) => fetched.extend(batch), - Err(err) => { - return Err(FetchLogError::PullLog(Box::new(err))); - } - } - } - fetched.sort_by_key(|f| f.log_offset); - Ok(Chunk::new(fetched.into())) - } else { - // old behavior that we fall back to if the scout is not implemented - let mut offset = self.start_log_offset_id as i64; - loop { - let mut log_batch = log_client - .read( - &self.tenant, - self.collection_uuid, - offset, - self.batch_size as i32, - Some(timestamp), - ) - .await?; - - let retrieve_count = log_batch.len(); + } - if let Some(last_log) = log_batch.last() { - offset = last_log.log_offset + 1; - fetched.append(&mut log_batch); - if let Some(limit) = self.maximum_fetch_count { - if fetched.len() >= limit as usize { - // Enough logs have been fetched - fetched.truncate(limit as usize); - break; - } - } + let window_size: usize = self.batch_size as usize; + let ranges = (self.start_log_offset_id..limit_offset) + .step_by(window_size) + .map(|x| (x, std::cmp::min(x + window_size as u64, limit_offset))) + .collect::>(); + let sema = Arc::new(tokio::sync::Semaphore::new(10)); + let batch_readers = ranges + .into_iter() + .map(|(start, limit)| { + let mut log_client = log_client.clone(); + let collection_uuid = self.collection_uuid; + let num_records = (limit - start) as i32; + let start = start as i64; + let sema = Arc::clone(&sema); + async move { + let _permit = sema.acquire().await.unwrap(); + log_client + .read( + &self.tenant, + collection_uuid, + start, + num_records, + Some(timestamp), + ) + .await } - - if retrieve_count < self.batch_size as usize { - // No more logs to fetch - break; + }) + .collect::>(); + let batches = futures::future::join_all(batch_readers).await; + for batch in batches { + match batch { + Ok(batch) => fetched.extend(batch), + Err(err) => { + return Err(FetchLogError::PullLog(Box::new(err))); } } - tracing::info!(name: "Fetched log records", num_records = fetched.len()); - Ok(Chunk::new(fetched.into())) } + fetched.sort_by_key(|f| f.log_offset); + Ok(Chunk::new(fetched.into())) } }