mozilla

Sandbox

Sandboxes are Heka plugins that are implemented in a sandboxed scripting language. They provide a dynamic and isolated execution environment for data parsing, transformation, and analysis. They allow real time access to data in production without jeopardizing the integrity or performance of the monitoring infrastructure and do not require Heka to be recompiled. This broadens the audience that the data can be exposed to and facilitates new uses of the data (i.e. debugging, monitoring, dynamic provisioning, SLA analysis, intrusion detection, ad-hoc reporting, etc.)

Features

  • dynamic loading
    • SandboxFilters can be started/stopped on a self-service basis while Heka is running
    • SandboxDecoder can only be started/stopped on a Heka restart but no recompilation is required to add new functionality.
  • small - memory requirements are about 16 KiB for a basic sandbox

  • fast - microsecond execution times

  • stateful - ability to resume where it left off after a restart/reboot

  • isolated - failures are contained and malfunctioning sandboxes are terminated

Sandbox Decoder

The sandbox decoder provides an isolated execution environment for data parsing and complex transformations without the need to recompile Heka.

SandboxDecoder Settings

  • script_type (string):

    The language the sandbox is written in. Currently the only valid option is ‘lua’.

  • filename (string):

    The path to the sandbox code; if specified as a relative path it will be appended to Heka’s global base_dir.

  • memory_limit (uint):

    The number of bytes the sandbox is allowed to consume before being terminated (max 8MiB, default max).

  • instruction_limit (uint):

    The number of instructions the sandbox is allowed the execute during the process_message function before being terminated (max 1M, default max).

  • output_limit (uint):

    The number of bytes the sandbox output buffer can hold before before being terminated (max 63KiB, default max). Anything less than 1KiB will default to 1KiB.

  • config (object):

    A map of configuration variables available to the sandbox via read_config. The map consists of a string key with: string, bool, int64, or float64 values.

Example

[sql_decoder]
type = "SandboxDecoder"
script_type = "lua"
filename = "sql_decoder.lua"

Sandbox Filter

The sandbox filter provides an isolated execution environment for data analysis. The output generated by the sandbox is injected into the payload of a new message for further processing or to be output.

SandboxFilter Settings

  • Common Filter / Output Parameters

  • script_type (string):

    The language the sandbox is written in. Currently the only valid option is ‘lua’.

  • filename (string):

    For a static configuration this is the path to the sandbox code; if specified as a relative path it will be appended to Heka’s global base_dir. The filename must be unique between static plugins, since the global data is preserved using this name. For a dynamic configuration the filename is ignored and the the physical location on disk is controlled by the SandboxManagerFilter.

  • preserve_data (bool):

    True if the sandbox global data should be preserved/restored on Heka shutdown/startup. The preserved data is stored along side the sandbox code i.e. counter.lua.data so Heka must have read/write permissions to that directory.

  • memory_limit (uint):

    The number of bytes the sandbox is allowed to consume before being terminated (max 8MiB, default 32767).

  • instruction_limit (uint):

    The number of instructions the sandbox is allowed the execute during the process_message/timer_event functions before being terminated (max 1M, default 1000).

  • output_limit (uint):

    The number of bytes the sandbox output buffer can hold before before being terminated (max 63KiB, default 1024). Anything less than 1KiB will default to 1KiB.

  • profile (bool):

    When true a statistically significant number of ProcessMessage timings are immediately captured before reverting back to the regular sampling interval. The main purpose is for more accurate sandbox comparison/tuning/optimization.

  • config (object):

    A map of configuration variables available to the sandbox via read_config. The map consists of a string key with: string, bool, int64, or float64 values.

Example

[hekabench_counter]
type = "SandboxFilter"
message_matcher = "Type == 'hekabench'"
ticker_interval = 1
script_type  = "lua"
filename = "counter.lua"
preserve_data = true
memory_limit = 32767
instruction_limit = 1000
output_limit = 1024
profile = false

