mozilla

Sandbox

Sandboxes are Heka plugins that are implemented in a sandboxed scripting language. They provide a dynamic and isolated execution environment for data parsing, transformation, and analysis. They allow real time access to data in production without jeopardizing the integrity or performance of the monitoring infrastructure and do not require Heka to be recompiled. This broadens the audience that the data can be exposed to and facilitates new uses of the data (i.e. debugging, monitoring, dynamic provisioning, SLA analysis, intrusion detection, ad-hoc reporting, etc.)

Features

  • dynamic loading
    • SandboxFilters can be started/stopped on a self-service basis while Heka is running
    • SandboxDecoder can only be started/stopped on a Heka restart but no recompilation is required to add new functionality.
  • small - memory requirements are about 16 KiB for a basic sandbox

  • fast - microsecond execution times

  • stateful - ability to resume where it left off after a restart/reboot

  • isolated - failures are contained and malfunctioning sandboxes are terminated

Lua Sandbox

The Lua sandbox provides full access to the Lua language in a sandboxed environment under hekad that enforces configurable restrictions.

API

Functions that must be exposed from the Lua sandbox

int process_message()

Called by Heka when a message is available to the sandbox. The instruction_limit configuration parameter is applied to this function call.

Arguments
none
Return
  • < 0 for non-fatal failure (increments ProcessMessageFailures)
  • 0 for success
  • > 0 for fatal error (terminates the sandbox)
timer_event(ns)

Called by Heka when the ticker_interval expires. The instruction_limit configuration parameter is applied to this function call. This function is only required in SandboxFilters (SandboxDecoders do not support timer events).

Arguments
  • ns (int64) current time in nanoseconds since the UNIX epoch
Return
none

Core functions that are exposed to the Lua sandbox

See: https://github.com/mozilla-services/lua_sandbox/blob/master/docs/sandbox_api.md

require(libraryName)

output(arg0, arg1, ...argN)

In most cases circular buffers should be directly output using inject_message. However, in order to create graph annotations the annotation table has to be written to the output buffer followed by the circular buffer. The output function is the only way to combine this data before injection (use a unique payload_type when injecting a message with a non-standard circular buffer mashups). Circular Buffer Graph Annotation (Alerts)

Heka specific functions that are exposed to the Lua sandbox

read_config(variableName)

Provides access to the sandbox configuration variables.

Arguments
  • variableName (string)
Return
number, string, bool, nil depending on the type of variable requested
read_message(variableName, fieldIndex, arrayIndex)

Provides access to the Heka message data.

Arguments
  • variableName (string)
    • raw (accesses the raw MsgBytes in the PipelinePack)
    • Uuid
    • Type
    • Logger
    • Payload
    • EnvVersion
    • Hostname
    • Timestamp
    • Severity
    • Pid
    • Fields[_name_]
  • fieldIndex (unsigned) only used in combination with the Fields variableName
    • use to retrieve a specific instance of a repeated field _name_
  • arrayIndex (unsigned) only used in combination with the Fields variableName
    • use to retrieve a specific element out of a field containing an array
Return
number, string, bool, nil depending on the type of variable requested
write_message(variableName, value, representation, fieldIndex, arrayIndex)

New in version 0.5.

Decoders only. Mutates specified field value on the message that is being deocded.

Arguments
  • variableName (string)
    • Uuid (accepts raw bytes or RFC4122 string representation)

    • Type (string)

    • Logger (string)

    • Payload (string)

    • EnvVersion (string)

    • Hostname (string)

    • Timestamp (accepts Unix ns-since-epoch number or a handful of

      parseable string representations.)

    • Severity (number or int-parseable string)

    • Pid (number or int-parseable string)

    • Fields[_name_] (field type determined by value type: bool, number, or string)

  • value (bool, number or string)
    • value to which field should be set
  • representation (string) only used in combination with the Fields variableName
    • representation tag to set
  • fieldIndex (unsigned) only used in combination with the Fields variableName
    • use to set a specfic instance of a repeated field _name_
  • arrayIndex (unsigned) only used in combination with the Fields variableName
    • use to set a specific element of a field containing an array
