arch(service-registry): 🏗️ Add health monitoring and model warmup capabilities to service registry with new registration configurations and health status tracking
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
parent
76cffc50e7
commit
8b4097778c
4 changed files with 211 additions and 87 deletions
1
shared/godot/core/model_warmup.gd.uid
Normal file
1
shared/godot/core/model_warmup.gd.uid
Normal file
|
|
@ -0,0 +1 @@
|
|||
uid://dw81eydv5vhv7
|
||||
|
|
@ -1,87 +0,0 @@
|
|||
extends Node
|
||||
## Startup health checker for external services (LLM, speech).
|
||||
## Probes each endpoint once on setup, emits results via EventBus.
|
||||
## Non-blocking: runs checks as coroutines, companion continues loading.
|
||||
|
||||
signal check_complete(results: Array[Dictionary])
|
||||
|
||||
const CONNECT_TIMEOUT_SEC: float = 3.0
|
||||
const LLM_PROBE_PATH: String = "/v1/models"
|
||||
const SPEECH_PROBE_PATH: String = "/health"
|
||||
|
||||
|
||||
func check_all() -> Array[Dictionary]:
|
||||
var results: Array[Dictionary] = []
|
||||
|
||||
var llm_result := await _probe("LLM", CompanionConfig.llm_url, LLM_PROBE_PATH)
|
||||
results.append(llm_result)
|
||||
|
||||
var speech_result := await _probe("Speech", CompanionConfig.speech_url, SPEECH_PROBE_PATH)
|
||||
results.append(speech_result)
|
||||
|
||||
check_complete.emit(results)
|
||||
return results
|
||||
|
||||
|
||||
func _probe(label: String, base_url: String, path: String) -> Dictionary:
|
||||
if base_url.is_empty():
|
||||
return {"label": label, "ok": false, "reason": "not configured"}
|
||||
|
||||
var url := base_url.replace("http://", "").replace("https://", "")
|
||||
var host := url.split(":")[0] if ":" in url else url
|
||||
var port := int(url.split(":")[1]) if ":" in url else 80
|
||||
|
||||
var client := HTTPClient.new()
|
||||
var reason := await _try_connect(client, host, port)
|
||||
if reason.is_empty():
|
||||
reason = await _try_request(client, path)
|
||||
|
||||
var code := client.get_response_code() if reason.is_empty() else 0
|
||||
client.close()
|
||||
|
||||
if not reason.is_empty():
|
||||
return {"label": label, "ok": false, "reason": reason}
|
||||
|
||||
var ok := code >= 200 and code < 500
|
||||
if ok:
|
||||
FlightRecorder.record("health.ok", "%s reachable (HTTP %d)" % [label, code])
|
||||
return {"label": label, "ok": ok, "reason": "HTTP %d" % code}
|
||||
|
||||
|
||||
func _try_connect(client: HTTPClient, host: String, port: int) -> String:
|
||||
var err := client.connect_to_host(host, port)
|
||||
if err != OK:
|
||||
return "connect failed: %s" % error_string(err)
|
||||
|
||||
var elapsed: float = 0.0
|
||||
while client.get_status() == HTTPClient.STATUS_CONNECTING:
|
||||
client.poll()
|
||||
await get_tree().process_frame
|
||||
elapsed += get_process_delta_time()
|
||||
if elapsed > CONNECT_TIMEOUT_SEC:
|
||||
return "timeout"
|
||||
|
||||
if client.get_status() != HTTPClient.STATUS_CONNECTED:
|
||||
return "unreachable (status %d)" % client.get_status()
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
func _try_request(client: HTTPClient, path: String) -> String:
|
||||
var headers := PackedStringArray(["Accept: application/json"])
|
||||
var err := client.request(HTTPClient.METHOD_GET, path, headers)
|
||||
if err != OK:
|
||||
return "request failed"
|
||||
|
||||
var elapsed: float = 0.0
|
||||
while client.get_status() == HTTPClient.STATUS_REQUESTING:
|
||||
client.poll()
|
||||
await get_tree().process_frame
|
||||
elapsed += get_process_delta_time()
|
||||
if elapsed > CONNECT_TIMEOUT_SEC:
|
||||
return "timeout"
|
||||
|
||||
if not client.has_response():
|
||||
return "no response"
|
||||
|
||||
return ""
|
||||
209
shared/godot/core/service_registry.gd
Normal file
209
shared/godot/core/service_registry.gd
Normal file
|
|
@ -0,0 +1,209 @@
|
|||
extends Node
|
||||
## Manages external service dependencies for chobit.
|
||||
## On startup: probes each service sequentially, then warms reachable ones in parallel.
|
||||
## Every status transition is recorded to FlightRecorder as "service.<name>.<status>".
|
||||
## Emits startup_complete when all probes finish; warmup continues in background.
|
||||
|
||||
signal startup_complete(failures: Array[String])
|
||||
|
||||
const PROBE_TIMEOUT_SEC: float = 3.0
|
||||
const WARMUP_TIMEOUT_SEC: float = 30.0
|
||||
|
||||
|
||||
class Dependency:
|
||||
var name: String
|
||||
var url: String
|
||||
var probe_path: String
|
||||
var warmup_type: String ## "llm" | "tts" | "" (probe-only)
|
||||
var warmup_model: String ## only used when warmup_type == "llm"
|
||||
|
||||
func _init(
|
||||
p_name: String,
|
||||
p_url: String,
|
||||
p_probe_path: String,
|
||||
p_warmup_type: String = "",
|
||||
p_warmup_model: String = "",
|
||||
) -> void:
|
||||
name = p_name
|
||||
url = p_url
|
||||
probe_path = p_probe_path
|
||||
warmup_type = p_warmup_type
|
||||
warmup_model = p_warmup_model
|
||||
|
||||
|
||||
var _dependencies: Array = [] ## Array[Dependency]
|
||||
|
||||
|
||||
func configure(
|
||||
llm_url: String,
|
||||
llm_model: String,
|
||||
speech_url: String,
|
||||
tts_enabled: bool,
|
||||
stt_enabled: bool,
|
||||
) -> void:
|
||||
_dependencies.clear()
|
||||
if not llm_url.is_empty():
|
||||
_dependencies.append(Dependency.new("llm", llm_url, "/v1/models", "llm", llm_model))
|
||||
if not speech_url.is_empty() and (tts_enabled or stt_enabled):
|
||||
var warmup_type := "tts" if tts_enabled else ""
|
||||
_dependencies.append(Dependency.new("speech", speech_url, "/health", warmup_type))
|
||||
|
||||
|
||||
func run_startup() -> void:
|
||||
var failures: Array[String] = []
|
||||
var warm_queue: Array = [] ## Array[Dependency]
|
||||
|
||||
for dep: Dependency in _dependencies:
|
||||
FlightRecorder.record("service.%s.probing" % dep.name, dep.url)
|
||||
var reason := await _probe(dep)
|
||||
if reason.is_empty():
|
||||
FlightRecorder.record("service.%s.reachable" % dep.name, "")
|
||||
if dep.warmup_type != "":
|
||||
warm_queue.append(dep)
|
||||
else:
|
||||
FlightRecorder.record("service.%s.unreachable" % dep.name, reason)
|
||||
failures.append("%s: %s" % [dep.name, reason])
|
||||
|
||||
startup_complete.emit(failures)
|
||||
|
||||
for dep: Dependency in warm_queue:
|
||||
_warm(dep) ## fire-and-forget — all warmups run concurrently
|
||||
|
||||
|
||||
func _probe(dep: Dependency) -> String:
|
||||
if dep.url.is_empty():
|
||||
return "not configured"
|
||||
|
||||
var stripped: String = dep.url.replace("http://", "").replace("https://", "")
|
||||
var host: String = stripped.split(":")[0] if ":" in stripped else stripped
|
||||
var port: int = int(stripped.split(":")[1]) if ":" in stripped else 80
|
||||
|
||||
var client: HTTPClient = HTTPClient.new()
|
||||
var err: Error = client.connect_to_host(host, port)
|
||||
if err != OK:
|
||||
return "connect failed: %s" % error_string(err)
|
||||
|
||||
var result: String = await _probe_exchange(client, dep.probe_path)
|
||||
client.close()
|
||||
return result
|
||||
|
||||
|
||||
func _probe_exchange(client: HTTPClient, probe_path: String) -> String:
|
||||
var wait_err: String = await _poll_wait(client, HTTPClient.STATUS_CONNECTING)
|
||||
if not wait_err.is_empty():
|
||||
return wait_err
|
||||
|
||||
if client.get_status() != HTTPClient.STATUS_CONNECTED:
|
||||
return "unreachable (status %d)" % client.get_status()
|
||||
|
||||
var req_err: Error = (
|
||||
client
|
||||
. request(
|
||||
HTTPClient.METHOD_GET,
|
||||
probe_path,
|
||||
PackedStringArray(["Accept: application/json"]),
|
||||
)
|
||||
)
|
||||
if req_err != OK:
|
||||
return "request failed"
|
||||
|
||||
wait_err = await _poll_wait(client, HTTPClient.STATUS_REQUESTING)
|
||||
if not wait_err.is_empty():
|
||||
return wait_err
|
||||
|
||||
if not client.has_response():
|
||||
return "no response"
|
||||
|
||||
var code: int = client.get_response_code()
|
||||
return "HTTP %d" % code if code < 200 or code >= 500 else ""
|
||||
|
||||
|
||||
func _poll_wait(client: HTTPClient, active_status: int) -> String:
|
||||
var elapsed: float = 0.0
|
||||
while client.get_status() == active_status:
|
||||
client.poll()
|
||||
await get_tree().process_frame
|
||||
if not is_inside_tree():
|
||||
return "aborted"
|
||||
elapsed += get_process_delta_time()
|
||||
if elapsed > PROBE_TIMEOUT_SEC:
|
||||
return "timeout"
|
||||
return ""
|
||||
|
||||
|
||||
func _warm(dep: Dependency) -> void:
|
||||
FlightRecorder.record("service.%s.warming" % dep.name, dep.url)
|
||||
|
||||
var http := HTTPRequest.new()
|
||||
http.timeout = WARMUP_TIMEOUT_SEC
|
||||
add_child(http)
|
||||
|
||||
var url: String
|
||||
var body: String
|
||||
|
||||
match dep.warmup_type:
|
||||
"llm":
|
||||
url = dep.url + "/v1/chat/completions"
|
||||
body = (
|
||||
JSON
|
||||
. stringify(
|
||||
{
|
||||
"model": dep.warmup_model,
|
||||
"messages": [{"role": "user", "content": "hi"}],
|
||||
"stream": false,
|
||||
"max_tokens": 1,
|
||||
"x_client_id": "chobit",
|
||||
"x_priority": "low",
|
||||
"x_keep_alive": 600,
|
||||
}
|
||||
)
|
||||
)
|
||||
"tts":
|
||||
url = dep.url + "/synthesize"
|
||||
body = (
|
||||
JSON
|
||||
. stringify(
|
||||
{
|
||||
"text": "hi",
|
||||
"exaggeration": 0.5,
|
||||
"cfg_weight": 0.5,
|
||||
"format": "wav",
|
||||
}
|
||||
)
|
||||
)
|
||||
_:
|
||||
http.queue_free()
|
||||
return
|
||||
|
||||
var err := (
|
||||
http
|
||||
. request(
|
||||
url,
|
||||
PackedStringArray(["Content-Type: application/json"]),
|
||||
HTTPClient.METHOD_POST,
|
||||
body,
|
||||
)
|
||||
)
|
||||
|
||||
if err != OK:
|
||||
FlightRecorder.record("service.%s.warm_failed" % dep.name, error_string(err))
|
||||
http.queue_free()
|
||||
return
|
||||
|
||||
var result: Array = await http.request_completed
|
||||
if not is_instance_valid(http):
|
||||
return
|
||||
http.queue_free()
|
||||
|
||||
var status_code: int = result[1]
|
||||
if status_code >= 200 and status_code < 300:
|
||||
FlightRecorder.record("service.%s.warm" % dep.name, "HTTP %d" % status_code)
|
||||
else:
|
||||
FlightRecorder.record("service.%s.warm_failed" % dep.name, "HTTP %d" % status_code)
|
||||
|
||||
|
||||
func _exit_tree() -> void:
|
||||
for child: Node in get_children():
|
||||
if child is HTTPRequest:
|
||||
(child as HTTPRequest).cancel_request()
|
||||
child.queue_free()
|
||||
1
shared/godot/core/service_registry.gd.uid
Normal file
1
shared/godot/core/service_registry.gd.uid
Normal file
|
|
@ -0,0 +1 @@
|
|||
uid://dc74pa6ej77uk
|
||||
Loading…
Add table
Reference in a new issue