mozilla

Sandbox

Sandboxes are Heka plugins that are implemented in a sandboxed scripting language. They provide a dynamic and isolated execution environment for data parsing, transformation, and analysis. They allow real time access to data in production without jeopardizing the integrity or performance of the monitoring infrastructure and do not require Heka to be recompiled. This broadens the audience that the data can be exposed to and facilitates new uses of the data (i.e. debugging, monitoring, dynamic provisioning, SLA analysis, intrusion detection, ad-hoc reporting, etc.)

Features

  • dynamic loading
    • SandboxFilters can be started/stopped on a self-service basis while Heka is running
    • SandboxDecoder can only be started/stopped on a Heka restart but no recompilation is required to add new functionality.
  • small - memory requirements are about 16 KiB for a basic sandbox

  • fast - microsecond execution times

  • stateful - ability to resume where it left off after a restart/reboot

  • isolated - failures are contained and malfunctioning sandboxes are terminated

Lua Sandbox

The Lua sandbox provides full access to the Lua language in a sandboxed environment under hekad that enforces configurable restrictions.

API

Functions that must be exposed from the Lua sandbox

int, string process_message()

This is the entry point for input plugins to start creating messages. For all other plugin types it is called by Heka when a message is available to the sandbox. The instruction_limit configuration parameter is applied to this function call for non input plugins.

Arguments
none
Return
  • int
    • < 0 for non-fatal failure (increments ProcessMessageFailures)
    • -2 for no output, but no error (encoders only)
    • 0 for success
    • > 0 for fatal error (terminates the sandbox)
  • string optional error message

timer_event(ns)

Called by Heka when the ticker_interval expires. The instruction_limit configuration parameter is applied to this function call. This function is only required in SandboxFilters that have a ticker_interval configuration greater than zero.

Arguments
  • ns (int64) current time in nanoseconds since the UNIX epoch
Return
none

Core functions that are exposed to the Lua sandbox

See: https://github.com/mozilla-services/lua_sandbox/

require(libraryName)

Available In
All plugin types
add_to_payload(arg1, arg2, ...argN)

Appends the arguments to the payload buffer for incremental construction of the final payload output (inject_payload finalizes the buffer and sends the message to Heka). This function is simply a rename of the generic sandbox output function to improve the readability of the plugin code.

Arguments
  • arg (number, string, bool, nil, circular_buffer)
Return
none
Available In
Decoders, filters, encoders

Heka specific functions that are exposed to the Lua sandbox

read_config(variableName)

Provides access to the sandbox configuration variables.

Arguments
  • variableName (string)
Return
number, string, bool, nil depending on the type of variable requested
Available In
All plugin types
read_message(variableName, fieldIndex, arrayIndex)

Provides access to the Heka message data. Note that both fieldIndex and arrayIndex are zero-based (i.e. the first element is 0) as opposed to Lua’s standard indexing, which is one-based.

Arguments
  • variableName (string)
    • raw (accesses the raw MsgBytes in the PipelinePack)
    • Uuid
    • Type
    • Logger
    • Payload
    • EnvVersion
    • Hostname
    • Timestamp
    • Severity
    • Pid
    • Fields[_name_]
  • fieldIndex (unsigned) only used in combination with the Fields variableName
    • use to retrieve a specific instance of a repeated field _name_; zero indexed
  • arrayIndex (unsigned) only used in combination with the Fields variableName
    • use to retrieve a specific element out of a field containing an array; zero indexed
Return
number, string, bool, nil depending on the type of variable requested
Available In
Decoders, filters, encoders, outputs
write_message(variableName, value, representation, fieldIndex, arrayIndex)

New in version 0.5.

Mutates specified field value on the message that is being decoded.

Arguments
  • variableName (string)
    • Uuid (accepts raw bytes or RFC4122 string representation)

    • Type (string)

    • Logger (string)

    • Payload (string)

    • EnvVersion (string)

    • Hostname (string)

    • Timestamp (accepts Unix ns-since-epoch number or a handful of

      parseable string representations.)

    • Severity (number or int-parseable string)

    • Pid (number or int-parseable string)

    • Fields[_name_] (field type determined by value type: bool, number, or string)

  • value (bool, number or string)
    • value to which field should be set
  • representation (string) only used in combination with the Fields variableName
    • representation tag to set
  • fieldIndex (unsigned) only used in combination with the Fields variableName
    • use to set a specfic instance of a repeated field _name_
  • arrayIndex (unsigned) only used in combination with the Fields variableName
    • use to set a specific element of a field containing an array
Return
none
Available In
Decoders, encoders
read_next_field()

Deprecated since version 0.10.0: Use read_message(“raw”) instead e.g.,

local msg = decode_message(read_message("raw"))
if msg.Fields then
    for i, f in ipairs(msg.Fields) do
    -- process fields
    end
end

Iterates through the message fields returning the field contents or nil when the end is reached.

Arguments
none
Return
value_type, name, value, representation, count (number of items in the field array)
Available In
Decoders, filters, encoders, outputs

inject_payload(payload_type, payload_name, arg3, ..., argN)

Creates a new Heka message using the contents of the payload buffer (pre-populated with add_to_payload) combined with any additional payload_args passed to this function. The output buffer is cleared after the injection. The payload_type and payload_name arguments are two pieces of optional metadata. If specified, they will be included as fields in the injected message e.g., Fields[payload_type] == ‘csv’, Fields[payload_name] == ‘Android Usage Statistics’. The number of messages that may be injected by the process_message or timer_event functions are globally controlled by the hekad global configuration options; if these values are exceeded the sandbox will be terminated.

Arguments
  • payload_type (optional, default “txt” string) Describes the content type of the injected payload data.
  • payload_name (optional, default “” string) Names the content to aid in downstream filtering.
  • arg3 (optional) Same type restrictions as add_to_payload.
  • ...
  • argN
Return
none
Available In
Decoders, filters, encoders
inject_message(message)

Creates a new Heka protocol buffer message using the contents of the specified Lua table (overwriting whatever is in the output buffer). Notes about message fields:

  • Timestamp is automatically generated if one is not provided. Nanosecond since the UNIX epoch is the only valid format.

  • UUID is automatically generated if a 16 byte binary UUID is not provided.

  • Hostname and Logger are automatically set by the SandboxFilter and cannot be overridden.

  • Type is prepended with “heka.sandbox.” by the SandboxFilter to avoid data confusion/mis-representation.

  • Fields (hash structure) can be represented in multiple forms and support the following primitive types: string, double, bool. These constructs can be added to the ‘Fields’ table in the message structure.

    • name=value e.g., foo=”bar”; foo=1; foo=true

    • name={array} e.g., foo={“b”, “a”, “r”}

    • name={object} e.g., foo={value=1, representation=”s”}; foo={value={1010, 2200, 1567}, value_type=2, representation=”ms”}

      • value (required) may be a single value or an array of values
      • value_type (optional) value_type enum. This is most useful for specifying that numbers should be treated as integers as opposed defaulting to doubles.
      • representation (optional) metadata for display and unit management
  • Fields (array structure)
    • same as above but the hash key name is moved into the object as ‘name’ e.g., Fields = {{name=”foo”, value=”bar”}}
Arguments
  • message (table or string) A table with the message structure documented below or a string with a Heka protobuf encoded message.
Return
none
Available In
Inputs, decoders, filters, encoders
Notes
Injection limits are only enforced on filter plugins. See max_*_inject in the global configuration options.
decode_message(heka_protobuf_string)

Converts a Heka protobuf encoded message string into a Lua table.

Arguments
  • heka_message (string) Lua variable containing a Heka protobuf encoded message
Return
  • message (table) The array based version of the message structure with the value member always being an array (even if there is only a single item). This format makes working with the output more consistent. The wide variation in the inject table format is to ease the construction of the message especially when using an LPeg grammar transformation.

Lua Message Hash Based Field Structure

{
Uuid        = "data",               -- ignored if not 16 byte raw binary UUID
Logger      = "nginx",              -- ignored in the SandboxFilter
Hostname    = "bogus.mozilla.com",  -- ignored in the SandboxFilter

Timestamp   = 1e9,
Type        = "TEST",               -- will become "heka.sandbox.TEST" in the SandboxFilter
Payload     = "Test Payload",
EnvVersion  = "0.8",
Pid         = 1234,
Severity    = 6,
Fields      = {
            http_status     = 200, -- encoded as a double
            request_size    = {value=1413, value_type=2, representation="B"} -- encoded as an integer
            }
}

Lua Message Array Based Field Structure

{
-- same as above
Fields      = {
            {name="http_status", value=200}, -- encoded as a double
            {name="request_size", value=1413, value_type=2, representation="B"} -- encoded as an integer
            }
}

Lua Sandbox Tutorial

How to create a simple sandbox filter

  1. Implement the required Heka interface in Lua
function process_message ()
    return 0
end

function timer_event(ns)
end
  1. Add the business logic (count the number of ‘demo’ events per minute)
require "string"

total = 0 -- preserved between restarts since it is in global scope
local count = 0 -- local scope so this will not be preserved

function process_message()
    total= total + 1
    count = count + 1
    return 0
end

function timer_event(ns)
    count = 0
    inject_payload("txt", "",
                   string.format("%d messages in the last minute; total=%d", count, total))
end
  1. Setup the configuration
[demo_counter]
type = "SandboxFilter"
message_matcher = "Type == 'demo'"
ticker_interval = 60
filename = "counter.lua"
preserve_data = true

4. Extending the business logic (count the number of ‘demo’ events per minute per device)

require "string"

device_counters = {}

function process_message()
    local device_name = read_message("Fields[DeviceName]")
    if device_name == nil then
        device_name = "_unknown_"
    end

    local dc = device_counters[device_name]
    if dc == nil then
        dc = {count = 1, total = 1}
        device_counters[device_name] = dc
    else
        dc.count = dc.count + 1
        dc.total = dc.total + 1
    end
    return 0
end

function timer_event(ns)
    add_to_payload("#device_name\tcount\ttotal\n")
    for k, v in pairs(device_counters) do
        add_to_payload(string.format("%s\t%d\t%d\n", k, v.count, v.total))
        v.count = 0
    end
    inject_payload()
end

Sandbox Input

New in version 0.9.

Plugin Name: SandboxInput

The SandboxInput provides a flexible execution environment for data ingestion and transformation without the need to recompile Heka. Like all other sandboxes it needs to implement a process_message function. However, it doesn’t have to return until shutdown. If you would like to implement a polling interface process_message can return zero when complete and it will be called again the next time TickerInterval fires (if ticker_interval was set to zero it would simply exit after running once). See Sandbox.

Config:

  • All of the common input configuration parameters are ignored since the data processing (splitting and decoding) should happen in the plugin.

  • Common Sandbox Parameters
    • instruction_limit is always set to zero for SandboxInputs

Example

[MemInfo]
type = "SandboxInput"
filename = "meminfo.lua"

[MemInfo.config]
path = "/proc/meminfo"

Available Sandbox Inputs

  • none

Sandbox Decoder

Plugin Name: SandboxDecoder

The SandboxDecoder provides an isolated execution environment for data parsing and complex transformations without the need to recompile Heka. See Sandbox.

Config:

Example

[sql_decoder]
type = "SandboxDecoder"
filename = "sql_decoder.lua"

Available Sandbox Decoders

Apache Access Log Decoder

Parses the Apache access logs based on the Apache ‘LogFormat’ configuration directive. The Apache format specifiers are mapped onto the Nginx variable names where applicable e.g. %a -> remote_addr. This allows generic web filters and outputs to work with any HTTP server input.

