Run Amazon Athena’s queries with AWS Lambda
We introduce how to Amazon Athena using AWS Lambda(Python3.6).
Overview
- Receive key data when an Event published and AWS lambda is executed.
- Run query at Amazon Athena and get the result from execution.
- Delete s3 objects created at the time of Athena execution.
s3 data
S3 data sample is as follows:
{"name":"user1","email":"[email protected]"} {"name":"user2","email":"[email protected]"} {"name":"user3","email":"[email protected]"}
AWS Lambda
Sample Code
import time import boto3 # athena constant DATABASE = 'your_athena_database_name' TABLE = 'your_athena_table_name' # S3 constant S3_OUTPUT = 's3://your_athena_query_output_backet_name' S3_BUCKET = 'your_athena_query_output_backet_name' # number of retries RETRY_COUNT = 10 # query constant COLUMN = 'your_column_name' def lambda_handler(event, context): # get keyword keyword = event['name'] # created query query = "SELECT * FROM %s.%s where %s = '%s';" % (DATABASE, TABLE, COLUMN, keyword) # athena client client = boto3.client('athena') # Execution response = client.start_query_execution( QueryString=query, QueryExecutionContext={ 'Database': DATABASE }, ResultConfiguration={ 'OutputLocation': S3_OUTPUT, } ) # get query execution id query_execution_id = response['QueryExecutionId'] print(query_execution_id) # get execution status for i in range(1, 1 + RETRY_COUNT): # get query execution query_status = client.get_query_execution(QueryExecutionId=query_execution_id) query_execution_status = query_status['QueryExecution']['Status']['State'] if query_execution_status == 'SUCCEEDED': print("STATUS:" + query_execution_status) break if query_execution_status == 'FAILED': raise Exception("STATUS:" + query_execution_status) else: print("STATUS:" + query_execution_status) time.sleep(i) else: client.stop_query_execution(QueryExecutionId=query_execution_id) raise Exception('TIME OVER') # get query results result = client.get_query_results(QueryExecutionId=query_execution_id) print(result) # get data if len(result['ResultSet']['Rows']) == 2: email = result['ResultSet']['Rows'][1]['Data'][1]['VarCharValue'] return email else: return None
Get name
of data when Lambda function gets invoked, run the query on Athena and return the response.
- Get
name
data
# get keyword keyword = event['name']
- Create query
# create query query = "SELECT * FROM %s.%s where %s = '%s';" % (DATABASE, TABLE, COLUMN, keyword)
- Run query with Athena
# Execution response = client.start_query_execution( QueryString=query, QueryExecutionContext={ 'Database': DATABASE }, ResultConfiguration={ 'OutputLocation': S3_OUTPUT, } )
- Get execution id
# get query execution id query_execution_id = response['QueryExecutionId']
- Check the status of QueryExecution
Assuming that the execution time does not take so much time, we execute using a for
loop instead of while
loop, with a specified retry count.
# get execution status for i in range(1, 1 + RETRY_COUNT): # get query execution query_status = client.get_query_execution(QueryExecutionId=query_execution_id) query_execution_status = query_status['QueryExecution']['Status']['State'] if query_execution_status == 'SUCCEEDED': print("STATUS:" + query_execution_status) break if query_execution_status == 'FAILED': raise Exception("STATUS:" + query_execution_status) else: print("STATUS:" + query_execution_status) time.sleep(i) else: client.stop_query_execution(QueryExecutionId=query_execution_id) raise Exception('TIME OVER')
- Get Response
# get query results result = client.get_query_results(QueryExecutionId=query_execution_id)
- Example Response
{'ResultSet': {'Rows': [{'Data': [{'VarCharValue': 'name'}, {'VarCharValue': 'email'}]}, {'Data': [{'VarCharValue': 'user1'}, {'VarCharValue': '[email protected]'}]}], 'ResultSetMetadata': {'ColumnInfo': [{'CatalogName': 'hive', 'SchemaName': '', 'TableName': '', 'Name': 'name', 'Label': 'name', 'Type': 'varchar', 'Precision': 0123456789, 'Scale': 0, 'Nullable': 'UNKNOWN', 'CaseSensitive': True}, {'CatalogName': 'hive', 'SchemaName': '', 'TableName': '', 'Name': 'email', 'Label': 'email', 'Type': 'varchar', 'Precision': 0123456789, 'Scale': 0, 'Nullable': 'UNKNOWN', 'CaseSensitive': True}]}}, 'ResponseMetadata': {'RequestId': 'e0c1ec4c-09a1-11e8-97a4-************', 'HTTPStatusCode': 200, 'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1', 'date': 'Sun, 04 Feb 2018 11:52:30 GMT', 'x-amzn-requestid': 'e0c1ec4c-09a1-11e8-97a4-************', 'content-length': '1026', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}
We get email address data from the response in the following example code:
# get data if len(result['ResultSet']['Rows']) == 2: email = result['ResultSet']['Rows'][1]['Data'][1]['VarCharValue'] return email else: return None
Execution
- Execution test event
{ "name": "user1" }
- Execution Result
We successfully obtained the email address.
Created file in the S3 bucket
When Athena is executed, the following files are created in the S3 bucket of the output destination. We recommend that delete it regularly using the S3 Bucket's Lifecycle Policy.
- file example e0c1ec4c-09a1-11e8-97a4-************.csv e0c1ec4c-09a1-11e8-97a4-************.csv.metadata
OR if you want to delete the files each time, you might delete it in code as follows.
# If you want to delete output file # s3 client client = boto3.client('s3') # created s3 object s3_objects_key = [] s3_object_key_csv = query_execution_id + '.csv' s3_objects_key.append({'Key': s3_object_key_csv}) s3_object_key_metadata = query_execution_id + '.csv.metadata' s3_objects_key.append({'Key': s3_object_key_metadata}) # delete s3 object for i in range(1, 1 + RETRY_COUNT): response = client.delete_objects( Bucket=S3_BUCKET, Delete={ 'Objects': s3_objects_key } ) if response['ResponseMetadata']['HTTPStatusCode'] == 200: print("delete %s complete" % s3_objects_key) break else: print(response['ResponseMetadata']['HTTPStatusCode']) time.sleep(i) else: raise Exception('object %s delete failed' % s3_objects_key)
IAM Role
We created IAM Role as follows that base on AmazonAthenaFullAccess
.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "athena:*" ], "Resource": [ "*" ] }, { "Effect": "Allow", "Action": [ "glue:CreateDatabase", "glue:DeleteDatabase", "glue:GetDatabase", "glue:GetDatabases", "glue:UpdateDatabase", "glue:CreateTable", "glue:DeleteTable", "glue:BatchDeleteTable", "glue:UpdateTable", "glue:GetTable", "glue:GetTables", "glue:BatchCreatePartition", "glue:CreatePartition", "glue:DeletePartition", "glue:BatchDeletePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions", "glue:BatchGetPartition" ], "Resource": [ "*" ] }, { "Effect": "Allow", "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:CreateBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::your_athena_query_results_bucket", "arn:aws:s3:::your_athena_query_results_bucket/*" ] }, { "Effect": "Allow", "Action": [ "s3:ListBucket", "s3:GetObject" ], "Resource": [ "arn:aws:s3:::your_athena_bucket", "arn:aws:s3:::your_athena_bucket/*" ] } ] }
Conclusion
We explained how to run Amazon Athena using AWS Lambda (Python) based on sample code.