How It Works#

The Live Video Alert Agent is a multi-layered agentic application that ingests RTSP video streams, applies VLM-based scene understanding, and dispatches configurable actions through an agentic tool-calling pipeline.

Architecture Overview#

System Architecture

Data Flow#

RTSP Sources (N cameras)
     │
     ▼
LiveStreamManager × N          grab()/retrieve() throttled decode
     │                         exponential-backoff reconnection
     │  frame (latest)
     ▼
AgentManager                   one asyncio.Task per stream (concurrent)
  ├─ VlmClient ──────────────► OVMS / OpenAI-compatible VLM
  │   └─ retry + backoff       Phi-3.5-Vision | InternVL2-2B ...
  │
  ├─ AlertStateManager         per-stream × per-alert runtime state
  │   ├─ cooldown gate         suppresses repeat firings
  │   ├─ consecutive counter   detects persistent conditions
  │   └─ escalation trigger    promotes alert tier after N consecutives
  │
  ├─ AlertActionAgent          decides WHICH tools to call
  │   ├─ ADK mode              Google ADK LlmAgent + FunctionTool (default) with OVMS-hosted text model endpoint
  │   └─ Rule-based mode       direct tool execution — no LLM needed
  │
  ├─ MCP Client (optional)     Model Context Protocol integration
  │   └─ External MCP servers  discover and invoke remote tools
  │
  └─ Action Tools (async)
      ├─ log_alert              structured logging
      ├─ capture_snapshot       JPEG frame to disk / named volume
      ├─ trigger_webhook        HMAC-signed HTTP POST
      └─ publish_mqtt           paho-mqtt 2.x MQTTv5 publish
     │
     ▼
EventManager (SSE pub/sub)     alerts fan-out to all connected browsers
     │
     ▼
Dashboard UI                   real-time stream tiles, alert feed

Key Components#

LiveStreamManager#

Each registered camera has its own LiveStreamManager running in a daemon thread.

  • Uses cv2.VideoCapture.grab() followed by retrieve() to skip deep-decode on unused frames, reducing CPU usage proportionally to the gap between capture FPS and analysis FPS.

  • Frame interval is controlled by CAPTURE_FPS (default: auto-derived from ANALYSIS_INTERVAL).

  • Reconnects on drop-out with exponential back-off (2 s → 30 s).

  • Exposes a get_health() method returning connection status, actual FPS, resolution, and buffer fill level.

AgentManager#

The central orchestrator. Instead of a single serial loop across all cameras, each stream gets an independent asyncio.Task:

add_stream("cam1", ...) → _launch_stream_task("cam1")
add_stream("cam2", ...) → _launch_stream_task("cam2")

cam1-task: _stream_analysis_loop() running every ANALYSIS_INTERVAL seconds
cam2-task: _stream_analysis_loop() running every ANALYSIS_INTERVAL seconds

Failed or cancelled tasks are automatically restarted via an add_done_callback.

VlmClient#

Thin async wrapper around openai.AsyncOpenAI, targeting OVMS (OpenVINO Model Server) via its OpenAI-compatible REST API.

  • Sends a system role message (VLM system instruction) plus a user message containing the base64-encoded frame and the structured alert prompt.

  • Retries failed calls up to VLM_MAX_RETRIES times with exponential back-off.

  • Alert prompts are serialised with json.dumps — not f-strings — to prevent prompt-injection from user-supplied alert names or text.

AlertStateManager#

Maintains per-stream × per-alert runtime state without any database dependency:

State field

Purpose

last_action_time

Timestamp of last tool execution

consecutive_yes

Counts unbroken YES detections; triggers escalation

last_answer

Detects state transitions (NO→YES, YES→NO)

process() returns (should_act, is_escalation, is_transition) so the manager can decide whether to invoke tools and which tier of tools to use.

AlertActionAgent#

Decides which tools to invoke for a fired alert. Operates in one of three modes, selected automatically at startup:

Mode 1 — Google ADK (USE_ADK=true, default)#

Uses Google’s Agent Development Kit with a LlmAgent that receives structured alert context and calls FunctionTool-wrapped async tool functions. Best for dynamic, LLM-reasoned escalation logic that can be adjusted without code changes. The LLM is served locally via OVMS (ovms-llm service) using an OpenAI-compatible API endpoint.

Mode 2 — Rule-based (USE_ADK=false)#

Directly executes the tool list from AlertConfig.tools in order. No external LLM required — works fully offline and air-gapped. Escalation tools from AlertConfig.escalation.additional_tools are appended when the consecutive threshold is reached.

Action Tools#

All four tools are async functions registered in _TOOL_MAP:

Tool

Trigger condition

Configuration

log_alert

Always

Built-in, always active

capture_snapshot

Alert fires

SNAPSHOT_DIR writable

trigger_webhook

Alert fires

WEBHOOK_URL set

publish_mqtt

Alert fires

MQTT_BROKER set

Tools are configured per-alert in AlertConfig.tools and are silently skipped if their required env var is not set.

Alert Configuration Schema#

Each alert is described by an AlertConfig Pydantic model:

{
  "name": "Fire Detection",
  "prompt": "Is there fire or smoke visible?",
  "enabled": true,
  "tools": ["log_alert", "capture_snapshot"],
  "tool_arguments": {
    "trigger_webhook": {"stream_id": "{{stream_id}}", "severity": "{{severity}}"}
  },
  "escalation": {
    "threshold_consecutive": 3,
    "additional_tools": ["trigger_webhook", "publish_mqtt"]
  }
}

Field

Values

Description

tools

list of tool names

Tools invoked when alert fires

tool_arguments

object

Per-tool keyword argument overrides; supports {{variable}} placeholders rendered from alert context (stream_id, alert_name, answer, reason, consecutive_count, escalated, snapshot_path)

escalation.threshold_consecutive

integer ≥ 2

Consecutive YES count before escalation

escalation.additional_tools

list of tool names

Extra tools added on escalation

Event Types#

The SSE stream (GET /events) emits four event types:

Event

When

init

On SSE connect — current streams + latest results

analysis

Each VLM analysis cycle completes

alert_action

Alert fired and tools were invoked

keepalive

Every 15 s to prevent proxy timeouts

MCP Integration#

The agent supports connecting to external Model Context Protocol (MCP) servers, allowing alerts to invoke tools hosted on remote services (e.g., Prometheus for metrics queries, custom REST APIs, etc.).

MCPClient#

The MCPClient module manages lifecycle for one or more MCP servers configured in resources/mcp_servers.json. Supported transports:

Transport

When to use

http

Remote HTTP MCP server (MCP Streamable HTTP protocol)

sse

Remote SSE-based MCP server

stdio

Local subprocess MCP server

At startup, if MCP_ENABLED=true, the agent:

  1. Reads resources/mcp_servers.json

  2. Connects to each enabled server and performs the MCP initialize handshake

  3. Calls tools/list to discover available tools

  4. Registers discovered tools with the AlertActionAgent under prefixed names (mcp_{server_name}_{tool_name})

  5. If ADK mode is active, reinitialises the agent so the new tools appear in the LLM’s tool list

MCP tools can be referenced in AlertConfig.tools and AlertConfig.escalation.additional_tools by their prefixed names, and are invocable via the /mcp/tools/{tool_name}/invoke API endpoint.