How It Works#
The Live Video Alert Agent is a multi-layered agentic application that ingests RTSP video streams, applies VLM-based scene understanding, and dispatches configurable actions through an agentic tool-calling pipeline.
Architecture Overview#

Data Flow#
RTSP Sources (N cameras)
│
▼
LiveStreamManager × N grab()/retrieve() throttled decode
│ exponential-backoff reconnection
│ frame (latest)
▼
AgentManager one asyncio.Task per stream (concurrent)
├─ VlmClient ──────────────► OVMS / OpenAI-compatible VLM
│ └─ retry + backoff Phi-3.5-Vision | InternVL2-2B ...
│
├─ AlertStateManager per-stream × per-alert runtime state
│ ├─ cooldown gate suppresses repeat firings
│ ├─ consecutive counter detects persistent conditions
│ └─ escalation trigger promotes alert tier after N consecutives
│
├─ AlertActionAgent decides WHICH tools to call
│ ├─ ADK mode Google ADK LlmAgent + FunctionTool (default) with OVMS-hosted text model endpoint
│ └─ Rule-based mode direct tool execution — no LLM needed
│
├─ MCP Client (optional) Model Context Protocol integration
│ └─ External MCP servers discover and invoke remote tools
│
└─ Action Tools (async)
├─ log_alert structured logging
├─ capture_snapshot JPEG frame to disk / named volume
├─ trigger_webhook HMAC-signed HTTP POST
└─ publish_mqtt paho-mqtt 2.x MQTTv5 publish
│
▼
EventManager (SSE pub/sub) alerts fan-out to all connected browsers
│
▼
Dashboard UI real-time stream tiles, alert feed
Key Components#
LiveStreamManager#
Each registered camera has its own LiveStreamManager running in a daemon thread.
Uses
cv2.VideoCapture.grab()followed byretrieve()to skip deep-decode on unused frames, reducing CPU usage proportionally to the gap between capture FPS and analysis FPS.Frame interval is controlled by
CAPTURE_FPS(default: auto-derived fromANALYSIS_INTERVAL).Reconnects on drop-out with exponential back-off (2 s → 30 s).
Exposes a
get_health()method returning connection status, actual FPS, resolution, and buffer fill level.
AgentManager#
The central orchestrator. Instead of a single serial loop across all cameras, each
stream gets an independent asyncio.Task:
add_stream("cam1", ...) → _launch_stream_task("cam1")
add_stream("cam2", ...) → _launch_stream_task("cam2")
cam1-task: _stream_analysis_loop() running every ANALYSIS_INTERVAL seconds
cam2-task: _stream_analysis_loop() running every ANALYSIS_INTERVAL seconds
Failed or cancelled tasks are automatically restarted via an add_done_callback.
VlmClient#
Thin async wrapper around openai.AsyncOpenAI, targeting OVMS (OpenVINO Model
Server) via its OpenAI-compatible REST API.
Sends a
systemrole message (VLM system instruction) plus ausermessage containing the base64-encoded frame and the structured alert prompt.Retries failed calls up to
VLM_MAX_RETRIEStimes with exponential back-off.Alert prompts are serialised with
json.dumps— not f-strings — to prevent prompt-injection from user-supplied alert names or text.
AlertStateManager#
Maintains per-stream × per-alert runtime state without any database dependency:
State field |
Purpose |
|---|---|
|
Timestamp of last tool execution |
|
Counts unbroken YES detections; triggers escalation |
|
Detects state transitions (NO→YES, YES→NO) |
process() returns (should_act, is_escalation, is_transition) so the manager
can decide whether to invoke tools and which tier of tools to use.
AlertActionAgent#
Decides which tools to invoke for a fired alert. Operates in one of three modes, selected automatically at startup:
Mode 1 — Google ADK (USE_ADK=true, default)#
Uses Google’s Agent Development Kit with a LlmAgent that receives
structured alert context and calls FunctionTool-wrapped async tool functions.
Best for dynamic, LLM-reasoned escalation logic that can be adjusted without code
changes. The LLM is served locally via OVMS (ovms-llm service) using an
OpenAI-compatible API endpoint.
Mode 2 — Rule-based (USE_ADK=false)#
Directly executes the tool list from AlertConfig.tools in order. No external LLM
required — works fully offline and air-gapped. Escalation tools from
AlertConfig.escalation.additional_tools are appended when the consecutive
threshold is reached.
Action Tools#
All four tools are async functions registered in _TOOL_MAP:
Tool |
Trigger condition |
Configuration |
|---|---|---|
|
Always |
Built-in, always active |
|
Alert fires |
|
|
Alert fires |
|
|
Alert fires |
|
Tools are configured per-alert in AlertConfig.tools and are silently skipped
if their required env var is not set.
Alert Configuration Schema#
Each alert is described by an AlertConfig Pydantic model:
{
"name": "Fire Detection",
"prompt": "Is there fire or smoke visible?",
"enabled": true,
"tools": ["log_alert", "capture_snapshot"],
"tool_arguments": {
"trigger_webhook": {"stream_id": "{{stream_id}}", "severity": "{{severity}}"}
},
"escalation": {
"threshold_consecutive": 3,
"additional_tools": ["trigger_webhook", "publish_mqtt"]
}
}
Field |
Values |
Description |
|---|---|---|
|
list of tool names |
Tools invoked when alert fires |
|
object |
Per-tool keyword argument overrides; supports |
|
integer ≥ 2 |
Consecutive YES count before escalation |
|
list of tool names |
Extra tools added on escalation |
Event Types#
The SSE stream (GET /events) emits four event types:
Event |
When |
|---|---|
|
On SSE connect — current streams + latest results |
|
Each VLM analysis cycle completes |
|
Alert fired and tools were invoked |
|
Every 15 s to prevent proxy timeouts |
MCP Integration#
The agent supports connecting to external Model Context Protocol (MCP) servers, allowing alerts to invoke tools hosted on remote services (e.g., Prometheus for metrics queries, custom REST APIs, etc.).
MCPClient#
The MCPClient module manages lifecycle for one or more MCP servers configured in
resources/mcp_servers.json. Supported transports:
Transport |
When to use |
|---|---|
|
Remote HTTP MCP server (MCP Streamable HTTP protocol) |
|
Remote SSE-based MCP server |
|
Local subprocess MCP server |
At startup, if MCP_ENABLED=true, the agent:
Reads
resources/mcp_servers.jsonConnects to each enabled server and performs the MCP
initializehandshakeCalls
tools/listto discover available toolsRegisters discovered tools with the
AlertActionAgentunder prefixed names (mcp_{server_name}_{tool_name})If ADK mode is active, reinitialises the agent so the new tools appear in the LLM’s tool list
MCP tools can be referenced in AlertConfig.tools and AlertConfig.escalation.additional_tools
by their prefixed names, and are invocable via the /mcp/tools/{tool_name}/invoke API endpoint.