Return
none
read_next_field()

Iterates through the message fields returning the field contents or nil when the end is reached.

Arguments
none
Return
value_type, name, value, representation, count (number of items in the field array)
inject_message(payload_type, payload_name)

Creates a new Heka message using the contents of the output payload buffer and then clears the buffer. Two pieces of optional metadata are allowed and included as fields in the injected message i.e., Fields[payload_type] == ‘csv’ Fields[payload_name] == ‘Android Usage Statistics’. The number of messages that may be injected by the process_message or timer_event functions are globally controlled by the hekad hekad_command_line_options; if these values are exceeded the sandbox will be terminated.

Arguments
  • payload_type (optional, default “txt” string) Describes the content type of the injected payload data.
  • payload_name (optional, default “” string) Names the content to aid in downstream filtering.
Return
none
inject_message(circular_buffer, payload_name)

Creates a new Heka message placing the circular buffer output in the message payload (overwriting whatever is in the output buffer). The payload_type is set to the circular buffer output format string. i.e., Fields[payload_type] == ‘cbuf’. The Fields[payload_name] is set to the provided payload_name.

Arguments
  • circular_buffer (circular_buffer)
  • payload_name (optional, default “” string) Names the content to aid in downstream filtering.
Return
none
Notes
  • injection limits are enforced as described above
  • if the DashboardOutput plugin is configured a graphical view of the data is automatically generated.
inject_message(message_table)

Creates a new Heka protocol buffer message using the contents of the specified Lua table (overwriting whatever is in the output buffer). Notes about message fields:

  • Timestamp is automatically generated if one is not provided. Nanosecond since the UNIX epoch is the only valid format.

  • UUID is automatically generated, anything provided by the user is ignored.

  • Hostname and Logger are automatically set by the SandboxFilter and cannot be overridden.

  • Type is prepended with “heka.sandbox.” by the SandboxFilter to avoid data confusion/mis-representation.

  • Fields can be represented in multiple forms and support the following primitive types: string, double, bool. These constructs should be added to the ‘Fields’ table in the message structure. Note: since the Fields structure is a map and not an array, like the protobuf message, fields cannot be repeated.
    • name=value i.e., foo=”bar”; foo=1; foo=true

    • name={array} i.e., foo={“b”, “a”, “r”}

    • name={object} i.e. foo={value=1, representation=”s”}; foo={value={1010, 2200, 1567}, representation=”ms”}
      • value (required) may be a single value or an array of values
      • representation (optional) metadata for display and unit management
Arguments
  • message_table A table with the proper message structure.
Return
none
Notes
  • injection limits are enforced as described above

Sample Lua Message Structure

{
Uuid        = "data",               -- always ignored
Logger      = "nginx",              -- ignored in the SandboxFilter
Hostname    = "bogus.mozilla.com",  -- ignored in the SandboxFilter

Timestamp   = 1e9,
Type        = "TEST",               -- will become "heka.sandbox.TEST" in the SandboxFilter
Papload     = "Test Payload",
EnvVersion  = "0.8",
Pid         = 1234,
Severity    = 6,
Fields      = {
            http_status     = 200,
            request_size    = {value=1413, representation="B"}
            }
}

Lua Sandbox Tutorial

How to create a simple sandbox filter

  1. Implement the required Heka interface in Lua
function process_message ()
    return 0
end

function timer_event(ns)
end
  1. Add the business logic (count the number of ‘demo’ events per minute)
require "string"

total = 0 -- preserved between restarts since it is in global scope
local count = 0 -- local scope so this will not be preserved

function process_message()
    total= total + 1
    count = count + 1
    return 0
end

function timer_event(ns)
    output(string.format("%d messages in the last minute; total=%d", count, total))
    count = 0
    inject_message()
end
  1. Setup the configuration
[demo_counter]
type = "SandboxFilter"
message_matcher = "Type == 'demo'"
ticker_interval = 60
script_type = "lua"
filename = "counter.lua"
preserve_data = true

