Write a User Defined Function (UDF)#

User Defined Functions (UDFs) are custom Python scripts that allow you to implement domain-specific analytics and anomaly detection algorithms in the Time Series Analytics Microservice. UDFs process streaming data from Kapacitor and can perform complex computations, machine learning inference, or custom business logic on time-series data.

This guide provides step-by-step instructions for writing your own UDFs for the Time Series Analytics Microservice.

Overview#

Kapacitor spawns a UDF process (called an agent) that communicates with Kapacitor over STDIN/STDOUT using protocol buffers. The agent:

  1. Describes its capabilities and configuration options.

  2. Initializes itself with provided options.

  3. Processes incoming data points or batches.

  4. Returns results back to Kapacitor.

The Python Kapacitor agent handles all communication details and exposes a simple Handler interface for implementing your custom logic.

UDF Architecture#

┌─────────────┐          Protocol Buffers         ┌──────────────────┐
│             │  ◄────── (STDIN/STDOUT) ────────► │                  │
│  Kapacitor  │                                   │   UDF Agent      │
│             │          Data Points              │   (Your Python   │
│             │   ─────────────────────────────►  │    Handler)      │
└─────────────┘                                   └──────────────────┘

The Handler Interface#

All UDFs must implement the Handler interface from kapacitor.udf.agent. The Handler receives callbacks as Kapacitor sends data and requests.

Basic Implementation Template:

#!/usr/bin/env python3
import os
import logging
import time
from kapacitor.udf.agent import Agent, Handler
from kapacitor.udf import udf_pb2

log_level = os.getenv('KAPACITOR_LOGGING_LEVEL', 'INFO').upper()
logging_level = getattr(logging, log_level, logging.INFO)

# Configure logging
logging.basicConfig(
    level=logging_level,  # Set the log level to DEBUG
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',  # Log format
)

# Create a logger instance for this module
# This logger will be used throughout the UDF to record events, errors, and debug information
# All log messages will be captured by Kapacitor and written to its log files
logger = logging.getLogger()

class MyHandler(Handler):
    """ Handler for the UDF. It processes incoming points
    and uses the loaded model to analyze and process the point.
    """
    def __init__(self, agent):
        self._agent = agent
        # Need to enable after model training
        model_name = (os.path.basename(__file__)).replace('.py', '<model_type_extension>')
        model_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
                                   "../models/" + model_name)
        model_path = os.path.abspath(model_path)

        # Initialize model for analysis.
        # Load the model using your algorithm's import method and assign it to self.model.
        # Example: self.model = cb.CatBoostClassifier(...), then self.model.load_model(model_path)
        self.model = None  # Replace None with your actual model instance

        self.model.load_model(model_path)

    def info(self):
        """Describe the UDF's capabilities and configuration options
        Note: Time Series Analytics only supports STREAM processing.
        """
        response = udf_pb2.Response()
        response.info.wants = udf_pb2.STREAM
        response.info.provides = udf_pb2.STREAM
        return response

    def init(self, init_req):
        """ Initialize the Handler with the provided options"""
        response = udf_pb2.Response()
        response.init.success = True
        return response

    def snapshot(self):
        """Create a snapshot of the current state"""
        response = udf_pb2.Response()
        response.snapshot.snapshot = b''
        return response

    def restore(self, restore_req):
        """Restore from a previous snapshot"""
        response = udf_pb2.Response()
        response.restore.success = False
        response.restore.error = 'not implemented'
        return response

    def point(self, point):
        """Process a single data point"""
        # Your custom logic goes here
        # Extract field values, perform analysis, and emit results
        self._agent.write_response(udf_pb2.Response(point=point))

    def begin_batch(self, begin_req):
        """ A batch has begun.
        """
        raise Exception("not supported")

    def end_batch(self, end_req):
        """ The batch is complete.
        """
        raise Exception("not supported")

if __name__ == '__main__':
    # Create an agent
    agent = Agent()

    # Create a handler and pass it an agent so it can write points
    h = MyHandler(agent)

    # Set the handler on the agent
    agent.handler = h

    # Anything printed to STDERR from a UDF process gets captured
    # into the Kapacitor logs.
    logger.info("Starting UDF agent")
    agent.start()
    agent.wait()
    logger.info("UDF agent finished")

