POI MQTT Pipeline Design#

Topics Subscribed#

#

Topic

Source Consumer

Purpose

1

scenescape/data/camera/+

EventConsumer

Person detection + face reid embeddings (PRIMARY — used for FAISS matching)

2

scenescape/regulated/scene/+

ScenescapeRegionConsumer

Scene tracking + region entry/exit

3

scenescape/external/+/person

EventConsumer

Global UUID observability + movement event storage (monitoring only — NOT used for FAISS)

Topic 1: scenescape/data/camera/{camera_id}#

Published by: DL Streamer Pipeline Server (runs person + face detection + reid models)

Raw payload structure:

{
  "timestamp": "2026-04-27T07:42:17.916Z",
  "objects": {
    "person": [
      {
        "id": 1,
        "bounding_box_px": [200, 150, 280, 380],
        "confidence": 0.91,
        "metadata": {
          "reid": {
            "embedding_vector": "AABAP...base64string...1368chars"
          }
        },
        "sub_objects": {
          "face": [
            {
              "id": "face-001",
              "bounding_box_px": [210, 155, 260, 210],
              "metadata": {
                "reid": {
                  "embedding_vector": "AABAP...base64string...1368chars"
                }
              }
            }
          ]
        }
      }
    ],
    "face": []
  }
}

What we extract:

  1. camera_id — from the MQTT topic itself

  2. person[].id — SceneScape tracking ID

  3. person[].sub_objects.face[].metadata.reid.embedding_vectorface embedding (only source used for FAISS)

  4. Embedding decode: base64.b64decode(str) → 1024 bytes → struct.unpack("256f", raw)256-dim float vector

Important: Body-level embeddings (person[].metadata.reid.embedding_vector) come from person-reidentification-retail-0277 — a DIFFERENT embedding space from the face model. They are not used for FAISS matching. Only face sub-object embeddings from face-reidentification-retail-0095 are sent to FAISS.

What we do with it:

embedding vector → DetectionIndex.claim_track(track_id, vector, frame)
                   ├── if first time (SETNX gate)
                   │     → store entry in FAISS + detection:frame:{faiss_id}
                   └── always → update rolling exit vector + frame
                → FAISS POI search (top_k=10, threshold=SIMILARITY_THRESHOLD)
                → if match found → create alert → store in Redis
                → always → store MovementEvent in Redis

Topic 2: scenescape/regulated/scene/{scene_id}#

Published by: SceneScape scene controller (regulated/smoothed tracking)

Raw payload structure:

{
  "timestamp": "2026-04-27T07:42:20.100Z",
  "id": "db68a737-92db-4477-880b-07bc7d658ab9",
  "objects": [
    {
      "id": 1,
      "category": "person",
      "visibility": ["Camera_01"],
      "metadata": {
        "reid": { "embedding_vector": [[0.12, -0.05, "... 256 floats"]] }
      },
      "regions": {
        "entrance-zone": { "entered": "2026-04-27T07:41:55.000Z" },
        "aisle-3":       { "entered": "2026-04-27T07:42:10.000Z" }
      },
      "sub_objects": { "face": [] }
    }
  ]
}

What we extract (stateful diff per person):

  • current_regions = set(person["regions"].keys())

  • previous_regions = in-memory state from last message for same person.id

  • current - previousregion ENTEREDstore_region_entry()

  • previous - currentregion EXITEDstore_region_exit() + compute dwell time

  • If person disappears from objects entirely → forced exit for all their regions


Redis Data Model#

1. Movement Events — event:{object_id}:{timestamp}#

Written every detection frame. TTL = 7 days.

KEY:   event:1:2026-04-27T07:42:22.753Z
TYPE:  string (JSON)
TTL:   604800 sec (7 days)

VALUE:
{
  "object_id": 1,
  "timestamp": "2026-04-27T07:42:22.753Z",
  "camera_id": "Camera_02",
  "region":    "Camera_02",
  "poi_id":    null,              ← filled when POI match found
  "embedding_reference": null
}