4. Extending the business logic (count the number of ‘demo’ events per minute per device)

require "string"

device_counters = {}

function process_message()
    local device_name = read_message("Fields[DeviceName]")
    if device_name == nil then
        device_name = "_unknown_"
    end

    local dc = device_counters[device_name]
    if dc == nil then
        dc = {count = 1, total = 1}
        device_counters[device_name] = dc
    else
        dc.count = dc.count + 1
        dc.total = dc.total + 1
    end
    return 0
end

function timer_event(ns)
    output("#device_name\tcount\ttotal\n")
    for k, v in pairs(device_counters) do
        output(string.format("%s\t%d\t%d\n", k, v.count, v.total))
        v.count = 0
    end
    inject_message()
end

SandboxManagerFilter

The SandboxManagerFilter provides dynamic control (start/stop) of sandbox filters in a secure manner without stopping the Heka daemon. Commands are sent to a SandboxManagerFilter using a signed Heka message. The intent is to have one manager per access control group each with their own message signing key. Users in each group can submit a signed control message to manage any filters running under the associated manager. A signed message is not an enforced requirement but it is highly recommended in order to restrict access to this functionality.

SandboxManagerFilter Settings

  • Common Filter Parameters

  • working_directory (string):

    The directory where the filter configurations, code, and states are preserved. The directory can be unique or shared between sandbox managers since the filter names are unique per manager. Defaults to a directory in ${BASE_DIR}/sbxmgrs with a name generated from the plugin name.

  • module_directory (string):

    The directory where ‘require’ will attempt to load the external Lua modules from. Defaults to ${SHARE_DIR}/lua_modules.

  • max_filters (uint):

    The maximum number of filters this manager can run.

New in version 0.5.

  • memory_limit (uint):

    The number of bytes managed sandboxes are allowed to consume before being terminated (max 8MiB, default max).

  • instruction_limit (uint):

    The number of instructions managed sandboxes are allowed the execute during the process_message/timer_event functions before being terminated (max 1M, default max).

  • output_limit (uint):

    The number of bytes managed sandbox output buffers can hold before before being terminated (max 63KiB, default max). Anything less than 64B is set to 64B.

Example

[OpsSandboxManager]
type = "SandboxManagerFilter"
message_signer = "ops"
message_matcher = "Type == 'heka.control.sandbox'"
max_filters = 100

Control Message

The sandbox manager control message is a regular Heka message with the following variables set to the specified values.

Starting a SandboxFilter

  • Type: “heka.control.sandbox”
  • Payload: sandbox code
  • Fields[action]: “load”
  • Fields[config]: the TOML configuration for the SandboxFilter sandboxfilter_settings

Stopping a SandboxFilter

  • Type: “heka.control.sandbox”
  • Fields[action]: “unload”
  • Fields[name]: The SandboxFilter name specified in the configuration

heka-sbmgr

Heka Sbmgr is a tool for managing (starting/stopping) sandbox filters by generating the control messages defined above.

Command Line Options

heka-sbmgr [-config config_file] [-action load|unload] [-filtername specified on unload] [-script sandbox script filename] [-scriptconfig sandbox script configuration filename]

Configuration Variables

  • ip_address (string): IP address of the Heka server.

  • use_tls (bool): Specifies whether or not SSL/TLS encryption should be used for the TCP connections. Defaults to false.

  • signer (object): Signer information for the encoder.
    • name (string): The name of the signer.
    • hmac_hash (string): md5 or sha1
    • hmac_key (string): The key the message will be signed with.
    • version (int): The version number of the hmac_key.
  • tls (TlsConfig): A sub-section that specifies the settings to be used for any SSL/TLS encryption. This will only have any impact if use_tls is set to true. See Configuring TLS.

Example

ip_address       = "127.0.0.1:5565"
use_tls          = true
[signer]
    name         = "test"
    hmac_hash    = "md5"
    hmac_key     = "4865ey9urgkidls xtb0[7lf9rzcivthkm"
    version      = 0