Implementing UDF with Basic Implementation Template#

Import and initialize the model in the __init__(self, agent)#

Load ML models, thresholds, or external resources only once during initialization.

Implement custom logic in the point() function#

The point() method is called for every incoming data point. Use it to extract fields, run inference or business logic, and emit results.

    def point(self, point):

        # Extract field values from the incoming data point
        # Double fields (numeric values) - floating-point numbers like temperature, pressure
        temperature = point.fieldsDouble.get("temperature", 0.0)
        pressure = point.fieldsDouble.get("pressure", 0.0)
        humidity = point.fieldsDouble.get("humidity", 0.0)

        # String fields (text values) - textual data like IDs, locations, status messages
        sensor_id = point.fieldsString.get("sensor_id", "unknown")
        location = point.fieldsString.get("location", "default")
        status = point.fieldsString.get("status", "normal")

        # Boolean fields (true/false values) - binary states like active/inactive, enabled/disabled
        is_active = point.fieldsBool.get("is_active", False)
        is_calibrated = point.fieldsBool.get("is_calibrated", True)

        # Integer fields (whole numbers) - counts, codes, or discrete numeric values
        sample_count = point.fieldsInt.get("sample_count", 0)
        error_code = point.fieldsInt.get("error_code", 0)

        # Tags (metadata - always strings) - used for grouping and filtering, not for computation
        device_name = point.tags.get("device", "")
        site = point.tags.get("site", "")

        # Log the extracted values for debugging and monitoring
        logger.debug(f"Processing point - Sensor: {sensor_id}, Temp: {temperature}, Active: {is_active}")

        # Perform your analysis using the extracted values
        # Example: Run model inference, apply thresholds, calculate derived metrics
        # Add your custom logic here

        # Example: Set output fields based on analysis results
        # point.fieldsDouble["anomaly_score"] = calculated_score
        # point.fieldsString["analysis_result"] = "normal" or "anomaly"

        # Write the processed point back to Kapacitor
        self._agent.write_response(udf_pb2.Response(point=point))

Best Practices#

  1. Error Handling: Always validate input data and handle missing fields gracefully.

    if field_name not in point.fieldsDouble:
        logger.warning(f"Field {field_name} not found")
        return
    
  2. Logging: Use appropriate log levels (DEBUG, INFO, WARNING, ERROR)

    logger.info("Processing point for source %s", source)
    logger.debug("Detailed debug information")
    logger.error("Critical error occurred")
    
  3. Default Values: Always set default values for output fields.

    if "anomaly_status" not in point.fieldsDouble:
        point.fieldsDouble["anomaly_status"] = 0.0
    
  4. Environment Variables: Use environment variables for configuration.

    model_path = os.getenv('MODEL_PATH', '/default/path')
    log_level = os.getenv('KAPACITOR_LOGGING_LEVEL', 'INFO').upper()
    
  5. Model Files: Store models in the models/ directory alongside UDF scripts.

    model_name = os.path.basename(__file__).replace('.py', '.pkl')
    model_path = os.path.join(os.path.dirname(__file__), "../models/", model_name)
    
  6. Performance: Minimize processing time in the point() method

    • Load models and resources in init(), not point()

    • Use vectorized operations when possible

    • Consider batching for heavy computations

  7. Dependencies: List all Python dependencies in requirements.txt with pinned versions.

    numpy==1.24.0
    scikit-learn==1.3.0
    pandas==2.0.0
    catboost==1.2.0
    

Using Your UDF in TICKscripts#

Once your UDF is written, reference it in TICKscripts:

// Stream processing example
var data = stream
    |from()
        .measurement('sensor_data')
    @my_udf()
        .field('temperature')
        .threshold(50.0)
    |alert()
        .crit(lambda: "anomaly_status" > 0)
        .message('Anomaly detected: {{ index .Fields "temperature" }}')
        .mqtt('my_mqtt_broker')
        .topic('alerts/sensor')
    |influxDBOut()
        .database('results')
        .measurement('sensor_data')

References#