Skip to content

Conversation

owen-d
Copy link
Member

@owen-d owen-d commented Apr 18, 2025

Add Filter Pipeline Implementation

This PR implements a filter pipeline for Loki's engine executor component. The filter operator enables row-level filtering of Arrow record batches based on boolean expressions. This is a fundamental component for query execution.

Changes

  • Implement NewFilterPipeline for evaluating filter predicates on record batches
  • Add filterBatch utility to efficiently filter Arrow records
  • Update executeFilter in the executor to use the new implementation
  • Add comprehensive test suite for the filter pipeline:
    • Basic literal predicates (true/false)
    • Column reference predicates
    • Empty batch handling
    • Multiple batch processing

Implementation Notes

The filter implementation follows the same pattern as the project pipeline, evaluating predicates against each record and creating a new filtered record based on the results. The implementation supports compound predicates with AND logic.

This implementation provides a foundation for more advanced filtering operations as the expression evaluator is enhanced to support more complex expressions.

@owen-d owen-d requested a review from a team as a code owner April 18, 2025 16:37
}

return errorPipeline(errNotImplemented)
return NewFilterPipeline(filter, inputs[0], &c.evaluator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The expression evaluator is stateless, so it does not require to pass a pointer.

// boolean filters are only used for filtering; they're not returned
// and must be released
// TODO: verify this once the evaluator implementation is fleshed out
col.Release()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a nil check here, in case not all columns where initialized.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so; it's initialized with capacity, but len=0. Length only increases when columns are added.

AssertPipelinesEqual(t, filterPipeline, expectedPipeline)
})

t.Run("filter using valid column directly", func(t *testing.T) {
Copy link
Contributor

@chaudum chaudum Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I found the test name confusing. The test is validating that you can filter on a boolean column with a simple ColumnExpr.

This implementation provides a foundation for more advanced filtering operations as the expression evaluator is enhanced to support more complex expressions.

👍

@chaudum chaudum merged commit cf54d50 into grafana:main Apr 24, 2025
61 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants