Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Rx.NET/Source/Directory.build.targets
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
<Project>
<!-- This props all need to be set in targets as they depend on the values set earlier -->
<PropertyGroup Condition="'$(TargetFramework)' == 'net46'">
<DefineConstants>$(DefineConstants);HAS_WINRT;PREFER_ASYNC;HAS_TPL46;DESKTOPCLR</DefineConstants>
<DefineConstants>$(DefineConstants);HAS_TRACE;HAS_WINRT;PREFER_ASYNC;HAS_TPL46;DESKTOPCLR</DefineConstants>
</PropertyGroup>
<PropertyGroup Condition="'$(TargetFramework)' == 'uap10.0'">
<TargetPlatformVersion>10.0.16299.0</TargetPlatformVersion>
<TargetPlatformMinVersion>10.0.15063.0</TargetPlatformMinVersion>
<DefineConstants>$(DefineConstants);NO_CODE_COVERAGE_ATTRIBUTE;HAS_WINRT;PREFER_ASYNC;HAS_TPL46;NO_REMOTING;NO_SERIALIZABLE;CRIPPLED_REFLECTION;NO_THREAD;WINDOWS</DefineConstants>
</PropertyGroup>
<PropertyGroup Condition="'$(TargetFramework)' == 'uap10.0.16299'">
<DefineConstants>$(DefineConstants);HAS_WINRT;PREFER_ASYNC;HAS_TPL46;NO_REMOTING;WINDOWS</DefineConstants>
<DefineConstants>$(DefineConstants);HAS_TRACE;HAS_WINRT;PREFER_ASYNC;HAS_TPL46;NO_REMOTING;WINDOWS</DefineConstants>
</PropertyGroup>
<PropertyGroup Condition="'$(TargetFramework)' == 'netstandard2.0' or '$(TargetFramework)' == 'netcoreapp2.0'">
<DefineConstants>$(DefineConstants);HAS_WINRT;PREFER_ASYNC;HAS_TPL46;NO_REMOTING</DefineConstants>
<DefineConstants>$(DefineConstants);HAS_TRACE;HAS_WINRT;PREFER_ASYNC;HAS_TPL46;NO_REMOTING</DefineConstants>
</PropertyGroup>
</Project>
5 changes: 5 additions & 0 deletions Rx.NET/Source/src/Microsoft.Reactive.Testing/TestScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Diagnostics;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
Expand All @@ -12,6 +13,10 @@ namespace Microsoft.Reactive.Testing
/// <summary>
/// Virtual time scheduler used for testing applications and libraries built using Reactive Extensions.
/// </summary>
[DebuggerDisplay("\\{ " +
nameof(Clock) + " = {" + nameof(Clock) + "} " +
nameof(Now) + " = {" + nameof(Now) + ".ToString(\"O\")} " +
"\\}")]
public class TestScheduler : VirtualTimeScheduler<long, long>
{
/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using System.Diagnostics;

namespace System.Reactive.Concurrency
{
Expand Down Expand Up @@ -67,6 +68,10 @@ protected override DateTimeOffset Add(DateTimeOffset absolute, TimeSpan relative
/// <summary>
/// Provides a virtual time scheduler that uses <see cref="DateTimeOffset"/> for absolute time and <see cref="TimeSpan"/> for relative time.
/// </summary>
[DebuggerDisplay("\\{ " +
nameof(Clock) + " = {" + nameof(Clock) + "} " +
nameof(Now) + " = {" + nameof(Now) + ".ToString(\"O\")} " +
"\\}")]
public class HistoricalScheduler : HistoricalSchedulerBase
{
private readonly SchedulerQueue<DateTimeOffset> _queue = new SchedulerQueue<DateTimeOffset>();
Expand Down
5 changes: 5 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Internal/HalfSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Diagnostics;
using System.Threading;

namespace System.Reactive
Expand Down Expand Up @@ -47,6 +48,10 @@ public static void ForwardOnNext<T>(ISink<T> sink, T item, ref int wip, ref Exce
}
}
}
#if (HAS_TRACE)
else if (error == null)
Trace.TraceWarning("OnNext called while another OnNext call was in progress on the same Observer.");
#endif
}

/// <summary>
Expand Down
6 changes: 5 additions & 1 deletion Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,11 @@ private void OnCompleted(int index)
}
else
{
_subscriptions[index].Dispose();
var subscriptions = Volatile.Read(ref _subscriptions);
if (subscriptions != null && subscriptions != Array.Empty<IDisposable>())
{
Disposable.TryDispose(ref subscriptions[index]);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ public void Dispose()

public void OnCompleted()
{
_observer.OnCompleted();
}

public void OnError(Exception error)
Expand Down Expand Up @@ -233,7 +232,6 @@ public void Dispose()

public void OnCompleted()
{
_observer.OnCompleted();
}

public void OnError(Exception error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ namespace System.Reactive.Concurrency
public System.IDisposable SchedulePeriodic<TState>(TState state, System.TimeSpan period, System.Func<TState, TState> action) { }
public override System.Reactive.Concurrency.IStopwatch StartStopwatch() { }
}
[System.Diagnostics.DebuggerDisplayAttribute("\\{ Clock = {Clock} Now = {Now.ToString(\"O\")} \\}")]
public class HistoricalScheduler : System.Reactive.Concurrency.HistoricalSchedulerBase
{
public HistoricalScheduler() { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ namespace Microsoft.Reactive.Testing
public override int GetHashCode() { }
public override string ToString() { }
}
[System.Diagnostics.DebuggerDisplayAttribute("\\{ Clock = {Clock} Now = {Now.ToString(\"O\")} \\}")]
public class TestScheduler : System.Reactive.Concurrency.VirtualTimeScheduler<long, long>
{
public TestScheduler() { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,5 +705,46 @@ public void CreateAsync_Action_Token()
Assert.True(lst.Take(10).SequenceEqual(Enumerable.Repeat(42, 10)));
}


[Fact]
public void CreateWithTaskDisposable_NoPrematureTermination()
{
var obs = Observable.Create<int>(async o =>
{
// avoid warning on async o due to no await
await Task.CompletedTask;

var inner = Observable.Range(1, 3);

return inner.Subscribe(x =>
{
o.OnNext(x);
});
});

var result = obs.Take(1).Wait();
}

[Fact]
public void CreateWithTaskAction_NoPrematureTermination()
{
var obs = Observable.Create<int>(async o =>
{
// avoid warning on async o due to no await
await Task.CompletedTask;

var inner = Observable.Range(1, 3);

var d = inner.Subscribe(x =>
{
o.OnNext(x);
});

Action a = () => d.Dispose();
return a;
});

var result = obs.Take(1).Wait();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4448,6 +4448,41 @@ public void Zip_AtLeastOneThrows4()

#endregion

[Fact]
public void Zip2WithImmediateReturn()
{
Observable.Zip<Unit, Unit, Unit>(
Observable.Return(Unit.Default),
Observable.Return(Unit.Default),
(_, __) => Unit.Default
)
.Subscribe(_ => { });
}

[Fact]
public void Zip3WithImmediateReturn()
{
Observable.Zip<Unit, Unit, Unit, Unit>(
Observable.Return(Unit.Default),
Observable.Return(Unit.Default),
Observable.Return(Unit.Default),
(_, __, ___) => Unit.Default
)
.Subscribe(_ => { });
}

[Fact]
public void ZipEnumerableWithImmediateReturn()
{
Enumerable.Range(0, 100)
.Select(_ => Observable.Return(Unit.Default))
.Zip()
.Subscribe(_ =>
{

}
);
}
}
#pragma warning restore IDE0039 // Use local function
}