```{eval-rst}
:orphan:
```
# MQTT Publishing via gvapython
- [Overview](#overview)
- [Prerequisites](#prerequisites)
- [Verify MQTT broker has started](#verify-mqtt-broker-has-started)
- [Start MQTT Subscriber](#start-mqtt-subscriber)
- [Configure DL Streamer Pipeline Server for MQTT Publishing](#configure-dl-streamer-pipeline-server-for-mqtt-publishing)
- [Configuration options](#configuration-options)
- [Sample REST Request](#sample-rest-request)
- [Secure MQTT Publishing](#secure-publishing)
- [Error handling](#error-handling)
## Overview
The processed frames and metadata can be published over to a MQTT message broker. Prior to publishing, MQTT broker/subscriber needs to be configured and started. Here is an overview of the flow,
- DL Streamer Pipeline Server will be the MQTT publisher and publishes to MQTT broker.
- Broker receives messages from DL Streamer Pipeline Server and forwards the messages to MQTT subscribers.
- Subscriber receives messages from broker on the subscribed topic.
The python script `[WORKDIR]/edge-ai-libraries/microservices/dlstreamer-pipeline-server/user_scripts/gvapython/mqtt_publisher.py` supports publishing frames and metadata to specified MQTT broker.
## Prerequisites
Prior to DL Streamer Pipeline Server publishing, MQTT broker and subscriber needs to be configured and started.
### Verify MQTT broker has started
When bringing up DL Streamer Pipeline Server containers in standalone mode using `docker compose up`, MQTT broker is also started listening on port `1883`.
Verify MQTT broker is up and running using `docker ps`.
To configure and start MQTT broker refer [here](./eis_mqtt_publish_doc.md#configure-and-start-mqtt-broker)
### Start MQTT Subscriber
For starting MQTT subscriber, refer [here](./eis_mqtt_publish_doc.md#start-mqtt-subscriber)
## Configure DL Streamer Pipeline Server for MQTT Publishing
### Configuration options
Here is a sample configuration which performs Pallet Defect Detection and publishes the inference results to mqtt broker using gvapython script.
```bash
"pipeline": "{auto_source} name=source ! decodebin ! videoconvert ! gvadetect name=detection ! queue ! gvawatermark ! gvametaconvert name=metaconvert ! gvapython class=MQTTPublisher function=process module=/home/pipeline-server/gvapython/mqtt_publisher/mqtt_publisher.py name=mqtt_publisher ! gvametapublish name=destination ! appsink name=appsink",
```
```bash
"parameters": {
"type": "object",
"properties": {
"detection-properties": {
"element": {
"name": "detection",
"format": "element-properties"
}
},
"mqtt_publisher": {
"element": {
"name": "mqtt_publisher",
"property": "kwarg",
"format": "json"
},
"type": "object"
}
}
}
```
Modify the config to change the pipeline configuration as needed.
Add below configuration as part of REST request to enable publishing to the mqtt broker.
- `topic` topic to which message will be published. Defaults to `dlstreamer_pipeline_results`. *(optional)*
- `publish_frame` whether to publish only metadata or both frames and metadata can be published to the mqtt broker.
Defaults to `false`. *(optional)*
- When `publish_frame` is false, only metadata will be published.
- When `publish_frame` is true, both metadata and frame will be published.
| Payload | Type |
| :------------------- | :------------------------- |
| Only metadata | JSON (metadata)
| Both frame and metadata| JSON (metadata, frame) where frames are Base64 encoded UTF-8 string |
- `qos` quality of service level to use which defaults to 0. Values can be 0, 1, 2. *(optional)*
More details on the QoS levels can be found [here](https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels)
- `protocol` protocol version to use which defaults to 4 i.e. MQTTv311. Values can be 3, 4, 5 based on the versions MQTTv3, MQTTv311, MQTTv5 respectively *(optional)*
- When publishing frames, the frames are always JPEG encoded.
### Sample REST request
```sh
curl localhost:8080/pipelines/user_defined_pipelines/pallet_defect_detection -X POST -H 'Content-Type: application/json' -d '{
"source": {
"uri": "file:///home/pipeline-server/resources/videos/warehouse.avi",
"type": "uri"
},
"destination": {
"frame": {
"type": "rtsp",
"path": "pallet_defect_detection"
}
},
"parameters": {
"detection-properties": {
"model": "/home/pipeline-server/resources/models/geti/pallet_defect_detection/deployment/Detection/model/model.xml",
"device": "CPU"
},
"mqtt_publisher": {
"publish_frame": false
}
}
}'
```
`NOTE` `"host"` and `"port"` of mqtt publisher needs to be updated in `[WORKDIR]/edge-ai-libraries/microservices/dlstreamer-pipeline-server/docker/.env` file
```sh
MQTT_HOST=
MQTT_PORT=1883
```
## Secure publishing
Publishing to MQTT broker could be over a secure communication channel providing encryption and authentication over TLS.
Follow the steps 1, 2 and 3 from [here](./eis_mqtt_publish_doc.md#secure-publishing).
- Generate certificates
- Configure and start MQTT broker
- Configure and start MQTT subscriber
Upon completing the broker and subscriber setup, refer to the below steps to configuring DL Streamer Pipeline Server for secure connection.
- Add values to following parameters present in `[WORKDIR]/edge-ai-libraries/microservices/dlstreamer-pipeline-server/docker/.env` file
```sh
MQTT_HOST=
MQTT_PORT=8883
```
- Modify `docker-compose.yml`. The port number should match with the value specified in the `[WORKDIR]/edge-ai-libraries/microservices/dlstreamer-pipeline-server/docker/.env` file. In the above example we have used port `8883`, hence we need to add the ports with same value in `docker-compose.yml` file as shown below
```yaml
ports:
- "8883:8883"
```
- Sample REST request with configuration for secure connection
```sh
curl localhost:8080/pipelines/user_defined_pipelines/pallet_defect_detection -X POST -H 'Content-Type: application/json' -d '{
"source": {
"uri": "file:///home/pipeline-server/resources/videos/warehouse.avi",
"type": "uri"
},
"destination": {
"frame": {
"type": "rtsp",
"path": "pallet_defect_detection"
}
},
"parameters": {
"detection-properties": {
"model": "/home/pipeline-server/resources/models/geti/pallet_defect_detection/deployment/Detection/model/model.xml",
"device": "CPU"
},
"mqtt_publisher": {
"tls": {
"ca_cert": "/MqttCerts/ca.crt",
"client_key": "/MqttCerts/client/client.key",
"client_cert": "/MqttCerts/client/client.crt"
}
}
}
}'
```
## Error Handling
Refer to the section [here](./eis_mqtt_publish_doc.md#error-handling) for error handling details.