Elastic Logstash
Meter Ingestion via Logstash
Integrating metering data into Amberflo can be done with open source Logstash.
The integration uses the Logstash S3 output plugin and writes the metering records to the Amberflo S3 bucket.
You can use Logstash's rich language for parsing the metering data out of your logs/repositories.
Logstash can extract data from files, Elasticsearch, JDBC, S3, MongoDB and other systems using various input plugins. https://www.elastic.co/guide/en/logstash/current/input-plugins.html
Example - Logstash configuration
In the example below we will share a sample Logstash configuration that reads log lines from a file and writes the relevant metering records to Amberflo's S3 bucket.
We will tail new files locally from /Users/demoaccount/input/*
the sample file format
log line 1
log line 2
myMeter 2 myCustomerId amberflo_meter
log line 3
Logstash will read any new file and transform them using the mutate logic into metering records.
In this example we ignore any line without the string: amberflo_meter. We splits based on the space character " ", and treat the meterApiName, meterValue, cusomterId as the first, second and third object accordingly.
input {
file {
path => "/Users/demoaccount/input/*"
}
}
filter {
if "amberflo_meter" in [message]{
ruby { code => "event.set('time', ((event.get('@timestamp').to_f*1000).to_i).to_s)" }
mutate {
split => { "message" => " " }
add_field => { "meterApiName" => "%{[message][0]}" }
add_field => { "meterValue" => "%{[message][1]}" }
add_field => { "customerId" => "%{[message][2]}" }
remove_field => ["host","message","@version","@timestamp"]
}
}else{
drop {}
}
}
output {
s3{
access_key_id => "XXXXXX"
secret_access_key => "YYYYYYYYYYYYYY"
region => "us-west-2"
bucket => "demo-amberflo"
size_file => 2048
time_file => 5
codec => "json"
canned_acl => "bucket-owner-full-control"
}
}
The resulting file (meter record) will be:
{
"meterApiName": "myMeter",
"meterValue": "2",
"customerId": "myCustomerId",
"time": "1621619742810",
"path": "/Users/demoaccount/input/sample.txt"
}
JDBC source
You can use the Logstash JDBC source to extract the metering info from your repository.
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
input {
jdbc {
statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value"
use_column_value => true
tracking_column => "id"
jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
jdbc_user => "mysql"
schedule => "* * * * *"
# ... other configuration bits
}
}
Elasticsearch input
You can use the Elasticsearch input if you have metering data in your logs.
An example input plugin looks like:
input {
elasticsearch {
hosts => "search-myes-cluster.us-west-2.es.amazonaws.com:443"
index => "cwl--aws-lambda-meter-definition-api-lambda-2021.07"
query => '{ "query": {"bool": {"filter": [{"range": {"@timestamp": {"from": "now-1d/d", "to": "now/d", "include_lower": true, "include_upper": true, "format": "epoch_millis", "boost": 1 } } }, {"query_string": {"query": "*amberflo_meter*", "default_field": "@message", "fields": [], "type": "best_fields", "default_operator": "or", "max_determinized_states": 10000, "enable_position_increments": true, "fuzziness": "AUTO", "fuzzy_prefix_length": 0, "fuzzy_max_expansions": 50, "phrase_slop": 0, "escape": false, "auto_generate_synonyms_phrase_query": true, "fuzzy_transpositions": true, "boost": 1 } } ], "adjust_pure_negative": true, "boost": 1 } }, "aggregations": {} }'
size => 500
scroll => "5m"
docinfo => true
ssl => true
user => "myuser"
password => "mypwd"
#schedule => "*/1 * * * *"
}
}
Updated 3 months ago