Conversation
…anage only an uuid of the player when this uuid is generated by another service for exemple, the authentication service
There was a problem hiding this comment.
Pull request overview
This PR implements significant performance optimizations and architectural improvements to the GORC (Game Object Replication Channels) system, focusing on reducing lock contention, improving distance calculations, and fixing runtime lifecycle management issues.
Key Changes:
- Migrated from
tokio::sync::RwLocktoDashMapfor lock-free concurrent access to GORC objects and positions - Introduced
distance_squared()method to eliminate expensivesqrt()operations in distance comparisons - Fixed plugin runtime lifecycle by storing runtime instances instead of just handles, preventing premature task termination
- Replaced broadcast channels with per-connection mpsc channels for O(1) message delivery in the connection manager
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/plugin_system/src/manager.rs | Stores luminal runtime and tokio handle in server context to prevent async task orphaning across DLL boundaries |
| crates/horizon_event_system/src/types.rs | Adds distance_squared() optimization method for faster distance comparisons without sqrt() |
| crates/horizon_event_system/src/system/handlers.rs | Improves runtime handling with fallback to create runtime when none exists |
| crates/horizon_event_system/src/system/emitters.rs | Adds GorcEvent wrapping, subscriber diagnostics, and new player subscription method |
| crates/horizon_event_system/src/gorc/instance.rs | Major refactor replacing RwLock with DashMap for lock-free access and uses distance_squared throughout |
| crates/horizon_event_system/src/context.rs | Adds tokio_handle() trait method for cross-DLL async task spawning |
| crates/horizon_event_system/benches/distance_benchmark.rs | New benchmark demonstrating distance_squared performance improvements |
| crates/horizon_event_system/Cargo.toml | Adds async-lock and criterion dependencies for optimization and benchmarking |
| crates/game_server/src/server/handlers.rs | Migrates to per-connection message channels for O(1) delivery |
| crates/game_server/src/server/core.rs | Adds update_player_id event handler for player authentication flow |
| crates/game_server/src/connection/manager.rs | Replaces broadcast channel with per-connection mpsc channels for efficient message routing |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let mut stored_context = self.server_context.write().expect("Failed to lock server_context for write"); | ||
| *stored_context = Some(context.clone()); | ||
| info!("🔧 Server context stored - luminal runtime will persist for plugin tasks"); | ||
| } |
There was a problem hiding this comment.
Using expect() on a RwLock can cause panics if the lock is poisoned. While lock poisoning is rare, in a plugin system where third-party code runs, it's safer to handle this error gracefully with a Result return or a logged warning rather than panicking, which could crash the entire server.
| let mut stored_context = self.server_context.write().expect("Failed to lock server_context for write"); | |
| *stored_context = Some(context.clone()); | |
| info!("🔧 Server context stored - luminal runtime will persist for plugin tasks"); | |
| } | |
| match self.server_context.write() { | |
| Ok(mut stored_context) => { | |
| *stored_context = Some(context.clone()); | |
| info!("🔧 Server context stored - luminal runtime will persist for plugin tasks"); | |
| } | |
| Err(poisoned) => { | |
| error!("❌ Failed to lock server_context for write: poisoned lock"); | |
| return Err(PluginSystemError::LockPoisoned("server_context".to_string())); | |
| } | |
| } |
| match tokio::runtime::Builder::new_current_thread() | ||
| .enable_all() | ||
| .build() | ||
| { |
There was a problem hiding this comment.
Creating a new single-threaded runtime in the fallback path (when no runtime context exists) is inefficient. This can happen frequently if handlers are called from non-async contexts. Consider maintaining a global runtime pool or requiring callers to provide a runtime handle, as repeatedly creating runtimes has significant overhead.
| for (connection_id, sender) in senders.iter() { | ||
| if let Err(e) = sender.send(message.clone()) { | ||
| tracing::error!("Failed to broadcast message to connection {}: {:?}", connection_id, e); | ||
| } else { | ||
| sent_count += 1; | ||
| } | ||
| } |
There was a problem hiding this comment.
The broadcast_to_all method holds a read lock on message_senders while sending to all channels. If any channel's buffer is full or slow to process, this will block other operations from acquiring the lock. Consider collecting the senders first, then releasing the lock before sending, or clone the senders to avoid holding the lock during I/O operations.
| /// # Returns | ||
| /// | ||
| /// An unbounded receiver for messages targeted to this connection. | ||
| pub async fn register_message_channel(&self, connection_id: ConnectionId) -> mpsc::UnboundedReceiver<Vec<u8>> { |
There was a problem hiding this comment.
Using an unbounded channel can lead to memory exhaustion if messages are produced faster than they can be consumed. Consider using a bounded channel with an appropriate capacity and handling the backpressure scenario (e.g., dropping old messages or disconnecting slow clients).
| } | ||
|
|
||
| /// Get the tracked position of an object (lock-free, single source of truth for spatial queries) | ||
| pub fn get_object_position(&self, object_id: GorcObjectId) -> Option<Vec3> { |
There was a problem hiding this comment.
The method signature changed from async to sync but still internally accesses a DashMap which provides interior mutability. While this is technically safe, making this method synchronous when all other similar methods are async creates an inconsistency in the API. Consider keeping this async for API consistency, especially since callers may already be in async contexts.
| pub fn get_object_position(&self, object_id: GorcObjectId) -> Option<Vec3> { | |
| pub async fn get_object_position(&self, object_id: GorcObjectId) -> Option<Vec3> { |
| std::thread::spawn(move || { | ||
| let rt = tokio::runtime::Builder::new_current_thread() | ||
| .enable_all() | ||
| .build() | ||
| .expect("Failed to build runtime for update_player_id"); | ||
|
|
||
| rt.block_on(async move { | ||
| // Get the connection_id for this player | ||
| if let Some(connection_id) = conn_mgr.get_connection_id_by_player(old_player_id).await { | ||
| // Update the player_id stored in the connection | ||
| conn_mgr.set_player_id(connection_id, new_player_id).await; | ||
| info!( | ||
| "🔄 Updated player_id in connection {} from {} to {}", | ||
| connection_id, old_player_id, new_player_id | ||
| ); | ||
| } else { | ||
| warn!("⚠️ Failed to find connection for player {} when updating player_id", old_player_id); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Creating a new runtime for each update_player_id event is inefficient and can lead to resource exhaustion under load. Each runtime spawns OS threads, and creating them repeatedly is expensive. Consider using the existing tokio runtime handle from the ServerContext instead, or spawn the task on the existing runtime using tokio::spawn.
| std::thread::spawn(move || { | |
| let rt = tokio::runtime::Builder::new_current_thread() | |
| .enable_all() | |
| .build() | |
| .expect("Failed to build runtime for update_player_id"); | |
| rt.block_on(async move { | |
| // Get the connection_id for this player | |
| if let Some(connection_id) = conn_mgr.get_connection_id_by_player(old_player_id).await { | |
| // Update the player_id stored in the connection | |
| conn_mgr.set_player_id(connection_id, new_player_id).await; | |
| info!( | |
| "🔄 Updated player_id in connection {} from {} to {}", | |
| connection_id, old_player_id, new_player_id | |
| ); | |
| } else { | |
| warn!("⚠️ Failed to find connection for player {} when updating player_id", old_player_id); | |
| } | |
| }); | |
| tokio::spawn(async move { | |
| // Get the connection_id for this player | |
| if let Some(connection_id) = conn_mgr.get_connection_id_by_player(old_player_id).await { | |
| // Update the player_id stored in the connection | |
| conn_mgr.set_player_id(connection_id, new_player_id).await; | |
| info!( | |
| "🔄 Updated player_id in connection {} from {} to {}", | |
| connection_id, old_player_id, new_player_id | |
| ); | |
| } else { | |
| warn!("⚠️ Failed to find connection for player {} when updating player_id", old_player_id); | |
| } |
| debug!("🎮 GORC: Zone changes for player {} - {} entries, {} exits", player_id, zone_entries.len(), zone_exits.len()); | ||
|
|
||
| // If this is a new player or they moved significantly, recalculate subscriptions | ||
| const MOVEMENT_THRESHOLD_SQ: f64 = 25.0; // 5.0 * 5.0 |
There was a problem hiding this comment.
The magic number 25.0 representing the squared movement threshold should be defined as a named constant (e.g., MOVEMENT_THRESHOLD_SQ) at the module level or with the original threshold for clarity and maintainability. This makes the relationship to the 5.0 meter threshold explicit.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
@ddurieux maybe you want to create an maybe this would lessen the impact of #301 (comment) to not require a memory copy (if there were any)
|
| break; | ||
| } | ||
| while let Some(message) = message_receiver.recv().await { | ||
| let message_text = String::from_utf8_lossy(&message); |
There was a problem hiding this comment.
from_utf8_lossy is not O(1) due to the fact that it needs to loop over message.
| luminal_handle: luminal::Handle, | ||
| /// The luminal runtime - MUST be stored to keep tasks alive | ||
| /// Dropping the runtime will terminate all spawned tasks! | ||
| luminal_runtime: Arc<luminal::Runtime>, |
There was a problem hiding this comment.
the docstring here says that dropping luminal_runtime would stop all threads, but 191-194, 469-472 say it will be orphaned
a-catgirl-dev
left a comment
There was a problem hiding this comment.
some copilot messages, some of mine, but the most damning is the one in crates/game_server/src/server/core.rs. i dont think that code would run very often, but it is scary as heck and if i were the one making the calls, i would not be merging this into main
also i clicked on add comments instead of start review... so you only have one comment here
| if let (Ok(old_player_id), Ok(new_player_id)) = (old_player_id, new_player_id) { | ||
| // Spawn a dedicated thread with its own runtime to handle the async work | ||
| // This is necessary because on_core_async handlers don't have a guaranteed tokio runtime context | ||
| std::thread::spawn(move || { | ||
| let rt = tokio::runtime::Builder::new_current_thread() | ||
| .enable_all() | ||
| .build() | ||
| .expect("Failed to build runtime for update_player_id"); | ||
|
|
||
| rt.block_on(async move { | ||
| // Get the connection_id for this player | ||
| if let Some(connection_id) = conn_mgr.get_connection_id_by_player(old_player_id).await { | ||
| // Update the player_id stored in the connection | ||
| conn_mgr.set_player_id(connection_id, new_player_id).await; | ||
| info!( | ||
| "🔄 Updated player_id in connection {} from {} to {}", | ||
| connection_id, old_player_id, new_player_id | ||
| ); | ||
| } else { | ||
| warn!("⚠️ Failed to find connection for player {} when updating player_id", old_player_id); | ||
| } | ||
| }); |
There was a problem hiding this comment.
copilot is right. please dont do this in prod
There was a problem hiding this comment.
diff --git a/crates/game_server/src/server/core.rs b/crates/game_server/src/server/core.rs
index 0bf4ac1..9cc791a 100644
--- a/crates/game_server/src/server/core.rs
+++ b/crates/game_server/src/server/core.rs
@@ -564,25 +564,18 @@ impl GameServer {
if let (Ok(old_player_id), Ok(new_player_id)) = (old_player_id, new_player_id) {
// Spawn a dedicated thread with its own runtime to handle the async work
// This is necessary because on_core_async handlers don't have a guaranteed tokio runtime context
- std::thread::spawn(move || {
- let rt = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("Failed to build runtime for update_player_id");
-
- rt.block_on(async move {
- // Get the connection_id for this player
- if let Some(connection_id) = conn_mgr.get_connection_id_by_player(old_player_id).await {
- // Update the player_id stored in the connection
- conn_mgr.set_player_id(connection_id, new_player_id).await;
- info!(
- "🔄 Updated player_id in connection {} from {} to {}",
- connection_id, old_player_id, new_player_id
- );
- } else {
- warn!("⚠ Failed to find connection for player {} when updating player_id", old_player_id);
- }
- });
+ tokio::runtime::Handle::current().spawn(async move {^M
+ // Get the connection_id for this player^M
+ if let Some(connection_id) = conn_mgr.get_connection_id_by_player(old_player_id).await {^M
+ // Update the player_id stored in the connection^M
+ conn_mgr.set_player_id(connection_id, new_player_id).await;^M
+ info!(^M
+ "🔄 Updated player_id in connection {} from {} to {}",^M
+ connection_id, old_player_id, new_player_id^M
+ );^M
+ } else {^M
+ warn!("⚠ Failed to find connection for player {} when updating player_id", old_player_id);^M
+ }^M
});
} else {
warn!("⚠ Failed to deserialize player IDs from update_player_id event: {:?}", event);
@@ -772,4 +765,4 @@ impl GameServer {
self.plugin_manager.clone()
}
-}
\ No newline at end of file
+}^Mperhaps this diff might work? im not sure if this will panic because i have no idea if this closure runs in an OS thread or green thread, and there are no tests to ensure behaviour.

It's very big