Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 174 additions & 21 deletions src/runtime/multi_mode/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import logging
import os
import time
from typing import List, Optional, Union
from typing import Any, Dict, List, Optional, Set, Union

import json5
from actions.orchestrator import ActionOrchestrator
from backgrounds.orchestrator import BackgroundOrchestrator
from fuser import Fuser
Expand Down Expand Up @@ -74,13 +75,16 @@ def __init__(
self.check_interval = check_interval
self.config_watcher_task: Optional[asyncio.Task] = None
self.last_modified: Optional[float] = None
self.config_path = self.mode_manager._get_runtime_config_path()
self.source_config_path = self._get_source_config_path()
self.watched_config_mtimes: Dict[str, float] = {}

# Initialize hot-reload if enabled
if self.hot_reload:
self.config_path = self.mode_manager._get_runtime_config_path()
self.last_modified = self._get_file_mtime()
self._refresh_watched_config_mtimes()
self.last_modified = self.watched_config_mtimes.get(self.config_path, 0.0)
logging.info(
f"Hot-reload enabled for runtime config: {self.config_path} (check interval: {check_interval}s)"
f"Hot-reload enabled for {len(self.watched_config_mtimes)} config file(s) (check interval: {check_interval}s)"
)

# Current runtime components
Expand Down Expand Up @@ -607,17 +611,132 @@ def get_available_modes(self) -> dict:
for name, config in self.mode_config.modes.items()
}

def _get_file_mtime(self) -> float:
def _get_source_config_path(self) -> str:
"""
Resolve the source configuration path from mode_config_name.

Returns
-------
str
Absolute path to the mode-aware source configuration file.
"""
config_name = self.mode_config_name
if not config_name.endswith((".json", ".json5")):
config_name = f"{config_name}.json5"

if os.path.isabs(config_name):
return os.path.normpath(config_name)

if os.path.exists(config_name):
return os.path.abspath(config_name)

return os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../../config", config_name)
)

def _extract_config_references(self, config_file_path: str) -> Set[str]:
"""
Parse a config file and return referenced .json/.json5 paths.

Parameters
----------
config_file_path : str
Path to the config file to parse.

Returns
-------
Set[str]
Absolute referenced config paths discovered in the file.
"""
if not os.path.exists(config_file_path):
return set()

try:
with open(config_file_path, "r") as f:
raw_config = json5.load(f)
except Exception as e:
logging.debug(
f"Unable to parse config references from '{config_file_path}': {e}"
)
return set()

base_dir = os.path.dirname(config_file_path)
references: Set[str] = set()

def walk(value: Any) -> None:
if isinstance(value, dict):
for nested in value.values():
walk(nested)
return

if isinstance(value, list):
for nested in value:
walk(nested)
return

if not isinstance(value, str):
return

candidate = value.strip()
if "://" in candidate:
return

if not candidate.lower().endswith((".json", ".json5")):
return

resolved = (
candidate
if os.path.isabs(candidate)
else os.path.abspath(os.path.join(base_dir, candidate))
)
references.add(os.path.normpath(resolved))

walk(raw_config)
references.discard(os.path.normpath(os.path.abspath(config_file_path)))
return references

def _get_watched_config_paths(self) -> Set[str]:
"""
Build the set of config files to watch for hot-reload.

Returns
-------
Set[str]
Absolute config paths to monitor.
"""
watched_paths = {os.path.normpath(os.path.abspath(self.config_path))}

source_path = os.path.normpath(os.path.abspath(self.source_config_path))
watched_paths.add(source_path)
watched_paths.update(self._extract_config_references(source_path))

return watched_paths

def _refresh_watched_config_mtimes(self) -> None:
"""
Refresh the watched config file map and their mtimes.
"""
self.watched_config_mtimes = {
path: self._get_file_mtime(path) for path in self._get_watched_config_paths()
}