Config:

  • log_format (string)

    The ‘LogFormat’ configuration directive from the apache2.conf. %t variables are converted to the number of nanosecond since the Unix epoch and used to set the Timestamp on the message. http://httpd.apache.org/docs/2.4/mod/mod_log_config.html

  • type (string, optional, default nil):

    Sets the message ‘Type’ header to the specified value

  • user_agent_transform (bool, optional, default false)

    Transform the http_user_agent into user_agent_browser, user_agent_version, user_agent_os.

  • user_agent_keep (bool, optional, default false)

    Always preserve the http_user_agent value if transform is enabled.

  • user_agent_conditional (bool, optional, default false)

    Only preserve the http_user_agent value if transform is enabled and fails.

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example Heka Configuration

[TestWebserver]
type = "LogstreamerInput"
log_directory = "/var/log/apache"
file_match = 'access\.log'
decoder = "CombinedLogDecoder"

[CombinedLogDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/apache_access.lua"

[CombinedLogDecoder.config]
type = "combined"
user_agent_transform = true
# combined log format
log_format = '%h %l %u %t \"%r\" %>s %O \"%{Referer}i\" \"%{User-Agent}i\"'

# common log format
# log_format = '%h %l %u %t \"%r\" %>s %O'

# vhost_combined log format
# log_format = '%v:%p %h %l %u %t \"%r\" %>s %O \"%{Referer}i\" \"%{User-Agent}i\"'

# referer log format
# log_format = '%{Referer}i -> %U'

Example Heka Message

Timestamp:2014-01-10 07:04:56 -0800 PST
Type:combined
Hostname:test.example.com
Pid:0
UUID:8e414f01-9d7f-4a48-a5e1-ae92e5954df5
Logger:TestWebserver
Payload:
EnvVersion:
Severity:7
Fields:
name:”remote_user” value_string:”-“
name:”http_x_forwarded_for” value_string:”-“
name:”http_referer” value_string:”-“
name:”body_bytes_sent” value_type:DOUBLE representation:”B” value_double:82
name:”remote_addr” value_string:”62.195.113.219” representation:”ipv4”
name:”status” value_type:DOUBLE value_double:200
name:”request” value_string:”GET /v1/recovery_email/status HTTP/1.1”
name:”user_agent_os” value_string:”FirefoxOS”
name:”user_agent_browser” value_string:”Firefox”
name:”user_agent_version” value_type:DOUBLE value_double:29

Graylog Extended Log Format Decoder

Parses a payload containing JSON in the Graylog2 Extended Format specficiation. http://graylog2.org/resources/gelf/specification

Config:

  • type (string, optional, default nil):

    Sets the message ‘Type’ header to the specified value

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example of Graylog2 Exteded Format Log

{
  "version": "1.1",
  "host": "rogueethic.com",
  "short_message": "This is a short message to identify what is going on.",
  "full_message": "An entire backtrace\ncould\ngo\nhere",
  "timestamp": 1385053862.3072,
  "level": 1,
  "_user_id": 9001,
  "_some_info": "foo",
  "_some_env_var": "bar"
}

Example Heka Configuration

[GELFLogInput]
type = "LogstreamerInput"
log_directory = "/var/log"
file_match = 'application\.gelf'
decoder = "GraylogDecoder"

[GraylogDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/graylog_decoder.lua"

    [GraylogDecoder.config]
    type = "gelf"
    payload_keep = true

Linux CPU Stats Decoder

Parses a payload containing the contents of file /proc/stat.

Config:

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example Heka Configuration

[ProcStats]
type = "FilePollingInput"
ticker_interval = 1
file_path = "/proc/stat"
decoder = "ProcStatDecoder"

[ProcStatDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_procstat.lua"

Example Heka Message

Timestamp:

2014-12-10 22:38:24 +0000 UTC

Type:

stats.proc

Hostname:

yourhost.net

Pid:

0

Uuid:

d2546942-7c36-4042-ad2e-f6bfdac11cdb

Logger:
Payload:
EnvVersion:
Severity:

7

Fields:
name:”cpu” type:double value:[14384,125,3330,946000,333,0,356,0,0,0]
name:”cpu[1-#]” type:double value:[14384,125,3330,946000,333,0,356,0,0,0]
name:”ctxt” type:double value:2808304
name:”btime” type:double value:1423004780
name:”intr” type:double value:[14384,125,3330,0,0,0,0,0,0,0...0]
name:”processes” type:double value:3811
name:”procs_running” type:double value:1
name:”procs_blocked” type:double value:0
name:”softirq” type:double value:[288977,23,101952,19,13046,19217,7,...]
Cpu fields:

1 2 3 4 5 6 7 8 9 10 user nice system idle [iowait] [irq] [softirq] [steal] [guest] [guestnice] Note: systems provide user, nice, system, idle. Other fields depend on kernel.

intr

This line shows counts of interrupts serviced since boot time, for each of the possible system interrupts. The first column is the total of all interrupts serviced including unnumbered architecture specific interrupts; each subsequent column is the total for that particular numbered interrupt. Unnumbered interrupts are not shown, only summed into the total.

Linux Disk Stats Decoder

Parses a payload containing the contents of a /sys/block/$DISK/stat file (where $DISK is a disk identifier such as sda) into a Heka message struct. This also tries to obtain the TickerInterval of the input it recieved the data from, by extracting it from a message field named TickerInterval.

Config:

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example Heka Configuration

[DiskStats]
type = "FilePollingInput"
ticker_interval = 1
file_path = "/sys/block/sda1/stat"
decoder = "DiskStatsDecoder"

[DiskStatsDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_diskstats.lua"

Example Heka Message

Timestamp:2014-01-10 07:04:56 -0800 PST
Type:stats.diskstats
Hostname:test.example.com
Pid:0
UUID:8e414f01-9d7f-4a48-a5e1-ae92e5954df5
Payload:
EnvVersion:
Severity:7
Fields:
name:”ReadsCompleted” value_type:DOUBLE value_double:”20123”
name:”ReadsMerged” value_type:DOUBLE value_double:”11267”
name:”SectorsRead” value_type:DOUBLE value_double:”1.094968e+06”
name:”TimeReading” value_type:DOUBLE value_double:”45148”
name:”WritesCompleted” value_type:DOUBLE value_double:”1278”
name:”WritesMerged” value_type:DOUBLE value_double:”1278”
name:”SectorsWritten” value_type:DOUBLE value_double:”206504”
name:”TimeWriting” value_type:DOUBLE value_double:”3348”
name:”TimeDoingIO” value_type:DOUBLE value_double:”4876”
name:”WeightedTimeDoingIO” value_type:DOUBLE value_double:”48356”
name:”NumIOInProgress” value_type:DOUBLE value_double:”3”
name:”TickerInterval” value_type:DOUBLE value_double:”2”
name:”FilePath” value_string:”/sys/block/sda/stat”

Linux Load Average Decoder

Parses a payload containing the contents of a /proc/loadavg file into a Heka message.

Config:

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example Heka Configuration

[LoadAvg]
type = "FilePollingInput"
ticker_interval = 1
file_path = "/proc/loadavg"
decoder = "LoadAvgDecoder"

[LoadAvgDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_loadavg.lua"

Example Heka Message

Timestamp:2014-01-10 07:04:56 -0800 PST
Type:stats.loadavg
Hostname:test.example.com
Pid:0
UUID:8e414f01-9d7f-4a48-a5e1-ae92e5954df5
Payload:
EnvVersion:
Severity:7
Fields:
name:”1MinAvg” value_type:DOUBLE value_double:”3.05”
name:”5MinAvg” value_type:DOUBLE value_double:”1.21”
name:”15MinAvg” value_type:DOUBLE value_double:”0.44”
name:”NumProcesses” value_type:DOUBLE value_double:”11”
name:”FilePath” value_string:”/proc/loadavg”

Linux Memory Stats Decoder

Parses a payload containing the contents of a /proc/meminfo file into a Heka message.

Config:

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example Heka Configuration

[MemStats]
type = "FilePollingInput"
ticker_interval = 1
file_path = "/proc/meminfo"
decoder = "MemStatsDecoder"

[MemStatsDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_memstats.lua"

Example Heka Message

Timestamp:2014-01-10 07:04:56 -0800 PST
Type:stats.memstats
Hostname:test.example.com
Pid:0
UUID:8e414f01-9d7f-4a48-a5e1-ae92e5954df5
Payload:
EnvVersion:
Severity:7
Fields:
name:”MemTotal” value_type:DOUBLE representation:”kB” value_double:”4047616”
name:”MemFree” value_type:DOUBLE representation:”kB” value_double:”3432216”
name:”Buffers” value_type:DOUBLE representation:”kB” value_double:”82028”
name:”Cached” value_type:DOUBLE representation:”kB” value_double:”368636”
name:”FilePath” value_string:”/proc/meminfo”

The total available fields can be found in man procfs. All fields are of type double, and the representation is in kB (except for the HugePages fields). Here is a full list of fields available:

MemTotal, MemFree, Buffers, Cached, SwapCached, Active, Inactive, Active(anon), Inactive(anon), Active(file), Inactive(file), Unevictable, Mlocked, SwapTotal, SwapFree, Dirty, Writeback, AnonPages, Mapped, Shmem, Slab, SReclaimable, SUnreclaim, KernelStack, PageTables, NFS_Unstable, Bounce, WritebackTmp, CommitLimit, Committed_AS, VmallocTotal, VmallocUsed, VmallocChunk, HardwareCorrupted, AnonHugePages, HugePages_Total, HugePages_Free, HugePages_Rsvd, HugePages_Surp, Hugepagesize, DirectMap4k, DirectMap2M, DirectMap1G.

Note that your available fields may have a slight variance depending on the system’s kernel version.

MySQL Slow Query Log Decoder

Parses and transforms the MySQL slow query logs. Use mariadb_slow_query.lua to parse the MariaDB variant of the MySQL slow query logs.

Config:

  • truncate_sql (int, optional, default nil)

    Truncates the SQL payload to the specified number of bytes (not UTF-8 aware) and appends ”...”. If the value is nil no truncation is performed. A negative value will truncate the specified number of bytes from the end.

Example Heka Configuration

[Sync-1_5-SlowQuery]
type = "LogstreamerInput"
log_directory = "/var/log/mysql"
file_match = 'mysql-slow\.log'
parser_type = "regexp"
delimiter = "\n(# User@Host:)"
delimiter_location = "start"
decoder = "MySqlSlowQueryDecoder"

[MySqlSlowQueryDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/mysql_slow_query.lua"

    [MySqlSlowQueryDecoder.config]
    truncate_sql = 64

Example Heka Message

Timestamp:2014-05-07 15:51:28 -0700 PDT
Type:mysql.slow-query
Hostname:127.0.0.1
Pid:0
UUID:5324dd93-47df-485b-a88e-429f0fcd57d6
Logger:Sync-1_5-SlowQuery
Payload:/* [queryName=FIND_ITEMS] */ SELECT bso.userid, bso.collection, ...
EnvVersion:
Severity:7
Fields:
name:”Rows_examined” value_type:DOUBLE value_double:16458
name:”Query_time” value_type:DOUBLE representation:”s” value_double:7.24966
name:”Rows_sent” value_type:DOUBLE value_double:5001
name:”Lock_time” value_type:DOUBLE representation:”s” value_double:0.047038

Nginx Access Log Decoder

Parses the Nginx access logs based on the Nginx ‘log_format’ configuration directive.

Config:

  • log_format (string)

    The ‘log_format’ configuration directive from the nginx.conf. $time_local or $time_iso8601 variable is converted to the number of nanosecond since the Unix epoch and used to set the Timestamp on the message. http://nginx.org/en/docs/http/ngx_http_log_module.html

  • type (string, optional, default nil):

    Sets the message ‘Type’ header to the specified value

  • user_agent_transform (bool, optional, default false)

    Transform the http_user_agent into user_agent_browser, user_agent_version, user_agent_os.

  • user_agent_keep (bool, optional, default false)

    Always preserve the http_user_agent value if transform is enabled.

  • user_agent_conditional (bool, optional, default false)

    Only preserve the http_user_agent value if transform is enabled and fails.

  • payload_keep (bool, optional, default false)

    Always preserve the original log line in the message payload.

Example Heka Configuration

[TestWebserver]
type = "LogstreamerInput"
log_directory = "/var/log/nginx"
file_match = 'access\.log'
decoder = "CombinedLogDecoder"

[CombinedLogDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/nginx_access.lua"

[CombinedLogDecoder.config]
type = "combined"
user_agent_transform = true
# combined log format
log_format = '$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"'

Example Heka Message

Timestamp:2014-01-10 07:04:56 -0800 PST
Type:combined
Hostname:test.example.com
Pid:0
UUID:8e414f01-9d7f-4a48-a5e1-ae92e5954df5
Logger:TestWebserver
Payload:
EnvVersion:
Severity:7
Fields:
name:”remote_user” value_string:”-“
name:”http_x_forwarded_for” value_string:”-“
name:”http_referer” value_string:”-“
name:”body_bytes_sent” value_type:DOUBLE representation:”B” value_double:82
name:”remote_addr” value_string:”62.195.113.219” representation:”ipv4”
name:”status” value_type:DOUBLE value_double:200
name:”request” value_string:”GET /v1/recovery_email/status HTTP/1.1”
name:”user_agent_os” value_string:”FirefoxOS”
name:”user_agent_browser” value_string:”Firefox”
name:”user_agent_version” value_type:DOUBLE value_double:29

Nginx Error Log Decoder

Parses the Nginx error logs based on the Nginx hard coded internal format.

Config:

  • tz (string, optional, defaults to UTC)

    The conversion actually happens on the Go side since there isn’t good TZ support here.

  • type (string, optional, defaults to “nginx.error”):

    Sets the message ‘Type’ header to the specified value

Example Heka Configuration

[TestWebserverError]
type = "LogstreamerInput"
log_directory = "/var/log/nginx"
file_match = 'error\.log'
decoder = "NginxErrorDecoder"

[NginxErrorDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/nginx_error.lua"

[NginxErrorDecoder.config]
tz = "America/Los_Angeles"

Example Heka Message

Timestamp:2014-01-10 07:04:56 -0800 PST
Type:nginx.error
Hostname:trink-x230
Pid:16842
UUID:8e414f01-9d7f-4a48-a5e1-ae92e5954df5
Logger:TestWebserverError
Payload:using inherited sockets from “6;”
EnvVersion:
Severity:5
Fields:
name:”tid” value_type:DOUBLE value_double:0
name:”connection” value_type:DOUBLE value_double:8878

Rsyslog Decoder

Parses the rsyslog output using the string based configuration template.

Config:

  • hostname_keep (boolean, defaults to false)

    Always preserve the original ‘Hostname’ field set by Logstreamer’s ‘hostname’ configuration setting.

  • template (string)

    The ‘template’ configuration string from rsyslog.conf. http://rsyslog-5-8-6-doc.neocities.org/rsyslog_conf_templates.html

  • tz (string, optional, defaults to UTC)

    If your rsyslog timestamp field in the template does not carry zone offset information, you may set an offset to be applied to your events here. Typically this would be used with the “Traditional” rsyslog formats.

    Parsing is done by Go, supports values of “UTC”, “Local”, or a location name corresponding to a file in the IANA Time Zone database, e.g. “America/New_York”.

Example Heka Configuration

[RsyslogDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/rsyslog.lua"

[RsyslogDecoder.config]
type = "RSYSLOG_TraditionalFileFormat"
template = '%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n'
tz = "America/Los_Angeles"

Example Heka Message

Timestamp:2014-02-10 12:58:58 -0800 PST
Type:RSYSLOG_TraditionalFileFormat
Hostname:trink-x230
Pid:0
UUID:e0eef205-0b64-41e8-a307-5772b05e16c1
Logger:RsyslogInput
Payload:“imklog 5.8.6, log source = /proc/kmsg started.”
EnvVersion:
Severity:7
Fields:
name:”programname” value_string:”kernel”

Externally Available Sandbox Decoders

Sandbox Filter

Plugin Name: SandboxFilter

The sandbox filter provides an isolated execution environment for data analysis. Any output generated by the sandbox is injected into the payload of a new message for further processing or to be output.

Config:

Example:

[hekabench_counter]
type = "SandboxFilter"
message_matcher = "Type == 'hekabench'"
ticker_interval = 1
filename = "counter.lua"
preserve_data = true
profile = false

    [hekabench_counter.config]
    rows = 1440
    sec_per_row = 60

Available Sandbox Filters

Circular Buffer Delta Aggregator

Collects the circular buffer delta output from multiple instances of an upstream sandbox filter (the filters should all be the same version at least with respect to their cbuf output). The purpose is to recreate the view at a larger scope in each level of the aggregation i.e., host view -> datacenter view -> service level view.

Config:

  • enable_delta (bool, optional, default false)

    Specifies whether or not this aggregator should generate cbuf deltas.

  • anomaly_config(string) - (see Anomaly Detection Module)

    A list of anomaly detection specifications. If not specified no anomaly detection/alerting will be performed.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the enable_delta configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[TelemetryServerMetricsAggregator]
type = "SandboxFilter"
message_matcher = "Logger == 'TelemetryServerMetrics' && Fields[payload_type] == 'cbufd'"
ticker_interval = 60
filename = "lua_filters/cbufd_aggregator.lua"
preserve_data = true

[TelemetryServerMetricsAggregator.config]
enable_delta = false
anomaly_config = 'roc("Request Statistics", 1, 15, 0, 1.5, true, false)'
preservation_version = 0

Circular Buffer Delta Aggregator (by hostname)

Collects the circular buffer delta output from multiple instances of an upstream sandbox filter (the filters should all be the same version at least with respect to their cbuf output). Each column from the source circular buffer will become its own graph. i.e., ‘Error Count’ will become a graph with each host being represented in a column.

Config:

  • max_hosts (uint)

    Pre-allocates the number of host columns in the graph(s). If the number of active hosts exceed this value, the plugin will terminate.

  • rows (uint)

    The number of rows to keep from the original circular buffer. Storing all the data from all the hosts is not practical since you will most likely run into memory and output size restrictions (adjust the view down as necessary).

  • host_expiration (uint, optional, default 120 seconds)

    The amount of time a host has to be inactive before it can be replaced by a new host.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the max_hosts or rows configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[TelemetryServerMetricsHostAggregator]
type = "SandboxFilter"
message_matcher = "Logger == 'TelemetryServerMetrics' && Fields[payload_type] == 'cbufd'"
ticker_interval = 60
filename = "lua_filters/cbufd_host_aggregator.lua"
preserve_data = true

[TelemetryServerMetricsHostAggregator.config]
max_hosts = 5
rows = 60
host_expiration = 120
preservation_version = 0

CPU Stats Filter

Calculates deltas in /proc/stat data. Also emits CPU percentage utilization information.

Config:

  • whitelist (string, optional, default “”)

    Only process fields that fit the pattern, defaults to match all.

  • extras (boolean, optional, default false)

    Process extra fields like ctxt, softirq, cpu fields.

  • percent_integer (boolean, optional, default true)

    Process percentage as whole number.

Example Heka Configuration

[ProcStats]
type = "FilePollingInput"
ticker_interval = 1
file_path = "/proc/stat"
decoder = "ProcStatDecoder"

[ProcStatDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_procstat.lua"

[ProcStatFilter]
type = "SandboxFilter"
filename = "lua_filters/procstat.lua"
preserve_data = true
message_matcher = "Type == 'stats.procstat'"
[ProcStatFilter.config]
    whitelist = "cpu$"
    extras = false
    percent_integer = true
Cpu fields:

1 2 3 4 5 6 7 8 9 10 user nice system idle [iowait] [irq] [softirq] [steal] [guest] [guestnice] Note: systems provide user, nice, system, idle. Other fields depend on kernel.

user: Time spent executing user applications (user mode). nice: Time spent executing user applications with low priority (nice). system: Time spent executing system calls (system mode). idle: Idle time. iowait: Time waiting for I/O operations to complete. irq: Time spent servicing interrupts. softirq: Time spent servicing soft-interrupts. steal: ticks spent executing other virtual hosts [virtualization setups] guest: Used in virtualization setups. guestnice: running a niced guest

intr
This line shows counts of interrupts serviced since boot time, for each of the possible system interrupts. The first column is the total of all interrupts serviced including unnumbered architecture specific interrupts; each subsequent column is the total for that particular numbered interrupt. Unnumbered interrupts are not shown, only summed into the total.
ctxt 115315
The number of context switches that the system underwent.
btime 769041601
Boot time, in seconds since the Epoch, 1970-01-01 00:00:00 +0000 (UTC).
processes 86031
Number of forks since boot.
procs_running 6
Number of process in runnable state. (Linux 2.5.45 onward.)
procs_blocked 2
Number of process blocked waiting for I/O to complete. (Linux 2.5.45 onward.)
softirq 288977 23 101952 19 13046 19217 7 19125 92077 389 43122
Time spent servicing soft-interrupts.

Disk Stats Filter

Graphs disk IO stats. It automatically converts the running totals of Writes and Reads into rates of the values. The time based fields are left as running totals of the amount of time doing IO. Expects to receive messages with disk IO data embedded in a particular set of message fields which matches what is generated by Linux Disk Stats Decoder: WritesCompleted, ReadsCompleted, SectorsWritten, SectorsRead, WritesMerged, ReadsMerged, TimeWriting, TimeReading, TimeDoingIO, WeightedTimeDoingIO, TickerInterval.

Config:

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • anomaly_config(string) - (see Anomaly Detection Module)

Example Heka Configuration

[DiskStatsFilter]
type = "SandboxFilter"
filename = "lua_filters/diskstats.lua"
preserve_data = true
message_matcher = "Type == 'stats.diskstats'"
ticker_interval = 10

Frequent Items

Calculates the most frequent items in a data stream.

Config:

  • message_variable (string)

    The message variable name containing the items to be counted.

  • max_items (uint, optional, default 1000)

    The maximum size of the sample set (higher will produce a more accurate list).

  • min_output_weight (uint, optional, default 100)

    Used to reduce the long tail output by only outputting the higher frequency items.

  • reset_days (uint, optional, default 1)

    Resets the list after the specified number of days (on the UTC day boundary). A value of 0 will never reset the list.

Example Heka Configuration

[FxaAuthServerFrequentIP]
type = "SandboxFilter"
filename = "lua_filters/frequent_items.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Logger == 'nginx.access' && Type == 'fxa-auth-server'"

[FxaAuthServerFrequentIP.config]
message_variable = "Fields[remote_addr]"
max_items = 10000
min_output_weight = 100
reset_days = 1

Heka Memory Statistics (self monitoring)

Graphs the Heka memory statistics using the heka.memstat message generated by pipeline/report.go.

Config:

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • anomaly_config (string, optional)

    See Anomaly Detection Module.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the rows or sec_per_row configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[HekaMemstat]
type = "SandboxFilter"
filename = "lua_filters/heka_memstat.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Type == 'heka.memstat'"

Heka Message Schema (Message Documentation)

Generates documentation for each unique message in a data stream. The output is a hierarchy of Logger, Type, EnvVersion, and a list of associated message field attributes including their counts (number in the brackets). This plugin is meant for data discovery/exploration and should not be left running on a production system.

Config:

<none>

Example Heka Configuration

[SyncMessageSchema]
type = "SandboxFilter"
filename = "lua_filters/heka_message_schema.lua"
ticker_interval = 60
preserve_data = false
message_matcher = "Logger != 'SyncMessageSchema' && Logger =~ /^Sync/"

Example Output

Sync-1_5-Webserver [54600]
slf [54600]
-no version- [54600]
upstream_response_time (mismatch)
http_user_agent (string)
body_bytes_sent (number)
remote_addr (string)
request (string)
upstream_status (mismatch)
status (number)
request_time (number)
request_length (number)
Sync-1_5-SlowQuery [37]
mysql.slow-query [37]
-no version- [37]
Query_time (number)
Rows_examined (number)
Rows_sent (number)
Lock_time (number)

Heka Process Message Failures (self monitoring)

Monitors Heka’s process message failures by plugin.

Config:

  • anomaly_config(string) - (see Anomaly Detection Module)

    A list of anomaly detection specifications. If not specified a default of ‘mww_nonparametric(“DEFAULT”, 1, 5, 10, 0.7)’ is used. The “DEFAULT” settings are applied to any plugin without an explict specification.

Example Heka Configuration

[HekaProcessMessageFailures]
type = "SandboxFilter"
filename = "lua_filters/heka_process_message_failures.lua"
ticker_interval = 60
preserve_data = false # the counts are reset on Heka restarts and the monitoring should be too.
message_matcher = "Type == 'heka.all-report'"

HTTP Status Graph

Graphs HTTP status codes using the numeric Fields[status] variable collected from web server access logs.

Config:

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • anomaly_config (string, optional)

    See Anomaly Detection Module.

  • alert_throttle (uint, optional, default 3600)

    Sets the throttle for the anomaly alert, in seconds.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the sec_per_row or rows configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[FxaAuthServerHTTPStatus]
type = "SandboxFilter"
filename = "lua_filters/http_status.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Logger == 'nginx.access' && Type == 'fxa-auth-server'"

[FxaAuthServerHTTPStatus.config]
sec_per_row = 60
rows = 1440
anomaly_config = 'roc("HTTP Status", 2, 15, 0, 1.5, true, false) roc("HTTP Status", 4, 15, 0, 1.5, true, false) mww_nonparametric("HTTP Status", 5, 15, 10, 0.8)'
alert_throttle = 300
preservation_version = 0

InfluxDB Batch

Converts Heka message contents to InfluxDB v0.9.0 API format and periodically emits batch messages with a payload ready for delivery via HTTP.

Optionally includes all standard message fields as tags or fields and iterates through all of the dynamic fields to add as points (series), skipping any fields explicitly omitted using the skip_fields config option. It can also map any Heka message fields as tags in the request sent to the InfluxDB write API, using the tag_fields config option. All dynamic fields in the Heka message are converted to separate points separated by newlines that are submitted to InfluxDB.

Note

This filter is intended for use with InfluxDB versions 0.9 or greater.

Config:

  • decimal_precision (string, optional, default “6”)

    String that is used in the string.format function to define the number of digits printed after the decimal in number values. The string formatting of numbers is forced to print with floating points because InfluxDB will reject values that change from integers to floats and vice-versa. By forcing all numbers to floats, we ensure that InfluxDB will always accept our numerical values, regardless of the initial format.

  • name_prefix (string, optional, default nil)

    String to use as the name key’s prefix value in the generated line. Supports message field interpolation. %{fieldname}. Any fieldname values of “Type”, “Payload”, “Hostname”, “Pid”, “Logger”, “Severity”, or “EnvVersion” will be extracted from the the base message schema, any other values will be assumed to refer to a dynamic message field. Only the first value of the first instance of a dynamic message field can be used for name name interpolation. If the dynamic field doesn’t exist, the uninterpolated value will be left in the name. Note that it is not possible to interpolate either the “Timestamp” or the “Uuid” message fields into the name, those values will be interpreted as referring to dynamic message fields.

  • name_prefix_delimiter (string, optional, default nil)

    String to use as the delimiter between the name_prefix and the field name. This defaults to a blank string but can be anything else instead (such as ”.” to use Graphite-like naming).

  • skip_fields (string, optional, default nil)

    Space delimited set of fields that should not be included in the InfluxDB measurements being generated. Any fieldname values of “Type”, “Payload”, “Hostname”, “Pid”, “Logger”, “Severity”, or “EnvVersion” will be assumed to refer to the corresponding field from the base message schema. Any other values will be assumed to refer to a dynamic message field. The magic value “all_base” can be used to exclude base fields from being mapped to the event altogether (useful if you don’t want to use tags and embed them in the name_prefix instead).

  • source_value_field (string, optional, default nil)

    If the desired behavior of this encoder is to extract one field from the Heka message and feed it as a single line to InfluxDB, then use this option to define which field to find the value from. Be careful to set the name_prefix field if this option is present or no measurement name will be present when trying to send to InfluxDB. When this option is present, no other fields besides this one will be sent to InfluxDB as a measurement whatsoever.

  • tag_fields (string, optional, default “all_base”)

    Take fields defined and add them as tags of the measurement(s) sent to InfluxDB for the message. The magic values “all” and “all_base” are used to map all fields (including taggable base fields) to tags and only base fields to tags, respectively. If those magic values aren’t used, then only those fields defined will map to tags of the measurement sent to InfluxDB. The tag_fields values are independent of the skip_fields values and have no affect on each other. You can skip fields from being sent to InfluxDB as measurements, but still include them as tags.

  • timestamp_precision (string, optional, default “ms”)

    Specify the timestamp precision that you want the event sent with. The default is to use milliseconds by dividing the Heka message timestamp by 1e6, but this math can be altered by specifying one of the precision values supported by the InfluxDB write API (ms, s, m, h). Other precisions supported by InfluxDB of n and u are not yet supported.

  • value_field_key (string, optional, default “value”)

    This defines the name of the InfluxDB measurement. We default this to “value” to match the examples in the InfluxDB documentation, but you can replace that with anything else that you prefer.

  • flush_count (string, optional, default 0)

    Specifies a number of messages that will trigger a batch flush, if received before a timer tick. Values of zero or lower mean to never flush on message count, only on ticker intervals.

Example Heka Configuration

[LoadAvgPoller]
type = "FilePollingInput"
ticker_interval = 5
file_path = "/proc/loadavg"
decoder = "LinuxStatsDecoder"

[LoadAvgDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_loadavg.lua"

[LinuxStatsDecoder]
type = "MultiDecoder"
subs = ["LoadAvgDecoder", "AddStaticFields"]
cascade_strategy = "all"
log_sub_errors = false

[AddStaticFields]
type = "ScribbleDecoder"

    [AddStaticFields.message_fields]
    Environment = "dev"

[InfluxdbLineFilter]
type = "SandboxFilter"
message_matcher = "Type =~ /stats.*/"
filename = "lua_filters/influx_batch.lua"

    [InfluxdbLineFilter.config]
    skip_fields = "**all_base** FilePath NumProcesses Environment TickerInterval"
    tag_fields = "Hostname Environment"
    timestamp_precision= "s"
    flush_count = 10000

[PayloadEncoder]

[InfluxdbOutput]
type = "HttpOutput"
message_matcher = "Fields[payload_name] == 'influx_line'"
encoder = "PayloadEncoder"
address = "http://influxdbserver.example.com:8086/write?db=mydb&rp=mypolicy&precision=s"
username = "influx_username"
password = "influx_password"

Load Average Filter

Graphs the load average and process count data. Expects to receive messages containing fields entitled 1MinAvg, 5MinAvg, 15MinAvg, and NumProcesses, such as those generated by the Linux Load Average Decoder.

Config:

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • anomaly_config (string, optional)

    See Anomaly Detection Module.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the sec_per_row or rows configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[LoadAvgFilter]
type = "SandboxFilter"
filename = "lua_filters/loadavg.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Type == 'stats.loadavg'"

Memory Stats Filter

Graphs memory usage statistics. Expects to receive messages with memory usage data embedded in a specific set of message fields, which matches the messages generated by Linux Memory Stats Decoder: MemFree, Cached, Active, Inactive, VmallocUsed, Shmem, SwapCached.

Config:

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • anomaly_config (string, optional)

    See Anomaly Detection Module.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the sec_per_row or rows configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[MemoryStatsFilter]
type = "SandboxFilter"
filename = "lua_filters/memstats.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Type == 'stats.memstats'"

MySQL Slow Query

Graphs MySQL slow query data produced by the MySQL Slow Query Log Decoder.

Config:

  • sec_per_row (uint, optional, default 60)

    Sets the size of each bucket (resolution in seconds) in the sliding window.

  • rows (uint, optional, default 1440)

    Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.

  • anomaly_config (string, optional)

    See Anomaly Detection Module.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the sec_per_row or rows configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[Sync-1_5-SlowQueries]
type = "SandboxFilter"
message_matcher = "Logger == 'Sync-1_5-SlowQuery'"
ticker_interval = 60
filename = "lua_filters/mysql_slow_query.lua"

    [Sync-1_5-SlowQueries.config]
    anomaly_config = 'mww_nonparametric("Statistics", 5, 15, 10, 0.8)'
    preservation_version = 0

Stats Graph

Converts stat values extracted from statmetric messages (see Stat Accumulator Input) to circular buffer data and periodically emits messages containing this data to be graphed by a DashboardOutput. Note that this filter expects the stats data to be available in the message fields, so the StatAccumInput must be configured with emit_in_fields set to true for this filter to work correctly.

Config:

  • title (string, optional, default “Stats”):

    Title for the graph output generated by this filter.

  • rows (uint, optional, default 300):

    The number of rows to store in our circular buffer. Each row represents one time interval.

  • sec_per_row (uint, optional, default 1):

    The number of seconds in each circular buffer time interval.

  • stats (string):

    Space separated list of stat names. Each specified stat will be expected to be found in the fields of the received statmetric messages, and will be extracted and inserted into its own column in the accumulated circular buffer.

  • stat_labels (string):

    Space separated list of header label names to use for the extracted stats. Must be in the same order as the specified stats. Any label longer than 15 characters will be truncated.

  • anomaly_config (string, optional):

    Anomaly detection configuration, see Anomaly Detection Module.

  • preservation_version (uint, optional, default 0):

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time any edits are made to your rows, sec_per_row, stats, or stat_labels values, or else Heka will fail to start because the preserved data will no longer match the filter’s data structure.

  • stat_aggregation (string, optional, default “sum”):
    Controls how the column data is aggregated when combining multiple circular buffers.

    “sum” - The total is computed for the time/column (default). “min” - The smallest value is retained for the time/column. “max” - The largest value is retained for the time/column. “none” - No aggregation will be performed the column.

  • stat_unit (string, optional, default “count”):

    The unit of measure (maximum 7 characters). Alpha numeric, ‘/’, and ‘*’ characters are allowed everything else will be converted to underscores. i.e. KiB, Hz, m/s (default: count).

Example Heka Configuration

[stat-graph]
type = "SandboxFilter"
filename = "lua_filters/stat_graph.lua"
ticker_interval = 10
preserve_data = true
message_matcher = "Type == 'heka.statmetric'"

  [stat-graph.config]
  title = "Hits and Misses"
  rows = 1440
  stat_aggregation = "none"
  stat_unit = "count"
  sec_per_row = 10
  stats = "stats.counters.hits.count stats.counters.misses.count"
  stat_labels = "hits misses"
  anomaly_config = 'roc("Hits and Misses", 1, 15, 0, 1.5, true, false) roc("Hits and Misses", 2, 15, 0, 1.5, true, false)'
  preservation_version = 0

Unique Items

Counts the number of unique items per day e.g. active daily users by uid.

Config:

  • message_variable (string, required)

    The Heka message variable containing the item to be counted.

  • title (string, optional, default “Estimated Unique Daily message_variable”)

    The graph title for the cbuf output.

  • enable_delta (bool, optional, default false)

    Specifies whether or not this plugin should generate cbuf deltas. Deltas should be enabled when sharding is used; see: Circular Buffer Delta Aggregator.

  • preservation_version (uint, optional, default 0)

    If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the enable_delta configuration is changed to prevent the plugin from failing to start during data restoration.

Example Heka Configuration

[FxaActiveDailyUsers]
type = "SandboxFilter"
filename = "lua_filters/unique_items.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Logger == 'FxaAuth' && Type == 'request.summary' && Fields[path] == '/v1/certificate/sign' && Fields[errno] == 0"

    [FxaActiveDailyUsers.config]
    message_variable = "Fields[uid]"
    title = "Estimated Active Daily Users"
    preservation_version = 0

Sandbox Encoder

Plugin Name: SandboxEncoder

The SandboxEncoder provides an isolated execution environment for converting messages into binary data without the need to recompile Heka. See Sandbox.

Config:

Example

[custom_json_encoder]
type = "SandboxEncoder"
filename = "path/to/custom_json_encoder.lua"

    [custom_json_encoder.config]
    msg_fields = ["field1", "field2"]

Available Sandbox Encoders

Alert Encoder

Produces more human readable alert messages.

Config:

<none>

Example Heka Configuration

[FxaAlert]
type = "SmtpOutput"
message_matcher = "Type == 'heka.sandbox-output' && Fields[payload_type] == 'alert' && Logger =~ /^Fxa/" || Type == 'heka.sandbox-terminated' && Fields[plugin] =~ /^Fxa/"
send_from = "heka@example.com"
send_to = ["alert@example.com"]
auth = "Plain"
user = "test"
password = "testpw"
host = "localhost:25"
encoder = "AlertEncoder"

[AlertEncoder]
type = "SandboxEncoder"
filename = "lua_encoders/alert.lua"

Example Output

Timestamp:2014-05-14T14:20:18Z
Hostname:ip-10-226-204-51
Plugin:FxaBrowserIdHTTPStatus
Alert:HTTP Status - algorithm: roc col: 1 msg: detected anomaly, standard deviation exceeds 1.5

CBUF Librato Encoder

Extracts data from SandboxFilter circular buffer output messages and uses it to generate time series JSON structures that will be accepted by Librato’s POST API. It will keep track of the last time it’s seen a particular message, keyed by filter name and output name. The first time it sees a new message, it will send data from all of the rows except the last one, which is possibly incomplete. For subsequent messages, the encoder will automatically extract data from all of the rows that have elapsed since the last message was received.

The SandboxEncoder preserve_data setting should be set to true when using this encoder, or else the list of received messages will be lost whenever Heka is restarted, possibly causing the same data rows to be sent to Librato multiple times.

Config:

  • message_key (string, optional, default “%{Logger}:%{payload_name}”)

    String to use as the key to differentiate separate cbuf messages from each other. Supports message field interpolation.

Example Heka Configuration

[cbuf_librato_encoder]
type = "SandboxEncoder"
filename = "lua_encoders/cbuf_librato.lua"
preserve_data = true
  [cbuf_librato_encoder.config]
  message_key = "%{Logger}:%{Hostname}:%{payload_name}"

[librato]
type = "HttpOutput"
message_matcher = "Type == 'heka.sandbox-output && Fields[payload_type] == 'cbuf'"
encoder = "cbuf_librato_encoder"
address = "https://metrics-api.librato.com/v1/metrics"
username = "username@example.com"
password = "SECRET"
    [librato.headers]
    Content-Type = ["application/json"]

Example Output

{"gauges":[{"value":12,"measure_time":1410824950,"name":"HTTP_200","source":"thor"},{"value":1,"measure_time":1410824950,"name":"HTTP_300","source":"thor"},{"value":1,"measure_time":1410824950,"name":"HTTP_400","source":"thor"}]}

ESPayloadEncoder

Prepends ElasticSearch BulkAPI index JSON to a message payload.

Config:

  • index (string, optional, default “heka-%{%Y.%m.%d}”)

    String to use as the _index key’s value in the generated JSON. Supports field interpolation as described below.

  • type_name (string, optional, default “message”)

    String to use as the _type key’s value in the generated JSON. Supports field interpolation as described below.

  • id (string, optional)

    String to use as the _id key’s value in the generated JSON. Supports field interpolation as described below.

  • es_index_from_timestamp (boolean, optional)

    If true, then any time interpolation (often used to generate the ElasticSeach index) will use the timestamp from the processed message rather than the system time.

Field interpolation:

All of the string config settings listed above support message field interpolation.

Example Heka Configuration

[es_payload]
type = "SandboxEncoder"
filename = "lua_encoders/es_payload.lua"
    [es_payload.config]
    es_index_from_timestamp = true
    index = "%{Logger}-%{%Y.%m.%d}"
    type_name = "%{Type}-%{Hostname}"

[ElasticSearchOutput]
message_matcher = "Type == 'mytype'"
encoder = "es_payload"

Example Output

{"index":{"_index":"mylogger-2014.06.05","_type":"mytype-host.domain.com"}}
{"json":"data","extracted":"from","message":"payload"}

Schema Carbon Line Encoder

Converts full Heka message contents to line protocol for Carbon Plaintext API Iterates through all of the dynamic fields to add as points (series), skipping any fields explicitly omitted using the skip_fields config option. All dynamic fields in the Heka message are converted to separate points separated by newlines that are submitted to Carbon.

Config:

  • name_prefix (string, optional, default nil)

    String to use as the name key’s prefix value in the generated line. Supports message field interpolation. %{fieldname}. Any fieldname values of “Type”, “Payload”, “Hostname”, “Pid”, “Logger”, “Severity”, or “EnvVersion” will be extracted from the the base message schema, any other values will be assumed to refer to a dynamic message field. Only the first value of the first instance of a dynamic message field can be used for name name interpolation. If the dynamic field doesn’t exist, the uninterpolated value will be left in the name. Note that it is not possible to interpolate either the “Timestamp” or the “Uuid” message fields into the name, those values will be interpreted as referring to dynamic message fields.

  • name_prefix_delimiter (string, optional, default ”.”)

    String to use as the delimiter between the name_prefix and the field name. This defaults to a ”.” to use Graphite naming convention.

  • skip_fields (string, optional, default nil)

    Space delimited set of fields that should not be included in the Carbon records being generated. Any fieldname values of “Type”, “Payload”, “Hostname”, “Pid”, “Logger”, “Severity”, or “EnvVersion” will be assumed to refer to the corresponding field from the base message schema. Any other values will be assumed to refer to a dynamic message field. The magic value “all_base” can be used to exclude base fields from being mapped to the event altogether.

  • source_value_field (string, optional, default nil)

    If the desired behavior of this encoder is to extract one field from the Heka message and feed it as a single line to Carbon, then use this option to define which field to find the value from. Make sure to set the name_prefix value to use fields from the message with field interpolation so the full metric path in Graphite is populated. When this option is present, no other fields besides this one will be sent to Carbon whatsoever.

Example Heka Configuration

[LinuxStatsDecoder]
type = "MultiDecoder"
subs = ["LoadAvgDecoder", "AddStaticFields"]
cascade_strategy = "all"
log_sub_errors = false

[LoadAvgPoller]
type = "FilePollingInput"
ticker_interval = 5
file_path = "/proc/loadavg"
decoder = "LinuxStatsDecoder"

[LoadAvgDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_loadavg.lua"

[AddStaticFields]
type = "ScribbleDecoder"

    [AddStaticFields.message_fields]
    Environment = "dev"

[CarbonLineEncoder]
type = "SandboxEncoder"
filename = "lua_encoders/schema_carbon_line.lua"

    [CarbonLineEncoder.config]
    name_prefix = "%{Environment}.%{Hostname}.%{Type}"
    skip_fields = "**all_base** FilePath NumProcesses Environment TickerInterval"

[CarbonOutput]
type = "TcpOutput"
message_matcher = "Type =~ /stats.*/"
encoder = "CarbonLineEncoder"
address = "127.0.0.1:2003"

Example Output

dev.myhost.stats.loadavg.1MinAvg 0.12 1434932023
dev.myhost.stats.loadavg.15MinAvg 0.18 1434932023
dev.myhost.stats.loadavg.5MinAvg 0.11 1434932023

Schema InfluxDB Encoder

Converts full Heka message contents to JSON for InfluxDB HTTP API. Includes all standard message fields and iterates through all of the dynamically specified fields, skipping any bytes fields or any fields explicitly omitted using the skip_fields config option.

Note

This encoder is intended for use with InfluxDB versions prior to 0.9. If you’re working with InfluxDB v0.9 or greater, you’ll want to use the Schema InfluxDB Line Encoder instead.

Config:

  • series (string, optional, default “series”)

    String to use as the series key’s value in the generated JSON. Supports interpolation of field values from the processed message, using %{fieldname}. Any fieldname values of “Type”, “Payload”, “Hostname”, “Pid”, “Logger”, “Severity”, or “EnvVersion” will be extracted from the the base message schema, any other values will be assumed to refer to a dynamic message field. Only the first value of the first instance of a dynamic message field can be used for series name interpolation. If the dynamic field doesn’t exist, the uninterpolated value will be left in the series name. Note that it is not possible to interpolate either the “Timestamp” or the “Uuid” message fields into the series name, those values will be interpreted as referring to dynamic message fields.

  • skip_fields (string, optional, default “”)

    Space delimited set of fields that should not be included in the InfluxDB records being generated. Any fieldname values of “Type”, “Payload”, “Hostname”, “Pid”, “Logger”, “Severity”, or “EnvVersion” will be assumed to refer to the corresponding field from the base message schema. Any other values will be assumed to refer to a dynamic message field.

  • multi_series (boolean, optional, default false)

    Instead of submitting all fields to InfluxDB as attributes of a single series, submit a series for each field that sets a “value” attribute to the value of the field. This also sets the name attribute to the series value with the field name appended to it by a ”.”. This is the recommended by InfluxDB for v0.9 onwards as it is found to provide better performance when querying and aggregating across multiple series.

  • exclude_base_fields (boolean, optional, default false)

    Don’t send the base fields to InfluxDB. This saves storage space by not including base fields that are mostly redundant and unused data. If skip_fields includes base fields, this overrides it and will only be relevant for skipping dynamic fields.

Example Heka Configuration

[influxdb]
type = "SandboxEncoder"
filename = "lua_encoders/schema_influx.lua"
    [influxdb.config]
    series = "heka.%{Logger}"
    skip_fields = "Pid EnvVersion"

[InfluxOutput]
message_matcher = "Type == 'influxdb'"
encoder = "influxdb"
type = "HttpOutput"
address = "http://influxdbserver.example.com:8086/db/databasename/series"
username = "influx_username"
password = "influx_password"

Example Output

[{"points":[[1.409378221e+21,"log","test","systemName","TcpInput",5,1,"test"]],"name":"heka.MyLogger","columns":["Time","Type","Payload","Hostname","Logger","Severity","syslogfacility","programname"]}]

Schema InfluxDB Line Encoder

Converts full Heka message contents to line protocol for InfluxDB HTTP write API (new in InfluxDB v0.9.0). Optionally includes all standard message fields as tags or fields and iterates through all of the dynamic fields to add as points (series), skipping any fields explicitly omitted using the skip_fields config option. It can also map any Heka message fields as tags in the request sent to the InfluxDB write API, using the tag_fields config option. All dynamic fields in the Heka message are converted to separate points separated by newlines that are submitted to InfluxDB.

Note

This encoder is intended for use with InfluxDB versions 0.9 or greater. If you’re working with InfluxDB versions prior to 0.9, you’ll want to use the Schema InfluxDB Encoder instead.

Config:

  • decimal_precision (string, optional, default “6”)

    String that is used in the string.format function to define the number of digits printed after the decimal in number values. The string formatting of numbers is forced to print with floating points because InfluxDB will reject values that change from integers to floats and vice-versa. By forcing all numbers to floats, we ensure that InfluxDB will always accept our numerical values, regardless of the initial format.

  • name_prefix (string, optional, default nil)

    String to use as the name key’s prefix value in the generated line. Supports message field interpolation. %{fieldname}. Any fieldname values of “Type”, “Payload”, “Hostname”, “Pid”, “Logger”, “Severity”, or “EnvVersion” will be extracted from the the base message schema, any other values will be assumed to refer to a dynamic message field. Only the first value of the first instance of a dynamic message field can be used for name name interpolation. If the dynamic field doesn’t exist, the uninterpolated value will be left in the name. Note that it is not possible to interpolate either the “Timestamp” or the “Uuid” message fields into the name, those values will be interpreted as referring to dynamic message fields.

  • name_prefix_delimiter (string, optional, default nil)

    String to use as the delimiter between the name_prefix and the field name. This defaults to a blank string but can be anything else instead (such as ”.” to use Graphite-like naming).

  • skip_fields (string, optional, default nil)

    Space delimited set of fields that should not be included in the InfluxDB measurements being generated. Any fieldname values of “Type”, “Payload”, “Hostname”, “Pid”, “Logger”, “Severity”, or “EnvVersion” will be assumed to refer to the corresponding field from the base message schema. Any other values will be assumed to refer to a dynamic message field. The magic value “all_base” can be used to exclude base fields from being mapped to the event altogether (useful if you don’t want to use tags and embed them in the name_prefix instead).

  • source_value_field (string, optional, default nil)

    If the desired behavior of this encoder is to extract one field from the Heka message and feed it as a single line to InfluxDB, then use this option to define which field to find the value from. Be careful to set the name_prefix field if this option is present or no measurement name will be present when trying to send to InfluxDB. When this option is present, no other fields besides this one will be sent to InfluxDB as a measurement whatsoever.

  • tag_fields (string, optional, default “all_base”)

    Take fields defined and add them as tags of the measurement(s) sent to InfluxDB for the message. The magic values “all” and “all_base” are used to map all fields (including taggable base fields) to tags and only base fields to tags, respectively. If those magic values aren’t used, then only those fields defined will map to tags of the measurement sent to InfluxDB. The tag_fields values are independent of the skip_fields values and have no affect on each other. You can skip fields from being sent to InfluxDB as measurements, but still include them as tags.

  • timestamp_precision (string, optional, default “ms”)

    Specify the timestamp precision that you want the event sent with. The default is to use milliseconds by dividing the Heka message timestamp by 1e6, but this math can be altered by specifying one of the precision values supported by the InfluxDB write API (ms, s, m, h). Other precisions supported by InfluxDB of n and u are not yet supported.

  • value_field_key (string, optional, default “value”)

    This defines the name of the InfluxDB measurement. We default this to “value” to match the examples in the InfluxDB documentation, but you can replace that with anything else that you prefer.

Example Heka Configuration

[LoadAvgPoller]
type = "FilePollingInput"
ticker_interval = 5
file_path = "/proc/loadavg"
decoder = "LinuxStatsDecoder"

[LoadAvgDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/linux_loadavg.lua"

[LinuxStatsDecoder]
type = "MultiDecoder"
subs = ["LoadAvgDecoder", "AddStaticFields"]
cascade_strategy = "all"
log_sub_errors = false

[AddStaticFields]
type = "ScribbleDecoder"

    [AddStaticFields.message_fields]
    Environment = "dev"

[InfluxdbLineEncoder]
type = "SandboxEncoder"
filename = "lua_encoders/schema_influx_line.lua"

    [InfluxdbLineEncoder.config]
    skip_fields = "**all_base** FilePath NumProcesses Environment TickerInterval"
    tag_fields = "Hostname Environment"
    timestamp_precision= "s"

[InfluxdbOutput]
type = "HttpOutput"
message_matcher = "Type =~ /stats.*/"
encoder = "InfluxdbLineEncoder"
address = "http://influxdbserver.example.com:8086/write?db=mydb&rp=mypolicy&precision=s"
username = "influx_username"
password = "influx_password"

Example Output

5MinAvg,Hostname=myhost,Environment=dev value=0.110000 1434932024
1MinAvg,Hostname=myhost,Environment=dev value=0.110000 1434932024
15MinAvg,Hostname=myhost,Environment=dev value=0.170000 1434932024

Statmetric Influx Encoder

Extracts data from message fields in heka.statmetric messages generated by a Stat Accumulator Input and generates JSON suitable for use with InfluxDB’s HTTP API. StatAccumInput must be configured with emit_in_fields = true for this encoder to work correctly.

Config:

<none>

Example Heka Configuration

[statmetric-influx-encoder]
type = "SandboxEncoder"
filename = "lua_encoders/statmetric_influx.lua"

[influx]
type = "HttpOutput"
message_matcher = "Type == 'heka.statmetric'"
address = "http://myinfluxserver.example.com:8086/db/stats/series"
encoder = "statmetric-influx-encoder"
username = "influx_username"
password = "influx_password"

Example Output

[{"points":[[1408404848,78271]],"name":"stats.counters.000000.rate","columns":["time","value"]},{"points":[[1408404848,78271]],"name":"stats.counters.000000.count","columns":["time","value"]},{"points":[[1408404848,17420]],"name":"stats.timers.000001.count","columns":["time","value"]},{"points":[[1408404848,17420]],"name":"stats.timers.000001.count_ps","columns":["time","value"]},{"points":[[1408404848,1]],"name":"stats.timers.000001.lower","columns":["time","value"]},{"points":[[1408404848,1024]],"name":"stats.timers.000001.upper","columns":["time","value"]},{"points":[[1408404848,8937851]],"name":"stats.timers.000001.sum","columns":["time","value"]},{"points":[[1408404848,513.07985074627]],"name":"stats.timers.000001.mean","columns":["time","value"]},{"points":[[1408404848,461.72356167879]],"name":"stats.timers.000001.mean_90","columns":["time","value"]},{"points":[[1408404848,925]],"name":"stats.timers.000001.upper_90","columns":["time","value"]},{"points":[[1408404848,2]],"name":"stats.statsd.numStats","columns":["time","value"]}]

Sandbox Output

New in version 0.9.

Plugin Name: SandboxOutput

The SandboxOutput provides a flexible execution environment for data encoding and transmission without the need to recompile Heka. See Sandbox.

Config:

  • The common output configuration parameter ‘encoder’ is ignored since all data transformation should happen in the plugin.

  • Common Sandbox Parameters

  • timer_event_on_shutdown (bool):

    True if the sandbox should have its timer_event function called on shutdown.

Example

[SandboxFileOutput]
type = "SandboxOutput"
filename = "fileoutput.lua"

[SandboxFileOutput.config]
path = "mylog.txt"

Available Sandbox Outputs

  • none

Externally Available Sandbox Outputs

Available Sandbox Modules

Alert Module

API

Stores the last alert time in the global _LAST_ALERT so alert throttling will persist between restarts.

queue(ns, msg)

Queue an alert message to be sent.

Arguments
  • ns (int64) current time in nanoseconds since the UNIX epoch.
  • msg (string) alert payload.
Return
  • true if the message is queued, false if it would be throttled.
send(ns, msg)

Send an alert message.

Arguments
  • ns (int64) current time in nanoseconds since the UNIX epoch.
  • msg (string) alert payload.
Return
  • true if the message is sent, false if it is throttled.
send_queue(ns)

Sends all queued alert message as a single message.

Arguments
  • ns (int64) current time in nanoseconds since the UNIX epoch.
Return
  • true if the queued messages are sent, false if they are throttled.
set_throttle(ns_duration)

Sets the minimum duration between alert event outputs.

Arguments
  • ns_duration (int64) minimum duration in nanoseconds between alerts.
Return
  • none
throttled(ns)

Test to see if sending an alert at this time would be throttled.

Arguments
  • ns (int64) current time in nanoseconds since the UNIX epoch.
Return
  • true if a message would be throttled, false if it would be sent.

Note

Use a zero timestamp to override message throttling.

Annotation Module

API

add(name, ns, col, stext, text)

Create an annotation in the global _ANNOTATIONS table.

Arguments
  • name (string) circular buffer payload name.
  • ns (int64) current time in nanoseconds since the UNIX epoch.
  • col (uint) circular buffer column to annotate.
  • stext (string) short text to display on the graph.
  • text (string) long text to display in the rollover.
Return
  • none
create(ns, col, stext, text)

Helper function to create an annotation table but not add it to the global list of annotations.

Arguments
  • ns (int64) current time in nanoseconds since the UNIX epoch.
  • col (uint) circular buffer column to annotate.
  • stext (string) short text to display on the graph.
  • text (string) long text to display in the rollover.
Return
  • annotation table
concat(name, annotations)

Concatenates an array of annotation tables to the specified key in the global _ANNOTATIONS table.

Arguments
  • name (string) circular buffer payload name.
  • annotations (array) annotation tables.
Return
  • none

prune(name, ns)

Arguments
  • name (string) circular buffer payload name.
  • ns (int64) current time in nanoseconds since the UNIX epoch.
Return
  • The json encoded list of annotations.
remove(name)

Entirely remove the payload name from the global _ANNOTATIONS table.

Arguments
  • name (string) circular buffer payload name.
Return
  • none

set_prune(name, ns_duration)

Arguments
  • name (string) circular buffer payload name.
  • ns_duration (int64) time in nanoseconds the annotation should remain in the list.
Return
  • none

Anomaly Detection Module

API

parse_config(anomaly_config)

Parses the anomaly_config into a Lua table. If the configuration is invalid an error is thrown.

Arguments
  • anomaly_config (string or nil)

The configuration can specify any number of algorithm function calls (space delimited if desired, but they will also work back to back with no delimiter). This allows for analysis of multiple graphs, columns, and even specification of multiple algorithms per column.

Rate of change test

Only use this test on data with a normal (Gaussian http://en.wikipedia.org/wiki/Normal_distribution) distribution. It identifies rapid changes (spikes) in the data (increasing and decreasing) but ignores cyclic data that has a more gradual rise and fall. It is typically used for something like HTTP 200 status code analysis to detect a sudden increase/decrease in web traffic.

roc(“payload_name”, col, win, hwin, sd, loss_of_data, start_of_data)
  • payload_name (string)

    Quoted string containing the payload_name value used in the inject_payload function call. If the payload name contains a double quote it should be escaped as two double quotes in a row.

  • col (uint)

    The circular buffer column to perform the analysis on.

  • win (uint)

    The number of intervals in an analysis window.

  • hwin (uint)

    The number of intervals in the historical analysis window (0 uses the full history). Must be greater than or equal to ‘win’.

  • sd (double)

    The standard deviation threshold to trigger the anomaly.

  • loss_of_data (bool)

    Alert if data stops.

  • start_of_data (bool)

    Alert if data starts.

e.g. roc(“Output1”, 1, 15, 0, 2, true, false)

Mann-Whitney-Wilcoxon test http://en.wikipedia.org/wiki/Mann-Whitney

Parametric

Only use this test on data with a normal (Gaussian http://en.wikipedia.org/wiki/Normal_distribution) distribution. It identifies more gradual changes in the data (increasing, decreasing, or any). It is typically used with something like server memory analysis where the values are more stable and gradual changes are interesting (e.g., memory leak).

mww(“payload_name”, col, win, nwin, pvalue, trend)
  • payload_name (string)

    Quoted string containing the payload_name value used in the inject_payload function call. If the payload name contains a double quote it should be escaped as two double quotes in a row.

  • col (uint)

    The circular buffer column to perform the analysis on.

  • win (uint)

    The number of intervals in an analysis window (should be at least 20).

  • nwin (uint)

    The number of analysis windows to compare.

  • pvalue (double)

    The pvalue threshold to trigger the prediction. http://en.wikipedia.org/wiki/P_value

  • trend (string)

    (decreasing|increasing|any)

e.g. mww(“Output1”, 2, 60, 10, 0.0001, decreasing)

Non-parametric

This test can be used on data with a normal (Gaussian http://en.wikipedia.org/wiki/Normal_distribution) or non-normal (nonparametric http://en.wikipedia.org/wiki/Nonparametric_statistics) distribution. It identifies overlap/similarities between two data sets. It is typically used for something like detecting an increase in HTTP 500 status code errors.

mww_nonparametric(“payload_name”, col, win, nwin, pstat)
  • payload_name (string)

    Quoted string containing the payload_name value used in the inject_payload function call. If the payload name contains a double quote it should be escaped as two double quotes in a row.

  • col (uint)

    The circular buffer column to perform the analysis on.

  • win (uint)

    The number of intervals in an analysis window.

  • nwin (uint)

    The number of analysis windows to compare.

  • pstat (double)

    Value between 0 and 1. Anything above 0.5 is an increasing trend anything below 0.5 is a decreasing trend. http://en.wikipedia.org/wiki/Mann-Whitney#.CF.81_statistic

e.g. mww_nonparametric(“Output1”, 2, 15, 10, 0.55)

Return
Configuration table if parsing was successful or nil, if nil was passed in.
detect(ns, name, cbuf, anomaly_config)

Detects anomalies in the circular buffer data returning any error messages for alert generation and array of annotations for the graph.

Arguments
  • ns (int64) current time in nanoseconds since the UNIX epoch. It used to advance the circular buffer if necessary (i.e., if no data is being received). The anomaly detection is always performed on the newest data (ignoring the current interval since it is incomplete).
  • name (string) circular buffer payload name
  • cbuf (userdata) circular buffer
  • anomaly_config (table) returned from the parse() method
Return
  • string if an anomaly was detected, otherwise nil.
  • array of annotation tables

Message Interpolation Module

New in version 0.9.

API

interpolate_from_msg(value, secs)

Interpolates values from the currently processed message into the provided string value. A %{} enclosed field name will be replaced by the field value from the current message. Supported default field names are “Type”, “Hostname”, “Pid”, “UUID”, “Logger”, “EnvVersion”, and “Severity”. Any other values will be checked against the defined dynamic message fields. If no field matches, then a C strftime (on non-Windows platforms) or C89 strftime (on Windows) time substitution will be attempted. The time used for time substitution will be the seconds-from-epoch timestamp passed in as the secs argument, if provided. If secs is nil, local system time is used. Note that the message timestamp is not automatically used; if you want to use the message timestamp for time substitutions, then you need to manually extract it and convert it from nanoseconds to seconds (i.e. divide by 1e9).

Arguments
  • value (string)

    String into which message values should be interpolated.

  • secs (number or nil)

    Timestamp (in seconds since epoch) to use for time substitutions. If nil, system time will be used.

Return
  • Original string value with any interpolated message values.

ElasticSearch Module

API

bulkapi_index_json(index, type_name, id, ns)

Returns a simple JSON ‘index’ structure satisfying the ElasticSearch BulkAPI

Arguments
  • index (string or nil)

    String to use as the _index key’s value in the generated JSON, or nil to omit the key. Supports field interpolation as described below.

  • type_name (string or nil)

    String to use as the _type key’s value in the generated JSON, or nil to omit the key. Supports field interpolation as described below.

  • id (string or nil)

    String to use as the _id key’ value in the generated JSON, or nil to omit the key. Supports field interpolation as described below.

  • ns (number or nil)

    Nanosecond timestamp to use for any strftime field interpolation into the above fields. Current system time will be used if nil.

Field interpolation

All of the string arguments listed above support message field interpolation.

Return
  • JSON string suitable for use as ElasticSearch BulkAPI index directive.

Field Utilities Module

Module contains utility functions for setting up fields for various purposes.

API

field_map(fields_str or nil)

Returns a table of fields that match the space delimited input string of fields. This can be used to provide input to other functions such as a list of fields to skip or use for tags.

Arguments
  • fields_str (string or nil)

    Space delimited list of fields. If this is empty or nil, all base fields will be returned.

Return
Table with the fields found in the space delimited input string, boolean indicating all base fields are to be used, boolean indicating all fields are to be used.
message_timestamp(timestamp_precision)

Returns the timestamp value after dividing it by a constant after mapping it from a precision value to convert it from the heka default precision of ns to a lower precision to work better with output endpoints.

Arguments
  • timestamp_precision (string or nil)

    String that can have a value of “ms”, “s”, “m” or “h”.

Return
The timestamp value after converting it from ns to the indicated timestamp_precision.
used_base_fields(skip_fields)

Returns a table of base fields that are not found in the input table. This is useful to provide a lookup table that is used to decide whether or not a field should be included in an output by performing a simple lookup against it.

Arguments
  • skip_fields (table)

    Table of fields to be skipped from use.

Return A table of base fields that are not found in the input table.

Graphite Module

New in version 0.10.

Module contains various utility functions for metrics stored Graphite.

Currently module focuses on generators of metrics which can be later passed to Graphite.

API

count_rate(bucket, count, ticker_interval, now_sec)

Generates string with count and rate metric for graphite.

Arguments
  • bucket - node name in which metric will be stored.
  • count - value of count.
  • ticker_interval - base interval for calculation of rate.
  • now_sec - timestamp (float) for metric.
Return
String with count and rate metric for stats.counters.<bucket>.count and stats.counters.<bucket>.rate bucket with time given in <now_sec>.
multi_counts_rates(counts, ticker_interval, now_sec)

Generates a multiline graphite count metric with their rates.

Arguments
  • counts - table, indices will be mapped to buckets and values to their

    specific counts e.g {‘bucket1’: 1, ‘bucket2’:2}

  • ticker_interval - base interval for calculation of rate.

  • now_sec - timestamp (float) for metric.

Return
String with multiple counts and rates returned via return_count function.
timeseries_metrics(bucket, times, ticker_interval, percent_thresh, now_sec)

Generates string with metrics for given timeseries data to pass it to graphite.

Arguments
  • bucket - node name in which metric will be stored.
  • times - a table with times (float values).
  • ticker_interval - base interval for calculation of rate.
  • percent_treshold - base treshould for percentiles.
  • now_sec - timestamp (float) for metric.
Return
Returns multiline graphite string with following metrics:
  • stats.timers.<bucket>.count
  • stats.timers.<bucket>.rate
  • stats.timers.<bucket>.min
  • stats.timers.<bucket>.max
  • stats.timers.<bucket>.mean
  • stats.timers.<bucket>.mean_percentile
  • stats.timers.<bucket>.upper_percentile
multi_timeseries_metrics(timers, ticker_interval, percent_thresh, now_sec)

Returns multline string with stats calculated for timeseries in their respective buckets

Arguments
  • timers - tables with bucket names and tables of times inside e.g {‘bucket’: [1,2,3,4]}
  • ticker_interval - base interval for calculation of rate.
  • percent_treshold - base treshould for percentiles.
  • now_sec - timestamp (float) for metric.
Return
String with corresponding metric series and their respective buckets defined in timers table. Metrics are the same as for the timeseries_metrics_function.
function ns_to_sec(ns)

Converts nanoseconds into seconds.

Arguments
  • ns - nanoseconds
Return
Seconds in float value.

Time Series Line Protocol Module

Provides functions to convert full Heka message contents to line protocol for InfluxDB HTTP write API (new in InfluxDB v0.9.0) or Graphite/Carbon and any other time series data store that this functionality makes sense for. Optionally includes all standard message fields as tags or fields and iterates through all of the dynamic fields to add as measurements, skipping any fields explicitly omitted using the skip_fields config option. It can also map any Heka message fields as tags in the request sent to the InfluxDB write API, using the tag_fields config option. All dynamic fields in the Heka message are converted to separate points separated by newlines that are submitted to InfluxDB, unless source_value_field is defined.

API

carbon_line_msg(config)

Wrapper function that calls others within this module and the field_util module to generate a table of Carbon line protocol messages that are derived from the dynamic fields in a Heka message. Base fields or dynamic fields can be used in the metric name portion of the message. Configuration is implemented in the encoders that utilize this module.

Arguments
  • config (table or nil)

    Table of config option overrides that come from the client of this API. Defaults for this module are defined within the set_config function and clients implementing this API can reference it for available options.

Return
A table of Carbon line protocol messages ready to be sent to a Carbon server after being looped through in an encoder implementing this API.
influxdb_line_msg(config)

Wrapper function that calls others within this module and the field_util module to generate a table of InfluxDB line protocol messages that are derived from the base or dynamic fields in a Heka message. Base fields or dynamic fields can be used in the metric name portion of the message and included as tags if desired. Configuration is implemented in the encoders that utilize this module.

Arguments
  • config (table or nil)

    Table of config option overrides that come from the client of this API. Defaults for this module are defined within the set_config function and clients implementing this API can reference it for available options.

Return
A table of InfluxDB line protocol messages ready to be sent to an InfluxDB server after being looped through in an encoder implementing this API.
set_config(client_config)

This function takes a table of configuration options as input that can override the defaults that are set within it. The table is then used to compare with the default module_config, updates that table and then returns it to the client calling this API. Calls are then made out to public functions exposed by the field_util module to populate the tables and variables related to base fields, tagging fields and skipping fields kept within the module_config table. Clients utilizing this API must call this function first to set the configuration for subsequent calls to public functions that it exposes.

Arguments
  • client_config (table or nil)

    Table of configuration option overrides with the keys being the option names and the values being the option values.

Return
A table of configuration options that can then be passed to other public functions that this API exposes when calling them.

Lua Parsing Expression Grammars (LPeg)

Best practices (using Lpeg in the sandbox)

  1. Read the LPeg reference
  2. The ‘re’ module is now available in the sandbox but the best practice is to use the LPeg syntax whenever possible (i.e., in Lua code). Why?
    • Consistency and readability of a single syntax.
    • Promotes more modular grammars.
    • Is easier to comment.
  3. Do not use parentheses around function calls that take a single string argument.
-- prefer
lpeg.P"Literal"

-- instead of
lpeg.P("Literal")
  1. When writing sub-grammars with an ordered choice (+) place each choice on its own line; this make it easier to pick out the alternates. Also, if possible order them from most frequent to least frequent use.
local date_month = lpeg.P"0" * lpeg.R"19"
                   + "1" * lpeg.R"02"

-- The exception: when grouping alternates together in a higher level grammar.

local log_grammar = (rfc3339 + iso8601) * log_severity * log_message
  1. Use the locale patterns when matching standard character classes.
-- prefer
lpeg.digit

-- instead of
lpeg.R"09".
  1. If a literal occurs within an expression avoid wrapping it in a function.
-- prefer
lpeg.digit * "Test"

-- instead of
lpeg.digit * lpeg.P"Test"
  1. When creating a parser from an RFC standard mirror the ABNF grammar that is provided.
  2. If creating a grammar that would also be useful to others, please consider contributing it back to the project, thanks.
  3. Use the grammar tester http://lpeg.trink.com.

Sandbox Manager Filter

Plugin Name: SandboxManagerFilter

The SandboxManagerFilter provides dynamic control (start/stop) of sandbox filters in a secure manner without stopping the Heka daemon. Commands are sent to a SandboxManagerFilter using a signed Heka message. The intent is to have one manager per access control group each with their own message signing key. Users in each group can submit a signed control message to manage any filters running under the associated manager. A signed message is not an enforced requirement but it is highly recommended in order to restrict access to this functionality.

SandboxManagerFilter Settings

  • Common Filter Parameters

  • working_directory (string):

    The directory where the filter configurations, code, and states are preserved. The directory can be unique or shared between sandbox managers since the filter names are unique per manager. Defaults to a directory in ${BASE_DIR}/sbxmgrs with a name generated from the plugin name.

  • module_directory (string):

    The directory where ‘require’ will attempt to load the external Lua modules from. Defaults to ${SHARE_DIR}/lua_modules.

  • max_filters (uint):

    The maximum number of filters this manager can run.

New in version 0.5.

  • memory_limit (uint):

    The number of bytes managed sandboxes are allowed to consume before being terminated (default 8MiB).

  • instruction_limit (uint):

    The number of instructions managed sandboxes are allowed to execute during the process_message/timer_event functions before being terminated (default 1M).

  • output_limit (uint):

    The number of bytes managed sandbox output buffers can hold before being terminated (default 63KiB). Warning: messages exceeding 64KiB will generate an error and be discarded by the standard output plugins (File, TCP, UDP) since they exceed the maximum message size.

Example

[OpsSandboxManager]
type = "SandboxManagerFilter"
message_signer = "ops"
# message_matcher = "Type == 'heka.control.sandbox'" # automatic default setting
max_filters = 100

Control Message

The sandbox manager control message is a regular Heka message with the following variables set to the specified values.

Starting a SandboxFilter

  • Type: “heka.control.sandbox”
  • Payload: sandbox code
  • Fields[action]: “load”
  • Fields[config]: the TOML configuration for the Sandbox Filter

Stopping a SandboxFilter

  • Type: “heka.control.sandbox”
  • Fields[action]: “unload”
  • Fields[name]: The SandboxFilter name specified in the configuration

heka-sbmgr

Heka Sbmgr is a tool for managing (starting/stopping) sandbox filters by generating the control messages defined above.

Command Line Options

heka-sbmgr [-config config_file] [-action load|unload] [-filtername specified on unload] [-script sandbox script filename] [-scriptconfig sandbox script configuration filename]

Configuration Variables

  • ip_address (string): IP address of the Heka server.

  • use_tls (bool): Specifies whether or not SSL/TLS encryption should be used for the TCP connections. Defaults to false.

  • signer (object): Signer information for the encoder.
    • name (string): The name of the signer.
    • hmac_hash (string): md5 or sha1
    • hmac_key (string): The key the message will be signed with.
    • version (int): The version number of the hmac_key.
  • tls (TlsConfig): A sub-section that specifies the settings to be used for any SSL/TLS encryption. This will only have any impact if use_tls is set to true. See Configuring TLS.

Example

ip_address       = "127.0.0.1:5565"
use_tls          = true
[signer]
    name         = "test"
    hmac_hash    = "md5"
    hmac_key     = "4865ey9urgkidls xtb0[7lf9rzcivthkm"
    version      = 0

[tls]
    cert_file = "heka.crt"
    key_file = "heka.key"
    client_auth = "NoClientCert"
    prefer_server_ciphers = true
    min_version = "TLS11"

heka-sbmgrload

Heka Sbmgrload is a test tool for starting/stopping a large number of sandboxes. The script and configuration are built into the tool and the filters will be named: CounterSandboxN where N is the instance number.

Command Line Options

heka-sbmgrload [-config config_file] [-action load|unload] [-num number of sandbox instances]

Configuration Variables (same as heka-sbmgr)

Tutorial - How to use the dynamic sandboxes

SandboxManager/Heka Setup

  1. Configure the SandboxManagerFilter.

The SandboxManagerFilters are defined in the hekad configuration file and are created when hekad starts. The manager provides a location/namespace for SandboxFilters to run and controls access to this space via a signed Heka message. By associating a message_signer with the manager we can restrict who can load and unload the associated filters. Lets start by configuring a SandboxManager for a specific set of users; platform developers. Choose a unique filter name [PlatformDevs] and a signer name “PlatformDevs”, in this case we will use the same name for each.

[PlatformDevs]
type = "SandboxManagerFilter"
message_signer = "PlatformDevs"
working_directory = "/var/heka/sandbox"
max_filters = 100
  1. Configure the input that will receive the SandboxManager control messages.

For this setup we will extend the current TCP input to handle our signed messages. The signer section consists of the signer name followed by an underscore and the key version number (the reason for this notation is to simply flatten the signer configuration structure into a single map). Multiple key versions are allowed to be active at the same time facilitating the rollout of new keys.

[TCP:5565]
type = "TcpInput"
address = ":5565"
    [TCP:5565.signer.PlatformDevs_0]
    hmac_key = "Old Platform devs signing key"
    [TCP:5565.signer.PlatformDevs_1]
    hmac_key = "Platform devs signing key"

3. Configure the sandbox manager utility (sbmgr). The signer information must exactly match the values in the input configuration above otherwise the messages will be discarded. Save the file as PlatformDevs.toml.

ip_address       = ":5565"
[signer]
    name         = "PlatformDevs"
    hmac_hash    = "md5"
    hmac_key     = "Platform devs signing key"
    version      = 1

SandboxFilter Setup

  1. Create a SandboxFilter script and save it as “example.lua”. See Lua Sandbox Tutorial for more detail.
require "circular_buffer"

data = circular_buffer.new(1440, 1, 60) -- message count per minute
local COUNT = data:set_header(1, "Messages", "count")
function process_message ()
    local ts = read_message("Timestamp")
    data:add(ts, COUNT, 1)
    return 0
end

function timer_event(ns)
    inject_payload("cbuf", "", data)
end
  1. Create the SandboxFilter configuration and save it as “example.toml”.

The only difference between a static and dynamic SandboxFilter configuration is the filename. In the dynamic configuration it can be left blank or left out entirely. The manager will assign the filter a unique system wide name, in this case, “PlatformDevs-Example”.

[Example]
type = "SandboxFilter"
message_matcher = "Type == 'Widget'"
ticker_interval = 60
filename = ""
preserve_data = false
  1. Load the filter using sbmgr.
sbmgr -action=load -config=PlatformDevs.toml -script=example.lua -scriptconfig=example.toml

If you are running the Dashboard Output the following links are available:

Otherwise

Note

A running filter cannot be ‘reloaded’ it must be unloaded and loaded again. During the unload/load process some data can be missed and gaps will be created. In the future we hope to remedy this but for now it is a limitation of the dynamic sandbox.

  1. Unload the filter using sbmgr.
sbmgr -action=unload -config=PlatformDevs.toml -filtername=Example

Sandbox Development

Decoders

Since decoders cannot be dynamically loaded and they stop Heka processing on fatal errors they must be developed outside of a production enviroment. Most Lua decoders are LPeg based as it is the best way to parse and transform data within the sandbox. The other alternatives are the built-in Lua pattern matcher or the JSON parser with a manual transformation.

  1. Procure some sample data to be used as test input.

    timestamp=time_t key1=data1 key2=data2
    
  2. Configure a simple LogstreamerInput to deliver the data to your decoder.

    [LogstreamerInput]
    log_directory = "."
    file_match = 'data\.log'
    decoder = "SandboxDecoder"
    
  3. Configure your test decoder.

    [SandboxDecoder]
    filename = "decoder.lua"
    
  4. Configure the DasboardOutput for visibility into the decoder (performance, memory usage, messages processed/failed, etc.)

    [DashboardOutput]
    address = "127.0.0.1:4352"
    ticker_interval = 10
    working_directory = "dashboard"
    static_directory = "/usr/share/heka/dasher"
    
  5. Configure a LogOutput to display the generated messages.

    [LogOutput]
    message_matcher = "TRUE"
    
  6. Build the decoder.

    The decoder will receive a message from an input plugin. The input may have set some additional message headers but the ‘Payload’ header contains the data for the decoder. The decoder can access the payload using read_message(“Payload”). The payload can be used to construct an entirely new message, multiple messages or modify any part of the existing message (see inject_message, write_message in the Lua Sandbox API). Message headers not modified by the decoder are left intact and in the case of multiple message injections the initial message header values are duplicated for each message.

    1. LPeg grammar.

      Incrementally build and test your grammar using http://lpeg.trink.com.

    2. Lua pattern matcher.

      Test match expressions using http://www.lua.org/cgi-bin/demo.

    3. JSON parser.

      For data transformation use the LPeg/Lua matcher links above. Something like simple field remapping i.e. msg.Hostname = json.host can be verified in the LogOutput.

  7. Run Heka with the test configuration.

  8. Inspect/verify the messages written by LogOutput.

Filters

Since filters can be dynamically loaded it is recommended you develop them in production with live data.

  1. Read Tutorial - How to use the dynamic sandboxes

OR

  1. If you are developing the filter in conjunction with the decoder you can add it to the test configuration.

    [SandboxFilter]
    filename = "filter.lua"
    
  2. Debugging

    1. Watch for a dashboard sandbox termination report. The termination message provides the line number and cause of the failure. These are usually straight forward to correct and commonly caused by a syntax error in the script or invalid assumptions about the data (e.g. cnt = cnt + read_message(“Fields[counter]”) will fail if the counter field doesn’t exist or is non-numeric due to a error in the data).

    2. No termination report and the output does not match expectations. These are usually a little harder to debug.

      1. Check the Heka dasboard to make sure the router is sending messages to the plugin. If not, verify your message_matcher configuration.
      2. Visually review the the plugin for errors. Are the message field names correct, was the result of the cjson.decode tested, are the output variables actually being assigned to and output/injected, etc.
      3. Add a debug output message with the pertinent information.
      require "string"
      require "table"
      local dbg = {}
      
      -- table.insert(dbg, string.format("Entering function x arg1: %s", arg1))
      -- table.insert(dbg, "Exiting function x")
      
      inject_payload("txt", "debug", table.concat(dbg, "\n"))
      
      1. LAST RESORT: Move the filter out of production, turn on preservation, run the tests, stop Heka, and review the entire preserved state of the filter.

Lua Sandbox Cookbooks