[tls]
    cert_file = "heka.crt"
    key_file = "heka.key"
    client_auth = "NoClientCert"
    prefer_server_ciphers = true
    min_version = "TLS11"

heka-sbmgrload

Heka Sbmgrload is a test tool for starting/stopping a large number of sandboxes. The script and configuration are built into the tool and the filters will be named: CounterSandboxN where N is the instance number.

Command Line Options

heka-sbmgrload [-config config_file] [-action load|unload] [-num number of sandbox instances]

Configuration Variables (same as heka-sbmgr)

Tutorial - How to use the dynamic sandboxes

SandboxManager/Heka Setup

  1. Configure the SandboxManagerFilter.

The SandboxManagerFilters are defined in the hekad configuration file and are created when hekad starts. The manager provides a location/namespace for SandboxFilters to run and controls access to this space via a signed Heka message. By associating a message_signer with the manager we can restrict who can load and unload the associated filters. Lets start by configuring a SandboxManager for a specific set of users; platform developers. Choose a unique filter name [PlatformDevs] and a signer name “PlatformDevs”, in this case we will use the same name for each.

[PlatformDevs]
type = "SandboxManagerFilter"
message_signer = "PlatformDevs"
message_matcher = "Type == 'heka.control.sandbox'"
working_directory = "/var/heka/sandbox"
max_filters = 100
  1. Configure the input that will receive the SandboxManager control messages.

For this setup we will extend the current TCP input to handle our signed messages. The signer section consists of the signer name followed by an underscore and the key version number (the reason for this notation is to simply flatten the signer configuration structure into a single map). Multiple key versions are allowed to be active at the same time facilitating the rollout of new keys.

[TCP:5565]
type = "TcpInput"
address = ":5565"
    [TCP:5565.signer.PlatformDevs_0]
    hmac_key = "Old Platform devs signing key"
    [TCP:5565.signer.PlatformDevs_1]
    hmac_key = "Platform devs signing key"

3. Configure the sandbox manager utility (sbmgr). The signer information must exactly match the values in the input configuration above otherwise the messages will be discarded. Save the file as PlatformDevs.toml.

ip_address       = ":5565"
[signer]
    name         = "PlatformDevs"
    hmac_hash    = "md5"
    hmac_key     = "Platform devs signing key"
    version      = 1

SandboxFilter Setup

  1. Create a SandboxFilter script and save it as “example.lua”. See Lua Sandbox Tutorial for more detail.
require "circular_buffer"

data = circular_buffer.new(1440, 1, 60) -- message count per minute
local COUNT = data:set_header(1, "Messages", "count")
function process_message ()
    local ts = read_message("Timestamp")
    data:add(ts, COUNT, 1)
    return 0
end

function timer_event(ns)
    inject_message(data)
end
  1. Create the SandboxFilter configuration and save it as “example.toml”.

The only difference between a static and dynamic SandboxFilter configuration is the filename. In the dynamic configuration it can be left blank or left out entirely. The manager will assign the filter a unique system wide name, in this case, “PlatformDevs-Example”.

[Example]
type = "SandboxFilter"
message_matcher = "Type == 'Widget'"
ticker_interval = 60
script_type = "lua"
filename = ""
preserve_data = false
  1. Load the filter using sbmgr.
sbmgr -action=load -config=PlatformDevs.toml -script=example.lua -scriptconfig=example.toml

If you are running the DashboardOutput the following links are available:

Otherwise

Note

A running filter cannot be ‘reloaded’ it must be unloaded and loaded again. The state is not preserved in this case for two reasons (in the future we hope to remedy this):

1. During the unload/load process some data can be missed creating a small gap in the analysis causing anomalies and confusion. 2. The internal data representation may have changed and restoration may be problematic.

  1. Unload the filter using sbmgr.
sbmgr -action=unload -config=PlatformDevs.toml -filtername=Example

SandboxDecoder