Currently 3,425+ movement event keys live from Camera_01 and Camera_02.


2. POI Registry — poi:{poi_id} + poi:index#

Created when an operator enrolls a POI via the UI.

KEY:   poi:{poi_id}              → full POI JSON  (no TTL — permanent)
KEY:   poi:index                 → Redis SET of all poi_ids

VALUE of poi:{poi_id}:
{
  "event_type": "poi_enrollment",
  "poi_id": "poi-a3f2c1b0",
  "enrolled_by": "operator",
  "severity": "high",            ← low | medium | high
  "notes": "Shoplifter suspect",
  "reference_images": [
    {
      "source": "uploaded_image",
      "embedding_id": "emb-0001",
      "vector_dim": 256,
      "image_path": "/data/images/poi-a3f2c1b0.jpg"
    }
  ],
  "status": "active",
  "timestamp": "2026-04-27T08:00:00Z"
}

3. FAISS ID Mappings — faiss2poi:{int} + poi2faiss:{poi_id}#

Created alongside POI enrollment to bridge FAISS integer indices to POI string IDs.

KEY:  faiss2poi:{int}            → "poi-a3f2c1b0"   (FAISS internal index → POI ID)
KEY:  poi2faiss:{poi_id}         → SET{0, 1, ...}   (POI ID → all its FAISS vector indices)

4. Alert Cache & Dedup#

KEY:  alert:sent:{object_id}:{poi_id}  → "1"    TTL=configurable  (dedup: 1 alert per window per person+POI pair)
KEY:  alert:{alert_id}                 → full alert JSON   TTL=7 days
KEY:  alerts:recent                    → LIST of last 1000 alert JSONs  (no TTL)

VALUE of alert:{alert_id}:
{
  "event_type": "poi_match_alert",
  "alert_id": "alert-uuid",
  "timestamp": "2026-04-27T08:05:12Z",
  "poi_id": "poi-a3f2c1b0",
  "severity": "high",
  "status": "New",
  "match": {
    "camera_id": "Camera_02",
    "confidence": 0.91,
    "similarity_score": 0.87,
    "bbox": [200, 150, 280, 380],
    "frame_number": 0,
    "thumbnail_path": "/api/v1/thumbnail/cam:Camera_02:1"
  },
  "poi_metadata": {
    "notes": "Shoplifter suspect",
    "enrollment_date": "2026-04-20T08:00:00Z",
    "total_previous_matches": 3
  }
}

5. Object → POI Cache — object:{object_id}#

Cache-Aside pattern: avoids hitting FAISS on every video frame for the same tracked person.

KEY:  object:{object_id}  → JSON {"poi_id": "poi-a3f2c1b0", "similarity": 0.87}   TTL=configurable

Once matched, subsequent frames for the same object_id skip FAISS entirely for the cache TTL period (compose default: 5 seconds).


6. Region Presence & Dwell — region:presence:* + region:dwell:*#

Written when regions are configured as polygons in the SceneScape scene.

KEY:  region:presence:{scene_id}:{region_id}:{object_id}    TTL=1h
VALUE:
{
  "first_seen": "2026-04-27T07:41:55Z",
  "region_name": "entrance-zone",
  "camera_id": "Camera_01"
}

KEY:  region:dwell:{object_id}:{scene_id}:{region_id}:{date}  TTL=7 days
VALUE:
{
  "object_id": 1,
  "scene_id": "db68a737-92db-4477-880b-07bc7d658ab9",
  "region_id": "entrance-zone",
  "region_name": "entrance-zone",
  "exit_time": "2026-04-27T07:42:30Z",
  "dwell_sec": 35.0
}

7. Detection Index — detection:*#

Stores per-detection data for the all-detections FAISS index.

KEY:  detection:meta:{faiss_id}     → JSON metadata (track_id, camera_id, timestamp, bbox)
KEY:  detection:frame:{faiss_id}    → JPEG binary (entry frame)   TTL=7 days
KEY:  detection:track_count:{track} → integer (embedding count for track)
KEY:  detection:track_last:{track}  → timestamp (last embedding stored)