[hekabench_counter.config]
rows = 1440
sec_per_row = 60

Sandbox Manager

The SandboxManagerFilter allows SandboxFilters to be dynamically started and stopped using a signed Heka message. The intent is to have one manager per access control group each with their own message signing key. Users in each group can submit a signed control message to manage any filters running under the associated manager. A signed message is not an enforced requirement but it is highly recommended in order to restrict access to this functionality.

SandboxManagerFilter Settings

  • Common Filter / Output Parameters

  • working_directory (string):

    The directory where the filter configurations, code, and states are preserved. The directory can be unique or shared between sandbox managers since the filter names are unique per manager. Defaults to a directory in ${BASE_DIR}/sbxmgrs with a name generated from the plugin name.

  • max_filters (uint):

    The maximum number of filters this manager can run.

Example

[OpsSandboxManager]
type = "SandboxManagerFilter"
message_signer = "ops"
message_matcher = "Type == 'heka.control.sandbox'"
max_filters = 100

Control Message

The sandbox manager control message is a regular Heka message with the following variables set to the specified values.

Starting a SandboxFilter

  • Type: “heka.control.sandbox”
  • Payload: sandbox code
  • Fields[action]: “load”
  • Fields[config]: the TOML configuration for the SandboxFilter SandboxFilter Settings

Stopping a SandboxFilter

  • Type: “heka.control.sandbox”
  • Fields[action]: “unload”
  • Fields[name]: The SandboxFilter name specified in the configuration

sbmgr

Sbmgr is a tool for managing (starting/stopping) sandbox filters by generating the control messages defined above.

Command Line Options

sbmgr [-config config_file] [-action load|unload] [-filtername specified on unload] [-script sandbox script filename] [-scriptconfig sandbox script configuration filename]

sbmgrload

Sbmgrload is a test tool for starting/stopping a large number of sandboxes. The script and configuration are built into the tool and the filters will be named: CounterSandboxN where N is the instance number.

Command Line Options

sbmgrload [-config config_file] [-action load|unload] [-num number of sandbox instances]

Configuration Variables

  • ip_address (string): IP address of the Heka server.

  • signer (object): Signer information for the encoder.
    • name (string): The name of the signer.
    • hmac_hash (string): md5 or sha1
    • hmac_key (string): The key the message will be signed with.
    • version (int): The version number of the hmac_key.

Example

ip_address          = "127.0.0.1:5565"
[signer]
    name         = "test"
    hmac_hash    = "md5"
    hmac_key     = "4865ey9urgkidls xtb0[7lf9rzcivthkm"
    version      = 0

Tutorial - How to use the dynamic sandboxes

SandboxManager/Heka Setup

1. The SandboxManagerFilters are defined in the hekad configuration file and are created when hekad starts. The manager provides a location/namespace for SandboxFilters to run and controls access to this space via a signed Heka message. By associating a message_signer with the manager we can restrict who can load and unload the associated filters. Lets start by configuring a SandboxManager for a specific set of users; platform developers. Choose a unique filter name [PlatformDevs] and a signer name “PlatformDevs”, in this case we will use the same name for each.

[PlatformDevs]
type = "SandboxManagerFilter"
message_signer = "PlatformDevs"
message_matcher = "Type == 'heka.control.sandbox'"
working_directory = "/var/heka/sandbox"
max_filters = 100

2. Configure the input that will receive the SandboxManager control messages. For this setup we will extend the current TCP input to handle our signed messages. The signer section consists of the signer name followed by an underscore and the key version number (the reason for this notation is to simply flatten the signer configuration structure into a single map). Multiple key versions are allowed to be active at the same time facilitating the rollout of new keys.

[TCP:5565]
type = "TcpInput"
address = ":5565"
    [TCP:5565.signer.PlatformDevs_0]
    hmac_key = "Old Platform devs signing key"
    [TCP:5565.signer.PlatformDevs_1]
    hmac_key = "Platform devs signing key"