def _get_file_mtime(self, file_path: Optional[str] = None) -> float:
"""
Get the modification time of the config file.

Parameters
----------
file_path : Optional[str]
Path to the file. Defaults to runtime config path.

Returns
-------
float
The modification time as a timestamp
"""
if self.config_path and os.path.exists(self.config_path):
return os.path.getmtime(self.config_path)
target_path = self.config_path if file_path is None else file_path
if target_path and os.path.exists(target_path):
return os.path.getmtime(target_path)
return 0.0

async def _check_config_changes(self) -> None:
Expand All @@ -628,17 +747,35 @@ async def _check_config_changes(self) -> None:
try:
await asyncio.sleep(self.check_interval)

if not self.config_path or not os.path.exists(self.config_path):
continue
changed_paths: Set[str] = set()
current_watch_paths = self._get_watched_config_paths()
all_watch_paths = current_watch_paths | set(
self.watched_config_mtimes.keys()
)

current_mtimes: Dict[str, float] = {}
for path in all_watch_paths:
current_mtime = self._get_file_mtime(path)
current_mtimes[path] = current_mtime

current_mtime = self._get_file_mtime()
previous_mtime = self.watched_config_mtimes.get(path, 0.0)
if current_mtime != previous_mtime:
changed_paths.add(path)

if self.last_modified and current_mtime > self.last_modified:
if changed_paths:
logging.info(
f"Runtime config file changed, reloading: {self.config_path}"
"Configuration file changes detected: %s",
", ".join(sorted(changed_paths)),
)
await self._reload_config()
self.last_modified = current_mtime
reload_from_runtime = changed_paths == {self.config_path}
await self._reload_config(reload_from_runtime=reload_from_runtime)
self._refresh_watched_config_mtimes()
else:
self.watched_config_mtimes = {
path: current_mtimes[path] for path in current_watch_paths
}

self.last_modified = self.watched_config_mtimes.get(self.config_path, 0.0)

except asyncio.CancelledError:
logging.debug("Config watcher cancelled")
Expand All @@ -647,16 +784,28 @@ async def _check_config_changes(self) -> None:
logging.error(f"Error checking config changes: {e}")
await asyncio.sleep(10) # Wait before retrying

async def _reload_config(self) -> None:
async def _reload_config(self, reload_from_runtime: bool = False) -> None:
"""
Reload the mode configuration when runtime config file changes.
Reload the mode configuration after file changes are detected.

The runtime config file serves as a trigger - when it changes, we reload
from the original configuration source and then regenerate the runtime config.
Parameters
----------
reload_from_runtime : bool, optional
If True, reload from runtime config snapshot; otherwise reload from
the original source config file and any referenced mode files.
"""
try:
reload_path = self.config_path if reload_from_runtime else self.source_config_path
if not reload_path or not os.path.exists(reload_path):
logging.warning(
"Reload source file missing (%s), falling back to runtime config",
reload_path,
)
reload_path = self.config_path
reload_from_runtime = True

logging.info(
f"Runtime config file changed, triggering reload: {self.config_path}"
f"Configuration changed, triggering reload from: {reload_path}"
)

self._is_reloading = True
Expand All @@ -665,15 +814,19 @@ async def _reload_config(self) -> None:

await self._stop_current_orchestrators()

logging.info("Loading configuration from the new runtime file")
logging.info("Loading updated mode configuration")
new_mode_config = load_mode_config(
self.mode_config_name,
mode_source_path=self.mode_manager._get_runtime_config_path(),
mode_source_path=reload_path,
)

self.mode_config = new_mode_config
self.mode_manager.config = new_mode_config

# Keep runtime snapshot in sync when source files are edited directly.
if not reload_from_runtime:
self.mode_manager._create_runtime_config_file()

if current_mode not in new_mode_config.modes:
logging.warning(
f"Current mode '{current_mode}' not found in reloaded config, switching to default mode '{new_mode_config.default_mode}'"
Expand Down
Loading