The SandboxDecoder provides an isolated execution environment for data parsing and complex transformations without the need to recompile Heka. See Sandbox.

Config:

  • script_type (string):

    The language the sandbox is written in. Currently the only valid option is ‘lua’.

  • filename (string):

    The path to the sandbox code; if specified as a relative path it will be appended to Heka’s global share_dir.

  • preserve_data (bool):

    True if the sandbox global data should be preserved/restored on Heka shutdown/startup.

  • memory_limit (uint):

    The number of bytes the sandbox is allowed to consume before being terminated (max 8MiB, default max).

  • instruction_limit (uint):

    The number of instructions the sandbox is allowed the execute during the process_message function before being terminated (max 1M, default max).

  • output_limit (uint):

    The number of bytes the sandbox output buffer can hold before before being terminated (max 63KiB, default max). Anything less than 64B is set to 64B.

  • module_directory (string):

    The directory where ‘require’ will attempt to load the external Lua modules from. Defaults to ${SHARE_DIR}/lua_modules.

  • config (object):

    A map of configuration variables available to the sandbox via read_config. The map consists of a string key with: string, bool, int64, or float64 values.

Example

[sql_decoder]
type = "SandboxDecoder"
script_type = "lua"
filename = "sql_decoder.lua"

Available Sandbox Decoders

Nginx Access Log Decoder

Parses the Nginx access logs based on the Nginx ‘log_format’ configuration directive.

Config:

  • log_format (string)

    The ‘log_format’ configuration directive from the nginx.conf. $time_local or $time_iso8601 variable is converted to the number of nanosecond since the Unix epoch and used to set the Timestamp on the message.

  • type (string, optional, default nil):

    Sets the message ‘Type’ header to the specified value

  • user_agent_transform (bool, optional, default false)

    Transform the http_user_agent into user_agent_browser, user_agent_version, user_agent_os.

  • user_agent_keep (bool, optional, default false)

    Always preserve the http_user_agent value if transform is enabled.

  • user_agent_conditional (bool, optional, default false)

    Only preserve the http_user_agent value if transform is enabled and fails.

Example Heka Configuration

[FxaNginxAccessDecoder]
type = "SandboxDecoder"
script_type = "lua"
filename = "lua_decoders/nginx_access.lua"

[FxaNginxAccessDecoder.config]
log_format = '$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for"'
user_agent_transform = true

Example Heka Message

Timestamp:2014-01-10 07:04:56 -0800 PST
Type:logfile
Hostname:trink-x230
Pid:0
UUID:8e414f01-9d7f-4a48-a5e1-ae92e5954df5
Logger:FxaNginxAccessInput
Payload:
EnvVersion:
Severity:7
Fields:
name:”remote_user” value_string:”-“
name:”http_x_forwarded_for” value_string:”-“
name:”http_referer” value_string:”-“
name:”body_bytes_sent” value_type:DOUBLE representation:”B” value_double:82
name:”remote_addr” value_string:”62.195.113.219”
name:”status” value_type:DOUBLE value_double:200
name:”request” value_string:”GET /v1/recovery_email/status HTTP/1.1”
name:”user_agent_os” value_string:”FirefoxOS”
name:”user_agent_browser” value_string:”Firefox”
name:”user_agent_version” value_type:DOUBLE value_double:29

Rsyslog Decoder

Parses the rsyslog output using the string based configuration template.

Config:

  • template (string)

    The ‘template’ configuration string from rsyslog.conf.

  • tz (string, optional, defaults to UTC)

    The conversion actually happens on the Go side since there isn’t good TZ support here.

Example Heka Configuration

[RsyslogDecoder]
type = "SandboxDecoder"
script_type = "lua"
filename = "lua_decoders/rsyslog.lua"

[RsyslogDecoder.config]
template = '%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n'
tz = "America/Los_Angeles"

Example Heka Message

