Athena - Part 3 - Data Transformation

Transforming the data into something useful

Steve Clements
Steve Clements
Cover Image for Athena - Part 3 - Data Transformation

Back to Part 2 - Logging | Forward to part 4 - Athena Analytics API

Part 3 - Data Transformation

Scheduling the transformation process

Now that we've decoupled the log processing from the web app it allows us to run a regular process to transform the data into something more useful. Using CloudWatch Events we set a up scheduler to run every 10 minutes to trigger a Lambda that does the processing. This is really cheap and the cost of running it is predictable. It only runs 6 times an hour.

This is easy to do with terraform:

resource "aws_cloudwatch_event_rule" "scheduler" {
  name                = "analytics_transform_scheduler"
  description         = "Transform the analytics data every 10 minutes"
  schedule_expression = "rate(10 minutes)"
}
resource "aws_cloudwatch_event_target" "scheduler" {
  arn  = aws_lambda_function.transformer.arn
  rule = aws_cloudwatch_event_rule.scheduler.name
}

Reading the logs

The lambda reads the cloudwatch logs generated in the last 10 minutes by the previous process and filters the analytics log using a filter. The code uses version 3 of the aws sdk. Here is the full function:

import {
  CloudWatchLogsClient,
  FilterLogEventsCommand,
  FilterLogEventsCommandInput,
  FilteredLogEvent,
} from '@aws-sdk/client-cloudwatch-logs'
import pino from 'pino'

const logger = pino()

const cloudwatchLogsClient = new CloudWatchLogsClient({
  apiVersion: '2014-03-28',
})

const getLogs = async (params: FilterLogEventsCommandInput) =>
  cloudwatchLogsClient.send(new FilterLogEventsCommand(params))

export const readCloudwatchLog = async (
  filterPattern: string,
  endTime: number,
  sourceLogGroupName: string,
  nextToken?: string
): Promise<FilteredLogEvent[]> => {
  const startTime = endTime - 10 * 60 * 1000
  const params: FilterLogEventsCommandInput = {
    filterPattern,
    startTime,
    endTime,
    logGroupName: sourceLogGroupName,
    nextToken: (nextToken as string) ?? undefined,
  }
  const logs = await getLogs(params)
  if (logs.nextToken) {
    logs.events = [
      ...(logs.events as FilteredLogEvent[]),
      ...(await readCloudwatchLog(
        filterPattern,
        endTime,
        sourceLogGroupName,
        logs.nextToken
      )),
    ]
  }
  logger.info(`returning ${logs.events?.length} logs`)
  return logs.events as FilteredLogEvent[]
}

and this is how it is invoked:

const data = await readCloudwatchLog(
  '{ $.logtype = "analytics" }',
  Date.now(),
  '/aws/lambda/analytics'
)

Transforming the data

The data is now in a format that can be transformed into something useful. Because we want to save it to Athena it needs to be in the Athena format. Which is a funny format - it is lines of JSON. So we need to convert the data into lines of JSON terminate with a line ending. For example:

{"data": "something"}
{"data": "something else"}

(Note the non-standard format of the JSON - no commas between the lines)

Writing the data

Using the aws sdk we can write the data to s3:

s3.send(
  new PutObjectCommand({
    Bucket: bucket,
    Key: key,
    Body: data,
  })
)

Continue to part 4 to see how to query the stored data >>