Sandboxes are Heka plugins that are implemented in a sandboxed scripting language. They provide a dynamic and isolated execution environment for data parsing, transformation, and analysis. They allow real time access to data in production without jeopardizing the integrity or performance of the monitoring infrastructure and do not require Heka to be recompiled. This broadens the audience that the data can be exposed to and facilitates new uses of the data (i.e. debugging, monitoring, dynamic provisioning, SLA analysis, intrusion detection, ad-hoc reporting, etc.)
small - memory requirements are about 16 KiB for a basic sandbox
fast - microsecond execution times
stateful - ability to resume where it left off after a restart/reboot
isolated - failures are contained and malfunctioning sandboxes are terminated
The sandbox decoder provides an isolated execution environment for data parsing and complex transformations without the need to recompile Heka.
The language the sandbox is written in. Currently the only valid option is ‘lua’.
The path to the sandbox code; if specified as a relative path it will be appended to Heka’s global base_dir.
The number of bytes the sandbox is allowed to consume before being terminated (max 8MiB, default max).
The number of instructions the sandbox is allowed the execute during the process_message function before being terminated (max 1M, default max).
The number of bytes the sandbox output buffer can hold before before being terminated (max 63KiB, default max). Anything less than 1KiB will default to 1KiB.
A map of configuration variables available to the sandbox via read_config. The map consists of a string key with: string, bool, int64, or float64 values.
Example
[sql_decoder]
type = "SandboxDecoder"
script_type = "lua"
filename = "sql_decoder.lua"
The sandbox filter provides an isolated execution environment for data analysis. The output generated by the sandbox is injected into the payload of a new message for further processing or to be output.
The language the sandbox is written in. Currently the only valid option is ‘lua’.
For a static configuration this is the path to the sandbox code; if specified as a relative path it will be appended to Heka’s global base_dir. The filename must be unique between static plugins, since the global data is preserved using this name. For a dynamic configuration the filename is ignored and the the physical location on disk is controlled by the SandboxManagerFilter.
True if the sandbox global data should be preserved/restored on Heka shutdown/startup. The preserved data is stored along side the sandbox code i.e. counter.lua.data so Heka must have read/write permissions to that directory.
The number of bytes the sandbox is allowed to consume before being terminated (max 8MiB, default 32767).
The number of instructions the sandbox is allowed the execute during the process_message/timer_event functions before being terminated (max 1M, default 1000).
The number of bytes the sandbox output buffer can hold before before being terminated (max 63KiB, default 1024). Anything less than 1KiB will default to 1KiB.
When true a statistically significant number of ProcessMessage timings are immediately captured before reverting back to the regular sampling interval. The main purpose is for more accurate sandbox comparison/tuning/optimization.
A map of configuration variables available to the sandbox via read_config. The map consists of a string key with: string, bool, int64, or float64 values.
Example
[hekabench_counter]
type = "SandboxFilter"
message_matcher = "Type == 'hekabench'"
ticker_interval = 1
script_type = "lua"
filename = "counter.lua"
preserve_data = true
memory_limit = 32767
instruction_limit = 1000
output_limit = 1024
profile = false
[hekabench_counter.config]
rows = 1440
sec_per_row = 60
The SandboxManagerFilter allows SandboxFilters to be dynamically started and stopped using a signed Heka message. The intent is to have one manager per access control group each with their own message signing key. Users in each group can submit a signed control message to manage any filters running under the associated manager. A signed message is not an enforced requirement but it is highly recommended in order to restrict access to this functionality.
The directory where the filter configurations, code, and states are preserved. The directory can be unique or shared between sandbox managers since the filter names are unique per manager. Defaults to a directory in ${BASE_DIR}/sbxmgrs with a name generated from the plugin name.
The maximum number of filters this manager can run.
Example
[OpsSandboxManager]
type = "SandboxManagerFilter"
message_signer = "ops"
message_matcher = "Type == 'heka.control.sandbox'"
max_filters = 100
The sandbox manager control message is a regular Heka message with the following variables set to the specified values.
Starting a SandboxFilter
Stopping a SandboxFilter
Sbmgr is a tool for managing (starting/stopping) sandbox filters by generating the control messages defined above.
Command Line Options
sbmgr [-config config_file] [-action load|unload] [-filtername specified on unload] [-script sandbox script filename] [-scriptconfig sandbox script configuration filename]
Sbmgrload is a test tool for starting/stopping a large number of sandboxes. The script and configuration are built into the tool and the filters will be named: CounterSandboxN where N is the instance number.
Command Line Options
sbmgrload [-config config_file] [-action load|unload] [-num number of sandbox instances]
Configuration Variables
ip_address (string): IP address of the Heka server.
Example
ip_address = "127.0.0.1:5565"
[signer]
name = "test"
hmac_hash = "md5"
hmac_key = "4865ey9urgkidls xtb0[7lf9rzcivthkm"
version = 0
1. The SandboxManagerFilters are defined in the hekad configuration file and are created when hekad starts. The manager provides a location/namespace for SandboxFilters to run and controls access to this space via a signed Heka message. By associating a message_signer with the manager we can restrict who can load and unload the associated filters. Lets start by configuring a SandboxManager for a specific set of users; platform developers. Choose a unique filter name [PlatformDevs] and a signer name “PlatformDevs”, in this case we will use the same name for each.
[PlatformDevs]
type = "SandboxManagerFilter"
message_signer = "PlatformDevs"
message_matcher = "Type == 'heka.control.sandbox'"
working_directory = "/var/heka/sandbox"
max_filters = 100
2. Configure the input that will receive the SandboxManager control messages. For this setup we will extend the current TCP input to handle our signed messages. The signer section consists of the signer name followed by an underscore and the key version number (the reason for this notation is to simply flatten the signer configuration structure into a single map). Multiple key versions are allowed to be active at the same time facilitating the rollout of new keys.
[TCP:5565]
type = "TcpInput"
address = ":5565"
[TCP:5565.signer.PlatformDevs_0]
hmac_key = "Old Platform devs signing key"
[TCP:5565.signer.PlatformDevs_1]
hmac_key = "Platform devs signing key"
3. Configure the sandbox manager utility (sbmgr). The signer information must exactly match the values in the input configuration above otherwise the messages will be discarded. Save the file as PlatformDevs.toml.
ip_address = ":5565"
[signer]
name = "PlatformDevs"
hmac_hash = "md5"
hmac_key = "Platform devs signing key"
version = 1
data = circular_buffer.new(1440, 1, 60) -- message count per minute
local COUNT = data:set_header(1, "Messages", "count")
function process_message ()
local ts = read_message("Timestamp")
data:add(ts, COUNT, 1)
return 0
end
function timer_event(ns)
output(data)
inject_message("cbuf")
end
The only difference between a static and dynamic SandboxFilter configuration is the filename. In the dynamic configuration it can be left blank or left out entirely. The manager will assign the filter a unique system wide name, in this case, “PlatformDevs-Example”.
[Example]
type = "SandboxFilter"
message_matcher = "Type == 'Widget'"
ticker_interval = 60
script_type = "lua"
filename = ""
preserve_data = false
memory_limit = 64000
instruction_limit = 100
output_limit = 64000
sbmgr -action=load -config=PlatformDevs.toml -script=example.lua -scriptconfig=example.toml
If you are running the DashboardOutput the following links are available:
Otherwise
Note
A running filter cannot be ‘reloaded’ it must be unloaded and loaded again. The state is not preserved in this case for two reasons (in the future we hope to remedy this):
- During the unload/load process some data can be missed creating a small gap in the analysis causing anomalies and confusion.
- The internal data representation may have changed and restoration may be problematic.
sbmgr -action=unload -config=PlatformDevs.toml -filtername=Example
The Lua sandbox provides full access to the Lua language in a sandboxed environment under hekad that enforces configurable restrictions.
See also
Called by Heka when a message is available to the sandbox. The instruction_limit configuration parameter is applied to this function call.
Called by Heka when the ticker_interval expires. The instruction_limit configuration parameter is applied to this function call. This function is only required in SandboxFilters (SandboxDecoders do not support timer events).
Provides access to the sandbox configuration variables.
Provides access to the Heka message data.
Appends data to the payload buffer, which cannot exceed the output_limit configuration parameter.
Notes
- Outputting a Lua table will serialize it to JSON according to the following guidelines/restrictions:
Tables cannot contain internal of circular references.
- Keys starting with an underscore are considered private and will not be serialized.
- ‘_name’ is a special private key that can be used to specify the the name of the top level JSON object, if not provided the default is ‘table’.
Arrays only use contiguous numeric keys starting with an index of 1. Private keys are the exception i.e. local a = {1,2,3,_name=”my_name”} will be serialized as: {"my_name":[1,2,3]}\n
Hashes only use string keys (numeric keys will not be quoted and the JSON output will be invalid). Note: the hash keys are output in an arbitrary order i.e. local a = {x = 1, y = 2} will be serialized as: {"table":{"y":2,"x":1}}\n.
In most cases circular buffers should be directly output using inject_message. However, in order to create graph annotations the annotation table has to be written to the output buffer followed by the circular buffer. The output function is the only way to combine this data before injection (use a unique payload_type when injecting a message with a non-standard circular buffer mashups).
Creates a new Heka message using the contents of the output payload buffer and then clears the buffer. Two pieces of optional metadata are allowed and included as fields in the injected message i.e., Fields[payload_type] == ‘csv’ Fields[payload_name] == ‘Android Usage Statistics’. The number of messages that may be injected by the process_message or timer_event functions are globally controlled by the hekad Command Line Options; if these values are exceeded the sandbox will be terminated.
Creates a new Heka message placing the circular buffer output in the message payload (overwriting whatever is in the output buffer). The payload_type is set to the circular buffer output format string. i.e., Fields[payload_type] == ‘cbuf’. The Fields[payload_name] is set to the provided payload_name.
Creates a new Heka protocol buffer message using the contents of the specified Lua table (overwriting whatever is in the output buffer). Notes about message fields:
Timestamp is automatically generated if one is not provided. Nanosecond since the UNIX epoch is the only valid format.
UUID is automatically generated, anything provided by the user is ignored.
Hostname and Logger are automatically set by the SandboxFilter and cannot be overridden.
Type is prepended with “heka.sandbox.” by the SandboxFilter to avoid data confusion/mis-representation.
name=value i.e., foo=”bar”; foo=1; foo=true
name={array} i.e., foo={“b”, “a”, “r”}
Loads optional sandbox libraries
{
Uuid = "data", -- always ignored
Logger = "nginx", -- ignored in the SandboxFilter
Hostname = "bogus.mozilla.com", -- ignored in the SandboxFilter
Timestamp = 1e9,
Type = "TEST", -- will become "heka.sandbox.TEST" in the SandboxFilter
Papload = "Test Payload",
EnvVersion = "0.8",
Pid = 1234,
Severity = 6,
Fields = {
http_status = 200,
request_size = {value=1413, representation="B"}
}
}
The library is a sliding window time series data store and is implemented in the circular_buffer table.
circular_buffer.new(rows, columns, seconds_per_row, enable_delta)
- Arguments
- rows (unsigned) The number of rows in the buffer (must be > 1)
- columns (unsigned)The number of columns in the buffer (must be > 0)
- seconds_per_row (unsigned) The number of seconds each row represents (must be > 0).
- enable_delta (optional, default false bool) When true the changes made to the circular buffer between delta outputs are tracked.
- Return
- A circular buffer object.
Note
All column arguments are 1 based. If the column is out of range for the configured circular buffer a fatal error is generated.
double add(nanoseconds, column, value)
- Arguments
- nanosecond (unsigned) The number of nanosecond since the UNIX epoch. The value is used to determine which row is being operated on.
- column (unsigned) The column within the specified row to perform an add operation on.
- value (double) The value to be added to the specified row/column.
- Return
- The value of the updated row/column or nil if the time was outside the range of the buffer.
double set(nanoseconds, column, value)
- Arguments
- nanosecond (unsigned) The number of nanosecond since the UNIX epoch. The value is used to determine which row is being operated on.
- column (unsigned) The column within the specified row to perform a set operation on.
- value (double) The value to be overwritten at the specified row/column.
- Return
- The value passed in or nil if the time was outside the range of the buffer.
double get(nanoseconds, column)
- Arguments
- nanosecond (unsigned) The number of nanosecond since the UNIX epoch. The value is used to determine which row is being operated on.
- column (unsigned) The column within the specified row to retrieve the data from.
- Return
- The value at the specifed row/column or nil if the time was outside the range of the buffer.
int set_header(column, name, unit, aggregation_method)
- Arguments
column (unsigned) The column number where the header information is applied.
name (string) Descriptive name of the column (maximum 15 characters). Any non alpha numeric characters will be converted to underscores. (default: Column_N)
unit (string - optional) The unit of measure (maximum 7 characters). Alpha numeric, ‘/’, and ‘*’ characters are allowed everything else will be converted to underscores. i.e. KiB, Hz, m/s (default: count)
- aggregation_method (string - optional) Controls how the column data is aggregated when combining multiple circular buffers.
- sum The total is computed for the time/column (default).
- min The smallest value is retained for the time/column.
- max The largest value is retained for the time/column.
- avg The average is computed for the time/column.
- none No aggregation will be performed the column.
- Return
- The column number passed into the function.
double compute(function, column, start, end)
- Arguments
- function (string) The name of the compute function (sum|avg|sd|min|max).
- column (unsigned) The column that the computation is performed against.
- start (optional - unsigned) The number of nanosecond since the UNIX epoch. Sets the start time of the computation range; if nil the buffer’s start time is used.
- end (optional- unsigned) The number of nanosecond since the UNIX epoch. Sets the end time of the computation range (inclusive); if nil the buffer’s end time is used. The end time must be greater than or equal to the start time.
- Return
- The result of the computation for the specifed column over the given range or nil if the range fell outside of the buffer.
Sets an internal flag to control the output format of the circular buffer data structure; if deltas are not enabled or there haven’t been any modifications, nothing is output.
The circular buffer can be passed to the output() function. The output format can be selected using the format() function.
The cbuf (full data set) output format consists of newline delimited rows starting with a json header row followed by the data rows with tab delimited columns. The time in the header corresponds to the time of the first data row, the time for the other rows is calculated using the seconds_per_row header value.
{json header}
row1_col1\trow1_col2\n
.
.
.
rowN_col1\trowN_col2\n
The cbufd (delta) output format consists of newline delimited rows starting with a json header row followed by the data rows with tab delimited columns. The first column is the timestamp for the row (time_t). The cbufd output will only contain the rows that have changed and the corresponding delta values for each column.
{json header}
row14_timestamp\trow14_col1\trow14_col2\n
row10_timestamp\trow10_col1\trow10_col2\n
{"time":2,"rows":3,"columns":3,"seconds_per_row":60,"column_info":[{"name":"HTTP_200","unit":"count","aggregation":"sum"},{"name":"HTTP_400","unit":"count","aggregation":"sum"},{"name":"HTTP_500","unit":"count","aggregation":"sum"}]}
10002 0 0
11323 0 0
10685 0 0
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
data = circular_buffer.new(1440, 5, 60) -- 1 day at 1 minute resolution
local HTTP_200 = data:set_header(1, "HTTP_200" , "count")
local HTTP_300 = data:set_header(2, "HTTP_300" , "count")
local HTTP_400 = data:set_header(3, "HTTP_400" , "count")
local HTTP_500 = data:set_header(4, "HTTP_500" , "count")
local HTTP_UNKNOWN = data:set_header(5, "HTTP_UNKNOWN" , "count")
function process_message()
local ts = read_message("Timestamp")
local sc = read_message("Fields[http_status_code]")
if sc == nil then return 0 end
if sc >= 200 and sc < 300 then
data:add(ts, HTTP_200, 1)
elseif sc >= 300 and sc < 400 then
data:add(ts, HTTP_300, 1)
elseif sc >= 400 and sc < 500 then
data:add(ts, HTTP_400, 1)
elseif sc >= 500 and sc < 600 then
data:add(ts, HTTP_500, 1)
else
data:add(ts, HTTP_UNKNOWN, 1)
end
return 0
end
function timer_event()
output(data)
inject_message("cbuf", "HTTP Status Code Statistics")
end
Setting the inject_message payload_type to “cbuf” will cause the DashboardOutput to automatically generate an HTML page containing a graphical view of the data.
function process_message ()
return 0
end
function timer_event(ns)
end
total = 0 -- preserved between restarts since it is in global scope
local count = 0 -- local scope so this will not be preserved
function process_message()
total= total + 1
count = count + 1
return 0
end
function timer_event(ns)
output(string.format("%d messages in the last minute; total=%d", count, total))
count = 0
inject_message()
end
[demo_counter]
type = "SandboxFilter"
message_matcher = "Type == 'demo'"
ticker_interval = 60
script_type = "lua"
filename = "counter.lua"
preserve_data = true
memory_limit = 32767
instruction_limit = 100
output_limit = 256
4. Extending the business logic (count the number of ‘demo’ events per minute per device)
device_counters = {}
function process_message()
local device_name = read_message("Fields[DeviceName]")
if device_name == nil then
device_name = "_unknown_"
end
local dc = device_counters[device_name]
if dc == nil then
dc = {count = 1, total = 1}
device_counters[device_name] = dc
else
dc.count = dc.count + 1
dc.total = dc.total + 1
end
return 0
end
function timer_event(ns)
output("#device_name\tcount\ttotal\n")
for k, v in pairs(device_counters) do
output(string.format("%s\t%d\t%d\n", k, v.count, v.total))
v.count = 0
end
inject_message()
end
memory_limit = 65536
instruction_limit = 20000
output_limit = 64512