mozilla

Circular Buffer Graph Annotation (Alerts)ΒΆ

This plugin detects anomalies in the data. When an anomaly is detected an alert is generated and the graph is visually annotated at the time of the alert. See dygraphs Annotations for the available annotation properties.

-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.

--[[
Collects the circular buffer delta output from multiple instances of an upstream
sandbox filter (the filters should all be the same version at least with respect
to their cbuf output). The purpose is to recreate the view at a larger scope in
each level of the aggregation i.e., host view -> datacenter view -> service
level view.

Config:

- enable_delta (bool, optional, default false)
    Specifies whether or not this aggregator should generate cbuf deltas.

- alert_rows (uint, optional, default 15)
    Specifies the size of the rolling average windows to compare. The
    window cannot be more than one third of the entire circular buffer.

- alert_cols (JSON string, optional)
    An array of JSON objects consisting of a 'col' number and a 'deviation'
    alert threshold.  If not specified no anomaly detection/alerting will
    be performed.

*Example Heka Configuration*

.. code-block:: ini

    [TelemetryServerMetricsAggregator]
    type = "SandboxFilter"
    message_matcher = "Logger == 'TelemetryServerMetrics' && Fields[payload_type] == 'cbufd'"
    ticker_interval = 60
    script_type = "lua"
    filename = "lua_filters/cbufd_aggregator.lua"
    preserve_data = true

    [TelemetryServerMetricsAggregator.config]
    enable_delta = false
    alert_rows = 15
    alert_cols =  '[{"col":1, "deviation":2}]'
--]]

local cbufd = require "cbufd"
require "circular_buffer"
require "cjson"
require "math"
require "string"
require "table"

local enable_delta = read_config("enable_delta") or false
local alert_rows = read_config("alert_rows") or 15
local alert_cols = read_config("alert_cols")
if alert_cols then
    alert_cols = cjson.decode(alert_cols)
end

cbufs = {}

local function init_cbuf(payload_name, data)
    local h = cjson.decode(data.header)
    if not h then
        return nil
    end

    local cb = circular_buffer.new(h.rows, h.columns, h.seconds_per_row, enable_delta)
    for i,v in ipairs(h.column_info) do
        cb:set_header(i, v.name, v.unit, v.aggregation)
    end

    cbufs[payload_name] = {header = h, cbuf = cb, annotations = {}, last_alert = 0}
    return cbufs[payload_name]
end

local alert_message = ""

-- at the moment this is only useful for non-sparse data
local function detect_anomaly(ns, k, v, cols)
    if alert_rows * 3 >= v.header.rows then
        error("alert_rows cannot be more than one third of the circular buffer")
    end
    v.cbuf:add(ns, 1, 0) -- always advance the buffer/graph

    local interval = 1e9 * v.header.seconds_per_row
    local sliding_window = interval * alert_rows
    local previous_window = ns - sliding_window * 2
    local current_window = ns - sliding_window

    for i, c in ipairs(cols) do
        -- Anomaly detection
        -- Compute the average of the last X intervals and the X intervals before
        -- that and compare the difference against the historical standard deviation.
        -- The current interval is not included since it is incomplete and can skew
        -- the stats.
        local historical_sd, hsamples = v.cbuf:compute("sd" , c.col, nil, previous_window - interval)
        if hsamples >= alert_rows then
            local previous_avg, psamples  = v.cbuf:compute("avg", c.col, previous_window, current_window - interval)
            local current_avg, csamples   = v.cbuf:compute("avg", c.col, current_window , ns - interval)

            -- if any sample window doesn't have data an anomaly will not be detected
            -- todo we need to add support for sparse data i.e. failure counts

            -- special case the loss of data anomaly for now
            local loss_of_data = (psamples > 0 and csamples == 0)

            local delta = math.abs(current_avg - previous_avg)
            if delta > historical_sd * c.deviation -- anomaly detected
            and ns - v.last_alert > sliding_window -- hasn't already alerted in the sliding window
            or loss_of_data then
                for m, n in ipairs(v.annotations) do -- clean out old alerts
                    if n.x < (ns - interval * v.header.rows)/1e6 then
                        v.annotations[m] = nil
                    else
                        break
                    end
                end

                v.last_alert = ns - ns % interval
                local msg
                if loss_of_data then
                    msg = string.format("Column %d hasn't received any new data", c.col)
                else
                    msg = string.format("Column %d has fluctuated more than %G standard deviations", c.col, c.deviation)
                end

                table.insert(v.annotations, {x = math.floor(v.last_alert/1e6),
                    col        = c.col,
                    shortText  = "A",
                    text       = msg})

                -- consolidate all alerts into a single message
                alert_message = alert_message .. string.format("%s - %s\n", k, msg)
            end
        end
    end

    output({annotations = v.annotations}, v.cbuf)
    inject_message("cbuf", k)
end

function process_message ()
    local payload = read_message("Payload")
    local payload_name = read_message("Fields[payload_name]") or ""
    local data = cbufd.grammar:match(payload)
    if not data then
        return -1
    end

    local cb = cbufs[payload_name]
    if not cb then
        cb = init_cbuf(payload_name, data)
        if not cb then
            return -1
        end
    end

    for i,v in ipairs(data) do
        for col, value in ipairs(v) do
            if value == value then -- only aggregrate numbers
                local agg = cb.header.column_info[col].aggregation
                if  agg == "sum" then
                    cb.cbuf:add(v.time, col, value)
                elseif agg == "min" or agg == "max" then
                    cb.cbuf:set(v.time, col, value)
                end
            end
        end
    end
    return 0
end

function timer_event(ns)
    for k,v in pairs(cbufs) do
        if alert_cols then
            alert_message = ""
            detect_anomaly(ns, k, v, alert_cols)
        else
            inject_message(v.cbuf, k)
        end
        if enable_delta then
            inject_message(v.cbuf:format("cbufd"), k)
        end
    end

    if alert_message ~= "" then
        output(alert_message)
        inject_message("alert")
    end
end