mozilla

Filters

Common Filter Parameters

There are some configuration options that are universally available to all Heka filter plugins. These will be consumed by Heka itself when Heka initializes the plugin and do not need to be handled by the plugin-specific initialization code.

  • message_matcher (string, optional):

    Boolean expression, when evaluated to true passes the message to the filter for processing. Defaults to matching nothing. See: Message Matcher Syntax

  • message_signer (string, optional):

    The name of the message signer. If specified only messages with this signer are passed to the filter for processing.

  • ticker_interval (uint, optional):

    Frequency (in seconds) that a timer event will be sent to the filter. Defaults to not sending timer events.

  • can_exit (bool, optional)

    New in version 0.7.

    Whether or not this plugin can exit without causing Heka to shutdown. Defaults to false for non-sandbox filters, and true for sandbox filters.

Circular Buffer Delta Aggregator

New in version 0.5.

Collects the circular buffer delta output from multiple instances of an upstream sandbox filter (the filters should all be the same version at least with respect to their cbuf output). The purpose is to recreate the view at a larger scope in each level of the aggregation i.e., host view -> datacenter view -> service level view.

Config:

  • enable_delta (bool, optional, default false)

    Specifies whether or not this aggregator should generate cbuf deltas.

  • anomaly_config(string) - (see Anomaly Detection Module)

    A list of anomaly detection specifications. If not specified no anomaly detection/alerting will be performed.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the enable_delta configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[TelemetryServerMetricsAggregator]
type = "SandboxFilter"
message_matcher = "Logger == 'TelemetryServerMetrics' && Fields[payload_type] == 'cbufd'"
ticker_interval = 60
filename = "lua_filters/cbufd_aggregator.lua"
preserve_data = true

[TelemetryServerMetricsAggregator.config]
enable_delta = false
anomaly_config = 'roc("Request Statistics", 1, 15, 0, 1.5, true, false)'
preservation_version = 0

CBuf Delta Aggregator By Hostname

New in version 0.5.

Collects the circular buffer delta output from multiple instances of an upstream sandbox filter (the filters should all be the same version at least with respect to their cbuf output). Each column from the source circular buffer will become its own graph. i.e., ‘Error Count’ will become a graph with each host being represented in a column.

Config:

  • max_hosts (uint)

    Pre-allocates the number of host columns in the graph(s). If the number of active hosts exceed this value, the plugin will terminate.

  • rows (uint)

    The number of rows to keep from the original circular buffer. Storing all the data from all the hosts is not practical since you will most likely run into memory and output size restrictions (adjust the view down as necessary).

  • host_expiration (uint, optional, default 120 seconds)

    The amount of time a host has to be inactive before it can be replaced by a new host.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the max_hosts or rows configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[TelemetryServerMetricsHostAggregator]
type = "SandboxFilter"
message_matcher = "Logger == 'TelemetryServerMetrics' && Fields[payload_type] == 'cbufd'"
ticker_interval = 60
filename = "lua_filters/cbufd_host_aggregator.lua"
preserve_data = true

[TelemetryServerMetricsHostAggregator.config]
max_hosts = 5
rows = 60
host_expiration = 120
preservation_version = 0

CounterFilter

Once per ticker interval a CounterFilter will generate a message of type heka .counter-output. The payload will contain text indicating the number of messages that matched the filter’s message_matcher value during that interval (i.e. it counts the messages the plugin received). Every ten intervals an extra message (also of type heka.counter-output) goes out, containing an aggregate count and average per second throughput of messages received.

Config:

  • ticker_interval (int, optional):

    Interval between generated counter messages, in seconds. Defaults to 5.

Example:

[CounterFilter]
message_matcher = "Type != 'heka.counter-output'"

Disk Stats Filter

New in version 0.7.

Graphs disk IO stats. It automatically converts the running totals of Writes and Reads into rates of the values. The time based fields are left as running totals of the amount of time doing IO. Expects to receive messages with disk IO data embedded in a particular set of message fields which matches what is generated by Linux Disk Stats Decoder: WritesCompleted, ReadsCompleted, SectorsWritten, SectorsRead, WritesMerged, ReadsMerged, TimeWriting, TimeReading, TimeDoingIO, WeightedTimeDoingIO, TickerInterval.