Timestamp:2014-02-10 12:58:58 -0800 PST
Type:logfile
Hostname:trink-x230
Pid:0
UUID:e0eef205-0b64-41e8-a307-5772b05e16c1
Logger:RsyslogInput
Payload:“imklog 5.8.6, log source = /proc/kmsg started.”
EnvVersion:
Severity:7
Fields:
name:”syslogtag” value_string:”kernel:”]

Lua Parsing Expression Grammars (LPeg)

Best practices (using Lpeg in the sandbox)

  1. Read the LPeg reference

  2. There are no plans to include the ‘re’ module so embrace the SNOBOL tradition. Why?
    • Consistency and readability of a single syntax.
    • Promotes more modular grammars.
    • Is easier to comment.
  3. Do not use parentheses around function calls that take a single string argument.

-- prefer
lpeg.P"Literal"

-- instead of
lpeg.P("Literal")
  1. When writing sub-grammars with an ordered choice (+) place each choice on its own line; this make it easier to pick out the alternates. Also, if possible order them from most frequent to least frequent use.
local date_month = lpeg.P"0" * lpeg.R"19"
                   + "1" * lpeg.R"02"

-- The exception: when grouping alternates together in a higher level grammar.

local log_grammar = (rfc3339 + iso8601) * log_severity * log_message
  1. Use the locale patterns when matching standard character classes.
-- prefer
lpeg.digit

-- instead of
lpeg.R"09".
  1. If a literal occurs within an expression avoid wrapping it in a function.
-- prefer
lpeg.digit * "Test"

-- instead of
lpeg.digit * lpeg.P"Test"
  1. When creating a parser from an RFC standard mirror the ABNF grammar that is provided.
  2. If creating a grammar that would also be useful to others, please consider contributing it back to the project, thanks.
  3. Use the grammar tester http://lpeg.trink.com.

SandboxFilter

The sandbox filter provides an isolated execution environment for data analysis. Any output generated by the sandbox is injected into the payload of a new message for further processing or to be output.

Config:

  • Common Filter Parameters

  • script_type (string):

    The language the sandbox is written in. Currently the only valid option is ‘lua’.

  • filename (string):

    For a static configuration this is the path to the sandbox code; if specified as a relative path it will be appended to Heka’s global share_dir. The filename must be unique between static plugins, since the global data is preserved using this name. For a dynamic configuration the filename is ignored and the the physical location on disk is controlled by the SandboxManagerFilter.

  • preserve_data (bool):

    True if the sandbox global data should be preserved/restored on Heka shutdown/startup.

  • memory_limit (uint):

    The number of bytes the sandbox is allowed to consume before being terminated (max 8MiB, default max). For a dynamic configuration the value is ignored and the SandboxManagerFilter setting is used.

  • instruction_limit (uint):

    The number of instructions the sandbox is allowed the execute during the process_message/timer_event functions before being terminated (max 1M, default max). For a dynamic configuration the value is ignored and the SandboxManagerFilter setting is used.

  • output_limit (uint):

    The number of bytes the sandbox output buffer can hold before before being terminated (max 63KiB, default max). Anything less than 64B is set to 64B. For a dynamic configuration the value is ignored and the SandboxManagerFilter setting is used.

  • profile (bool):

    When true a statistically significant number of ProcessMessage timings are immediately captured before reverting back to the regular sampling interval. The main purpose is for more accurate sandbox comparison/tuning/optimization.

  • module_directory (string):

    The directory where ‘require’ will attempt to load the external Lua modules from. Defaults to ${SHARE_DIR}/lua_modules. For a dynamic configuration the module_directory is ignored and the the physical location on disk is controlled by the SandboxManagerFilter.

  • config (object):

    A map of configuration variables available to the sandbox via read_config. The map consists of a string key with: string, bool, int64, or float64 values.

Example:

[hekabench_counter]
type = "SandboxFilter"
message_matcher = "Type == 'hekabench'"
ticker_interval = 1
script_type  = "lua"
filename = "counter.lua"
preserve_data = true
profile = false

    [hekabench_counter.config]
    rows = 1440
    sec_per_row = 60

Available Sandbox Filters

Circular Buffer Delta Aggregator