3. Configure the sandbox manager utility (sbmgr). The signer information must exactly match the values in the input configuration above otherwise the messages will be discarded. Save the file as PlatformDevs.toml.

ip_address       = ":5565"
[signer]
    name         = "PlatformDevs"
    hmac_hash    = "md5"
    hmac_key     = "Platform devs signing key"
    version      = 1

SandboxFilter Setup

  1. Create a SandboxFilter script and save it as “example.lua”. See Lua Sandbox Tutorial for more detail.
data = circular_buffer.new(1440, 1, 60) -- message count per minute
local COUNT = data:set_header(1, "Messages", "count")
function process_message ()
    local ts = read_message("Timestamp")
    data:add(ts, COUNT, 1)
    return 0
end

function timer_event(ns)
    output(data)
    inject_message("cbuf")
end
  1. Create the SandboxFilter configuration and save it as “example.toml”.

The only difference between a static and dynamic SandboxFilter configuration is the filename. In the dynamic configuration it can be left blank or left out entirely. The manager will assign the filter a unique system wide name, in this case, “PlatformDevs-Example”.

[Example]
type = "SandboxFilter"
message_matcher = "Type == 'Widget'"
ticker_interval = 60
script_type = "lua"
filename = ""
preserve_data = false
memory_limit = 64000
instruction_limit = 100
output_limit = 64000
  1. Load the filter using sbmgr.
sbmgr -action=load -config=PlatformDevs.toml -script=example.lua -scriptconfig=example.toml

If you are running the DashboardOutput the following links are available:

Otherwise

Note

A running filter cannot be ‘reloaded’ it must be unloaded and loaded again. The state is not preserved in this case for two reasons (in the future we hope to remedy this):

  1. During the unload/load process some data can be missed creating a small gap in the analysis causing anomalies and confusion.
  2. The internal data representation may have changed and restoration may be problematic.
  1. Unload the filter using sbmgr
sbmgr -action=unload -config=PlatformDevs.toml -filtername=Example

Lua Sandbox

The Lua sandbox provides full access to the Lua language in a sandboxed environment under hekad that enforces configurable restrictions.

API

Functions that must be exposed from the Lua sandbox

int process_message()

Called by Heka when a message is available to the sandbox. The instruction_limit configuration parameter is applied to this function call.

Arguments
none
Return
  • < 0 for non-fatal failure (increments ProcessMessageFailures)
  • 0 for success
  • > 0 for fatal error (terminates the sandbox)
timer_event(ns)

Called by Heka when the ticker_interval expires. The instruction_limit configuration parameter is applied to this function call. This function is only required in SandboxFilters (SandboxDecoders do not support timer events).

Arguments
  • ns (int64) current time in nanoseconds since the UNIX epoch
Return
none

Heka functions that are exposed to the Lua sandbox

read_config(variableName)

Provides access to the sandbox configuration variables.

Arguments
  • variableName (string)
Return
number, string, bool, nil depending on the type of variable requested
read_message(variableName, fieldIndex, arrayIndex)

Provides access to the Heka message data.

Arguments
  • variableName (string)
    • Uuid
    • Type
    • Logger
    • Payload
    • EnvVersion
    • Hostname
    • Timestamp
    • Severity
    • Pid
    • Fields[_name_]
  • fieldIndex (unsigned) only used in combination with the Fields variableName
    • use to retrieve a specific instance of a repeated field _name_
  • arrayIndex (unsigned) only used in combination with the Fields variableName
    • use to retrieve a specific element out of a field containing an array
Return
number, string, bool, nil depending on the type of variable requested
output(arg0, arg1, ...argN)

Appends data to the payload buffer, which cannot exceed the output_limit configuration parameter.

Arguments
  • arg (number, string, bool, nil, table, circular_buffer) Lua variable or literal to be appended the output buffer
Return
none

Notes

