Athena - Part 3 - Data Transformation
Transforming the data into something useful



Back to Part 2 - Logging | Forward to part 4 - Athena Analytics API
Part 3 - Data Transformation
Scheduling the transformation process
Now that we've decoupled the log processing from the web app it allows us to run a regular process to transform the data into something more useful. Using CloudWatch Events we set a up scheduler to run every 10 minutes to trigger a Lambda that does the processing. This is really cheap and the cost of running it is predictable. It only runs 6 times an hour.
This is easy to do with terraform:
resource "aws_cloudwatch_event_rule" "scheduler" {
name = "analytics_transform_scheduler"
description = "Transform the analytics data every 10 minutes"
schedule_expression = "rate(10 minutes)"
}
resource "aws_cloudwatch_event_target" "scheduler" {
arn = aws_lambda_function.transformer.arn
rule = aws_cloudwatch_event_rule.scheduler.name
}
Reading the logs
The lambda reads the cloudwatch logs generated in the last 10 minutes by the previous process and filters the analytics log using a filter. The code uses version 3 of the aws sdk. Here is the full function:
import {
CloudWatchLogsClient,
FilterLogEventsCommand,
FilterLogEventsCommandInput,
FilteredLogEvent,
} from '@aws-sdk/client-cloudwatch-logs'
import pino from 'pino'
const logger = pino()
const cloudwatchLogsClient = new CloudWatchLogsClient({
apiVersion: '2014-03-28',
})
const getLogs = async (params: FilterLogEventsCommandInput) =>
cloudwatchLogsClient.send(new FilterLogEventsCommand(params))
export const readCloudwatchLog = async (
filterPattern: string,
endTime: number,
sourceLogGroupName: string,
nextToken?: string
): Promise<FilteredLogEvent[]> => {
const startTime = endTime - 10 * 60 * 1000
const params: FilterLogEventsCommandInput = {
filterPattern,
startTime,
endTime,
logGroupName: sourceLogGroupName,
nextToken: (nextToken as string) ?? undefined,
}
const logs = await getLogs(params)
if (logs.nextToken) {
logs.events = [
...(logs.events as FilteredLogEvent[]),
...(await readCloudwatchLog(
filterPattern,
endTime,
sourceLogGroupName,
logs.nextToken
)),
]
}
logger.info(`returning ${logs.events?.length} logs`)
return logs.events as FilteredLogEvent[]
}
and this is how it is invoked:
const data = await readCloudwatchLog(
'{ $.logtype = "analytics" }',
Date.now(),
'/aws/lambda/analytics'
)
Transforming the data
The data is now in a format that can be transformed into something useful. Because we want to save it to Athena it needs to be in the Athena format. Which is a funny format - it is lines of JSON. So we need to convert the data into lines of JSON terminate with a line ending. For example:
{"data": "something"}
{"data": "something else"}
(Note the non-standard format of the JSON - no commas between the lines)
Writing the data
Using the aws sdk we can write the data to s3:
s3.send(
new PutObjectCommand({
Bucket: bucket,
Key: key,
Body: data,
})
)