Skip to content

Commit b53aaa6

Browse files
More System.Threading.Lock locking.
1 parent 0b1da94 commit b53aaa6

20 files changed

+183
-64
lines changed

src/Grpc.AspNetCore.Server/Internal/HttpContextStreamWriter.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ internal class HttpContextStreamWriter<TResponse> : IServerStreamWriter<TRespons
3333
private readonly Action<TResponse, SerializationContext> _serializer;
3434
private readonly PipeWriter _bodyWriter;
3535
private readonly IHttpRequestLifetimeFeature _requestLifetimeFeature;
36-
private readonly object _writeLock;
36+
private readonly Lock _writeLock;
3737
private Task? _writeTask;
3838
private bool _completed;
3939
private long _writeCount;
@@ -42,7 +42,7 @@ public HttpContextStreamWriter(HttpContextServerCallContext context, Action<TRes
4242
{
4343
_context = context;
4444
_serializer = serializer;
45-
_writeLock = new object();
45+
_writeLock = new Lock();
4646

4747
// Copy HttpContext values.
4848
// This is done to avoid a race condition when reading them from HttpContext later when running in a separate thread.
@@ -92,7 +92,8 @@ private async Task WriteCoreAsync(TResponse message, CancellationToken cancellat
9292
throw new InvalidOperationException("Can't write the message because the request is complete.");
9393
}
9494

95-
lock (_writeLock)
95+
_writeLock.Enter();
96+
try
9697
{
9798
// Pending writes need to be awaited first
9899
if (IsWriteInProgressUnsynchronized)
@@ -103,6 +104,10 @@ private async Task WriteCoreAsync(TResponse message, CancellationToken cancellat
103104
// Save write task to track whether it is complete. Must be set inside lock.
104105
_writeTask = _bodyWriter.WriteStreamedMessageAsync(message, _context, _serializer, cancellationToken);
105106
}
107+
finally
108+
{
109+
_writeLock.Exit();
110+
}
106111

107112
await _writeTask;
108113
Interlocked.Increment(ref _writeCount);

src/Grpc.HealthCheck/HealthServiceImpl.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class HealthServiceImpl : Grpc.Health.V1.Health.HealthBase
4141
private readonly Dictionary<string, HealthCheckResponse.Types.ServingStatus> statusMap =
4242
new Dictionary<string, HealthCheckResponse.Types.ServingStatus>();
4343

44-
private readonly object watchersLock = new object();
44+
private readonly Lock watchersLock = new Lock();
4545
private readonly Dictionary<string, List<ChannelWriter<HealthCheckResponse>>> watchers =
4646
new Dictionary<string, List<ChannelWriter<HealthCheckResponse>>>();
4747

@@ -155,7 +155,8 @@ public override async Task Watch(HealthCheckRequest request, IServerStreamWriter
155155
FullMode = BoundedChannelFullMode.DropOldest
156156
});
157157

158-
lock (watchersLock)
158+
watchersLock.Enter();
159+
try
159160
{
160161
if (!watchers.TryGetValue(service, out List<ChannelWriter<HealthCheckResponse>>? channelWriters))
161162
{
@@ -165,6 +166,10 @@ public override async Task Watch(HealthCheckRequest request, IServerStreamWriter
165166

166167
channelWriters.Add(channel.Writer);
167168
}
169+
finally
170+
{
171+
watchersLock.Exit();
172+
}
168173

169174
// Watch calls run until ended by the client canceling them.
170175
context.CancellationToken.Register(() => {

src/Grpc.Net.Client/Balancer/Internal/BalancerHttpHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ namespace Grpc.Net.Client.Balancer.Internal;
3131

3232
internal class BalancerHttpHandler : DelegatingHandler
3333
{
34-
private static readonly object SetupLock = new object();
34+
private static readonly Lock SetupLock = new Lock();
3535

3636
internal const string WaitForReadyKey = "WaitForReady";
3737
internal const string SubchannelKey = "Subchannel";

src/Grpc.Net.Client/Balancer/Internal/ChildHandlerLoadBalancer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#region Copyright notice and license
1+
#region Copyright notice and license
22

33
// Copyright 2019 The gRPC Authors
44
//
@@ -53,7 +53,7 @@ internal sealed class ChildHandlerLoadBalancer : LoadBalancer
5353
private readonly IChannelControlHelper _controller;
5454
private readonly ServiceConfig? _initialServiceConfig;
5555
private readonly ConnectionManager _connectionManager;
56-
private readonly object _lock = new object();
56+
private readonly Lock _lock = new Lock();
5757

5858
internal (LoadBalancer LoadBalancer, string Name)? _current;
5959
internal (LoadBalancer LoadBalancer, string Name)? _pending;

src/Grpc.Net.Client/Balancer/Internal/ConnectionManager.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ internal sealed class ConnectionManager : IDisposable, IChannelControlHelper
3030
public static readonly BalancerAttributesKey<string> HostOverrideKey = new BalancerAttributesKey<string>("HostOverride");
3131
private static readonly ChannelIdProvider _channelIdProvider = new ChannelIdProvider();
3232

33-
private readonly object _lock;
33+
private readonly Lock _lock;
3434
internal readonly Resolver _resolver;
3535
private readonly ISubchannelTransportFactory _subchannelTransportFactory;
3636
private readonly List<Subchannel> _subchannels;
@@ -56,7 +56,7 @@ internal ConnectionManager(
5656
ISubchannelTransportFactory subchannelTransportFactory,
5757
LoadBalancerFactory[] loadBalancerFactories)
5858
{
59-
_lock = new object();
59+
_lock = new Lock();
6060
_nextPickerTcs = new TaskCompletionSource<SubchannelPicker>(TaskCreationOptions.RunContinuationsAsynchronously);
6161
_resolverStartedTcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
6262
_channelId = _channelIdProvider.GetNextChannelId();
@@ -221,7 +221,8 @@ public async Task ConnectAsync(bool waitForReady, CancellationToken cancellation
221221
else
222222
{
223223
Task waitForReadyTask;
224-
lock (_lock)
224+
_lock.Enter();
225+
try
225226
{
226227
var state = State;
227228
if (state == ConnectivityState.Ready)
@@ -232,6 +233,10 @@ public async Task ConnectAsync(bool waitForReady, CancellationToken cancellation
232233
waitForReadyTask = WaitForStateChangedAsync(state, waitForState: ConnectivityState.Ready, cancellationToken);
233234
_balancer?.RequestConnection();
234235
}
236+
finally
237+
{
238+
_lock.Exit();
239+
}
235240

236241
await waitForReadyTask.ConfigureAwait(false);
237242
}

src/Grpc.Net.Client/Balancer/Internal/SocketConnectivitySubchannelTransport.cs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public SocketConnectivitySubchannelTransport(
7979
_socketConnectedTimer = NonCapturingTimer.Create(OnCheckSocketConnection, state: null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
8080
}
8181

82-
private object Lock => _subchannel.Lock;
82+
private Lock Lock => _subchannel.Lock;
8383
public DnsEndPoint? CurrentEndPoint => _currentEndPoint;
8484
public TimeSpan? ConnectTimeout { get; }
8585
public TransportStatus TransportStatus
@@ -132,7 +132,7 @@ public void Disconnect()
132132

133133
private void DisconnectUnsynchronized()
134134
{
135-
Debug.Assert(Monitor.IsEntered(Lock));
135+
Debug.Assert(Lock.IsHeldByCurrentThread);
136136
Debug.Assert(!_disposed);
137137

138138
_initialSocket?.Dispose();
@@ -170,7 +170,8 @@ public async ValueTask<ConnectResult> TryConnectAsync(ConnectContext context, in
170170
await _socketConnect(socket, currentEndPoint, context.CancellationToken).ConfigureAwait(false);
171171
SocketConnectivitySubchannelTransportLog.ConnectedSocket(_logger, _subchannel.Id, currentEndPoint);
172172

173-
lock (Lock)
173+
Lock.Enter();
174+
try
174175
{
175176
_currentEndPoint = currentEndPoint;
176177
_lastEndPointIndex = currentIndex;
@@ -184,6 +185,10 @@ public async ValueTask<ConnectResult> TryConnectAsync(ConnectContext context, in
184185
// Instead, the socket timer target method reschedules the next run after it has finished.
185186
_socketConnectedTimer.Change(_socketPingInterval, Timeout.InfiniteTimeSpan);
186187
}
188+
finally
189+
{
190+
Lock.Exit();
191+
}
187192

188193
_subchannel.UpdateConnectivityState(ConnectivityState.Ready, "Successfully connected to socket.");
189194
return ConnectResult.Success;
@@ -223,13 +228,18 @@ public async ValueTask<ConnectResult> TryConnectAsync(ConnectContext context, in
223228
_subchannel.UpdateConnectivityState(
224229
ConnectivityState.TransientFailure,
225230
new Status(StatusCode.Unavailable, "Error connecting to subchannel.", firstConnectionError));
226-
lock (Lock)
231+
Lock.Enter();
232+
try
227233
{
228234
if (!_disposed)
229235
{
230236
_socketConnectedTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
231237
}
232238
}
239+
finally
240+
{
241+
Lock.Exit();
242+
}
233243
return result;
234244
}
235245

@@ -319,7 +329,8 @@ public async ValueTask<Stream> GetStreamAsync(DnsEndPoint endPoint, Cancellation
319329
DnsEndPoint? socketEndPoint = null;
320330
List<ReadOnlyMemory<byte>>? socketData = null;
321331
DateTime? socketCreatedTime = null;
322-
lock (Lock)
332+
Lock.Enter();
333+
try
323334
{
324335
if (_initialSocket != null)
325336
{
@@ -345,6 +356,10 @@ public async ValueTask<Stream> GetStreamAsync(DnsEndPoint endPoint, Cancellation
345356
_socketConnectedTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
346357
}
347358
}
359+
finally
360+
{
361+
Lock.Exit();
362+
}
348363

349364
if (socket != null)
350365
{
@@ -384,11 +399,16 @@ public async ValueTask<Stream> GetStreamAsync(DnsEndPoint endPoint, Cancellation
384399
// This stream wrapper intercepts dispose.
385400
var stream = new StreamWrapper(networkStream, OnStreamDisposed, socketData);
386401

387-
lock (Lock)
402+
Lock.Enter();
403+
try
388404
{
389405
_activeStreams.Add(new ActiveStream(endPoint, socket, stream));
390406
SocketConnectivitySubchannelTransportLog.StreamCreated(_logger, _subchannel.Id, endPoint, CalculateInitialSocketDataLength(socketData), _activeStreams.Count);
391407
}
408+
finally
409+
{
410+
Lock.Exit();
411+
}
392412

393413
return stream;
394414
}

src/Grpc.Net.Client/Balancer/PollingResolver.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public abstract class PollingResolver : Resolver
4444
private bool _disposed;
4545
private bool _resolveSuccessful;
4646

47-
private readonly object _lock = new object();
47+
private readonly Lock _lock = new Lock();
4848
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
4949
private readonly ILogger _logger;
5050
private readonly IBackoffPolicyFactory? _backoffPolicyFactory;

src/Grpc.Net.Client/Balancer/Subchannel.cs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ namespace Grpc.Net.Client.Balancer;
4040
public sealed class Subchannel : IDisposable
4141
{
4242
internal readonly List<BalancerAddress> _addresses;
43-
internal readonly object Lock;
43+
internal readonly Lock Lock;
4444
internal ISubchannelTransport Transport => _transport;
4545
internal string Id { get; }
4646

@@ -96,7 +96,7 @@ public BalancerAddress? CurrentAddress
9696

9797
internal Subchannel(ConnectionManager manager, IReadOnlyList<BalancerAddress> addresses)
9898
{
99-
Lock = new object();
99+
Lock = new Lock();
100100
_logger = manager.LoggerFactory.CreateLogger(GetType());
101101
_connectSemaphore = new SemaphoreSlim(1);
102102

@@ -283,7 +283,7 @@ public void RequestConnection()
283283

284284
private void CancelInProgressConnectUnsynchronized()
285285
{
286-
Debug.Assert(Monitor.IsEntered(Lock));
286+
Debug.Assert(Lock.IsHeldByCurrentThread);
287287

288288
if (_connectContext != null && !_connectContext.Disposed)
289289
{
@@ -299,7 +299,7 @@ private void CancelInProgressConnectUnsynchronized()
299299

300300
private ConnectContext GetConnectContextUnsynchronized()
301301
{
302-
Debug.Assert(Monitor.IsEntered(Lock));
302+
Debug.Assert(Lock.IsHeldByCurrentThread);
303303

304304
// There shouldn't be a previous connect in progress, but cancel the CTS to ensure they're no longer running.
305305
CancelInProgressConnectUnsynchronized();
@@ -312,7 +312,8 @@ private async Task ConnectTransportAsync()
312312
{
313313
ConnectContext connectContext;
314314
Task? waitSemaporeTask = null;
315-
lock (Lock)
315+
Lock.Enter();
316+
try
316317
{
317318
// Don't start connecting if the subchannel has been shutdown. Transport/semaphore will be disposed if shutdown.
318319
if (_state == ConnectivityState.Shutdown)
@@ -333,6 +334,10 @@ private async Task ConnectTransportAsync()
333334
waitSemaporeTask = _connectSemaphore.WaitAsync(connectContext.CancellationToken);
334335
}
335336
}
337+
finally
338+
{
339+
Lock.Exit();
340+
}
336341

337342
if (waitSemaporeTask != null)
338343
{
@@ -355,13 +360,18 @@ private async Task ConnectTransportAsync()
355360

356361
for (var attempt = 0; ; attempt++)
357362
{
358-
lock (Lock)
363+
Lock.Enter();
364+
try
359365
{
360366
if (_state == ConnectivityState.Shutdown)
361367
{
362368
return;
363369
}
364370
}
371+
finally
372+
{
373+
Lock.Exit();
374+
}
365375

366376
switch (await _transport.TryConnectAsync(connectContext, attempt).ConfigureAwait(false))
367377
{
@@ -425,7 +435,8 @@ private async Task ConnectTransportAsync()
425435
}
426436
finally
427437
{
428-
lock (Lock)
438+
Lock.Enter();
439+
try
429440
{
430441
// Dispose context because it might have been created with a connect timeout.
431442
// Want to clean up the connect timeout timer.
@@ -438,6 +449,10 @@ private async Task ConnectTransportAsync()
438449
_connectSemaphore.Release();
439450
}
440451
}
452+
finally
453+
{
454+
Lock.Exit();
455+
}
441456
}
442457
}
443458

src/Grpc.Net.Client/Grpc.Net.Client.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<IsGrpcPublishedPackage>true</IsGrpcPublishedPackage>
88
<GenerateDocumentationFile>true</GenerateDocumentationFile>
9-
<TargetFrameworks>net462;netstandard2.0;netstandard2.1;net6.0;net7.0;net8.0</TargetFrameworks>
9+
<TargetFrameworks>net462;netstandard2.0;netstandard2.1;net6.0;net7.0;net8.0;net9.0</TargetFrameworks>
1010
<PackageReadmeFile>README.md</PackageReadmeFile>
1111
</PropertyGroup>
1212

src/Grpc.Net.Client/GrpcChannel.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public sealed class GrpcChannel : ChannelBase, IDisposable
5050
internal const long DefaultMaxRetryBufferSize = 1024 * 1024 * 16; // 16 MB
5151
internal const long DefaultMaxRetryBufferPerCallSize = 1024 * 1024; // 1 MB
5252

53-
private readonly object _lock;
53+
private readonly Lock _lock;
5454
private readonly ConcurrentDictionary<IMethod, GrpcMethodInfo> _methodInfoCache;
5555
private readonly Func<IMethod, GrpcMethodInfo> _createMethodInfoFunc;
5656
private readonly Dictionary<MethodKey, MethodConfig>? _serviceConfigMethods;
@@ -104,7 +104,7 @@ public sealed class GrpcChannel : ChannelBase, IDisposable
104104

105105
internal GrpcChannel(Uri address, GrpcChannelOptions channelOptions) : base(address.Authority)
106106
{
107-
_lock = new object();
107+
_lock = new Lock();
108108
_methodInfoCache = new ConcurrentDictionary<IMethod, GrpcMethodInfo>();
109109

110110
// Dispose the HTTP client/handler if...

0 commit comments

Comments
 (0)