Outputting a Lua table will serialize it to JSON according to the following guidelines/restrictions:
  • Tables cannot contain internal of circular references.

  • Keys starting with an underscore are considered private and will not be serialized.
    • ‘_name’ is a special private key that can be used to specify the the name of the top level JSON object, if not provided the default is ‘table’.
  • Arrays only use contiguous numeric keys starting with an index of 1. Private keys are the exception i.e. local a = {1,2,3,_name=”my_name”} will be serialized as: {"my_name":[1,2,3]}\n

  • Hashes only use string keys (numeric keys will not be quoted and the JSON output will be invalid). Note: the hash keys are output in an arbitrary order i.e. local a = {x = 1, y = 2} will be serialized as: {"table":{"y":2,"x":1}}\n.

In most cases circular buffers should be directly output using inject_message. However, in order to create graph annotations the annotation table has to be written to the output buffer followed by the circular buffer. The output function is the only way to combine this data before injection (use a unique payload_type when injecting a message with a non-standard circular buffer mashups).

inject_message(payload_type, payload_name)

Creates a new Heka message using the contents of the output payload buffer and then clears the buffer. Two pieces of optional metadata are allowed and included as fields in the injected message i.e., Fields[payload_type] == ‘csv’ Fields[payload_name] == ‘Android Usage Statistics’. The number of messages that may be injected by the process_message or timer_event functions are globally controlled by the hekad Command Line Options; if these values are exceeded the sandbox will be terminated.

Arguments
  • payload_type (optional, default “txt” string) Describes the content type of the injected payload data.
  • payload_name (optional, default “” string) Names the content to aid in downstream filtering.
Return
none
inject_message(circular_buffer, payload_name)

Creates a new Heka message placing the circular buffer output in the message payload (overwriting whatever is in the output buffer). The payload_type is set to the circular buffer output format string. i.e., Fields[payload_type] == ‘cbuf’. The Fields[payload_name] is set to the provided payload_name.

Arguments
  • circular_buffer (circular_buffer)
  • payload_name (optional, default “” string) Names the content to aid in downstream filtering.
Return
none
Notes
  • injection limits are enforced as described above
inject_message(message_table)

Creates a new Heka protocol buffer message using the contents of the specified Lua table (overwriting whatever is in the output buffer). Notes about message fields:

  • Timestamp is automatically generated if one is not provided. Nanosecond since the UNIX epoch is the only valid format.

  • UUID is automatically generated, anything provided by the user is ignored.

  • Hostname and Logger are automatically set by the SandboxFilter and cannot be overridden.

  • Type is prepended with “heka.sandbox.” by the SandboxFilter to avoid data confusion/mis-representation.

  • Fields can be represented in multiple forms and support the following primitive types: string, double, bool. These constructs should be added to the ‘Fields’ table in the message structure. Note: since the Fields structure is a map and not an array, like the protobuf message, fields cannot be repeated.
    • name=value i.e., foo=”bar”; foo=1; foo=true

    • name={array} i.e., foo={“b”, “a”, “r”}

    • name={object} i.e. foo={value=1, representation=”s”}; foo={value={1010, 2200, 1567}, representation=”ms”}
      • value (required) may be a single value or an array of values
      • representation (optional) metadata for display and unit management
Arguments
  • message_table A table with the proper message structure.
Return
none
Notes
  • injection limits are enforced as described above
require(libraryName)

Loads optional sandbox libraries

Arguments
Return
a table (which is also globally registered with the library name).

Sample Lua Message Structure

{
Uuid        = "data",               -- always ignored
Logger      = "nginx",              -- ignored in the SandboxFilter
Hostname    = "bogus.mozilla.com",  -- ignored in the SandboxFilter

Timestamp   = 1e9,
Type        = "TEST",               -- will become "heka.sandbox.TEST" in the SandboxFilter
Papload     = "Test Payload",
EnvVersion  = "0.8",
Pid         = 1234,
Severity    = 6,
Fields      = {
            http_status     = 200,
            request_size    = {value=1413, representation="B"}
            }
}

