Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

127 changes: 44 additions & 83 deletions rust/worker/src/execution/operators/fetch_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,100 +77,61 @@ impl Operator<FetchLogInput, FetchLogOutput> for FetchLogOperator {
);

let mut log_client = self.log_client.clone();
let limit_offset = log_client
let mut limit_offset = log_client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

The variable name limit_offset could be more descriptive. Based on the knowledge base guideline about variable naming, consider renaming it to specify what type of identifier this represents (e.g., limit_log_offset or max_log_offset) to improve readability and maintainability.

Context for Agents
[**BestPractice**]

The variable name `limit_offset` could be more descriptive. Based on the knowledge base guideline about variable naming, consider renaming it to specify what type of identifier this represents (e.g., `limit_log_offset` or `max_log_offset`) to improve readability and maintainability.

File: rust/worker/src/execution/operators/fetch_log.rs
Line: 80

.scout_logs(&self.tenant, self.collection_uuid, self.start_log_offset_id)
.await
.ok();
.inspect_err(|err| {
tracing::error!("could not pull logs: {err:?}");
})?;
let mut fetched = Vec::new();
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos() as i64;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes starting at this point are all whitespace.

if let Some(mut limit_offset) = limit_offset {
tracing::debug!(
"taking new code path with range [{}, {})",
self.start_log_offset_id,
limit_offset
if let Some(maximum_fetch_count) = self.maximum_fetch_count {
limit_offset = std::cmp::min(
limit_offset,
self.start_log_offset_id + maximum_fetch_count as u64,
);
if let Some(maximum_fetch_count) = self.maximum_fetch_count {
limit_offset = std::cmp::min(
limit_offset,
self.start_log_offset_id + maximum_fetch_count as u64,
);
}
let window_size: usize = self.batch_size as usize;
let ranges = (self.start_log_offset_id..limit_offset)
.step_by(window_size)
.map(|x| (x, std::cmp::min(x + window_size as u64, limit_offset)))
.collect::<Vec<_>>();
let sema = Arc::new(tokio::sync::Semaphore::new(10));
let batch_readers = ranges
.into_iter()
.map(|(start, limit)| {
let mut log_client = log_client.clone();
let collection_uuid = self.collection_uuid;
let num_records = (limit - start) as i32;
let start = start as i64;
let sema = Arc::clone(&sema);
async move {
let _permit = sema.acquire().await.unwrap();
log_client
.read(
&self.tenant,
collection_uuid,
start,
num_records,
Some(timestamp),
)
.await
}
})
.collect::<Vec<_>>();
let batches = futures::future::join_all(batch_readers).await;
for batch in batches {
match batch {
Ok(batch) => fetched.extend(batch),
Err(err) => {
return Err(FetchLogError::PullLog(Box::new(err)));
}
}
}
fetched.sort_by_key(|f| f.log_offset);
Ok(Chunk::new(fetched.into()))
} else {
// old behavior that we fall back to if the scout is not implemented
let mut offset = self.start_log_offset_id as i64;
loop {
let mut log_batch = log_client
.read(
&self.tenant,
self.collection_uuid,
offset,
self.batch_size as i32,
Some(timestamp),
)
.await?;

let retrieve_count = log_batch.len();
}

if let Some(last_log) = log_batch.last() {
offset = last_log.log_offset + 1;
fetched.append(&mut log_batch);
if let Some(limit) = self.maximum_fetch_count {
if fetched.len() >= limit as usize {
// Enough logs have been fetched
fetched.truncate(limit as usize);
break;
}
}
let window_size: usize = self.batch_size as usize;
let ranges = (self.start_log_offset_id..limit_offset)
.step_by(window_size)
.map(|x| (x, std::cmp::min(x + window_size as u64, limit_offset)))
.collect::<Vec<_>>();
let sema = Arc::new(tokio::sync::Semaphore::new(10));
let batch_readers = ranges
.into_iter()
.map(|(start, limit)| {
let mut log_client = log_client.clone();
let collection_uuid = self.collection_uuid;
let num_records = (limit - start) as i32;
let start = start as i64;
let sema = Arc::clone(&sema);
async move {
let _permit = sema.acquire().await.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

Potential panic from semaphore acquisition: sema.acquire().await.unwrap() will panic if the semaphore is closed, which can happen during async task cancellation or if the semaphore is dropped while permits are being acquired. Replace with proper error handling:

let _permit = sema.acquire().await.map_err(|_| {
    // Convert semaphore error to appropriate error type
    FetchLogError::PullLog(Box::new(/* appropriate error */))
})?;

Or if you want to fail fast on semaphore issues, at least log before panicking.

Context for Agents
[**BestPractice**]

Potential panic from semaphore acquisition: `sema.acquire().await.unwrap()` will panic if the semaphore is closed, which can happen during async task cancellation or if the semaphore is dropped while permits are being acquired. Replace with proper error handling:

```rust
let _permit = sema.acquire().await.map_err(|_| {
    // Convert semaphore error to appropriate error type
    FetchLogError::PullLog(Box::new(/* appropriate error */))
})?;
```

Or if you want to fail fast on semaphore issues, at least log before panicking.

File: rust/worker/src/execution/operators/fetch_log.rs
Line: 108

log_client
.read(
&self.tenant,
collection_uuid,
start,
num_records,
Some(timestamp),
)
.await
}

if retrieve_count < self.batch_size as usize {
// No more logs to fetch
break;
})
.collect::<Vec<_>>();
let batches = futures::future::join_all(batch_readers).await;
for batch in batches {
match batch {
Ok(batch) => fetched.extend(batch),
Err(err) => {
return Err(FetchLogError::PullLog(Box::new(err)));
}
}
tracing::info!(name: "Fetched log records", num_records = fetched.len());
Ok(Chunk::new(fetched.into()))
}
fetched.sort_by_key(|f| f.log_offset);
Ok(Chunk::new(fetched.into()))
}
}

Expand Down
Loading