Skip to content

Commit 185ed5d

Browse files
akarnokdOren Novotny
authored andcommitted
4.x: Improve Subject observer tracking (#511)
1 parent f600a69 commit 185ed5d

File tree

1 file changed

+135
-81
lines changed
  • Rx.NET/Source/src/System.Reactive/Subjects

1 file changed

+135
-81
lines changed

Rx.NET/Source/src/System.Reactive/Subjects/Subject.cs

Lines changed: 135 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,15 @@ public sealed class Subject<T> : SubjectBase<T>, IDisposable
1616
{
1717
#region Fields
1818

19-
private volatile IObserver<T> _observer;
19+
SubjectDisposable[] _observers;
20+
21+
Exception _exception;
22+
23+
static readonly SubjectDisposable[] EMPTY = new SubjectDisposable[0];
24+
25+
static readonly SubjectDisposable[] TERMINATED = new SubjectDisposable[0];
26+
27+
static readonly SubjectDisposable[] DISPOSED = new SubjectDisposable[0];
2028

2129
#endregion
2230

@@ -27,7 +35,7 @@ public sealed class Subject<T> : SubjectBase<T>, IDisposable
2735
/// </summary>
2836
public Subject()
2937
{
30-
_observer = NopObserver<T>.Instance;
38+
Volatile.Write(ref _observers, EMPTY);
3139
}
3240

3341
#endregion
@@ -41,38 +49,53 @@ public override bool HasObservers
4149
{
4250
get
4351
{
44-
return _observer != NopObserver<T>.Instance && !(_observer is DoneObserver<T>) && _observer != DisposedObserver<T>.Instance;
52+
return Volatile.Read(ref _observers).Length != 0;
4553
}
4654
}
4755

4856
/// <summary>
4957
/// Indicates whether the subject has been disposed.
5058
/// </summary>
51-
public override bool IsDisposed => _observer is DisposedObserver<T>;
59+
public override bool IsDisposed => Volatile.Read(ref _observers) == DISPOSED;
5260

5361
#endregion
5462

5563
#region Methods
5664

5765
#region IObserver<T> implementation
5866

67+
void ThrowDisposed()
68+
{
69+
throw new ObjectDisposedException(string.Empty);
70+
}
71+
5972
/// <summary>
6073
/// Notifies all subscribed observers about the end of the sequence.
6174
/// </summary>
6275
public override void OnCompleted()
6376
{
64-
var oldObserver = default(IObserver<T>);
65-
var newObserver = DoneObserver<T>.Completed;
66-
67-
do
77+
for (; ; )
6878
{
69-
oldObserver = _observer;
70-
71-
if (oldObserver == DisposedObserver<T>.Instance || oldObserver is DoneObserver<T>)
79+
var observers = Volatile.Read(ref _observers);
80+
if (observers == DISPOSED)
81+
{
82+
_exception = null;
83+
ThrowDisposed();
7284
break;
73-
} while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
74-
75-
oldObserver.OnCompleted();
85+
}
86+
if (observers == TERMINATED)
87+
{
88+
break;
89+
}
90+
if (Interlocked.CompareExchange(ref _observers, TERMINATED, observers) == observers)
91+
{
92+
foreach (var observer in observers)
93+
{
94+
observer.Observer?.OnCompleted();
95+
}
96+
break;
97+
}
98+
}
7699
}
77100

78101
/// <summary>
@@ -85,18 +108,29 @@ public override void OnError(Exception error)
85108
if (error == null)
86109
throw new ArgumentNullException(nameof(error));
87110

88-
var oldObserver = default(IObserver<T>);
89-
var newObserver = new DoneObserver<T> { Exception = error };
90-
91-
do
111+
for (; ; )
92112
{
93-
oldObserver = _observer;
94-
95-
if (oldObserver == DisposedObserver<T>.Instance || oldObserver is DoneObserver<T>)
113+
var observers = Volatile.Read(ref _observers);
114+
if (observers == DISPOSED)
115+
{
116+
_exception = null;
117+
ThrowDisposed();
96118
break;
97-
} while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
98-
99-
oldObserver.OnError(error);
119+
}
120+
if (observers == TERMINATED)
121+
{
122+
break;
123+
}
124+
_exception = error;
125+
if (Interlocked.CompareExchange(ref _observers, TERMINATED, observers) == observers)
126+
{
127+
foreach (var observer in observers)
128+
{
129+
observer.Observer?.OnError(error);
130+
}
131+
break;
132+
}
133+
}
100134
}
101135

102136
/// <summary>
@@ -105,7 +139,17 @@ public override void OnError(Exception error)
105139
/// <param name="value">The value to send to all currently subscribed observers.</param>
106140
public override void OnNext(T value)
107141
{
108-
_observer.OnNext(value);
142+
var observers = Volatile.Read(ref _observers);
143+
if (observers == DISPOSED)
144+
{
145+
_exception = null;
146+
ThrowDisposed();
147+
return;
148+
}
149+
foreach (var observer in observers)
150+
{
151+
observer.Observer?.OnNext(value);
152+
}
109153
}
110154