Lua Circular Buffer Library

The library is a sliding window time series data store and is implemented in the circular_buffer table.

Constructor

circular_buffer.new(rows, columns, seconds_per_row, enable_delta)

Arguments
  • rows (unsigned) The number of rows in the buffer (must be > 1)
  • columns (unsigned)The number of columns in the buffer (must be > 0)
  • seconds_per_row (unsigned) The number of seconds each row represents (must be > 0).
  • enable_delta (optional, default false bool) When true the changes made to the circular buffer between delta outputs are tracked.
Return
A circular buffer object.

Methods

Note

All column arguments are 1 based. If the column is out of range for the configured circular buffer a fatal error is generated.

double add(nanoseconds, column, value)

Arguments
  • nanosecond (unsigned) The number of nanosecond since the UNIX epoch. The value is used to determine which row is being operated on.
  • column (unsigned) The column within the specified row to perform an add operation on.
  • value (double) The value to be added to the specified row/column.
Return
The value of the updated row/column or nil if the time was outside the range of the buffer.

double set(nanoseconds, column, value)

Arguments
  • nanosecond (unsigned) The number of nanosecond since the UNIX epoch. The value is used to determine which row is being operated on.
  • column (unsigned) The column within the specified row to perform a set operation on.
  • value (double) The value to be overwritten at the specified row/column.
Return
The value passed in or nil if the time was outside the range of the buffer.

double get(nanoseconds, column)

Arguments
  • nanosecond (unsigned) The number of nanosecond since the UNIX epoch. The value is used to determine which row is being operated on.
  • column (unsigned) The column within the specified row to retrieve the data from.
Return
The value at the specifed row/column or nil if the time was outside the range of the buffer.

int set_header(column, name, unit, aggregation_method)

Arguments
  • column (unsigned) The column number where the header information is applied.

  • name (string) Descriptive name of the column (maximum 15 characters). Any non alpha numeric characters will be converted to underscores. (default: Column_N)

  • unit (string - optional) The unit of measure (maximum 7 characters). Alpha numeric, ‘/’, and ‘*’ characters are allowed everything else will be converted to underscores. i.e. KiB, Hz, m/s (default: count)

  • aggregation_method (string - optional) Controls how the column data is aggregated when combining multiple circular buffers.
    • sum The total is computed for the time/column (default).
    • min The smallest value is retained for the time/column.
    • max The largest value is retained for the time/column.
    • avg The average is computed for the time/column.
    • none No aggregation will be performed the column.
Return
The column number passed into the function.

double compute(function, column, start, end)

Arguments
  • function (string) The name of the compute function (sum|avg|sd|min|max).
  • column (unsigned) The column that the computation is performed against.
  • start (optional - unsigned) The number of nanosecond since the UNIX epoch. Sets the start time of the computation range; if nil the buffer’s start time is used.
  • end (optional- unsigned) The number of nanosecond since the UNIX epoch. Sets the end time of the computation range (inclusive); if nil the buffer’s end time is used. The end time must be greater than or equal to the start time.
Return
The result of the computation for the specifed column over the given range or nil if the range fell outside of the buffer.
cbuf format(format)

Sets an internal flag to control the output format of the circular buffer data structure; if deltas are not enabled or there haven’t been any modifications, nothing is output.

Arguments
  • format (string)
    • cbuf The circular buffer full data set format.
    • cbufd The circular buffer delta data set format.
Return
The circular buffer object.

Output

The circular buffer can be passed to the output() function. The output format can be selected using the format() function.

The cbuf (full data set) output format consists of newline delimited rows starting with a json header row followed by the data rows with tab delimited columns. The time in the header corresponds to the time of the first data row, the time for the other rows is calculated using the seconds_per_row header value.

{json header}
row1_col1\trow1_col2\n
.
.
.
rowN_col1\trowN_col2\n