Config:

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • anomaly_config(string) - (see Anomaly Detection Module)

Example Heka Configuration

[DiskStatsFilter]
type = "SandboxFilter"
filename = "lua_filters/diskstats.lua"
preserve_data = true
message_matcher = "Type == 'stats.diskstats'"

Frequent Items

New in version 0.5.

Calculates the most frequent items in a data stream.

Config:

  • message_variable (string)

    The message variable name containing the items to be counted.

  • max_items (uint, optional, default 1000)

    The maximum size of the sample set (higher will produce a more accurate list).

  • min_output_weight (uint, optional, default 100)

    Used to reduce the long tail output by only outputting the higher frequency items.

  • reset_days (uint, optional, default 1)

    Resets the list after the specified number of days (on the UTC day boundary). A value of 0 will never reset the list.

Example Heka Configuration

[FxaAuthServerFrequentIP]
type = "SandboxFilter"
filename = "lua_filters/frequent_items.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Logger == 'nginx.access' && Type == 'fxa-auth-server'"

[FxaAuthServerFrequentIP.config]
message_variable = "Fields[remote_addr]"
max_items = 10000
min_output_weight = 100
reset_days = 1

Heka Memory Statistics

New in version 0.6.

Graphs the Heka memory statistics using the heka.memstat message generated by pipeline/report.go.

Config:

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • anomaly_config (string, optional)

    See Anomaly Detection Module.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the rows or sec_per_row configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[HekaMemstat]
type = "SandboxFilter"
filename = "lua_filters/heka_memstat.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Type == 'heka.memstat'"

Heka Message Schema

New in version 0.5.

Generates documentation for each unique message in a data stream. The output is a hierarchy of Logger, Type, EnvVersion, and a list of associated message field attributes including their counts (number in the brackets). This plugin is meant for data discovery/exploration and should not be left running on a production system.

Config:

<none>

Example Heka Configuration

[SyncMessageSchema]
type = "SandboxFilter"
filename = "lua_filters/heka_message_schema.lua"
ticker_interval = 60
preserve_data = false
message_matcher = "Logger =~ /^Sync/"

Example Output

Sync-1_5-Webserver [54600]
slf [54600]
-no version- [54600]
upstream_response_time (mismatch)
http_user_agent (string)
body_bytes_sent (number)
remote_addr (string)
request (string)
upstream_status (mismatch)
status (number)
request_time (number)
request_length (number)
Sync-1_5-SlowQuery [37]
mysql.slow-query [37]
-no version- [37]
Query_time (number)
Rows_examined (number)
Rows_sent (number)
Lock_time (number)

Heka Process Message Failures

New in version 0.7.

Monitors Heka’s process message failures by plugin.

Config:

  • anomaly_config(string) - (see Anomaly Detection Module)

    A list of anomaly detection specifications. If not specified a default of ‘mww_nonparametric(“DEFAULT”, 1, 5, 10, 0.7)’ is used. The “DEFAULT” settings are applied to any plugin without an explict specification.

Example Heka Configuration

[HekaProcessMessageFailures]
type = "SandboxFilter"
filename = "lua_filters/heka_process_message_failures.lua"
ticker_interval = 60
preserve_data = false # the counts are reset on Heka restarts and the monitoring should be too.
message_matcher = "Type == 'heka.all-report'"

HTTP Status Graph

New in version 0.5.

Graphs HTTP status codes using the numeric Fields[status] variable collected from web server access logs.

Config:

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • anomaly_config (string, optional)

    See Anomaly Detection Module.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the sec_per_row or rows configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[FxaAuthServerHTTPStatus]
type = "SandboxFilter"
filename = "lua_filters/http_status.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Logger == 'nginx.access' && Type == 'fxa-auth-server'"

