Java sample to backfill meters from Postgres

This sample illustrates how to backfill meters from historic data stored in Postgres using the JAVA SDK.

The sample performs the following tasks:

  1. Read historical data from a Postgres table of tasks. The sample uses tasks sent and accepted by customers as meters.
  2. For each row, ingest the task sent or accepted meters into Amberflo depending on the task status. The uniqueId is set during ingestion to prevent duplicate ingestion records.

Maven dependencies (check for latest versions):

<dependencies>
  <dependency>
    <groupId>io.amberflo</groupId>
    <artifactId>metering-java-client</artifactId>
    <version>1.3.6</version>
  </dependency>
  <dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.3.1</version>
  </dependency>
  <dependency>
    <groupId>com.stripe</groupId>
    <artifactId>stripe-java</artifactId>
    <version>20.52.0</version>
  </dependency>
</dependencies>

Java code:

import com.amberflo.metering.ingest.MeteringContext;
import com.amberflo.metering.ingest.meter_message.Domain;
import com.amberflo.metering.ingest.meter_message.MeterMessage;
import com.amberflo.metering.ingest.meter_message.MeterMessageBuilder;
import com.amberflo.metering.ingest.meter_message.Region;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import static com.amberflo.metering.ingest.MeteringContext.metering;

public class TasksPostgresBackfill {
    //TODO: set the database connection, queries, and Amberflo info
    final static String DB_HOST = "sample-host.us-west-2.rds.amazonaws.com";
    final static String DB_NAME = "sampleDb";
    final static String DB_PASS = "sample-pass";
    final static String DB_USER = "sample-user";
    final static String SCHEMA = "sampleSchema";
    final static String COUNT_QUERY = "SELECT count(*)  AS total FROM tasks where status in ('sent', 'accepted');";
    final static String SELECT_QUERY = "SELECT id, tasks_type, user_id, status, email_sent_date, accepted_date FROM tasks where status in ('sent', 'accepted');";
    final static String AMBERFLO_API_KEY = "amberflo-api-key";
    //END TODO

    final static String TASKS_SENT_METER = "tasks_sent";
    final static String TASKS_ACCEPTED_METER = "tasks_accepted";
    static int counter_sent = 0;
    static int counter_accepted = 0;

    public static void main(final String[] args) throws Exception {
        Statement stmt = null;
        Connection con = null;
        ResultSet rs = null;
        try {
            MeteringContext
                    .createOrReplaceContext(
                            AMBERFLO_API_KEY,
                            "tasksManager",
                            Domain.Prod,
                            Region.US_West,
                            1,
                            10
                    );

            final String url = String.format("jdbc:postgresql://%s:5432/%s?currentSchema=%s", DB_HOST, DB_NAME, SCHEMA);
            con = DriverManager.getConnection(url, DB_USER, DB_PASS);
            stmt = con.createStatement();
            rs = stmt.executeQuery(COUNT_QUERY);
            rs.next();
            int total = rs.getInt("total");
            System.out.println("Total tasks sent and accepted = " + total);

            rs = stmt.executeQuery(SELECT_QUERY);
            while (rs.next()) {
                final int tasksId = rs.getInt("id");
                final String customerId = rs.getString("user_id");
                final String tasksType = rs.getString("tasks_type");
                final String status = rs.getString("status");
                final Date emailSentDate = rs.getDate("email_sent_date");
                final Date acceptedDate = rs.getDate("accepted_date");
                System.out.println(String.format("tasksId=%s, status=%s, customerId=%s, tasksType=%s, emailSentDate=%s, acceptedDate=%s",
                        tasksId, status, customerId, tasksType, emailSentDate, acceptedDate));
                System.out.println();
                if ("sent".equals(status)) {
                    ingestMeterForTask(Integer.toString(tasksId), customerId, tasksType, TASKS_SENT_METER, emailSentDate);
                    counter_sent++;
                    if (counter_sent % 100 == 0 && counter_sent > 0) {
                        Thread.sleep(5000);
                    }
                } else if ("accepted".equals(status)) {
                    ingestMeterForTask(Integer.toString(tasksId), customerId, tasksType, TASKS_ACCEPTED_METER, acceptedDate);
                    counter_accepted++;
                    if (counter_accepted % 100 == 0 && counter_accepted > 0) {
                        Thread.sleep(5000);
                    }
                }
            }
            System.out.println("DONE!!! total tasks sent ingested = " + counter_sent);
            System.out.println("DONE!!! total tasks accepted ingested = " + counter_accepted);
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (rs != null) rs.close();
            if (stmt != null) stmt.close();
            if (con != null) con.close();
            MeteringContext.flushAndClose();
        }
    }

    public static void ingestMeterForTask(
            final String tasksId,
            final String customerId,
            final String tasksType,
            final String meterName,
            final Date eventDate) {
        try {

            //java.sql.Date supports only Date components
            final LocalDateTime eventTime =
                    new java.util.Date(eventDate.getTime())
                            .toInstant()
                            .atZone(ZoneId.of("UTC"))
                            .toLocalDateTime();

            final Map<String, String> dimensions = new HashMap();
            dimensions.put("tasksType", tasksType);

            //set uniqueId to prevent duplicate ingested records in case this code
            //is run multiple times
            final MeterMessage meterMessage = MeterMessageBuilder
                    .createInstance(meterName, eventTime, customerId)
                    .setMeterValue(1)
                    .setUniqueId(tasksId)
                    .setDimensionsMap(dimensions)
                    .build();
            metering().meter(meterMessage);

            System.out.println(String.format("Amberflo ingestion: meter %s, task id %s, customerId %s, tasksType %s, eventTime %s",
                    meterName, tasksId, customerId,  dimensions.get("tasksType"), eventTime));
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}