8. Track Exit Frames — track:exit:*#

Rolling exit data, updated on each new detection for an active track.

KEY:  track:exit:vector:{track_id}   → float32 bytes (latest exit embedding)
KEY:  track:exit:frame:{track_id}    → JPEG binary (latest exit frame)
KEY:  track:exit:meta:{track_id}     → JSON (timestamp, camera_id, bbox)

9. Track Frames — track:frame:*#

KEY:  track:frame:{track_id}:entry      → JPEG binary (first frame seen)
KEY:  track:frame:{track_id}:last_seen  → JPEG binary (most recent frame)

End-to-End Data Flow#

Camera Feed
    │
    ▼
DL Streamer Pipeline Server
  (person-detection-retail-0013 +
   face-detection-retail-0004 +
   face-reidentification-retail-0095 +
   person-reidentification-retail-0277)
    │
    ├──► scenescape/data/camera/Camera_01
    └──► scenescape/data/camera/Camera_02
              │
              ▼
       poi-backend: EventConsumer
       ├── decode base64 → 256-dim float vector
       │     (face sub_object embedding only — body reid is NOT used for FAISS)
       ├── MatchingService.match_object(object_id, vector)
       │     ├── Cache hit (object:N)?  → skip FAISS, return cached poi_id
       │     └── Cache miss → FAISS.search(top_k=10, threshold=SIMILARITY_THRESHOLD)
       │                         │
       │                    match found?
       │                    ├── YES → AlertService
       │                    │           ├── dispatch to ALL strategies (must all succeed)
       │                    │           ├── store  alert:{id}  in Redis
       │                    │           ├── push   alerts:recent list
       │                    │           └── set    alert:sent:{obj_id}:{poi_id}  TTL=configurable
       │                    └── NO  → nothing
       └── store event:{object_id}:{timestamp} in Redis  (always, TTL=7d)

SceneScape Scene Controller
    │
    └──► scenescape/regulated/scene/{scene_id}
              │
              ▼
       poi-backend: ScenescapeRegionConsumer
       ├── diff regions per person (stateful in-memory)
       ├── entry detected → store region:presence:* (TTL=1h)
       └── exit  detected → compute dwell_sec
                          → store region:dwell:* (TTL=7d)
                          → delete region:presence:*

Redis Key Summary Table#

Key Pattern

Type

TTL

Purpose

event:{obj_id}:{timestamp}

string (JSON)

7 days

Movement event per detection frame

events:poi:{poi_id}

SET of event keys

7 days

Index: all events for a matched POI

poi:{poi_id}

string (JSON)

none

POI metadata (enrolled person)

poi:index

SET

none

All registered POI IDs

faiss2poi:{int}

string

none

FAISS index → POI ID mapping

poi2faiss:{poi_id}

SET

none

POI ID → FAISS indices

object:{obj_id}

string (JSON)

configurable

Cache-Aside: tracking ID → matched POI + similarity

alert:{alert_id}

string (JSON)

7 days

Full alert record

alerts:recent

LIST

none

Last 1000 alerts (ring buffer)

alert:sent:{obj_id}:{poi_id}

string

configurable

Dedup flag: alert already fired for this person+POI pair

region:presence:{scene}:{region}:{obj}

string (JSON)

1h

Region entry timestamp

region:dwell:{obj}:{scene}:{region}:{date}

string (JSON)

7 days

Completed region visit with dwell time

detection:meta:{faiss_id}

string (JSON)

7 days

Detection metadata (track, camera, bbox)

detection:frame:{faiss_id}

binary (JPEG)

7 days

Entry frame per detection

detection:track_count:{track}

integer

7 days

Embedding count per track

track:exit:vector:{track_id}

binary

7 days

Latest exit embedding

track:exit:frame:{track_id}

binary (JPEG)

7 days

Latest exit frame

track:frame:{track_id}:entry

binary (JPEG)

7 days

Track entry frame

track:frame:{track_id}:last_seen

binary (JPEG)

7 days

Track last-seen frame