Sandboxes are Heka plugins that are implemented in a sandboxed scripting language. They provide a dynamic and isolated execution environment for data parsing, transformation, and analysis. They allow real time access to data in production without jeopardizing the integrity or performance of the monitoring infrastructure and do not require Heka to be recompiled. This broadens the audience that the data can be exposed to and facilitates new uses of the data (i.e. debugging, monitoring, dynamic provisioning, SLA analysis, intrusion detection, ad-hoc reporting, etc.)
small - memory requirements are about 16 KiB for a basic sandbox
fast - microsecond execution times
stateful - ability to resume where it left off after a restart/reboot
isolated - failures are contained and malfunctioning sandboxes are terminated
The Lua sandbox provides full access to the Lua language in a sandboxed environment under hekad that enforces configurable restrictions.
See also
Called by Heka when a message is available to the sandbox. The instruction_limit configuration parameter is applied to this function call.
Called by Heka when the ticker_interval expires. The instruction_limit configuration parameter is applied to this function call. This function is only required in SandboxFilters (SandboxDecoders do not support timer events).
See: https://github.com/mozilla-services/lua_sandbox/blob/master/docs/sandbox_api.md
require(libraryName)
Appends the arguments to the payload buffer for incremental construction of the final payload output (inject_payload finalizes the buffer and sends the message to Heka). This function is simply a rename of the generic sandbox output function to improve the readability of the plugin code.
Provides access to the sandbox configuration variables.
Provides access to the Heka message data.
New in version 0.5.
Decoders only. Mutates specified field value on the message that is being deocded.
Uuid (accepts raw bytes or RFC4122 string representation)
Type (string)
Logger (string)
Payload (string)
EnvVersion (string)
Hostname (string)
parseable string representations.)
Severity (number or int-parseable string)
Pid (number or int-parseable string)
Fields[_name_] (field type determined by value type: bool, number, or string)
Iterates through the message fields returning the field contents or nil when the end is reached.
inject_payload(payload_type, payload_name, arg3, ..., argN)
Creates a new Heka message using the contents of the payload buffer (pre-populated with add_to_payload) combined with any additional payload_args passed to this function. The output buffer is cleared after the injection. The payload_type and payload_name arguments are two pieces of optional metadata. If specified, they will be included as fields in the injected message e.g., Fields[payload_type] == ‘csv’, Fields[payload_name] == ‘Android Usage Statistics’. The number of messages that may be injected by the process_message or timer_event functions are globally controlled by the hekad global configuration options; if these values are exceeded the sandbox will be terminated.
- Arguments
- payload_type (optional, default “txt” string) Describes the content type of the injected payload data.
- payload_name (optional, default “” string) Names the content to aid in downstream filtering.
- arg3 (optional) Same type restrictions as add_to_payload.
- ...
- argN
- Return
- none
Creates a new Heka protocol buffer message using the contents of the specified Lua table (overwriting whatever is in the output buffer). Notes about message fields:
Timestamp is automatically generated if one is not provided. Nanosecond since the UNIX epoch is the only valid format.
UUID is automatically generated, anything provided by the user is ignored.
Hostname and Logger are automatically set by the SandboxFilter and cannot be overridden.
Type is prepended with “heka.sandbox.” by the SandboxFilter to avoid data confusion/mis-representation.
name=value i.e., foo=”bar”; foo=1; foo=true
name={array} i.e., foo={“b”, “a”, “r”}
{
Uuid = "data", -- always ignored
Logger = "nginx", -- ignored in the SandboxFilter
Hostname = "bogus.mozilla.com", -- ignored in the SandboxFilter
Timestamp = 1e9,
Type = "TEST", -- will become "heka.sandbox.TEST" in the SandboxFilter
Payload = "Test Payload",
EnvVersion = "0.8",
Pid = 1234,
Severity = 6,
Fields = {
http_status = 200,
request_size = {value=1413, representation="B"}
}
}
function process_message ()
return 0
end
function timer_event(ns)
end
require "string"
total = 0 -- preserved between restarts since it is in global scope
local count = 0 -- local scope so this will not be preserved
function process_message()
total= total + 1
count = count + 1
return 0
end
function timer_event(ns)
count = 0
inject_payload("txt", "",
string.format("%d messages in the last minute; total=%d", count, total))
end
[demo_counter]
type = "SandboxFilter"
message_matcher = "Type == 'demo'"
ticker_interval = 60
filename = "counter.lua"
preserve_data = true
4. Extending the business logic (count the number of ‘demo’ events per minute per device)
require "string"
device_counters = {}
function process_message()
local device_name = read_message("Fields[DeviceName]")
if device_name == nil then
device_name = "_unknown_"
end
local dc = device_counters[device_name]
if dc == nil then
dc = {count = 1, total = 1}
device_counters[device_name] = dc
else
dc.count = dc.count + 1
dc.total = dc.total + 1
end
return 0
end
function timer_event(ns)
add_to_payload("#device_name\tcount\ttotal\n")
for k, v in pairs(device_counters) do
add_to_payload(string.format("%s\t%d\t%d\n", k, v.count, v.total))
v.count = 0
end
inject_payload()
end
The SandboxManagerFilter provides dynamic control (start/stop) of sandbox filters in a secure manner without stopping the Heka daemon. Commands are sent to a SandboxManagerFilter using a signed Heka message. The intent is to have one manager per access control group each with their own message signing key. Users in each group can submit a signed control message to manage any filters running under the associated manager. A signed message is not an enforced requirement but it is highly recommended in order to restrict access to this functionality.
The directory where the filter configurations, code, and states are preserved. The directory can be unique or shared between sandbox managers since the filter names are unique per manager. Defaults to a directory in ${BASE_DIR}/sbxmgrs with a name generated from the plugin name.
The directory where ‘require’ will attempt to load the external Lua modules from. Defaults to ${SHARE_DIR}/lua_modules.
The maximum number of filters this manager can run.
New in version 0.5.
The number of bytes managed sandboxes are allowed to consume before being terminated (default 8MiB).
The number of instructions managed sandboxes are allowed to execute during the process_message/timer_event functions before being terminated (default 1M).
The number of bytes managed sandbox output buffers can hold before being terminated (default 63KiB). Warning: messages exceeding 64KiB will generate an error and be discarded by the standard output plugins (File, TCP, UDP) since they exceed the maximum message size.
Example
[OpsSandboxManager]
type = "SandboxManagerFilter"
message_signer = "ops"
# message_matcher = "Type == 'heka.control.sandbox'" # automatic default setting
max_filters = 100
The sandbox manager control message is a regular Heka message with the following variables set to the specified values.
Starting a SandboxFilter
Stopping a SandboxFilter
Heka Sbmgr is a tool for managing (starting/stopping) sandbox filters by generating the control messages defined above.
Command Line Options
heka-sbmgr [-config config_file] [-action load|unload] [-filtername specified on unload] [-script sandbox script filename] [-scriptconfig sandbox script configuration filename]
Configuration Variables
ip_address (string): IP address of the Heka server.
use_tls (bool): Specifies whether or not SSL/TLS encryption should be used for the TCP connections. Defaults to false.
tls (TlsConfig): A sub-section that specifies the settings to be used for any SSL/TLS encryption. This will only have any impact if use_tls is set to true. See Configuring TLS.
Example
ip_address = "127.0.0.1:5565"
use_tls = true
[signer]
name = "test"
hmac_hash = "md5"
hmac_key = "4865ey9urgkidls xtb0[7lf9rzcivthkm"
version = 0
[tls]
cert_file = "heka.crt"
key_file = "heka.key"
client_auth = "NoClientCert"
prefer_server_ciphers = true
min_version = "TLS11"
Heka Sbmgrload is a test tool for starting/stopping a large number of sandboxes. The script and configuration are built into the tool and the filters will be named: CounterSandboxN where N is the instance number.
Command Line Options
heka-sbmgrload [-config config_file] [-action load|unload] [-num number of sandbox instances]
Configuration Variables (same as heka-sbmgr)
The SandboxManagerFilters are defined in the hekad configuration file and are created when hekad starts. The manager provides a location/namespace for SandboxFilters to run and controls access to this space via a signed Heka message. By associating a message_signer with the manager we can restrict who can load and unload the associated filters. Lets start by configuring a SandboxManager for a specific set of users; platform developers. Choose a unique filter name [PlatformDevs] and a signer name “PlatformDevs”, in this case we will use the same name for each.
[PlatformDevs]
type = "SandboxManagerFilter"
message_signer = "PlatformDevs"
working_directory = "/var/heka/sandbox"
max_filters = 100
For this setup we will extend the current TCP input to handle our signed messages. The signer section consists of the signer name followed by an underscore and the key version number (the reason for this notation is to simply flatten the signer configuration structure into a single map). Multiple key versions are allowed to be active at the same time facilitating the rollout of new keys.
[TCP:5565]
type = "TcpInput"
parser_type = "message.proto"
decoder = "ProtobufDecoder"
address = ":5565"
[TCP:5565.signer.PlatformDevs_0]
hmac_key = "Old Platform devs signing key"
[TCP:5565.signer.PlatformDevs_1]
hmac_key = "Platform devs signing key"
3. Configure the sandbox manager utility (sbmgr). The signer information must exactly match the values in the input configuration above otherwise the messages will be discarded. Save the file as PlatformDevs.toml.
ip_address = ":5565"
[signer]
name = "PlatformDevs"
hmac_hash = "md5"
hmac_key = "Platform devs signing key"
version = 1
require "circular_buffer"
data = circular_buffer.new(1440, 1, 60) -- message count per minute
local COUNT = data:set_header(1, "Messages", "count")
function process_message ()
local ts = read_message("Timestamp")
data:add(ts, COUNT, 1)
return 0
end
function timer_event(ns)
inject_payload("cbuf", "", data)
end
The only difference between a static and dynamic SandboxFilter configuration is the filename. In the dynamic configuration it can be left blank or left out entirely. The manager will assign the filter a unique system wide name, in this case, “PlatformDevs-Example”.
[Example]
type = "SandboxFilter"
message_matcher = "Type == 'Widget'"
ticker_interval = 60
filename = ""
preserve_data = false
sbmgr -action=load -config=PlatformDevs.toml -script=example.lua -scriptconfig=example.toml
If you are running the DashboardOutput the following links are available:
Otherwise
Note
A running filter cannot be ‘reloaded’ it must be unloaded and loaded again. During the unload/load process some data can be missed and gaps will be created. In the future we hope to remedy this but for now it is a limitation of the dynamic sandbox.
sbmgr -action=unload -config=PlatformDevs.toml -filtername=Example
The SandboxDecoder provides an isolated execution environment for data parsing and complex transformations without the need to recompile Heka. See Sandbox.
Config:
Example
[sql_decoder]
type = "SandboxDecoder"
filename = "sql_decoder.lua"
Parses the Apache access logs based on the Apache ‘LogFormat’ configuration directive. The Apache format specifiers are mapped onto the Nginx variable names where applicable e.g. %a -> remote_addr. This allows generic web filters and outputs to work with any HTTP server input.
Config:
The ‘LogFormat’ configuration directive from the apache2.conf. %t variables are converted to the number of nanosecond since the Unix epoch and used to set the Timestamp on the message. http://httpd.apache.org/docs/2.4/mod/mod_log_config.html
Sets the message ‘Type’ header to the specified value
Transform the http_user_agent into user_agent_browser, user_agent_version, user_agent_os.
Always preserve the http_user_agent value if transform is enabled.
Only preserve the http_user_agent value if transform is enabled and fails.
Always preserve the original log line in the message payload.
Example Heka Configuration
[TestWebserver]
type = "LogstreamerInput"
log_directory = "/var/log/apache"
file_match = 'access\.log'
decoder = "CombinedLogDecoder"
[CombinedLogDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/apache_access.lua"
[CombinedLogDecoder.config]
type = "combined"
user_agent_transform = true
# combined log format
log_format = '%h %l %u %t \"%r\" %>s %O \"%{Referer}i\" \"%{User-Agent}i\"'
# common log format
# log_format = '%h %l %u %t \"%r\" %>s %O'
# vhost_combined log format
# log_format = '%v:%p %h %l %u %t \"%r\" %>s %O \"%{Referer}i\" \"%{User-Agent}i\"'
# referer log format
# log_format = '%{Referer}i -> %U'
Example Heka Message
Timestamp: | 2014-01-10 07:04:56 -0800 PST |
---|---|
Type: | combined |
Hostname: | test.example.com |
Pid: | 0 |
UUID: | 8e414f01-9d7f-4a48-a5e1-ae92e5954df5 |
Logger: | TestWebserver |
Payload: | |
EnvVersion: | |
Severity: | 7 |
Fields: | name:”remote_user” value_string:”-“
name:”http_x_forwarded_for” value_string:”-“
name:”http_referer” value_string:”-“
name:”body_bytes_sent” value_type:DOUBLE representation:”B” value_double:82
name:”remote_addr” value_string:”62.195.113.219” representation:”ipv4”
name:”status” value_type:DOUBLE value_double:200
name:”request” value_string:”GET /v1/recovery_email/status HTTP/1.1”
name:”user_agent_os” value_string:”FirefoxOS”
name:”user_agent_browser” value_string:”Firefox”
name:”user_agent_version” value_type:DOUBLE value_double:29
|
Parses and transforms the MySQL slow query logs. Use mariadb_slow_query.lua to parse the MariaDB variant of the MySQL slow query logs.
Config:
Truncates the SQL payload to the specified number of bytes (not UTF-8 aware) and appends ”...”. If the value is nil no truncation is performed. A negative value will truncate the specified number of bytes from the end.
Example Heka Configuration
[Sync-1_5-SlowQuery]
type = "LogstreamerInput"
log_directory = "/var/log/mysql"
file_match = 'mysql-slow\.log'
parser_type = "regexp"
delimiter = "\n(# User@Host:)"
delimiter_location = "start"
decoder = "MySqlSlowQueryDecoder"
[MySqlSlowQueryDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/mysql_slow_query.lua"
[MySqlSlowQueryDecoder.config]
truncate_sql = 64
Example Heka Message
Timestamp: | 2014-05-07 15:51:28 -0700 PDT |
---|---|
Type: | mysql.slow-query |
Hostname: | 127.0.0.1 |
Pid: | 0 |
UUID: | 5324dd93-47df-485b-a88e-429f0fcd57d6 |
Logger: | Sync-1_5-SlowQuery |
Payload: | /* [queryName=FIND_ITEMS] */ SELECT bso.userid, bso.collection, ... |
EnvVersion: | |
Severity: | 7 |
Fields: | name:”Rows_examined” value_type:DOUBLE value_double:16458
name:”Query_time” value_type:DOUBLE representation:”s” value_double:7.24966
name:”Rows_sent” value_type:DOUBLE value_double:5001
name:”Lock_time” value_type:DOUBLE representation:”s” value_double:0.047038
|
Parses the Nginx access logs based on the Nginx ‘log_format’ configuration directive.
Config:
The ‘log_format’ configuration directive from the nginx.conf. $time_local or $time_iso8601 variable is converted to the number of nanosecond since the Unix epoch and used to set the Timestamp on the message. http://nginx.org/en/docs/http/ngx_http_log_module.html
Sets the message ‘Type’ header to the specified value
Transform the http_user_agent into user_agent_browser, user_agent_version, user_agent_os.
Always preserve the http_user_agent value if transform is enabled.
Only preserve the http_user_agent value if transform is enabled and fails.
Always preserve the original log line in the message payload.
Example Heka Configuration
[TestWebserver]
type = "LogstreamerInput"
log_directory = "/var/log/nginx"
file_match = 'access\.log'
decoder = "CombinedLogDecoder"
[CombinedLogDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/nginx_access.lua"
[CombinedLogDecoder.config]
type = "combined"
user_agent_transform = true
# combined log format
log_format = '$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"'
Example Heka Message
Timestamp: | 2014-01-10 07:04:56 -0800 PST |
---|---|
Type: | combined |
Hostname: | test.example.com |
Pid: | 0 |
UUID: | 8e414f01-9d7f-4a48-a5e1-ae92e5954df5 |
Logger: | TestWebserver |
Payload: | |
EnvVersion: | |
Severity: | 7 |
Fields: | name:”remote_user” value_string:”-“
name:”http_x_forwarded_for” value_string:”-“
name:”http_referer” value_string:”-“
name:”body_bytes_sent” value_type:DOUBLE representation:”B” value_double:82
name:”remote_addr” value_string:”62.195.113.219” representation:”ipv4”
name:”status” value_type:DOUBLE value_double:200
name:”request” value_string:”GET /v1/recovery_email/status HTTP/1.1”
name:”user_agent_os” value_string:”FirefoxOS”
name:”user_agent_browser” value_string:”Firefox”
name:”user_agent_version” value_type:DOUBLE value_double:29
|
Parses the Nginx error logs based on the Nginx hard coded internal format.
Config:
The conversion actually happens on the Go side since there isn’t good TZ support here.
Example Heka Configuration
[TestWebserverError]
type = "LogstreamerInput"
log_directory = "/var/log/nginx"
file_match = 'error\.log'
decoder = "NginxErrorDecoder"
[NginxErrorDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/nginx_error.lua"
[NginxErrorDecoder.config]
tz = "America/Los_Angeles"
Example Heka Message
Timestamp: | 2014-01-10 07:04:56 -0800 PST |
---|---|
Type: | nginx.error |
Hostname: | trink-x230 |
Pid: | 16842 |
UUID: | 8e414f01-9d7f-4a48-a5e1-ae92e5954df5 |
Logger: | TestWebserverError |
Payload: | using inherited sockets from “6;” |
EnvVersion: | |
Severity: | 5 |
Fields: | name:”tid” value_type:DOUBLE value_double:0
name:”connection” value_type:DOUBLE value_double:8878
|
Parses the rsyslog output using the string based configuration template.
Config:
The ‘template’ configuration string from rsyslog.conf. http://rsyslog-5-8-6-doc.neocities.org/rsyslog_conf_templates.html
If your rsyslog timestamp field in the template does not carry zone offset information, you may set an offset to be applied to your events here. Typically this would be used with the “Traditional” rsyslog formats.
Parsing is done by Go, supports values of “UTC”, “Local”, or a location name corresponding to a file in the IANA Time Zone database, e.g. “America/New_York”.
Example Heka Configuration
[RsyslogDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/rsyslog.lua"
[RsyslogDecoder.config]
type = "RSYSLOG_TraditionalFileFormat"
template = '%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n'
tz = "America/Los_Angeles"
Example Heka Message
Timestamp: | 2014-02-10 12:58:58 -0800 PST |
---|---|
Type: | RSYSLOG_TraditionalFileFormat |
Hostname: | trink-x230 |
Pid: | 0 |
UUID: | e0eef205-0b64-41e8-a307-5772b05e16c1 |
Logger: | RsyslogInput |
Payload: | “imklog 5.8.6, log source = /proc/kmsg started.” |
EnvVersion: | |
Severity: | 7 |
Fields: | name:”programname” value_string:”kernel”
|
Stores the last alert time in the global _LAST_ALERT so alert throttling will persist between restarts.
Queue an alert message to be sent.
Send an alert message.
Sends all queued alert message as a single message.
Sets the minimum duration between alert event outputs.
Test to see if sending an alert at this time would be throttled.
Note
Use a zero timestamp to override message throttling.
Create an annotation in the global _ANNOTATIONS table.
Helper function to create an annotation table but not add it to the global list of annotations.
Concatenates an array of annotation tables to the specified key in the global _ANNOTATIONS table.
prune(name, ns)
- Arguments
- name (string) circular buffer payload name.
- ns (int64) current time in nanoseconds since the UNIX epoch.
- Return
- The json encoded list of annotations.
Entirely remove the payload name from the global _ANNOTATIONS table.
set_prune(name, ns_duration)
- Arguments
- name (string) circular buffer payload name.
- ns_duration (int64) time in nanoseconds the annotation should remain in the list.
- Return
- none
Parses the anomaly_config into a Lua table. If the configuration is invalid an error is thrown.
The configuration can specify any number of algorithm function calls (space delimited if desired, but they will also work back to back with no delimiter). This allows for analysis of multiple graphs, columns, and even specification of multiple algorithms per column.
Rate of change test
Only use this test on data with a normal (Gaussian http://en.wikipedia.org/wiki/Normal_distribution) distribution. It identifies rapid changes (spikes) in the data (increasing and decreasing) but ignores cyclic data that has a more gradual rise and fall. It is typically used for something like HTTP 200 status code analysis to detect a sudden increase/decrease in web traffic.
Quoted string containing the payload_name value used in the inject_payload function call. If the payload name contains a double quote it should be escaped as two double quotes in a row.
The circular buffer column to perform the analysis on.
The number of intervals in an analysis window.
The number of intervals in the historical analysis window (0 uses the full history). Must be greater than or equal to ‘win’.
The standard deviation threshold to trigger the anomaly.
Alert if data stops.
Alert if data starts.
e.g. roc(“Output1”, 1, 15, 0, 2, true, false)
Mann-Whitney-Wilcoxon test http://en.wikipedia.org/wiki/Mann-Whitney
Parametric
Only use this test on data with a normal (Gaussian http://en.wikipedia.org/wiki/Normal_distribution) distribution. It identifies more gradual changes in the data (increasing, decreasing, or any). It is typically used with something like server memory analysis where the values are more stable and gradual changes are interesting (e.g., memory leak).
Quoted string containing the payload_name value used in the inject_payload function call. If the payload name contains a double quote it should be escaped as two double quotes in a row.
The circular buffer column to perform the analysis on.
The number of intervals in an analysis window (should be at least 20).
The number of analysis windows to compare.
The pvalue threshold to trigger the prediction. http://en.wikipedia.org/wiki/P_value
(decreasing|increasing|any)
e.g. mww(“Output1”, 2, 60, 10, 0.0001, decreasing)
Non-parametric
This test can be used on data with a normal (Gaussian http://en.wikipedia.org/wiki/Normal_distribution) or non-normal (nonparametric http://en.wikipedia.org/wiki/Nonparametric_statistics) distribution. It identifies overlap/similarities between two data sets. It is typically used for something like detecting an increase in HTTP 500 status code errors.
Quoted string containing the payload_name value used in the inject_payload function call. If the payload name contains a double quote it should be escaped as two double quotes in a row.
The circular buffer column to perform the analysis on.
The number of intervals in an analysis window.
The number of analysis windows to compare.
Value between 0 and 1. Anything above 0.5 is an increasing trend anything below 0.5 is a decreasing trend. http://en.wikipedia.org/wiki/Mann-Whitney#.CF.81_statistic
e.g. mww_nonparametric(“Output1”, 2, 15, 10, 0.55)
Detects anomalies in the circular buffer data returning any error messages for alert generation and array of annotations for the graph.
bulkapi_index_json(index, type_name, id, ns)
Returns a simple JSON ‘index’ structure satisfying the ElasticSearch BulkAPI
- Arguments
- index (string or nil)
String to use as the _index key’s value in the generated JSON, or nil to omit the key. Supports field interpolation as described below.
- type_name (string or nil)
String to use as the _type key’s value in the generated JSON, or nil to omit the key. Supports field interpolation as described below.
- id (string or nil)
String to use as the _id key’ value in the generated JSON, or nil to omit the key. Supports field interpolation as described below.
- ns (number or nil)
Nanosecond timestamp to use for any strftime field interpolation into the above fields. Current system time will be used if nil.
Field interpolation
Data from the current message can be interpolated into any of the string arguments listed above. A %{} enclosed field name will be replaced by the field value from the current message. Supported default field names are “Type”, “Hostname”, “Pid”, “UUID”, “Logger”, “EnvVersion”, and “Severity”. Any other values will be checked against the defined dynamic message fields. If no field matches, then a C strftime (on non-Windows platforms) or C89 strftime (on Windows) time substitution will be attempted, using the nanosecond timestamp (if provided) or the system clock (if not).
- Return
- JSON string suitable for use as ElasticSearch BulkAPI index directive.
Read the LPeg reference
Do not use parentheses around function calls that take a single string argument.
-- prefer
lpeg.P"Literal"
-- instead of
lpeg.P("Literal")
local date_month = lpeg.P"0" * lpeg.R"19"
+ "1" * lpeg.R"02"
-- The exception: when grouping alternates together in a higher level grammar.
local log_grammar = (rfc3339 + iso8601) * log_severity * log_message
-- prefer
lpeg.digit
-- instead of
lpeg.R"09".
-- prefer
lpeg.digit * "Test"
-- instead of
lpeg.digit * lpeg.P"Test"
The sandbox filter provides an isolated execution environment for data analysis. Any output generated by the sandbox is injected into the payload of a new message for further processing or to be output.
Config:
Example:
[hekabench_counter]
type = "SandboxFilter"
message_matcher = "Type == 'hekabench'"
ticker_interval = 1
filename = "counter.lua"
preserve_data = true
profile = false
[hekabench_counter.config]
rows = 1440
sec_per_row = 60
Collects the circular buffer delta output from multiple instances of an upstream sandbox filter (the filters should all be the same version at least with respect to their cbuf output). The purpose is to recreate the view at a larger scope in each level of the aggregation i.e., host view -> datacenter view -> service level view.
Config:
Specifies whether or not this aggregator should generate cbuf deltas.
A list of anomaly detection specifications. If not specified no anomaly detection/alerting will be performed.
If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the enable_delta configuration is changed to prevent the plugin from failing to start during data restoration.
Example Heka Configuration
[TelemetryServerMetricsAggregator]
type = "SandboxFilter"
message_matcher = "Logger == 'TelemetryServerMetrics' && Fields[payload_type] == 'cbufd'"
ticker_interval = 60
filename = "lua_filters/cbufd_aggregator.lua"
preserve_data = true
[TelemetryServerMetricsAggregator.config]
enable_delta = false
anomaly_config = 'roc("Request Statistics", 1, 15, 0, 1.5, true, false)'
preservation_version = 0
Collects the circular buffer delta output from multiple instances of an upstream sandbox filter (the filters should all be the same version at least with respect to their cbuf output). Each column from the source circular buffer will become its own graph. i.e., ‘Error Count’ will become a graph with each host being represented in a column.
Config:
Pre-allocates the number of host columns in the graph(s). If the number of active hosts exceed this value, the plugin will terminate.
The number of rows to keep from the original circular buffer. Storing all the data from all the hosts is not practical since you will most likely run into memory and output size restrictions (adjust the view down as necessary).
The amount of time a host has to be inactive before it can be replaced by a new host.
If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the max_hosts or rows configuration is changed to prevent the plugin from failing to start during data restoration.
Example Heka Configuration
[TelemetryServerMetricsHostAggregator]
type = "SandboxFilter"
message_matcher = "Logger == 'TelemetryServerMetrics' && Fields[payload_type] == 'cbufd'"
ticker_interval = 60
filename = "lua_filters/cbufd_host_aggregator.lua"
preserve_data = true
[TelemetryServerMetricsHostAggregator.config]
max_hosts = 5
rows = 60
host_expiration = 120
preservation_version = 0
Calculates the most frequent items in a data stream.
Config:
The message variable name containing the items to be counted.
The maximum size of the sample set (higher will produce a more accurate list).
Used to reduce the long tail output by only outputting the higher frequency items.
Resets the list after the specified number of days (on the UTC day boundary). A value of 0 will never reset the list.
Example Heka Configuration
[FxaAuthServerFrequentIP]
type = "SandboxFilter"
filename = "lua_filters/frequent_items.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Logger == 'nginx.access' && Type == 'fxa-auth-server'"
[FxaAuthServerFrequentIP.config]
message_variable = "Fields[remote_addr]"
max_items = 10000
min_output_weight = 100
reset_days = 1
Graphs the Heka memory statistics using the heka.memstat message generated by pipeline/report.go.
Config:
Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.
Sets the size of each bucket (resolution in seconds) in the sliding window.
If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the rows or sec_per_row configuration is changed to prevent the plugin from failing to start during data restoration.
Example Heka Configuration
[HekaMemstat]
type = "SandboxFilter"
filename = "lua_filters/heka_memstat.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Type == 'heka.memstat'"
Generates documentation for each unique message in a data stream. The output is a hierarchy of Logger, Type, EnvVersion, and a list of associated message field attributes including their counts (number in the brackets). This plugin is meant for data discovery/exploration and should not be left running on a production system.
Config:
<none>
Example Heka Configuration
[SyncMessageSchema]
type = "SandboxFilter"
filename = "lua_filters/heka_message_schema.lua"
ticker_interval = 60
preserve_data = false
message_matcher = "Logger =~ /^Sync/"
Example Output
Monitors Heka’s process message failures by plugin.
Config:
A list of anomaly detection specifications. If not specified a default of ‘mww_nonparametric(“DEFAULT”, 1, 5, 10, 0.7)’ is used. The “DEFAULT” settings are applied to any plugin without an explict specification.
Example Heka Configuration
[HekaProcessMessageFailures]
type = "SandboxFilter"
filename = "lua_filters/heka_process_message_failures.lua"
ticker_interval = 60
preserve_data = false # the counts are reset on Heka restarts and the monitoring should be too.
message_matcher = "Type == 'heka.all-report'"
Graphs HTTP status codes using the numeric Fields[status] variable collected from web server access logs.
Config:
Sets the size of each bucket (resolution in seconds) in the sliding window.
Sets the size of the sliding window i.e., 1440 rows representing 60 seconds per row is a 24 sliding hour window with 1 minute resolution.
If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the sec_per_row or rows configuration is changed to prevent the plugin from failing to start during data restoration.
Example Heka Configuration
[FxaAuthServerHTTPStatus]
type = "SandboxFilter"
filename = "lua_filters/http_status.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Logger == 'nginx.access' && Type == 'fxa-auth-server'"
[FxaAuthServerHTTPStatus.config]
sec_per_row = 60
rows = 1440
anomaly_config = 'roc("HTTP Status", 2, 15, 0, 1.5, true, false) roc("HTTP Status", 4, 15, 0, 1.5, true, false) mww_nonparametric("HTTP Status", 5, 15, 10, 0.8)'
preservation_version = 0
Converts stat values extracted from statmetric messages (see StatAccumInput) to circular buffer data and periodically emits messages containing this data to be graphed by a DashboardOutput. Note that this filter expects the stats data to be available in the message fields, so the StatAccumInput must be configured with emit_in_fields set to true for this filter to work correctly.
Config:
Title for the graph output generated by this filter.
The number of rows to store in our circular buffer. Each row represents one time interval.
The number of seconds in each circular buffer time interval.
Space separated list of stat names. Each specified stat will be expected to be found in the fields of the received statmetric messages, and will be extracted and inserted into its own column in the accumulated circular buffer.
Space separated list of header label names to use for the extracted stats. Must be in the same order as the specified stats. Any label longer than 15 characters will be truncated.
Anomaly detection configuration, see Anomaly Detection Module.
If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time any edits are made to your rows, sec_per_row, stats, or stat_labels values, or else Heka will fail to start because the preserved data will no longer match the filter’s data structure.
Example Heka Configuration
[stat-graph]
type = "SandboxFilter"
filename = "lua_filters/stat_graph.lua"
ticker_interval = 10
preserve_data = true
message_matcher = "Type == 'heka.statmetric'"
[stat-graph.config]
title = "Hits and Misses"
rows = 1440
sec_per_row = 10
stats = "stats.counters.hits.count stats.counters.misses.count"
stat_labels = "hits misses"
anomaly_config = 'roc("Hits and Misses", 1, 15, 0, 1.5, true, false) roc("Hits and Misses", 2, 15, 0, 1.5, true, false)'
preservation_version = 0
Counts the number of unique items per day e.g. active daily users by uid.
Config:
The Heka message variable containing the item to be counted.
The graph title for the cbuf output.
Specifies whether or not this plugin should generate cbuf deltas. Deltas should be enabled when sharding is used; see: Circular Buffer Delta Aggregator.
If preserve_data = true is set in the SandboxFilter configuration, then this value should be incremented every time the enable_delta configuration is changed to prevent the plugin from failing to start during data restoration.
Example Heka Configuration
[FxaActiveDailyUsers]
type = "SandboxFilter"
filename = "lua_filters/unique_items.lua"
ticker_interval = 60
preserve_data = true
message_matcher = "Logger == 'FxaAuth' && Type == 'request.summary' && Fields[path] == '/v1/certificate/sign' && Fields[errno] == 0"
[FxaActiveDailyUsers.config]
message_variable = "Fields[uid]"
title = "Estimated Active Daily Users"
preservation_version = 0
The SandboxEncoder provides an isolated execution environment for converting messages into binary data without the need to recompile Heka. See Sandbox.
Config:
Example
[custom_json_encoder]
type = "SandboxEncoder"
filename = "path/to/custom_json_encoder.lua"
[custom_json_encoder.config]
msg_fields = ["field1", "field2"]
Produces more human readable alert messages.
Config:
<none>
Example Heka Configuration
[FxaAlert]
type = "SmtpOutput"
message_matcher = "((Type == 'heka.sandbox-output' && Fields[payload_type] == 'alert') || Type == 'heka.sandbox-terminated') && Logger =~ /^Fxa/"
send_from = "heka@example.com"
send_to = ["alert@example.com"]
auth = "Plain"
user = "test"
password = "testpw"
host = "localhost:25"
encoder = "AlertEncoder"
[AlertEncoder]
type = "SandboxEncoder"
filename = "lua_encoders/alert.lua"
Example Output
Timestamp: | 2014-05-14T14:20:18Z |
---|---|
Hostname: | ip-10-226-204-51 |
Plugin: | FxaBrowserIdHTTPStatus |
Alert: | HTTP Status - algorithm: roc col: 1 msg: detected anomaly, standard deviation exceeds 1.5 |
Prepends ElasticSearch BulkAPI index JSON to a message payload.
Config:
String to use as the _index key’s value in the generated JSON. Supports field interpolation as described below.
String to use as the _type key’s value in the generated JSON. Supports field interpolation as described below.
String to use as the _id key’s value in the generated JSON. Supports field interpolation as described below.
If true, then any time interpolation (often used to generate the ElasticSeach index) will use the timestamp from the processed message rather than the system time.
Field interpolation:
Data from the current message can be interpolated into any of the string arguments listed above. A %{} enclosed field name will be replaced by the field value from the current message. Supported default field names are “Type”, “Hostname”, “Pid”, “UUID”, “Logger”, “EnvVersion”, and “Severity”. Any other values will be checked against the defined dynamic message fields. If no field matches, then a C strftime (on non-Windows platforms) or C89 strftime (on Windows) time substitution will be attempted.
Example Heka Configuration
[es_payload]
type = "SandboxEncoder"
filename = "lua_encoders/es_payload.lua"
[es_payload.config]
es_index_from_timestamp = true
index = "%{Logger}-%{%Y.%m.%d}"
type_name = "%{Type}-%{Hostname}"
[ElasticSearchOutput]
message_matcher = "Type == 'mytype'"
encoder = "es_payload"
Example Output
{"index":{"_index":"mylogger-2014.06.05","_type":"mytype-host.domain.com"}}
{"json":"data","extracted":"from","message":"payload"}
Since decoders cannot be dynamically loaded and they stop Heka processing on fatal errors they must be developed outside of a production enviroment. Most Lua decoders are LPeg based as it is the best way to parse and transform data within the sandbox. The other alternatives are the built-in Lua pattern matcher or the JSON parser with a manual transformation.
Procure some sample data to be used as test input.
timestamp=time_t key1=data1 key2=data2
Configure a simple LogstreamerInput to deliver the data to your decoder.
[LogstreamerInput] log_directory = "." file_match = 'data\.log' decoder = "SandboxDecoder"
Configure your test decoder.
[SandboxDecoder] filename = "decoder.lua"
Configure the DasboardOutput for visibility into the decoder (performance, memory usage, messages processed/failed, etc.)
[DashboardOutput] address = "127.0.0.1:4352" ticker_interval = 10 working_directory = "dashboard" static_directory = "/usr/share/heka/dasher"
Configure a LogOutput to display the generated messages.
[LogOutput] message_matcher = "TRUE"
The decoder will receive a message from an input plugin. The input may have set some additional message headers but the ‘Payload’ header contains the data for the decoder. The decoder can access the payload using read_message(“Payload”). The payload can be used to construct an entirely new message, multiple messages or modify any part of the existing message (see inject_message, write_message in the Lua Sandbox API). Message headers not modified by the decoder are left intact and in the case of multiple message injections the initial message header values are duplicated for each message.
Incrementally build and test your grammar using http://lpeg.trink.com.
Test match expressions using http://www.lua.org/cgi-bin/demo.
For data transformation use the LPeg/Lua matcher links above. Something like simple field remapping i.e. msg.Hostname = json.host can be verified in the LogOutput.
Run Heka with the test configuration.
Inspect/verify the messages written by LogOutput.
Since filters can be dynamically loaded it is recommended you develop them in production with live data.
OR
If you are developing the filter in conjunction with the decoder you can add it to the test configuration.
[SandboxFilter] filename = "filter.lua"
Debugging
Watch for a dashboard sandbox termination report. The termination message provides the line number and cause of the failure. These are usually straight forward to correct and commonly caused by a syntax error in the script or invalid assumptions about the data (e.g. cnt = cnt + read_message(“Fields[counter]”) will fail if the counter field doesn’t exist or is non-numeric due to a error in the data).
No termination report and the output does not match expectations. These are usually a little harder to debug.
- Check the Heka dasboard to make sure the router is sending messages to the plugin. If not, verify your message_matcher configuration.
- Visually review the the plugin for errors. Are the message field names correct, was the result of the cjson.decode tested, are the output variables actually being assigned to and output/injected, etc.
- Add a debug output message with the pertinent information.
require "string" require "table" local dbg = {} -- table.insert(dbg, string.format("Entering function x arg1: %s", arg1)) -- table.insert(dbg, "Exiting function x") inject_payload("txt", "debug", table.concat(dbg, "\n"))
- LAST RESORT: Move the filter out of production, turn on preservation, run the tests, stop Heka, and review the entire preserved state of the filter.