[FxaAuthServerHTTPStatus.config]
sec_per_row = 60
rows = 1440
anomaly_config = 'roc("HTTP Status", 2, 15, 0, 1.5, true, false) roc("HTTP Status", 4, 15, 0, 1.5, true, false) mww_nonparametric("HTTP Status", 5, 15, 10, 0.8)'
preservation_version = 0

Load Average Filter

New in version 0.7.

Graphs the load average and process count data. Expects to receive messages containing fields entitled 1MinAvg, 5MinAvg, 15MinAvg, and NumProcesses, such as those generated by the Linux Load Average Decoder.

Config:

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • anomaly_config (string, optional)

    See Anomaly Detection Module.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the sec_per_row or rows configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[LoadAvgFilter]
type = "SandboxFilter"
filename = "lua_filters/loadavg.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Type == 'stats.loadavg'"

Memory Stats Filter

New in version 0.7.

Graphs memory usage statistics. Expects to receive messages with memory usage data embedded in a specific set of message fields, which matches the messages generated by Linux Memory Stats Decoder: MemFree, Cached, Active, Inactive, VmallocUsed, Shmem, SwapCached.

Config:

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • anomaly_config (string, optional)

    See Anomaly Detection Module.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the sec_per_row or rows configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[MemoryStatsFilter]
type = "SandboxFilter"
filename = "lua_filters/memstats.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Type == 'stats.memstats'"

MySQL Slow Query

New in version 0.6.

Graphs MySQL slow query data produced by the MySQL Slow Query Log Decoder.

Config:

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • anomaly_config (string, optional)

    See Anomaly Detection Module.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the sec_per_row or rows configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[Sync-1_5-SlowQueries]
type = "SandboxFilter"
message_matcher = "Logger == 'Sync-1_5-SlowQuery'"
ticker_interval = 60
filename = "lua_filters/mysql_slow_query.lua"

    [Sync-1_5-SlowQueries.config]
    anomaly_config = 'mww_nonparametric("Statistics", 5, 15, 10, 0.8)'
    preservation_version = 0

StatFilter

Filter plugin that accepts messages of a specfied form and uses extracted message data to generate statsd-style numerical metrics in the form of Stat objects that can be consumed by a StatAccumulator.

Config:

  • Metric:

    Subsection defining a single metric to be generated:

    • type (string):

      Metric type, supports “Counter”, “Timer”, “Gauge”.

    • name (string):

      Metric name, must be unique.

    • value (string):

      Expression representing the (possibly dynamic) value that the StatFilter should emit for each received message.

  • stat_accum_name (string):

    Name of a StatAccumInput instance that this StatFilter will use as its StatAccumulator for submitting generate stat values. Defaults to “StatAccumInput”.

Example (Assuming you had TransformFilter inserting messages as above):

[StatAccumInput]
ticker_interval = 5

[StatsdInput]
address = "127.0.0.1:29301"

[Hits]
type = "StatFilter"
message_matcher = 'Type == "ApacheLogfile"'

[Hits.Metric.bandwidth]
type = "Counter"
name = "httpd.bytes.%Hostname%"
value = "%Bytes%"

[Hits.Metric.method_counts]
type = "Counter"
name = "httpd.hits.%Method%.%Hostname%"
value = "1"

Note

StatFilter requires an available StatAccumInput to be running.

SandboxFilter

The sandbox filter provides an isolated execution environment for data analysis. Any output generated by the sandbox is injected into the payload of a new message for further processing or to be output.

Config:

Example:

[hekabench_counter]
type = "SandboxFilter"
message_matcher = "Type == 'hekabench'"
ticker_interval = 1
filename = "counter.lua"
preserve_data = true
profile = false

    [hekabench_counter.config]
    rows = 1440
    sec_per_row = 60

SandboxManagerFilter

The SandboxManagerFilter provides dynamic control (start/stop) of sandbox filters in a secure manner without stopping the Heka daemon. Commands are sent to a SandboxManagerFilter using a signed Heka message. The intent is to have one manager per access control group each with their own message signing key. Users in each group can submit a signed control message to manage any filters running under the associated manager. A signed message is not an enforced requirement but it is highly recommended in order to restrict access to this functionality.

