Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions src/Dapr/ChatServer.Host/ChatServerHostedServiceWrapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// <copyright file="ChatServerHostedServiceWrapper.cs" company="MUnique">
// Licensed under the MIT License. See LICENSE file in the project root for full license information.
// </copyright>

namespace MUnique.OpenMU.ChatServer.Host;

using System.Collections.Generic;
using System.Threading;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using MUnique.OpenMU.Dapr.Common;
using MUnique.OpenMU.DataModel.Configuration;
using MUnique.OpenMU.PlugIns;
using ChatServer = MUnique.OpenMU.ChatServer.ChatServer;

/// <summary>
/// A wrapper which takes a <see cref="ChatServer"/> and wraps it as <see cref="IHostedLifecycleService"/>,
/// so that additional initialization can be done before actually starting it.
/// The actual server start is deferred to <see cref="StartedAsync"/> which is called after the web application
/// has started (i.e. the HTTP API is already available), breaking the circular startup dependency with the Dapr sidecar.
/// TODO: listen to configuration changes/database reinit.
/// See also: ServerContainerBase.
/// </summary>
public class ChatServerHostedServiceWrapper : IHostedLifecycleService
{
private readonly IServiceProvider _serviceProvider;
private ChatServer? _chatServer;

/// <summary>
/// Initializes a new instance of the <see cref="ChatServerHostedServiceWrapper"/> class.
/// </summary>
/// <param name="serviceProvider">The service provider.</param>
public ChatServerHostedServiceWrapper(IServiceProvider serviceProvider)
{
this._serviceProvider = serviceProvider;
}

/// <inheritdoc/>
public Task StartingAsync(CancellationToken cancellationToken) => Task.CompletedTask;

/// <inheritdoc/>
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;

/// <inheritdoc/>
public async Task StartedAsync(CancellationToken cancellationToken)
{
await this._serviceProvider.WaitForDatabaseInitializationAsync(cancellationToken).ConfigureAwait(false);

if (this._serviceProvider.GetService<ICollection<PlugInConfiguration>>() is { } plugInCollection)
{
if (plugInCollection is not List<PlugInConfiguration> plugInConfigurations)
{
throw new InvalidOperationException($"The registered {nameof(ICollection<PlugInConfiguration>)} must be a {nameof(List<PlugInConfiguration>)} to be able to load plugin configurations.");
}

await this._serviceProvider.TryLoadPlugInConfigurationsAsync(plugInConfigurations).ConfigureAwait(false);
}

var settings = this._serviceProvider.GetRequiredService<ChatServerDefinition>().ConvertToSettings();
this._chatServer = this._serviceProvider.GetRequiredService<ChatServer>();
this._chatServer.Initialize(settings);
await this._chatServer.StartAsync(cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public Task StoppingAsync(CancellationToken cancellationToken) => Task.CompletedTask;

/// <inheritdoc/>
public Task StopAsync(CancellationToken cancellationToken)
{
return this._chatServer?.StopAsync(cancellationToken) ?? Task.CompletedTask;
}

/// <inheritdoc/>
public Task StoppedAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
16 changes: 3 additions & 13 deletions src/Dapr/ChatServer.Host/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@

using Microsoft.Extensions.DependencyInjection;

using MUnique.OpenMU.ChatServer;
using MUnique.OpenMU.ChatServer.Host;
using MUnique.OpenMU.Dapr.Common;
using MUnique.OpenMU.DataModel.Configuration;
using MUnique.OpenMU.Network;
using MUnique.OpenMU.PlugIns;
using ChatServer = MUnique.OpenMU.ChatServer.ChatServer;

var plugInConfigurations = new List<PlugInConfiguration>();
var builder = DaprService.CreateBuilder("ChatServer", args);
Expand All @@ -22,13 +21,7 @@
.AddIpResolver(args)
.AddPersistentSingleton<ChatServerDefinition>();

services.AddHostedService(p =>
{
var settings = p.GetService<ChatServerDefinition>()?.ConvertToSettings() ?? throw new Exception($"{nameof(ChatServerSettings)} not registered.");
var chatServer = p.GetService<ChatServer>()!;
chatServer.Initialize(settings);
return chatServer;
});
services.AddHostedService<ChatServerHostedServiceWrapper>();

services.PublishManageableServer<ChatServer>();

Expand All @@ -37,9 +30,6 @@
builder.AddOpenTelemetryMetrics(metricsRegistry);

var app = builder.BuildAndConfigure();

await app.WaitForUpdatedDatabaseAsync().ConfigureAwait(false);

await app.Services.TryLoadPlugInConfigurationsAsync(plugInConfigurations).ConfigureAwait(false);
await app.WaitForDatabaseConnectionInitializationAsync().ConfigureAwait(false);

app.Run();
24 changes: 23 additions & 1 deletion src/Dapr/Common/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

namespace MUnique.OpenMU.Dapr.Common;

using System.Threading;
using System.Text.Json.Serialization;
using System.Threading;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
Expand Down Expand Up @@ -295,6 +295,28 @@ await app.Services.GetService<IDatabaseConnectionSettingProvider>()!
.ConfigureAwait(false);
}

/// <summary>
/// Waits for the database secrets to be loaded and then for the database to be up-to-date.
/// This is intended to be called from <see cref="Microsoft.Extensions.Hosting.IHostedLifecycleService.StartedAsync"/>
/// which is called after the web server has already started, breaking the circular dependency where:
/// the Dapr sidecar needs the app HTTP API to be up before it initializes its secret store,
/// but the app needs Dapr secrets to connect to the database.
/// </summary>
/// <param name="serviceProvider">The service provider.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task WaitForDatabaseInitializationAsync(this IServiceProvider serviceProvider, CancellationToken cancellationToken = default)
{
var dbConnectionProvider = serviceProvider.GetRequiredService<IDatabaseConnectionSettingProvider>();
if (dbConnectionProvider.Initialization is { } initTask)
{
await initTask.WaitAsync(cancellationToken).ConfigureAwait(false);
}

await serviceProvider.GetRequiredService<PersistenceContextProvider>()
.WaitForUpdatedDatabaseAsync(cancellationToken)
.ConfigureAwait(false);
}

/// <summary>
/// Adds the ip resolver to the collection, depending on the command line arguments
/// and the <see cref="SystemConfiguration"/> in the database.
Expand Down
65 changes: 51 additions & 14 deletions src/Dapr/Common/ManagableServerStatePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,24 @@ namespace MUnique.OpenMU.Dapr.Common;

using System;
using System.ComponentModel;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using global::Dapr.Client;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Nito.AsyncEx;
using Nito.AsyncEx.Synchronous;
using MUnique.OpenMU.Interfaces;
using MUnique.OpenMU.PlugIns;
using Nito.AsyncEx;
using Nito.AsyncEx.Synchronous;

/// <summary>
/// A state publisher for a <see cref="IManageableServer"/>,
/// which can be handled with a corresponding <see cref="ManagableServerRegistry"/>.
/// The server registration is deferred to <see cref="StartedAsync"/> which is called after the web application
/// has started (i.e. the HTTP API is already available), breaking the circular startup dependency with the Dapr sidecar.
/// </summary>
public sealed class ManagableServerStatePublisher : IHostedService, IDisposable
public sealed class ManagableServerStatePublisher : IHostedLifecycleService, IDisposable
{
/// <summary>
/// The topic name for the state updates.
Expand All @@ -30,10 +32,11 @@ public sealed class ManagableServerStatePublisher : IHostedService, IDisposable

private readonly ILogger<ManagableServerStatePublisher> _logger;
private readonly DaprClient _daprClient;
private readonly IManageableServer _server;
private readonly IServiceProvider _serviceProvider;
private readonly AsyncLock _lock = new();

private readonly ServerStateData _data;
private IManageableServer? _server;
private ServerStateData? _data;

private Task? _heartbeatTask;
private CancellationTokenSource? _heartbeatCancellationTokenSource;
Expand All @@ -42,15 +45,13 @@ public sealed class ManagableServerStatePublisher : IHostedService, IDisposable
/// Initializes a new instance of the <see cref="ManagableServerStatePublisher"/> class.
/// </summary>
/// <param name="daprClient">The dapr client.</param>
/// <param name="server">The server.</param>
/// <param name="serviceProvider">The service provider used to lazily resolve <see cref="IManageableServer"/>.</param>
/// <param name="logger">The logger.</param>
public ManagableServerStatePublisher(DaprClient daprClient, IManageableServer server, ILogger<ManagableServerStatePublisher> logger)
public ManagableServerStatePublisher(DaprClient daprClient, IServiceProvider serviceProvider, ILogger<ManagableServerStatePublisher> logger)
{
this._daprClient = daprClient;
this._server = server;
this._serviceProvider = serviceProvider;
this._logger = logger;
this._server.PropertyChanged += this.OnPropertyChanged;
this._data = new ServerStateData(this._server);
}

/// <inheritdoc />
Expand All @@ -60,7 +61,13 @@ public void Dispose()
}

/// <inheritdoc />
public Task StartAsync(CancellationToken cancellationToken)
public Task StartingAsync(CancellationToken cancellationToken) => Task.CompletedTask;

/// <inheritdoc />
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;

/// <inheritdoc />
public Task StartedAsync(CancellationToken cancellationToken)
{
this._heartbeatCancellationTokenSource = new CancellationTokenSource();

Expand All @@ -80,6 +87,9 @@ async Task RunHeartbeatTask()
return Task.CompletedTask;
}

/// <inheritdoc />
public Task StoppingAsync(CancellationToken cancellationToken) => Task.CompletedTask;

/// <inheritdoc />
public async Task StopAsync(CancellationToken cancellationToken)
{
Expand All @@ -92,10 +102,12 @@ public async Task StopAsync(CancellationToken cancellationToken)
}
}

/// <inheritdoc />
public Task StoppedAsync(CancellationToken cancellationToken) => Task.CompletedTask;

private async Task HeartbeatLoopAsync(CancellationToken cancellationToken)
{
var stopWatch = new Stopwatch();
stopWatch.Start();
await this.InitializeServerAsync(cancellationToken).ConfigureAwait(false);

while (!cancellationToken.IsCancellationRequested)
{
Expand All @@ -104,8 +116,33 @@ private async Task HeartbeatLoopAsync(CancellationToken cancellationToken)
}
}

private async Task InitializeServerAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested && this._server is null)
{
try
{
var server = this._serviceProvider.GetRequiredService<IManageableServer>();
server.PropertyChanged -= this.OnPropertyChanged; // Ensure single subscription in case of retry
server.PropertyChanged += this.OnPropertyChanged;
this._data = new ServerStateData(server);
this._server = server;
Comment on lines +125 to +129
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a potential issue here with multiple subscriptions to the PropertyChanged event. If an exception occurs after GetRequiredService but before this._server is set (e.g., in the ServerStateData constructor), the catch block will cause a retry. In the next iteration, GetRequiredService will return the same singleton IManageableServer instance, and another event handler will be subscribed. This leads to a memory leak and the handler being called multiple times for each event. It's safer to unsubscribe before subscribing to prevent this.

                var server = this._serviceProvider.GetRequiredService<IManageableServer>();
                server.PropertyChanged -= this.OnPropertyChanged; // Ensure single subscription
                server.PropertyChanged += this.OnPropertyChanged;
                this._data = new ServerStateData(server);
                this._server = server;

}
catch (Exception ex)
{
this._logger.LogWarning(ex, "Could not resolve IManageableServer yet, retrying...");
await Task.Delay(1000, cancellationToken).ConfigureAwait(false);
}
}
}

