@@ -283,7 +283,7 @@ impl StreamDispatcherWorker {
283
283
let mut batch_sink = None ;
284
284
loop {
285
285
let ( batch, ack_rx, last_op_id, hwms) = select ! {
286
- batch = self . build_batch( & mut operation_update_source) , if batch_sink. is_some( ) && ! self . listeners . is_empty ( ) => batch,
286
+ batch = self . build_batch( & mut operation_update_source) , if batch_sink. is_some( ) => batch,
287
287
Some ( change) = state_change_source. recv( ) => {
288
288
match change {
289
289
StreamDispatcherStateChange :: NewSettings ( settings) => {
@@ -386,7 +386,7 @@ impl StreamDispatcherWorker {
386
386
}
387
387
while events. len ( ) < self . batch_size {
388
388
select ! {
389
- _ = self . collect_contract_events( hwms, events) => { }
389
+ _ = self . collect_contract_events( hwms, events) , if ! self . listeners . is_empty ( ) => { }
390
390
Ok ( ( ) ) = operation_update_source. changed( ) => {
391
391
self . collect_operation_events( last_op, events) . await ;
392
392
}
@@ -542,13 +542,15 @@ mod tests {
542
542
use crate :: {
543
543
blockchain:: BlockchainClient ,
544
544
contracts:: ContractManager ,
545
+ operations:: { Operation , OperationId , OperationStatus } ,
545
546
persistence,
546
547
streams:: {
547
- BlockReference , Listener , ListenerFilter , ListenerType , Stream , mux:: Multiplexer ,
548
+ BatchEvent , BlockReference , Listener , ListenerFilter , ListenerType , Stream ,
549
+ mux:: Multiplexer ,
548
550
} ,
549
551
} ;
550
552
use firefly_server:: apitypes:: ApiError ;
551
- use tokio:: sync:: watch;
553
+ use tokio:: { sync:: watch, time :: timeout } ;
552
554
553
555
#[ tokio:: test]
554
556
async fn should_ack_events ( ) -> Result < ( ) , ApiError > {
@@ -649,4 +651,59 @@ mod tests {
649
651
650
652
Ok ( ( ) )
651
653
}
654
+
655
+ #[ tokio:: test]
656
+ async fn should_surface_operation_updates_without_listeners ( ) -> Result < ( ) , ApiError > {
657
+ let blockchain = Arc :: new ( BlockchainClient :: mock ( ) . await ) ;
658
+ let contracts = Arc :: new ( ContractManager :: none ( ) ) ;
659
+ let persistence = persistence:: init ( & persistence:: PersistenceConfig :: Mock ) . await ?;
660
+ let operation_update_sink = watch:: Sender :: new ( None ) ;
661
+ let mux = Multiplexer :: new (
662
+ blockchain. clone ( ) ,
663
+ contracts,
664
+ persistence. clone ( ) ,
665
+ operation_update_sink. clone ( ) ,
666
+ )
667
+ . await ?;
668
+
669
+ let stream = Stream {
670
+ id : "stream_id" . to_string ( ) . into ( ) ,
671
+ name : "Some Stream" . into ( ) ,
672
+ batch_size : 5 ,
673
+ batch_timeout : Duration :: from_millis ( 100 ) ,
674
+ } ;
675
+ persistence. write_stream ( & stream) . await ?;
676
+ mux. handle_stream_write ( & stream) . await ?;
677
+
678
+ let mut subscription = mux. subscribe ( "Some Stream" ) . await ?;
679
+
680
+ let operation = Operation {
681
+ id : OperationId :: from ( "op" . to_string ( ) ) ,
682
+ status : OperationStatus :: Pending ,
683
+ tx_id : None ,
684
+ contract_address : None ,
685
+ } ;
686
+
687
+ let update_id = persistence. write_operation ( & operation) . await ?;
688
+ operation_update_sink. send_replace ( Some ( update_id. clone ( ) ) ) ;
689
+
690
+ // We receive a new batch with the operation update
691
+ let batch = subscription. recv ( ) . await . unwrap ( ) ;
692
+ assert_eq ! ( batch. batch_number, 1 ) ;
693
+ assert_eq ! ( batch. events, vec![ BatchEvent :: Receipt ( operation) ] ) ;
694
+ batch. ack ( ) ;
695
+
696
+ // We don't receive any additional batches
697
+ assert ! (
698
+ timeout( Duration :: from_millis( 500 ) , subscription. recv( ) )
699
+ . await
700
+ . is_err( )
701
+ ) ;
702
+
703
+ // The checkpoint is updated
704
+ let checkpoint = persistence. read_checkpoint ( & stream. id ) . await ?. unwrap ( ) ;
705
+ assert_eq ! ( checkpoint. last_operation_id, Some ( update_id) ) ;
706
+
707
+ Ok ( ( ) )
708
+ }
652
709
}
0 commit comments