Write a User Defined Function (UDF)#
User Defined Functions (UDFs) are custom Python scripts that allow you to implement domain-specific analytics and anomaly detection algorithms in the Time Series Analytics Microservice. UDFs process streaming data from Kapacitor and can perform complex computations, machine learning inference, or custom business logic on time-series data.
This guide provides step-by-step instructions for writing your own UDFs for the Time Series Analytics Microservice.
Overview#
Kapacitor spawns a UDF process (called an agent) that communicates with Kapacitor over STDIN/STDOUT using protocol buffers. The agent:
Describes its capabilities and configuration options.
Initializes itself with provided options.
Processes incoming data points or batches.
Returns results back to Kapacitor.
The Python Kapacitor agent handles all communication details and exposes a simple Handler interface for implementing your custom logic.
UDF Architecture#
┌─────────────┐ Protocol Buffers ┌──────────────────┐
│ │ ◄────── (STDIN/STDOUT) ────────► │ │
│ Kapacitor │ │ UDF Agent │
│ │ Data Points │ (Your Python │
│ │ ─────────────────────────────► │ Handler) │
└─────────────┘ └──────────────────┘
The Handler Interface#
All UDFs must implement the Handler interface from kapacitor.udf.agent. The Handler receives callbacks as Kapacitor sends data and requests.
Basic Implementation Template:
#!/usr/bin/env python3
import os
import logging
import time
from kapacitor.udf.agent import Agent, Handler
from kapacitor.udf import udf_pb2
log_level = os.getenv('KAPACITOR_LOGGING_LEVEL', 'INFO').upper()
logging_level = getattr(logging, log_level, logging.INFO)
# Configure logging
logging.basicConfig(
level=logging_level, # Set the log level to DEBUG
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # Log format
)
# Create a logger instance for this module
# This logger will be used throughout the UDF to record events, errors, and debug information
# All log messages will be captured by Kapacitor and written to its log files
logger = logging.getLogger()
class MyHandler(Handler):
""" Handler for the UDF. It processes incoming points
and uses the loaded model to analyze and process the point.
"""
def __init__(self, agent):
self._agent = agent
# Need to enable after model training
model_name = (os.path.basename(__file__)).replace('.py', '<model_type_extension>')
model_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
"../models/" + model_name)
model_path = os.path.abspath(model_path)
# Initialize model for analysis.
# Load the model using your algorithm's import method and assign it to self.model.
# Example: self.model = cb.CatBoostClassifier(...), then self.model.load_model(model_path)
self.model = None # Replace None with your actual model instance
self.model.load_model(model_path)
def info(self):
"""Describe the UDF's capabilities and configuration options
Note: Time Series Analytics only supports STREAM processing.
"""
response = udf_pb2.Response()
response.info.wants = udf_pb2.STREAM
response.info.provides = udf_pb2.STREAM
return response
def init(self, init_req):
""" Initialize the Handler with the provided options"""
response = udf_pb2.Response()
response.init.success = True
return response
def snapshot(self):
"""Create a snapshot of the current state"""
response = udf_pb2.Response()
response.snapshot.snapshot = b''
return response
def restore(self, restore_req):
"""Restore from a previous snapshot"""
response = udf_pb2.Response()
response.restore.success = False
response.restore.error = 'not implemented'
return response
def point(self, point):
"""Process a single data point"""
# Your custom logic goes here
# Extract field values, perform analysis, and emit results
self._agent.write_response(udf_pb2.Response(point=point))
def begin_batch(self, begin_req):
""" A batch has begun.
"""
raise Exception("not supported")
def end_batch(self, end_req):
""" The batch is complete.
"""
raise Exception("not supported")
if __name__ == '__main__':
# Create an agent
agent = Agent()
# Create a handler and pass it an agent so it can write points
h = MyHandler(agent)
# Set the handler on the agent
agent.handler = h
# Anything printed to STDERR from a UDF process gets captured
# into the Kapacitor logs.
logger.info("Starting UDF agent")
agent.start()
agent.wait()
logger.info("UDF agent finished")
Implementing UDF with Basic Implementation Template#
Import and initialize the model in the __init__(self, agent)#
Load ML models, thresholds, or external resources only once during initialization.
Implement custom logic in the point() function#
The point() method is called for every incoming data point. Use it to extract fields, run inference or business logic, and emit results.
def point(self, point):
# Extract field values from the incoming data point
# Double fields (numeric values) - floating-point numbers like temperature, pressure
temperature = point.fieldsDouble.get("temperature", 0.0)
pressure = point.fieldsDouble.get("pressure", 0.0)
humidity = point.fieldsDouble.get("humidity", 0.0)
# String fields (text values) - textual data like IDs, locations, status messages
sensor_id = point.fieldsString.get("sensor_id", "unknown")
location = point.fieldsString.get("location", "default")
status = point.fieldsString.get("status", "normal")
# Boolean fields (true/false values) - binary states like active/inactive, enabled/disabled
is_active = point.fieldsBool.get("is_active", False)
is_calibrated = point.fieldsBool.get("is_calibrated", True)
# Integer fields (whole numbers) - counts, codes, or discrete numeric values
sample_count = point.fieldsInt.get("sample_count", 0)
error_code = point.fieldsInt.get("error_code", 0)
# Tags (metadata - always strings) - used for grouping and filtering, not for computation
device_name = point.tags.get("device", "")
site = point.tags.get("site", "")
# Log the extracted values for debugging and monitoring
logger.debug(f"Processing point - Sensor: {sensor_id}, Temp: {temperature}, Active: {is_active}")
# Perform your analysis using the extracted values
# Example: Run model inference, apply thresholds, calculate derived metrics
# Add your custom logic here
# Example: Set output fields based on analysis results
# point.fieldsDouble["anomaly_score"] = calculated_score
# point.fieldsString["analysis_result"] = "normal" or "anomaly"
# Write the processed point back to Kapacitor
self._agent.write_response(udf_pb2.Response(point=point))
Best Practices#
Error Handling: Always validate input data and handle missing fields gracefully.
if field_name not in point.fieldsDouble: logger.warning(f"Field {field_name} not found") return
Logging: Use appropriate log levels (DEBUG, INFO, WARNING, ERROR)
logger.info("Processing point for source %s", source) logger.debug("Detailed debug information") logger.error("Critical error occurred")
Default Values: Always set default values for output fields.
if "anomaly_status" not in point.fieldsDouble: point.fieldsDouble["anomaly_status"] = 0.0
Environment Variables: Use environment variables for configuration.
model_path = os.getenv('MODEL_PATH', '/default/path') log_level = os.getenv('KAPACITOR_LOGGING_LEVEL', 'INFO').upper()
Model Files: Store models in the
models/directory alongside UDF scripts.model_name = os.path.basename(__file__).replace('.py', '.pkl') model_path = os.path.join(os.path.dirname(__file__), "../models/", model_name)
Performance: Minimize processing time in the
point()methodLoad models and resources in
init(), notpoint()Use vectorized operations when possible
Consider batching for heavy computations
Dependencies: List all Python dependencies in
requirements.txtwith pinned versions.numpy==1.24.0 scikit-learn==1.3.0 pandas==2.0.0 catboost==1.2.0
Using Your UDF in TICKscripts#
Once your UDF is written, reference it in TICKscripts:
// Stream processing example
var data = stream
|from()
.measurement('sensor_data')
@my_udf()
.field('temperature')
.threshold(50.0)
|alert()
.crit(lambda: "anomaly_status" > 0)
.message('Anomaly detected: {{ index .Fields "temperature" }}')
.mqtt('my_mqtt_broker')
.topic('alerts/sensor')
|influxDBOut()
.database('results')
.measurement('sensor_data')