Collects the circular buffer delta output from multiple instances of an upstream sandbox filter (the filters should all be the same version at least with respect to their cbuf output). The purpose is to recreate the view at a larger scope in each level of the aggregation i.e., host view -> datacenter view -> service level view.

Config:

  • enable_delta (bool, optional, default false)

    Specifies whether or not this aggregator should generate cbuf deltas.

  • alert_rows (uint, optional, default 15)

    Specifies the size of the rolling average windows to compare. The window cannot be more than one third of the entire circular buffer.

  • alert_cols (JSON string, optional)

    An array of JSON objects consisting of a ‘col’ number and a ‘deviation’ alert threshold. If not specified no anomaly detection/alerting will be performed.

Example Heka Configuration

[TelemetryServerMetricsAggregator]
type = "SandboxFilter"
message_matcher = "Logger == 'TelemetryServerMetrics' && Fields[payload_type] == 'cbufd'"
ticker_interval = 60
script_type = "lua"
filename = "lua_filters/cbufd_aggregator.lua"
preserve_data = true

[TelemetryServerMetricsAggregator.config]
enable_delta = false
alert_rows = 15
alert_cols =  '[{"col":1, "deviation":2}]'

Circular Buffer Delta Aggregator (by hostname)

Collects the circular buffer delta output from multiple instances of an upstream sandbox filter (the filters should all be the same version at least with respect to their cbuf output). Each column from the source circular buffer will become its own graph. i.e., ‘Error Count’ will become a graph with each host being represented in a column.

Config:

  • max_hosts (uint)

    Pre-allocates the number of host columns in the graph(s). If the number of active hosts exceed this value, the plugin will terminate.

  • rows (uint)

    The number of rows to keep from the original circular buffer. Storing all the data from all the hosts is not practical since you will most likely run into memory and output size restrictions (adjust the view down as necessary).

  • host_expiration (uint, optional, default 120 seconds)

    The amount of time a host has to be inactive before it can be replaced by a new host.

Example Heka Configuration

[TelemetryServerMetricsHostAggregator]
type = "SandboxFilter"
message_matcher = "Logger == 'TelemetryServerMetrics' && Fields[payload_type] == 'cbufd'"
ticker_interval = 60
script_type = "lua"
filename = "lua_filters/cbufd_host_aggregator.lua"
preserve_data = true

[TelemetryServerMetricsHostAggregator.config]
max_hosts = 5
rows = 60
host_expiration = 120

Frequent Items

Calculates the most frequent items in a data stream.

Config:

  • message_variable (string)

    The message variable name containing the items to be counted.

  • max_items (uint, optional, default 1000)

    The maximum size of the sample set (higher will produce a more accurate list).

  • min_output_weight (uint, optional, default 100)

    Used to reduce the long tail output by only outputting the higher frequency items.

  • reset_days (uint, optional, default 1)

    Resets the list after the specified number of days (on the UTC day boundary). A value of 0 will never reset the list.

Example Heka Configuration

[FxaAuthServerFrequentIP]
type = "SandboxFilter"
script_type = "lua"
filename = "lua_filters/frequent_items.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Logger == 'nginx.access' && Type == 'fxa-auth-server'"

[FxaAuthServerFrequentIP.config]
message_variable = "Fields[remote_addr]"
max_items = 10000
min_output_weight = 100
reset_days = 1

Heka Message Schema (Message Documentation)

Generates documentation for each message type in a data stream. The output includes each message Type, its associated field attributes, and their counts (number in the brackets). This plugin is meant for data discovery/exploration and should not be left running on a production system.

Config:

<none>

Example Heka Configuration

[FxaAuthServerMessageSchema]
type = "SandboxFilter"
script_type = "lua"
filename = "lua_filters/heka_message_schema.lua"
ticker_interval = 60
preserve_data = false
message_matcher = "Logger == 'fxa-auth-server'"

Example Output

DB.getToken [11598]
id (string)
rid (string - optional [2])
msg (string)

HTTP Status Graph

