Skip to content

Commit 738103d

Browse files
committed
Implement checkpoints and catchup
1 parent 15624d0 commit 738103d

File tree

6 files changed

+268
-115
lines changed

6 files changed

+268
-115
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

firefly-cardanoconnect/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ firefly-server = { path = "../firefly-server" }
1818
hex = "0.4"
1919
minicbor = "0.20"
2020
rand = "0.8"
21+
rand_chacha = "0.3"
2122
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
2223
schemars = "0.8"
2324
serde = "1"

firefly-cardanoconnect/src/persistence.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ use dashmap::DashMap;
33
use firefly_server::apitypes::{ApiError, ApiResult};
44
use tokio::sync::Mutex;
55

6-
use crate::streams::{Listener, ListenerId, Stream, StreamId};
6+
use crate::streams::{Listener, ListenerId, Stream, StreamCheckpoint, StreamId};
77

88
#[derive(Default)]
99
pub struct Persistence {
1010
all_streams: Mutex<Vec<Stream>>,
1111
all_listeners: DashMap<StreamId, Vec<Listener>>,
12+
all_checkpoints: DashMap<StreamId, StreamCheckpoint>,
1213
}
1314

1415
impl Persistence {
@@ -45,6 +46,7 @@ impl Persistence {
4546
streams.retain(|it| it.id != *id);
4647

4748
self.all_listeners.remove(id);
49+
self.all_checkpoints.remove(id);
4850

4951
Ok(())
5052
}
@@ -122,4 +124,25 @@ impl Persistence {
122124
.collect();
123125
Ok(listeners)
124126
}
127+
128+
pub async fn write_checkpoint(&self, checkpoint: &StreamCheckpoint) -> ApiResult<()> {
129+
// check stream existence by checking if we have a (possibly empty) list of listeners for it,
130+
// because that's cheaper than locking the streams list
131+
if !self.all_listeners.contains_key(&checkpoint.stream_id) {
132+
return Err(ApiError::not_found("No stream found with that ID"));
133+
}
134+
self.all_checkpoints
135+
.insert(checkpoint.stream_id.clone(), checkpoint.clone());
136+
Ok(())
137+
}
138+
139+
pub async fn read_checkpoint(
140+
&self,
141+
stream_id: &StreamId,
142+
) -> ApiResult<Option<StreamCheckpoint>> {
143+
match self.all_checkpoints.get(stream_id) {
144+
Some(checkpoint) => Ok(Some(checkpoint.clone())),
145+
None => Ok(None),
146+
}
147+
}
125148
}

firefly-cardanoconnect/src/streams/manager.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ impl StreamManager {
3939
batch_timeout,
4040
};
4141
self.persistence.write_stream(&stream).await?;
42-
self.mux.handle_stream_write(&stream).await;
42+
self.mux.handle_stream_write(&stream).await?;
4343
Ok(stream)
4444
}
4545

@@ -74,7 +74,7 @@ impl StreamManager {
7474
stream.batch_timeout = timeout;
7575
}
7676
self.persistence.write_stream(&stream).await?;
77-
self.mux.handle_stream_write(&stream).await;
77+
self.mux.handle_stream_write(&stream).await?;
7878
Ok(stream)
7979
}
8080

@@ -103,7 +103,7 @@ impl StreamManager {
103103
stream_id: stream_id.clone(),
104104
};
105105
self.persistence.write_listener(&listener).await?;
106-
self.mux.handle_listener_write(&listener).await;
106+
self.mux.handle_listener_write(&listener).await?;
107107
Ok(listener)
108108
}
109109

@@ -140,7 +140,7 @@ impl StreamManager {
140140
) -> ApiResult<()> {
141141
self.mux
142142
.handle_listener_delete(stream_id, listener_id)
143-
.await;
143+
.await?;
144144
self.persistence
145145
.delete_listener(stream_id, listener_id)
146146
.await?;

0 commit comments

Comments
 (0)