Heka’s APIs for filter and output plugins have changed drastically between the v0.9.X series and v0.10.0. To ease the transition, however, a very slightly modified version of the older APIs will be supported throughout the v0.10.X release series. These should allow users with pre-existing custom filter and output plugins to upgrade their plugin code to work with the newly introduced disk buffering support with minimal effort.
These transitional APIs are documented in full below, but a brief summary is that the APIs are identical to the filter and output APIs supported by the v0.9.X series, with two exceptions:
The plugin code must call the runner’s UpdateCursor method as appropriate to advance the disk buffer’s cursor, exactly as required by the newer APIs.
The pack.Recycle
method now requires an error
argument which signals
to Heka whether or not processing of the message in question was successful.
Using nil for this argument implies that the message was successfully
processed. Using a regular, non-nil error (such as is returned by
errors.New
or fmt.Errorf
) implies that the message was unable to be
processed but should not be tried again. Using a RetryMessageError
(as is
returned by pipeline.NewRetryMessageError
, see here implies that the message should be retried.
In other words, the argument passed in to the Recycle call is used similarly
to the return value of the MessageProcessor interface’s ProcessMessage
function. The behavior is
not identical, however. Use of a PluginExitError
will not cause the plugin to exit, exiting is still signaled by returning
from the plugin’s Run
method. Also, the RetryMessageError will only cause
a message to be retried when buffering is in use. If a plugin does not have
buffering turned on, then the value returned to the Recycle method will have
no impact.
Filter plugins are the message processing engine of the Heka system. They are used to examine and process message contents, and trigger events based on those contents in real time as messages are flowing through the Heka system.
The transitional filter plugin interface is just a single method:
type OldFilter interface {
Run(r FilterRunner, h PluginHelper) (err error)
}
Like input plugins, old-style filters have a Run
method which accepts a
runner and a helper, and which should not return until shutdown unless there’s
an error condition. The similarities end there, however.
Filters should call runner.InChan()
to gain access to the plugin’s input
channel. A filter’s input channel provides pointers to PipelinePack objects,
defined in pipeline_runner.go, each of which
contains what should be by now a fully populated Message struct from which the
filter can extract any desired information.
Sometimes, while processing a message, a filter plugin will need to generate
one or more new messages, which can be handed back to the router to be
checked against all registered plugins’ message_matcher
rules.
To generate new messages, your filter must call
PluginHelper.PipelinePack(msgLoopCount int)
. The msgloopCount
value to
be passed in should be obtained from the MsgLoopCount
value on the pack
that you’re already holding, or possibly zero if the new message is being
triggered by a timed ticker instead of an incoming message. The PipelinePack
method will either return a pack ready for you to populate or nil if the loop
count is greater than the configured maximum value, as a safeguard against
inadvertently creating infinite message loops.
Once a pack has been obtained, a filter plugin can populate its Message struct
using the various provided mutator methods. The pack can then be injected into
the Heka message router queue, where it will be checked against all plugin
message matchers, by passing it to the FilterRunner.Inject(pack
*PipelinePack)
method. Note that, again as a precaution against message
looping, a plugin will not be allowed to inject a message which would get a
positive response from that plugin’s own matcher.
Sometimes a filter will take a specific action triggered by a single incoming
message. There are many cases, however, when a filter is merely collecting or
aggregating data from the incoming messages, and instead will be sending out
reports on the data that has been collected at specific intervals. Heka has
built-in support for this use case. Any old style filter (or output) plugin can
include a ticker_interval
config setting (in seconds, integers only), which
will automatically be extracted by Heka when the configuration is loaded. Then
from within your plugin code you can call FilterRunner.Ticker()
and you
will get a channel (type <-chan time.Time
) that will send a tick at the
specified interval. Your plugin code can listen on the ticker channel and take
action as needed.
Observant readers might have noticed that, unlike the Input
interface,
filters don’t need to implement a Stop
method. Instead, Heka will
communicate a shutdown event to filter plugins by closing the input channel
from which the filter is receiving PipelinePacks. When this channel is closed,
a filter should perform any necessary clean-up and then return from the Run
method with a nil value to indicate a clean exit.
Finally, there are two very important points that all authors of old style
filter plugins should keep in mind. The first has to do with updating the disk
buffer’s queue cursor for cases when buffering is in
use. Old style filters should extract the QueueCursor values from packs and
call FilterRunner.UpdateCursor
as appropriate, just like filters that
support the newer APIs.
The second important point is that an old style filter’s code must call
pack.Recycle(err error)
to tell Heka that it is through with the
pack. Failure to do so will cause Heka to not free up the packs for reuse,
exhausting the supply and eventually causing the entire pipeline to freeze.
The error value passed in to Recycle should be nil
if the message was
processed successfully. The error value should be non-nil if the message
processing failed and the message should be dropped. The error value should be
an instance of RetryMessageError if the processing
failure was transient and message delivery should be tried again. Note that
redelivery will only happen in cases where the filter’s configuration has
use_buffering
set to true.
Output plugins are responsible for receiving Heka messages and using them to
generate interactions with the outside world. The OldOutput
interface is
nearly identical to the OldFilter
interface:
type OldOutput interface {
Run(or OutputRunner, h PluginHelper) (err error)
}
In fact, there are many ways in which old style filter and output plugins are
similar. Like filters, outputs should call the InChan
method on the
provided runner to get an input channel, which will feed PipelinePacks. Like
filters, outputs should listen on this channel until it is closed, at which
time they should perform any necessary clean-up and then return. And, like
filters, any old style output plugin with a ticker_interval
value in the
configuration will use that value to create a ticker channel that can be
accessed using the runner’s Ticker
method. And, finally, outputs should
also be sure to call pack.Recycle
(passing in an appropriate error value)
when they finish w/ a pack so that Heka knows the pack is freed up for reuse.
The primary way that outputs differ from filters, of course, is that outputs need to serialize (or extract data from) the messages they receive and then send that data to an external destination. The serialization (or data extraction) should typically be performed by the output’s specified encoder plugin. The OutputRunner exposes the following methods to assist with this:
Encode(pack *PipelinePack) (output []byte, err error)
UsesFraming() bool
Encoder() (encoder Encoder)
The Encode
method will use the specified encoder to convert the pack’s
message to binary data, then if use_framing
was set to true in the
output’s configuration it will prepend Heka’s Stream Framing. The
UsesFraming
method will tell you whether or not use_framing
was set to
true. Finally, the Encoder
method will return the actual encoder that was
registered. This is useful to check to make sure that an encoder was actually
registered, but generally you will want to use OutputRunner.Encode and not
Encoder.Encode, since the latter will not honor the output’s use_framing
specification.