Skip to content

Commit 022a450

Browse files
authored
PublishingActivityProgressReporter fixes (#8571)
* PublishingActivityReporter fixes * Introduce lock around status updates. * Lock on status updates. * Clean out unnecessary files from PR. * Extra file to remove. * Remove debugging code.
1 parent 5becd83 commit 022a450

File tree

6 files changed

+117
-44
lines changed

6 files changed

+117
-44
lines changed

playground/publishers/Publishers.AppHost/docker-compose.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ services:
2222
ports:
2323
- "8002:8001"
2424
- "8004:8003"
25+
depends_on:
26+
pg:
27+
condition: "service_started"
2528
networks:
2629
- "aspire"
2730
api:
@@ -37,6 +40,11 @@ services:
3740
ports:
3841
- "8006:8005"
3942
- "8008:8007"
43+
depends_on:
44+
pg:
45+
condition: "service_started"
46+
dbsetup:
47+
condition: "service_completed_successfully"
4048
networks:
4149
- "aspire"
4250
sqlserver:
@@ -70,6 +78,13 @@ services:
7078
ports:
7179
- "8011:8010"
7280
- "8013:8012"
81+
depends_on:
82+
api:
83+
condition: "service_started"
84+
networks:
85+
- "aspire"
86+
mycontainer:
87+
image: "${MYCONTAINER_IMAGE}"
7388
networks:
7489
- "aspire"
7590
networks:

src/Aspire.Cli/Commands/PublishCommand.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ protected override async Task<int> ExecuteAsync(ParseResult parseResult, Cancell
152152

153153
var backchannelCompletionSource = new TaskCompletionSource<AppHostBackchannel>();
154154

155-
var launchingAppHostTask = context.AddTask(":play_button: Launching apphost");
155+
var launchingAppHostTask = context.AddTask(":play_button: Launching apphost");
156156
launchingAppHostTask.IsIndeterminate();
157157
launchingAppHostTask.StartTask();
158158

@@ -167,7 +167,7 @@ protected override async Task<int> ExecuteAsync(ParseResult parseResult, Cancell
167167

168168
var backchannel = await backchannelCompletionSource.Task.ConfigureAwait(false);
169169

170-
launchingAppHostTask.Description = $":check_mark: Launching apphost";
170+
launchingAppHostTask.Description = $":check_mark: Launching apphost";
171171
launchingAppHostTask.Value = 100;
172172
launchingAppHostTask.StopTask();
173173

@@ -185,17 +185,17 @@ protected override async Task<int> ExecuteAsync(ParseResult parseResult, Cancell
185185
progressTasks.Add(publishingActivity.Id, progressTask);
186186
}
187187

188-
progressTask.Description = $":play_button: {publishingActivity.StatusText}";
188+
progressTask.Description = $":play_button: {publishingActivity.StatusText}";
189189

190190
if (publishingActivity.IsComplete && !publishingActivity.IsError)
191191
{
192-
progressTask.Description = $":check_mark: {publishingActivity.StatusText}";
192+
progressTask.Description = $":check_mark: {publishingActivity.StatusText}";
193193
progressTask.Value = 100;
194194
progressTask.StopTask();
195195
}
196196
else if (publishingActivity.IsError)
197197
{
198-
progressTask.Description = $"[red bold]:cross_mark: {publishingActivity.StatusText}[/]";
198+
progressTask.Description = $"[red bold]:cross_mark: {publishingActivity.StatusText}[/]";
199199
progressTask.Value = 0;
200200
break;
201201
}
@@ -205,8 +205,6 @@ protected override async Task<int> ExecuteAsync(ParseResult parseResult, Cancell
205205
}
206206
}
207207

208-
await backchannel.RequestStopAsync(cancellationToken).ConfigureAwait(false);
209-
210208
// When we are running in publish mode we don't want the app host to
211209
// stop itself while we might still be streaming data back across
212210
// the RPC backchannel. So we need to take responsibility for stopping

src/Aspire.Hosting/Backchannel/AppHostRpcTarget.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,23 +28,23 @@ IHostApplicationLifetime lifetime
2828
{
2929
while (cancellationToken.IsCancellationRequested == false)
3030
{
31-
var publishingActivity = await activityReporter.ActivitiyUpdated.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
31+
var publishingActivityStatus = await activityReporter.ActivityStatusUpdated.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
3232

33-
if (publishingActivity == null)
33+
if (publishingActivityStatus == null)
3434
{
3535
// If the publishing activity is null, it means that the activity has been removed.
3636
// This can happen if the activity is complete or an error occurred.
3737
yield break;
3838
}
3939

4040
yield return (
41-
publishingActivity.Id,
42-
publishingActivity.StatusMessage,
43-
publishingActivity.IsComplete,
44-
publishingActivity.IsError
41+
publishingActivityStatus.Activity.Id,
42+
publishingActivityStatus.StatusText,
43+
publishingActivityStatus.IsComplete,
44+
publishingActivityStatus.IsError
4545
);
4646

47-
if ( publishingActivity.IsPrimary &&(publishingActivity.IsComplete || publishingActivity.IsError))
47+
if ( publishingActivityStatus.Activity.IsPrimary &&(publishingActivityStatus.IsComplete || publishingActivityStatus.IsError))
4848
{
4949
// If the activity is complete or an error and it is the primary activity,
5050
// we can stop listening for updates.

src/Aspire.Hosting/DistributedApplicationRunner.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,10 @@ await eventing.PublishAsync<AfterPublishEvent>(
4949
new AfterPublishEvent(serviceProvider, model), stoppingToken
5050
).ConfigureAwait(false);
5151

52-
publishingActivity.IsComplete = true;
53-
await activityReporter.UpdateActivityAsync(publishingActivity, stoppingToken).ConfigureAwait(false);
52+
await activityReporter.UpdateActivityStatusAsync(
53+
publishingActivity,
54+
(status) => status with { IsComplete = true },
55+
stoppingToken).ConfigureAwait(false);
5456

5557
// If we are running in publish mode and a backchannel is being
5658
// used then we don't want to stop the app host. Instead the
@@ -65,8 +67,10 @@ await eventing.PublishAsync<AfterPublishEvent>(
6567
catch (Exception ex)
6668
{
6769
logger.LogError(ex, "Failed to publish the distributed application.");
68-
publishingActivity.IsError = true;
69-
await activityReporter.UpdateActivityAsync(publishingActivity, stoppingToken).ConfigureAwait(false);
70+
await activityReporter.UpdateActivityStatusAsync(
71+
publishingActivity,
72+
(status) => status with { IsError = true },
73+
stoppingToken).ConfigureAwait(false);
7074

7175
if (!backchannelService.IsBackchannelExpected)
7276
{

src/Aspire.Hosting/Publishing/PublishingActivityProgressReporter.cs

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,9 @@ namespace Aspire.Hosting.Publishing;
1414
[Experimental("ASPIREPUBLISHERS001")]
1515
public sealed class PublishingActivity
1616
{
17-
internal PublishingActivity(string id, string initialStatusText, bool isPrimary = false)
17+
internal PublishingActivity(string id, bool isPrimary = false)
1818
{
1919
Id = id;
20-
StatusMessage = initialStatusText;
2120
IsPrimary = isPrimary;
2221
}
2322

@@ -27,25 +26,41 @@ internal PublishingActivity(string id, string initialStatusText, bool isPrimary
2726
public string Id { get; private set; }
2827

2928
/// <summary>
30-
/// Status message of the publishing activity.
29+
/// Indicates whether the publishing activity is the primary activity.
3130
/// </summary>
32-
public string StatusMessage { get; set; }
31+
public bool IsPrimary { get; private set; }
3332

3433
/// <summary>
35-
/// Indicates whether the publishing activity is complete.
34+
/// The status text of the publishing activity.
3635
/// </summary>
37-
public bool IsComplete { get; set; }
36+
public PublishingActivityStatus? LastStatus { get; internal set; }
37+
}
3838

39+
/// <summary>
40+
/// Represents the status of a publishing activity.
41+
/// </summary>
42+
[Experimental("ASPIREPUBLISHERS001")]
43+
public sealed record PublishingActivityStatus
44+
{
3945
/// <summary>
40-
/// Indicates whether the publishing activity is the primary activity.
46+
/// The publishing activity associated with this status.
4147
/// </summary>
42-
public bool IsPrimary { get; private set; }
48+
public required PublishingActivity Activity { get; init; }
4349

4450
/// <summary>
45-
/// Indicates whether the publishing activity has encountered an error.
51+
/// The status text of the publishing activity.
4652
/// </summary>
47-
public bool IsError { get; set; }
53+
public required string StatusText { get; init; }
4854

55+
/// <summary>
56+
/// Indicates whether the publishing activity is complete.
57+
/// </summary>
58+
public required bool IsComplete { get; init; }
59+
60+
/// <summary>
61+
/// Indicates whether the publishing activity encountered an error.
62+
/// </summary>
63+
public required bool IsError { get; init; }
4964
}
5065

5166
/// <summary>
@@ -73,31 +88,68 @@ public interface IPublishingActivityProgressReporter
7388
/// Updates the status of an existing publishing activity.
7489
/// </summary>
7590
/// <param name="publishingActivity">The activity with updated properties.</param>
91+
/// <param name="statusUpdate"></param>
7692
/// <param name="cancellationToken">The cancellation token.</param>
7793
/// <returns></returns>
78-
Task UpdateActivityAsync(PublishingActivity publishingActivity, CancellationToken cancellationToken);
94+
Task UpdateActivityStatusAsync(PublishingActivity publishingActivity, Func<PublishingActivityStatus, PublishingActivityStatus> statusUpdate, CancellationToken cancellationToken);
7995
}
8096

8197
internal sealed class PublishingActivityProgressReporter : IPublishingActivityProgressReporter
8298
{
8399
public async Task<PublishingActivity> CreateActivityAsync(string id, string initialStatusText, bool isPrimary, CancellationToken cancellationToken)
84100
{
85-
var publishingActivity = new PublishingActivity(id, initialStatusText, isPrimary);
86-
await ActivitiyUpdated.Writer.WriteAsync(publishingActivity, cancellationToken).ConfigureAwait(false);
101+
var publishingActivity = new PublishingActivity(id, isPrimary);
102+
await UpdateActivityStatusAsync(
103+
publishingActivity,
104+
(status) => status with
105+
{
106+
StatusText = initialStatusText,
107+
IsComplete = false,
108+
IsError = false
109+
},
110+
cancellationToken
111+
).ConfigureAwait(false);
112+
87113
return publishingActivity;
88114
}
89115

90-
public async Task UpdateActivityAsync(PublishingActivity publishingActivity, CancellationToken cancellationToken)
116+
private readonly object _updateLock = new object();
117+
118+
public async Task UpdateActivityStatusAsync(PublishingActivity publishingActivity, Func<PublishingActivityStatus, PublishingActivityStatus> statusUpdate, CancellationToken cancellationToken)
91119
{
92-
await ActivitiyUpdated.Writer.WriteAsync(publishingActivity, cancellationToken).ConfigureAwait(false);
120+
PublishingActivityStatus? lastStatus;
121+
PublishingActivityStatus? newStatus;
122+
123+
lock (_updateLock)
124+
{
125+
lastStatus = publishingActivity.LastStatus ?? new PublishingActivityStatus
126+
{
127+
Activity = publishingActivity,
128+
StatusText = string.Empty,
129+
IsComplete = false,
130+
IsError = false
131+
};
132+
133+
newStatus = statusUpdate(lastStatus);
134+
publishingActivity.LastStatus = newStatus;
135+
}
136+
137+
if (lastStatus == newStatus)
138+
{
139+
throw new DistributedApplicationException(
140+
$"The status of the publishing activity '{publishingActivity.Id}' was not updated. The status update function must return a new instance of the status."
141+
);
142+
}
143+
144+
await ActivityStatusUpdated.Writer.WriteAsync(newStatus, cancellationToken).ConfigureAwait(false);
93145

94-
if (publishingActivity.IsPrimary && (publishingActivity.IsComplete || publishingActivity.IsError))
146+
if (publishingActivity.IsPrimary && (newStatus.IsComplete || newStatus.IsError))
95147
{
96148
// If the activity is complete or an error and it is the primary activity,
97149
// we can stop listening for updates.
98-
ActivitiyUpdated.Writer.Complete();
150+
ActivityStatusUpdated.Writer.Complete();
99151
}
100152
}
101153

102-
internal Channel<PublishingActivity> ActivitiyUpdated { get; } = Channel.CreateUnbounded<PublishingActivity>();
154+
internal Channel<PublishingActivityStatus> ActivityStatusUpdated { get; } = Channel.CreateUnbounded<PublishingActivityStatus>();
103155
}

src/Aspire.Hosting/Publishing/ResourceContainerImageBuilder.cs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -130,15 +130,17 @@ private async Task<string> BuildProjectContainerImageAsync(IResource resource, C
130130
stdout,
131131
stderr);
132132

133-
publishingActivity.IsError = true;
134-
await activityReporter.UpdateActivityAsync(publishingActivity, cancellationToken).ConfigureAwait(false);
133+
await activityReporter.UpdateActivityStatusAsync(
134+
publishingActivity, (status) => status with { IsError = true },
135+
cancellationToken).ConfigureAwait(false);
135136

136137
throw new DistributedApplicationException($"Failed to build container image, stdout: {stdout}, stderr: {stderr}");
137138
}
138139
else
139140
{
140-
publishingActivity.IsComplete = true;
141-
await activityReporter.UpdateActivityAsync(publishingActivity, cancellationToken).ConfigureAwait(false);
141+
await activityReporter.UpdateActivityStatusAsync(
142+
publishingActivity, (status) => status with { IsComplete = true },
143+
cancellationToken).ConfigureAwait(false);
142144

143145
logger.LogDebug(
144146
".NET CLI completed with exit code: {ExitCode}",
@@ -171,17 +173,19 @@ private async Task<string> BuildContainerImageFromDockerfileAsync(string resourc
171173
imageName,
172174
cancellationToken).ConfigureAwait(false);
173175

174-
publishingActivity.IsComplete = true;
175-
await activityReporter.UpdateActivityAsync(publishingActivity, cancellationToken).ConfigureAwait(false);
176+
await activityReporter.UpdateActivityStatusAsync(
177+
publishingActivity, (status) => status with { IsComplete = true },
178+
cancellationToken).ConfigureAwait(false);
176179

177180
return image;
178181
}
179182
catch (Exception ex)
180183
{
181184
logger.LogError(ex, "Failed to build container image from Dockerfile.");
182185

183-
publishingActivity.IsError = true;
184-
await activityReporter.UpdateActivityAsync(publishingActivity, cancellationToken).ConfigureAwait(false);
186+
await activityReporter.UpdateActivityStatusAsync(
187+
publishingActivity, (status) => status with { IsError = true },
188+
cancellationToken).ConfigureAwait(false);
185189

186190
throw;
187191
}

0 commit comments

Comments
 (0)