Skip to content

Commit d7003ba

Browse files
committed
Add support for subscribing to callbacks that return Task
1 parent fcd679e commit d7003ba

File tree

3 files changed

+141
-8
lines changed

3 files changed

+141
-8
lines changed

SteamKit2/SteamKit2/Steam/SteamClient/CallbackMgr/CallbackMgr.cs

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void RunWaitCallbacks()
106106
public async Task RunWaitCallbackAsync( CancellationToken cancellationToken = default )
107107
{
108108
var call = await client.WaitForCallbackAsync( cancellationToken );
109-
Handle( call );
109+
await HandleAsync( call );
110110
}
111111

112112
/// <summary>
@@ -139,6 +139,36 @@ public IDisposable Subscribe<TCallback>( Action<TCallback> callbackFunc )
139139
return Subscribe( JobID.Invalid, callbackFunc );
140140
}
141141

142+
/// <summary>
143+
/// Registers the provided <see cref="Func{T, Task}"/> to receive callbacks of type <typeparamref name="TCallback" />.
144+
/// </summary>
145+
/// <param name="jobID">The <see cref="JobID"/> of the callbacks that should be subscribed to.
146+
/// If this is <see cref="JobID.Invalid"/>, all callbacks of type <typeparamref name="TCallback" /> will be received.</param>
147+
/// <param name="callbackFunc">The function to invoke with the callback.</param>
148+
/// <typeparam name="TCallback">The type of callback to subscribe to.</typeparam>
149+
/// <remarks>When subscribing to asynchronous methods, <see cref="RunWaitCallbackAsync"/> should be used for awaiting callbacks.</remarks>
150+
/// <returns>An <see cref="IDisposable"/>. Disposing of the return value will unsubscribe the <paramref name="callbackFunc"/>.</returns>
151+
public IDisposable Subscribe<TCallback>( JobID jobID, Func<TCallback, Task> callbackFunc ) where TCallback : CallbackMsg
152+
{
153+
ArgumentNullException.ThrowIfNull( jobID );
154+
ArgumentNullException.ThrowIfNull( callbackFunc );
155+
156+
var callback = new Internal.AsyncCallback<TCallback>( callbackFunc, this, jobID );
157+
return callback;
158+
}
159+
160+
/// <summary>
161+
/// Registers the provided <see cref="Func{T, Task}"/> to receive callbacks of type <typeparam name="TCallback" />.
162+
/// </summary>
163+
/// <param name="callbackFunc">The function to invoke with the callback.</param>
164+
/// <remarks>When subscribing to asynchronous methods, <see cref="RunWaitCallbackAsync"/> should be used for awaiting callbacks.</remarks>
165+
/// <returns>An <see cref="IDisposable"/>. Disposing of the return value will unsubscribe the <paramref name="callbackFunc"/>.</returns>
166+
public IDisposable Subscribe<TCallback>( Func<TCallback, Task> callbackFunc )
167+
where TCallback : CallbackMsg
168+
{
169+
return Subscribe( JobID.Invalid, callbackFunc );
170+
}
171+
142172
/// <summary>
143173
/// Registers the provided <see cref="Action{T}"/> to receive callbacks for notifications from the service of type <typeparam name="TService" />
144174
/// with the notification message of type <typeparam name="TNotification"></typeparam>.
@@ -191,7 +221,28 @@ void Handle( CallbackMsg call )
191221
{
192222
if ( callback.CallbackType.IsAssignableFrom( type ) )
193223
{
194-
callback.Run( call );
224+
var task = callback.Run( call );
225+
task?.Wait();
226+
}
227+
}
228+
}
229+
230+
async Task HandleAsync( CallbackMsg call )
231+
{
232+
var callbacks = registeredCallbacks;
233+
var type = call.GetType();
234+
235+
// find handlers interested in this callback
236+
foreach ( var callback in callbacks )
237+
{
238+
if ( callback.CallbackType.IsAssignableFrom( type ) )
239+
{
240+
var task = callback.Run( call );
241+
242+
if ( task != null )
243+
{
244+
await task;
245+
}
195246
}
196247
}
197248
}

SteamKit2/SteamKit2/Steam/SteamClient/CallbackMgr/CallbackMgr_Internals.cs

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77

88
using System;
9+
using System.Threading.Tasks;
910

