mozilla

AMQP InputΒΆ

Plugin Name: AMQPInput

Connects to a remote AMQP broker (RabbitMQ) and retrieves messages from the specified queue. As AMQP is dynamically programmable, the broker topology needs to be specified in the plugin configuration.

Config:

  • url (string):

    An AMQP connection string formatted per the RabbitMQ URI Spec.

  • exchange (string):

    AMQP exchange name

  • exchange_type (string):

    AMQP exchange type (fanout, direct, topic, or headers).

  • exchange_durability (bool):

    Whether the exchange should be configured as a durable exchange. Defaults to non-durable.

  • exchange_auto_delete (bool):

    Whether the exchange is deleted when all queues have finished and there is no publishing. Defaults to auto-delete.

  • routing_key (string):

    The message routing key used to bind the queue to the exchange. Defaults to empty string.

  • prefetch_count (int):

    How many messages to fetch at once before message acks are sent. See RabbitMQ performance measurements for help in tuning this number. Defaults to 2.

  • queue (string):

    Name of the queue to consume from, an empty string will have the broker generate a name for the queue. Defaults to empty string.

  • bind_queue (string):

    Whether the queue should be explicitly bound to the exchange. Not all exchanges require the consumer to define and bind their own queue. Defaults to true.

  • queue_durability (bool):

    Whether the queue is durable or not. Defaults to non-durable.

  • queue_exclusive (bool):

    Whether the queue is exclusive (only one consumer allowed) or not. Defaults to non-exclusive.

  • queue_auto_delete (bool):

    Whether the queue is deleted when the last consumer un-subscribes. Defaults to auto-delete.

  • queue_ttl (int):

    Allows ability to specify TTL in milliseconds on Queue declaration for expiring messages. Defaults to undefined/infinite.

  • retries (RetryOptions, optional):

    A sub-section that specifies the settings to be used for restart behavior. See Configuring Restarting Behavior

New in version 0.6.

  • tls (TlsConfig):

    An optional sub-section that specifies the settings to be used for any SSL/TLS encryption. This will only have any impact if URL uses the AMQPS URI scheme. See Configuring TLS.

New in version 0.9.

  • read_only (bool):

    Whether the AMQP user is read-only. If this is true the exchange, queue and binding must be declared before starting Heka. Defaults to false.

Since many of these parameters have sane defaults, a minimal configuration to consume serialized messages would look like:

[AMQPInput]
url = "amqp://guest:guest@rabbitmq/"
exchange = "testout"
exchange_type = "fanout"

Or you might use a PayloadRegexDecoder to parse OSX syslog messages with the following:

[AMQPInput]
url = "amqp://guest:guest@rabbitmq/"
exchange = "testout"
exchange_type = "fanout"
decoder = "logparser"

[logparser]
type = "MultiDecoder"
subs = ["logline", "leftovers"]

[logline]
type = "PayloadRegexDecoder"
MatchRegex = '\w+ \d+ \d+:\d+:\d+ \S+ (?P<Reporter>[^\[]+)\[(?P<Pid>\d+)](?P<Sandbox>[^:]+)?: (?P Remaining>.*)'

    [logline.MessageFields]
    Type = "amqplogline"
    Hostname = "myhost"
    Reporter = "%Reporter%"
    Remaining = "%Remaining%"
    Logger = "%Logger%"
    Payload = "%Remaining%"

[leftovers]
type = "PayloadRegexDecoder"
MatchRegex = '.*'

    [leftovers.MessageFields]
    Type = "drop"
    Payload = ""

If the downstream heka messages are delimited by Heka’s Stream Framing, you will need to specify “HekaFramingSplitter” as the AMQPInput splitter. An example would look like:

[rsyslog-mq-input]
type = "AMQPInput"
url = "amqp://guest:guest@rabbitmq/"
exchange = "system-logs"
exchange_type = "topic"
exchange_durability = true
exchange_auto_delete = false
routing_key = "system.rsyslog"
queue = "rsyslog-logs"
queue_durability = true
queue_auto_delete = false
prefetch_count = 20
decoder = "rsyslog-multidecoder"
splitter = "HekaFramingSplitter"

    [rsyslog-mq-input.retries]
    max_delay = "180s"
    delay = "30s"
    max_retries = -1

[rsyslog-multidecoder]
type = "MultiDecoder"
subs = ["ProtobufDecoder", "rsyslog-decoder"]
cascade_strategy = "all"
log_sub_errors = true

[ProtobufDecoder]

[rsyslog-decoder]
type = "SandboxDecoder"
filename = "/usr/share/heka/lua_decoders/rsyslog.lua"

    [rsyslog-decoder.config]
    hostname_keep = true
    template = '%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n'