111155
#endregion
@@ -123,59 +167,92 @@ public override IDisposable Subscribe(IObserver<T> observer)
123167
if (observer == null)
124168
throw new ArgumentNullException(nameof(observer));
125169

126-
var oldObserver = default(IObserver<T>);
127-
var newObserver = default(IObserver<T>);
128-
129-
do
170+
var disposable = default(SubjectDisposable);
171+
for (; ; )
130172
{
131-
oldObserver = _observer;
173+
var observers = Volatile.Read(ref _observers);
174+
if (observers == DISPOSED)
175+
{
176+
_exception = null;
177+
ThrowDisposed();
178+
break;
179+
}
180+
if (observers == TERMINATED)
181+
{
182+
var ex = _exception;
183+
if (ex != null)
184+
{
185+
observer.OnError(ex);
186+
}
187+
else
188+
{
189+
observer.OnCompleted();
190+
}
191+
break;
192+
}
132193

133-
if (oldObserver == DisposedObserver<T>.Instance)
194+
if (disposable == null)
134195
{
135-
throw new ObjectDisposedException("");
196+
disposable = new SubjectDisposable(this, observer);
136197
}
137198

138-
if (oldObserver == DoneObserver<T>.Completed)
199+
var n = observers.Length;
200+
var b = new SubjectDisposable[n + 1];
201+
Array.Copy(observers, 0, b, 0, n);
202+
b[n] = disposable;
203+
if (Interlocked.CompareExchange(ref _observers, b, observers) == observers)
139204
{
140-
observer.OnCompleted();
141-
return Disposable.Empty;
205+
return disposable;
142206
}
207+
}
208+
return Disposable.Empty;
209+
}
143210

144-
if (oldObserver is DoneObserver<T> done)
211+
void Unsubscribe(SubjectDisposable observer)
212+
{
213+
for (; ; )
214+
{
215+
var a = Volatile.Read(ref _observers);
216+
var n = a.Length;
217+
if (n == 0)
218+
{
219+
break;
220+
}
221+
222+
var j = Array.IndexOf(a, observer);
223+
224+
if (j < 0)
145225
{
146-
observer.OnError(done.Exception);
147-
return Disposable.Empty;
226+
break;
148227
}
149228

150-
if (oldObserver == NopObserver<T>.Instance)
229+
var b = default(SubjectDisposable[]);
230+
if (n == 1)
151231
{
152-
newObserver = observer;
232+
b = EMPTY;
153233
}
154234
else
155235
{
156-
if (oldObserver is Observer<T> obs)
157-
{
158-
newObserver = obs.Add(observer);
159-
}
160-
else
161-
{
162-
newObserver = new Observer<T>(new ImmutableList<IObserver<T>>(new[] { oldObserver, observer }));
163-
}
236+
b = new SubjectDisposable[n - 1];
237+
Array.Copy(a, 0, b, 0, j);
238+
Array.Copy(a, j + 1, b, j, n - j - 1);
164239
}
165-
} while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
166-
167-
return new Subscription(this, observer);
240+
if (Interlocked.CompareExchange(ref _observers, b, a) == a)
241+
{
242+
break;
243+
}
244+
}
168245
}
169246

170-
private sealed class Subscription : IDisposable
247+
private sealed class SubjectDisposable : IDisposable
171248
{
172249
private Subject<T> _subject;
173250
private IObserver<T> _observer;
174251

175-
public Subscription(Subject<T> subject, IObserver<T> observer)
252+
public SubjectDisposable(Subject<T> subject, IObserver<T> observer)
176253
{
177254
_subject = subject;
178-
_observer = observer;
255+
Volatile.Write(ref _observer, observer);
179256
}
180257

181258
public void Dispose()
@@ -184,35 +261,11 @@ public void Dispose()
184261
if (observer == null)
185262
return;
186263

187-
_subject.Unsubscribe(observer);
264+
_subject.Unsubscribe(this);
188265
_subject = null;
189266
}
190-
}
191-
192-
private void Unsubscribe(IObserver<T> observer)
193-
{
194-
var oldObserver = default(IObserver<T>);
195-
var newObserver = default(IObserver<T>);
196267

197-
do
198-
{
199-
oldObserver = _observer;
200-
201-
if (oldObserver == DisposedObserver<T>.Instance || oldObserver is DoneObserver<T>)
202-
return;
203-
204-
if (oldObserver is Observer<T> obs)
205-
{
206-
newObserver = obs.Remove(observer);
207-
}
208-
else
209-
{
210-
if (oldObserver != observer)
211-
return;
212-
213-
newObserver = NopObserver<T>.Instance;
214-
}
215-
} while (Interlocked.CompareExchange(ref _observer, newObserver, oldObserver) != oldObserver);
268+
public IObserver<T> Observer { get { return Volatile.Read(ref _observer); } }
216269
}
217270

218271
#endregion
@@ -224,7 +277,8 @@ private void Unsubscribe(IObserver<T> observer)
224277
/// </summary>
225278
public override void Dispose()
226279
{
227-
_observer = DisposedObserver<T>.Instance;
280+
Interlocked.Exchange(ref _observers, DISPOSED);
281+
_exception = null;
228282
}
229283

230284
#endregion

0 commit comments

Comments
 (0)