The cbufd (delta) output format consists of newline delimited rows starting with a json header row followed by the data rows with tab delimited columns. The first column is the timestamp for the row (time_t). The cbufd output will only contain the rows that have changed and the corresponding delta values for each column.

{json header}
row14_timestamp\trow14_col1\trow14_col2\n
row10_timestamp\trow10_col1\trow10_col2\n

Sample Cbuf Output

{"time":2,"rows":3,"columns":3,"seconds_per_row":60,"column_info":[{"name":"HTTP_200","unit":"count","aggregation":"sum"},{"name":"HTTP_400","unit":"count","aggregation":"sum"},{"name":"HTTP_500","unit":"count","aggregation":"sum"}]}
10002   0   0
11323   0   0
10685   0   0

Example

-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.

data = circular_buffer.new(1440, 5, 60) -- 1 day at 1 minute resolution
local HTTP_200      = data:set_header(1, "HTTP_200"     , "count")
local HTTP_300      = data:set_header(2, "HTTP_300"     , "count")
local HTTP_400      = data:set_header(3, "HTTP_400"     , "count")
local HTTP_500      = data:set_header(4, "HTTP_500"     , "count")
local HTTP_UNKNOWN  = data:set_header(5, "HTTP_UNKNOWN" , "count")

function process_message()
    local ts = read_message("Timestamp")
    local sc = read_message("Fields[http_status_code]")
    if sc == nil then return 0 end

    if sc >= 200 and sc < 300 then
        data:add(ts, HTTP_200, 1)
    elseif sc >= 300 and sc < 400 then
        data:add(ts, HTTP_300, 1)
    elseif sc >= 400 and sc < 500 then
        data:add(ts, HTTP_400, 1)
    elseif sc >= 500 and sc < 600 then
        data:add(ts, HTTP_500, 1)
    else
        data:add(ts, HTTP_UNKNOWN, 1)
    end
    return 0
end

function timer_event()
    output(data)
    inject_message("cbuf", "HTTP Status Code Statistics")
end

Setting the inject_message payload_type to “cbuf” will cause the DashboardOutput to automatically generate an HTML page containing a graphical view of the data.

Lua Sandbox Tutorial

How to create a simple sandbox filter

  1. Implement the required Heka interface in Lua
function process_message ()
    return 0
end

function timer_event(ns)
end
  1. Add the business logic (count the number of ‘demo’ events per minute)
total = 0 -- preserved between restarts since it is in global scope
local count = 0 -- local scope so this will not be preserved

function process_message()
    total= total + 1
    count = count + 1
    return 0
end

function timer_event(ns)
    output(string.format("%d messages in the last minute; total=%d", count, total))
    count = 0
    inject_message()
end
  1. Setup the configuration
[demo_counter]
type = "SandboxFilter"
message_matcher = "Type == 'demo'"
ticker_interval = 60
script_type = "lua"
filename = "counter.lua"
preserve_data = true
memory_limit = 32767
instruction_limit = 100
output_limit = 256

4. Extending the business logic (count the number of ‘demo’ events per minute per device)

device_counters = {}

function process_message()
    local device_name = read_message("Fields[DeviceName]")
    if device_name == nil then
        device_name = "_unknown_"
    end

    local dc = device_counters[device_name]
    if dc == nil then
        dc = {count = 1, total = 1}
        device_counters[device_name] = dc
    else
        dc.count = dc.count + 1
        dc.total = dc.total + 1
    end
    return 0
end

function timer_event(ns)
    output("#device_name\tcount\ttotal\n")
    for k, v in pairs(device_counters) do
        output(string.format("%s\t%d\t%d\n", k, v.count, v.total))
        v.count = 0
    end
    inject_message()
end
  1. Depending on the number of devices being counted you will most likely want to update the configuration to account for the additional resource requirements.
memory_limit = 65536
instruction_limit = 20000
output_limit = 64512