Fluent Bit’s processors were introduced in version 3. But we have seen the capability continue to advance, with the introduction of the ability to conditionally execute processors. Until the ability to define conditionality directly with the processor, the control has had to be fudged around (putting processors further downstream and controlling it with filters and routing, etc).
In the Logs and Telemetry book, we go through the basic Processor capabilities and constraints, such as:
- Configuration file constraints
- Performance differences compared to using filter plugins
So we’re not going to revisit those points here.
We can chain processors to run in a sequence by directly defining the processors in the order in which they need to be executed. You can see the chaining in the example below.
Scenario
In the following configuration, we have created several input plugins using both the Dummy plugin and the HTTP plugin. Using the HTTP plugin makes it easy for us to control execution speed and change test data values, which helps us see different filter behaviours. To make life easy we’ve provided several payloads, and a simple caller.[bat|sh] script which takes a single parameter [1, 2, 3, …] which identifies the payload file to send.
All of these resources are available in the Book’s GitHub repository as part of the extras folder, which can be found here. This saves us from embedding everything into this blog.
Filters as Processors and Chaining
Filters can be used as processors as well as the dedicated processor types such as SQL asnd content modifier. The Filters just need to be referenced using the name attribute e.g. name: regex would use the REGEX filter.
Processor Conditions
If you’re familiar with Kubernetes selectors syntax, then the definition conditions for a processor will feel familiar. The condition is made up of a condition (#–A–) and the condition will contain one or more rules (#–B–). The rule defines an expression which will yield a Boolean outcome by identifying:
- An element of the payload (for example, a field/element in the log structure, or a metric, etc).
- The value to evaluate the field against
- The evaluation operator, which can be one of the typical operators e.g eq (equals), (see below for the full list).
Since the rules are a list, you can include as many as needed. The condition, in addition to the rule , has its own operator (#–C–), which tells the condition how to combine the results of each of the rules together. As we need a Boolean value, we can only use a logical and or a logical or. When we have a single rule then the operation is tested with itself.
In the following example, we have two inputs with processors to help demonstrate the different behavior. In the dummy source, we can see how a nested element can be accessed (i.e. $<element name>[‘<child element name>‘] ), performing a string comparison. Here we’re using a normal filter plugin as a processor.
With our HTTP source, we’re demonstrating that we can have two processors with their own conditions. The first processor is interesting, as it illustrates an exception to the convention; we can express conditionality within the Lua code (#–D–), but it ignores the condition construct. It is obviously debatable as to the value of a condition for the Lua processor, but it is worth considering, as there is an overhead when calling the LuaJIT if the condition can be quickly resolved internally.
service:
flush: 1
log_level: debug
pipeline:
inputs:
- name: dummy
dummy: '{"request": {"method": "GET", "path": "/api/v1/resource"}}'
tag: request.log
Interval_sec: 60
processors:
logs:
- name: content_modifier
action: insert
key: content_modifier_processor
value: true
condition: #--A--
op: and #--C--
rules: #--B--
- field: $request['method']"
op: eq
value: "GET"
- name: http
port: 9881
listen: 0.0.0.0
successful_response_code: 201
success_header: x-fluent-bit received
tag: http
tag_key: token
processors:
logs:
- name: lua
call: modify
code: |
function modify(tag, timestamp, record)
new_record = record
new_record["conditional"] = "condition-triggered"
return 1, timestamp, new_record
end
condition: #--D--
op: and
rules:
- field: "$classifier"
op: eq
value: "1"
- name: content_modifier
action: insert
key: content_modifier_processor2
value: true
condition:
op: and
rules:
- field: "$classifier"
op: eq
value: "2"
- name: sql
query: "SELECT token, classifier FROM STREAM;"
condition:
op: and
rules:
- field: "$classifier"
op: eq
value: "3"
outputs:
- name: stdout
match: "*"
To run the demonstration, we’ve provided several test payloads and a simple script that will call the Fluent Bit HTTP input plugin with the correct file. We just need to pass the number associated with the log file e.g. <Log>1<.json> is caller.[bat|sh] 1, and so on. The script is a variation of:
set fn=log%1%.json
echo %fn
curl -X POST --location 127.0.0.1:9881 --header Content-Type:application/json --data @%fn%
An example of one of the test payloads:
{"msg" : "dynamic tag", "helloTo" : "the World", "classifier" : 1, "token": "token1"}
Conclusion
Once you’ve got a measure of the condition structure, making the processors conditional is very easy.
Operators available
| Operator | Greater than ( > ) |
| eq | Equals |
| neq | Not Equals |
| gt | Greater than, or equal to ( >= ) |
| gte | Grather than, or equal to ( >= ) |
| lt | Less than ( < ) |
| lte | Less than or equal to ( =< ) |
| in | Is a value in a defined set (array) e.g. op: in value: [“a”, “b”, “c”] |
| not_in | Is a value not in a defined set (array) e.g. op: not_in value: [“a”, “b”, “c”] |
| regex | Matches the regular expression defined e.g op: regex value: ^a*z |
| not_regex | Does not match the regular expression provided e.g op: not_regex value: ^a*z |