1011
namespace SteamKit2.Internal
1112
{
@@ -16,22 +17,24 @@ namespace SteamKit2.Internal
1617
abstract class CallbackBase
1718
{
1819
internal abstract Type CallbackType { get; }
19-
internal abstract void Run( CallbackMsg callback );
20+
internal abstract Task? Run( CallbackMsg callback );
2021
}
2122

2223
sealed class Callback<TCall> : CallbackBase, IDisposable
2324
where TCall : CallbackMsg
2425
{
2526
CallbackManager? mgr;
2627

27-
public JobID JobID { get; set; }
28+
public JobID JobID { get; }
2829

29-
public Action<TCall> OnRun { get; set; }
30+
public Action<TCall> OnRun { get; }
3031

3132
internal override Type CallbackType => typeof( TCall );
3233

3334
public Callback( Action<TCall> func, CallbackManager mgr, JobID jobID )
3435
{
36+
ArgumentNullException.ThrowIfNull( func );
37+
3538
this.JobID = jobID;
3639
this.OnRun = func;
3740
this.mgr = mgr;
@@ -52,13 +55,60 @@ public void Dispose()
5255
System.GC.SuppressFinalize( this );
5356
}
5457

55-
internal override void Run( CallbackMsg callback )
58+
internal override Task? Run( CallbackMsg callback )
5659
{
5760
var cb = callback as TCall;
58-
if ( cb != null && ( cb.JobID == JobID || JobID == JobID.Invalid ) && OnRun != null )
61+
if ( cb != null && ( cb.JobID == JobID || JobID == JobID.Invalid ) )
5962
{
6063
OnRun( cb );
6164
}
65+
return null;
66+
}
67+
}
68+
69+
sealed class AsyncCallback<TCall> : CallbackBase, IDisposable
70+
where TCall : CallbackMsg
71+
{
72+
CallbackManager? mgr;
73+
74+
public JobID JobID { get; }
75+
76+
public Func<TCall, Task> OnRun { get; }
77+
78+
internal override Type CallbackType => typeof( TCall );
79+
80+
public AsyncCallback( Func<TCall, Task> func, CallbackManager mgr, JobID jobID )
81+
{
82+
ArgumentNullException.ThrowIfNull( func );
83+
84+
this.JobID = jobID;
85+
this.OnRun = func;
86+
this.mgr = mgr;
87+
88+
mgr.Register( this );
89+
}
90+
91+
~AsyncCallback()
92+
{
93+
Dispose();
94+
}
95+
96+
public void Dispose()
97+
{
98+
mgr?.Unregister( this );
99+
mgr = null;
100+
101+
System.GC.SuppressFinalize( this );
102+
}
103+
104+
internal override Task Run( CallbackMsg callback )
105+
{
106+
var cb = callback as TCall;
107+
if ( cb != null && ( cb.JobID == JobID || JobID == JobID.Invalid ) )
108+
{
109+
return OnRun( cb );
110+
}
111+
return Task.CompletedTask;
62112
}
63113
}
64114
}

SteamKit2/Tests/CallbackManagerFacts.cs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34
using SteamKit2;
45
using Xunit;
@@ -257,7 +258,7 @@ void unsubscribe( CallbackForTest cb )
257258
}
258259

259260
[Fact]
260-
public void CorrectlysubscribesFromInsideOfCallback()
261+
public void CorrectlySubscribesFromInsideOfCallback()
261262
{
262263
static void nothing( CallbackForTest cb )
263264
{
@@ -275,6 +276,37 @@ void subscribe( CallbackForTest cb )
275276
PostAndRunCallback( new CallbackForTest { UniqueID = Guid.NewGuid() } );
276277
}
277278

279+
[Fact]
280+
public async Task CorrectlyAwaitsForAsyncCallbacks()
281+
{
282+
var callback = new CallbackForTest { UniqueID = Guid.NewGuid() };
283+
284+
var numCallbacksRun = 0;
285+
async Task action( CallbackForTest cb )
286+
{
287+
await Task.Delay( 100, TestContext.Current.CancellationToken );
288+
Assert.Equal( callback.UniqueID, cb.UniqueID );
289+
numCallbacksRun++;
290+
}
291+
292+
using ( mgr.Subscribe<CallbackForTest>( action ) )
293+
{
294+
for ( var i = 0; i < 10; i++ )
295+
{
296+
client.PostCallback( callback );
297+
}
298+
299+
for ( var i = 1; i <= 10; i++ )
300+
{
301+
await mgr.RunWaitCallbackAsync( TestContext.Current.CancellationToken );
302+
Assert.Equal( i, numCallbacksRun );
303+
}
304+
305+
mgr.RunWaitAllCallbacks( TimeSpan.Zero );
306+
Assert.Equal( 10, numCallbacksRun );
307+
}
308+
}
309+
278310
void PostAndRunCallback(CallbackMsg callback)
279311
{
280312
client.PostCallback(callback);

0 commit comments

Comments
 (0)