In this example we are going to learn how to write a simple socket based UDF.
## What is a user-defined function (UDF)?
A UDF is a user defined function that can communicate with Kapacitor to process data.
Kapacitor will send it data and the UDF can respond with new or modified data.
A UDF can be written in any language that has [protocol buffer](https://developers.google.com/protocol-buffers/) support.
## What is the difference between a socket UDF and a process UDF?
* A process UDF, is a child process of Kapacitor that communicates over STDIN/STDOUT with Kapacitor and is completely managed by Kapacitor.
* A socket UDF is process external to Kapacitor that communicates over a configured unix domain socket. The process itself is not managed by Kapacitor.
Using a process UDF can be simpler than a socket UDF because Kapacitor will spawn the process and manage everything for you.
On the other hand you may want more control over the UDF process itself and rather expose only a socket to Kapacitor.
One use case that is common is running Kapacitor in a Docker container and the UDF in another container that exposes the socket via a Docker volume.
In both cases the protocol is the same the only difference is the transport mechanism.
Also note that since multiple Kapacitor tasks can use the same UDF, for a process based UDF a new child process will be spawned for each use of the UDF.
In contrast for a socket based UDF, a new connection will be made to the socket for each use of the UDF.
If you have many uses of the same UDF it may be better to use a socket UDF to keep the number of running processes low.
## Writing a UDF
A UDF communicates with Kapacitor via a protocol buffer request/response system.
We provide implementations of that communication layer in both Go and Python.
Since the other example used Python we will use the Go version here.
Our example is going to implement a `mirror` UDF which simply reflects all data it receives back to the Kapacitor server.
This example is actually part of the test suite and a Python and Go implementation can be found [here](https://github.com/influxdata/kapacitor/tree/master/udf/agent/examples/mirror).
### Lifecycle
Before we write any code lets look at the lifecycle of a socket UDF:
1. The UDF process is started, independently from Kapacitor.
2. The process listens on a unix domain socket.
3. Kapacitor connects to the socket and queries basic information about the UDFs options.
4. A Kapacitor task is enabled that uses the UDF and Kapacitor makes a new connection to the socket.
5. The task reads and writes data over the socket connection.
6. If the task is stopped for any reason the socket connection is closed.
### The Main method
We need to write a program that starts up and listens on a socket.
The following code is a main function that listens on a socket at
a default path, or on a custom path specified as the `-socket` flag.
```go
package main
import (
"flag"
"log"
"net"
)
var socketPath = flag.String("socket", "/tmp/mirror.sock", "Where to create the unix socket")
Notice that the `agent` has a channel for responses, this is because your UDF can send data to Kapacitor
at any time, so it does not need to be in a response to receive a point.
As a result, we need to close the channel to let the `agent` know
that we will not be sending any more data, which can be done via the `Stop` method.
Once the `agent` calls `Stop` on the `handler`, no other methods will be called and the `agent` won't stop until
the channel is closed.
This gives the UDF the chance to flush out any remaining data before it's shutdown:
```go
// Stop the handler gracefully.
func (h *mirrorHandler) Stop() {
// Close the channel since we won't be sending any more data to Kapacitor
close(h.agent.Responses)
}
```
Even though we have implemented the majority of the handler implementation, there are still a few missing methods.
Specifically, the methods around batching and snapshot/restores are missing, but, since we don't need them, we will just give them trivial implementations:
```go
// Create a snapshot of the running state of the process.
At this point we have a complete implementation of the `Handler` interface.
In step #4 of the Lifecycle above, Kapacitor makes a new connection to the UDF for each use in a task. Since it's possible that our UDF process can handle multiple connections simultaneously, we need a mechanism for creating a new `agent` and `handler` per connection.
A `server` is provided for this purpose, which expects an implementation of the `Accepter` interface:
```go
type Accepter interface {
// Accept new connections from the listener and handle them accordingly.
// The typical action is to create a new Agent with the connection as both its in and out objects.
Accept(net.Conn)
}
```
Here is a simple `accepter` that creates a new `agent` and `mirrorHandler`
for each new connection. Add this to the `main.go` file:
```go
type accepter struct {
count int64
}
// Create a new agent/handler for each new connection.
// Count and log each new connection and termination.
func (acc *accepter) Accept(conn net.Conn) {
count := acc.count
acc.count++
a := agent.New(conn, conn)
h := newMirrorHandler(a)
a.Handler = h
log.Println("Starting agent for connection", count)
a.Start()
go func() {
err := a.Wait()
if err != nil {
log.Fatal(err)
}
log.Printf("Agent for connection %d finished", count)
}()
}
```
Now with all the pieces in place, we can update our `main` function to
start up the `server`. Replace the previously provided `main` function with:
Instead of always overwriting a field, only set it if the field is not absent.
Also add support for setting tags as well as fields.
* Change the mirror UDF to work on batches instead of streams.
This requires changing the edge type in the `Info` method as well as implementing the `BeginBatch` and `EndBatch` methods.
* Take a look at the other [examples](https://github.com/influxdata/kapacitor/tree/master/udf/agent/examples) and modify one to do something similar to one of your existing requirements.
{{<influxdbutitle="User Defined Functions in Kapacitor"summary="Learn how to create User Defined Functions and Tasks with Kapacitor in this**free** InfluxDB University course"action="Take the course."link="https://university.influxdata.com/courses/user-defined-functions-in-kapacitor-tutorial/">}}