Graphs HTTP status codes using the numeric Fields[status] variable collected from web server access logs.

Config:

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

Example Heka Configuration

[FxaAuthServerHTTPStatus]
type = "SandboxFilter"
script_type = "lua"
filename = "lua_filters/http_status.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Logger == 'nginx.access' && Type == 'fxa-auth-server'"

[FxaAuthServerHTTPStatus.config]
sec_per_row = 60
rows = 1440

Sandbox Development

Decoders

Since decoders cannot be dynamically loaded and they stop Heka processing on fatal errors they must be developed outside of a production enviroment. Most Lua decoders are LPeg based as it is the best way to parse and transform data within the sandbox. The other alternatives are the built-in Lua pattern matcher or the JSON parser with a manual transformation.

  1. Procure some sample data to be used as test input.

    timestamp=time_t key1=data1 key2=data2
  2. Configure a simple LogstreamerInput to deliver the data to your decoder.

    [LogstreamerInput]
    log_directory = "."
    file_match = 'data\.log'
    decoder = "SandboxDecoder"
    
  3. Configure your test decoder.

    [SandboxDecoder]
    script_type = "lua"
    filename = "decoder.lua"
    
  4. Configure the DasboardOutput for visibility into the decoder (performance, memory usage, messages processed/failed, etc.)

    [DashboardOutput]
    address = "127.0.0.1:4352"
    ticker_interval = 10
    working_directory = "dashboard"
    static_directory = "/usr/share/heka/dasher"
    
  5. Configure a LogOutput to display the generated messages.

    [LogOutput]
    message_matcher = "TRUE"
    
  6. Build the decoder.

    The decoder will receive a message from an input plugin. The input may have set some additional message headers but the ‘Payload’ header contains the data for the decoder. The decoder can access the payload using read_message(“Payload”). The payload can be used to construct an entirely new message, multiple messages or modify any part of the existing message (see inject_message, write_message in the Lua Sandbox API). Message headers not modified by the decoder are left intact and in the case of multiple message injections the initial message header values are duplicated for each message.

    1. LPeg grammar.

      Incrementally build and test your grammar using http://lpeg.trink.com.

    2. Lua pattern matcher.

      Test match expressions using http://www.lua.org/cgi-bin/demo.

    3. JSON parser.

      For data transformation use the LPeg/Lua matcher links above. Something like simple field remapping i.e. msg.Hostname = json.host can be verified in the LogOutput.

  7. Run Heka with the test configuration.

  8. Inspect/verify the messages written by LogOutput.

Filters

Since filters can be dynamically loaded it is recommended you develop them in production with live data.

  1. Read Tutorial - How to use the dynamic sandboxes

OR

  1. If you are developing the filter in conjunction with the decoder you can add it to the test configuration.

    [SandboxFilter]
    script_type = "lua"
    filename = "filter.lua"
    
  2. Debugging

    1. Watch for a dashboard sandbox termination report. The termination message provides the line number and cause of the failure. These are usually straight forward to correct and commonly caused by a syntax error in the script or invalid assumptions about the data (e.g. cnt = cnt + read_message(“Fields[counter]”) will fail if the counter field doesn’t exist or is non-numeric due to a error in the data).

    2. No termination report and the output does not match expectations. These are usually a little harder to debug.

      1. Check the Heka dasboard to make sure the router is sending messages to the plugin. If not, verify your message_matcher configuration.
      2. Visually review the the plugin for errors. Are the message field names correct, was the result of the cjson.decode tested, are the output variables actually being assigned to and output/injected, etc.
      3. Add a debug output message with the pertinent information.
      require "string"
      require "table"
      local dbg = {}
      
      -- table.insert(dbg, string.format("Entering function x arg1: %s", arg1))
      -- table.insert(dbg, "Exiting function x")
      
      output(table.concat(dbg, "\n"))
      inject_message("txt", "debug")
      
      1. LAST RESORT: Move the filter out of production, turn on preservation, run the tests, stop Heka, and review the entire preserved state of the filter.