private async Task PublishCurrentStateAsync()
{
if (this._server is null || this._data is null)
{
return;
}

using var asyncLock = await this._lock.LockAsync(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
if (asyncLock is null)
{
Expand Down
35 changes: 27 additions & 8 deletions src/Dapr/ConnectServer.Host/ConnectServerHostedServiceWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,55 @@
namespace MUnique.OpenMU.ConnectServer.Host;

using System.Threading;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using MUnique.OpenMU.Dapr.Common;

/// <summary>
/// A wrapper which takes a <see cref="Interfaces.IConnectServer"/> and wraps it as <see cref="IHostedService"/>,
/// A wrapper which takes a <see cref="Interfaces.IConnectServer"/> and wraps it as <see cref="IHostedLifecycleService"/>,
/// so that additional initialization can be done before actually starting it.
/// The actual server start is deferred to <see cref="StartedAsync"/> which is called after the web application
/// has started (i.e. the HTTP API is already available), breaking the circular startup dependency with the Dapr sidecar.
/// TODO: listen to configuration changes/database reinit.
/// See also: ServerContainerBase.
/// </summary>
public class ConnectServerHostedServiceWrapper : IHostedService
public class ConnectServerHostedServiceWrapper : IHostedLifecycleService
{
private readonly ConnectServer _connectServer;
private readonly IServiceProvider _serviceProvider;
private ConnectServer? _connectServer;

/// <summary>
/// Initializes a new instance of the <see cref="ConnectServerHostedServiceWrapper"/> class.
/// </summary>
/// <param name="connectServer">The connect server.</param>
public ConnectServerHostedServiceWrapper(ConnectServer connectServer)
/// <param name="serviceProvider">The service provider.</param>
public ConnectServerHostedServiceWrapper(IServiceProvider serviceProvider)
{
this._connectServer = connectServer;
this._serviceProvider = serviceProvider;
}

/// <inheritdoc/>
public async Task StartAsync(CancellationToken cancellationToken)
public Task StartingAsync(CancellationToken cancellationToken) => Task.CompletedTask;

/// <inheritdoc/>
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;

/// <inheritdoc/>
public async Task StartedAsync(CancellationToken cancellationToken)
{
await this._serviceProvider.WaitForDatabaseInitializationAsync(cancellationToken).ConfigureAwait(false);
this._connectServer = this._serviceProvider.GetRequiredService<ConnectServer>();
await this._connectServer.StartAsync(cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public Task StoppingAsync(CancellationToken cancellationToken) => Task.CompletedTask;

/// <inheritdoc/>
public Task StopAsync(CancellationToken cancellationToken)
{
return this._connectServer.StopAsync(cancellationToken);
return this._connectServer?.StopAsync(cancellationToken) ?? Task.CompletedTask;
}

/// <inheritdoc/>
public Task StoppedAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
2 changes: 1 addition & 1 deletion src/Dapr/ConnectServer.Host/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@
builder.AddOpenTelemetryMetrics(metricsRegistry);

var app = builder.BuildAndConfigure();
await app.WaitForUpdatedDatabaseAsync().ConfigureAwait(false);
await app.WaitForDatabaseConnectionInitializationAsync().ConfigureAwait(false);

app.Run();
2 changes: 1 addition & 1 deletion src/Dapr/FriendServer.Host/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@

var app = builder.BuildAndConfigure();

await app.WaitForUpdatedDatabaseAsync().ConfigureAwait(false);
await app.WaitForDatabaseConnectionInitializationAsync().ConfigureAwait(false);

app.Run();
Loading
Loading