Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pkg/jobs/jobfrontier/frontier.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ func Get(
// InfoStorage keys are prefixed with "frontier/", the passed name, and then a
// chunk identifier.
func Store(
ctx context.Context, txn isql.Txn, jobID jobspb.JobID, name string, frontier span.Frontier,
ctx context.Context,
txn isql.Txn,
jobID jobspb.JobID,
name string,
frontier span.ReadOnlyFrontier,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One additional motivation for this change is that when we're accessing the sub-frontiers in a span.MultiFrontier, we only have a span.ReadOnlyFrontier and we'd like to be able to store the sub-frontiers with this function

) error {
return storeChunked(ctx, txn, jobID, name, frontier, 2<<20 /* 2mb */)
}
Expand All @@ -92,7 +96,7 @@ func storeChunked(
txn isql.Txn,
jobID jobspb.JobID,
name string,
frontier span.Frontier,
frontier span.ReadOnlyFrontier,
chunkSize int,
) error {
infoStorage := jobs.InfoStorageForJob(txn, jobID)
Expand Down
18 changes: 12 additions & 6 deletions pkg/util/span/frontier.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,14 @@ import (
// Frontier is not safe for concurrent modification, but MakeConcurrentFrontier
// can be used to make thread safe frontier.
type Frontier interface {
ReadOnlyFrontier

// AddSpansAt adds the provided spans to the frontier at the provided timestamp.
// If the span overlaps any spans already tracked by the frontier, the tree is adjusted
// to hold union of the span and the overlaps, with all entries assigned startAt starting
// timestamp.
AddSpansAt(startAt hlc.Timestamp, spans ...roachpb.Span) error

// Frontier returns the minimum timestamp being tracked.
Frontier() hlc.Timestamp

// PeekFrontierSpan returns one of the spans at the Frontier.
PeekFrontierSpan() roachpb.Span

// Forward advances the timestamp for a span. Any part of the span that doesn't
// overlap the tracked span set will be ignored. True is returned if the
// frontier advanced as a result.
Expand All @@ -49,6 +45,16 @@ type Frontier interface {
// letting a frontier be GCed is safe in that it won't cause a memory leak,
// but it will prevent frontier nodes from being efficiently re-used.
Release()
}

// ReadOnlyFrontier is a subset of Frontier with only the methods
// that are read-only.
type ReadOnlyFrontier interface {
// Frontier returns the minimum timestamp being tracked.
Frontier() hlc.Timestamp

// PeekFrontierSpan returns one of the spans at the Frontier.
PeekFrontierSpan() roachpb.Span

// Entries returns an iterator over the entries in the frontier.
// Updates to the frontier are restricted until iteration is stopped.
Expand Down
13 changes: 0 additions & 13 deletions pkg/util/span/multi_frontier.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,19 +241,6 @@ func (f *MultiFrontier[P]) String() string {
return buf.String()
}

// ReadOnlyFrontier is a subset of Frontier with only the methods
// that are read-only.
type ReadOnlyFrontier interface {
Frontier() hlc.Timestamp
PeekFrontierSpan() roachpb.Span
Entries() iter.Seq2[roachpb.Span, hlc.Timestamp]
SpanEntries(span roachpb.Span) iter.Seq2[roachpb.Span, hlc.Timestamp]
Len() int
String() string
}

var _ ReadOnlyFrontier = Frontier(nil)

// Frontiers returns an iterator over the sub-frontiers (with read-only access).
func (f *MultiFrontier[P]) Frontiers() iter.Seq2[P, ReadOnlyFrontier] {
return func(yield func(P, ReadOnlyFrontier) bool) {
Expand Down