SandboxManagerFilter Settings

  • Common Filter Parameters

  • working_directory (string):

    The directory where the filter configurations, code, and states are preserved. The directory can be unique or shared between sandbox managers since the filter names are unique per manager. Defaults to a directory in ${BASE_DIR}/sbxmgrs with a name generated from the plugin name.

  • module_directory (string):

    The directory where ‘require’ will attempt to load the external Lua modules from. Defaults to ${SHARE_DIR}/lua_modules.

  • max_filters (uint):

    The maximum number of filters this manager can run.

New in version 0.5.

  • memory_limit (uint):

    The number of bytes managed sandboxes are allowed to consume before being terminated (default 8MiB).

  • instruction_limit (uint):

    The number of instructions managed sandboxes are allowed to execute during the process_message/timer_event functions before being terminated (default 1M).

  • output_limit (uint):

    The number of bytes managed sandbox output buffers can hold before being terminated (default 63KiB). Warning: messages exceeding 64KiB will generate an error and be discarded by the standard output plugins (File, TCP, UDP) since they exceed the maximum message size.

Example

[OpsSandboxManager]
type = "SandboxManagerFilter"
message_signer = "ops"
# message_matcher = "Type == 'heka.control.sandbox'" # automatic default setting
max_filters = 100

Stats Graph

New in version 0.7.

Converts stat values extracted from statmetric messages (see StatAccumInput) to circular buffer data and periodically emits messages containing this data to be graphed by a DashboardOutput. Note that this filter expects the stats data to be available in the message fields, so the StatAccumInput must be configured with emit_in_fields set to true for this filter to work correctly.

Config:

  • title (string, optional, default “Stats”):

    Title for the graph output generated by this filter.

  • rows (uint, optional, default 300):

    The number of rows to store in our circular buffer. Each row represents one time interval.

  • sec_per_row (uint, optional, default 1):

    The number of seconds in each circular buffer time interval.

  • stats (string):

    Space separated list of stat names. Each specified stat will be expected to be found in the fields of the received statmetric messages, and will be extracted and inserted into its own column in the accumulated circular buffer.

  • stat_labels (string):

    Space separated list of header label names to use for the extracted stats. Must be in the same order as the specified stats. Any label longer than 15 characters will be truncated.

  • anomaly_config (string, optional):

    Anomaly detection configuration, see Anomaly Detection Module.

  • preservation_version (uint, optional, default 0):

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time any edits are made to your rows, sec_per_row, stats, or stat_labels values, or else Heka will fail to start because the preserved data will no longer match the filter’s data structure.

Example Heka Configuration

[stat-graph]
type = "SandboxFilter"
filename = "lua_filters/stat_graph.lua"
ticker_interval = 10
preserve_data = true
message_matcher = "Type == 'heka.statmetric'"

  [stat-graph.config]
  title = "Hits and Misses"
  rows = 1440
  sec_per_row = 10
  stats = "stats.counters.hits.count stats.counters.misses.count"
  stat_labels = "hits misses"
  anomaly_config = 'roc("Hits and Misses", 1, 15, 0, 1.5, true, false) roc("Hits and Misses", 2, 15, 0, 1.5, true, false)'
  preservation_version = 0

Unique Items

New in version 0.6.

Counts the number of unique items per day e.g. active daily users by uid.

Config:

  • message_variable (string, required)

    The Heka message variable containing the item to be counted.

  • title (string, optional, default “Estimated Unique Daily message_variable”)

    The graph title for the cbuf output.

  • enable_delta (bool, optional, default false)

    Specifies whether or not this plugin should generate cbuf deltas. Deltas should be enabled when sharding is used; see: Circular Buffer Delta Aggregator.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the enable_delta configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[FxaActiveDailyUsers]
type = "SandboxFilter"
filename = "lua_filters/unique_items.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Logger == 'FxaAuth' && Type == 'request.summary' && Fields[path] == '/v1/certificate/sign' && Fields[errno] == 0"

    [FxaActiveDailyUsers.config]
    message_variable = "Fields[uid]"
    title = "Estimated Active Daily Users"
    preservation_version = 0