The steps in this hands-on tutorial about AWS Glue are the following:
Step 1. Enter credentials
Step 2. Create an S3 bucket and load the dataset into the bucket
Step 3. Create a database for the crawled data
Step 4. Create a service role, that will used to access S3 and use Glue features
Step 5. Create the crawler and run the crawl job
Step 6. Write a Pyspark file with the operations to be performed on the dataset
Step 7. Generate a file with ETL job and load it into the S3 bucket
Step 8. Configure the ETL Glue Job and run it manually
Enter credentials¶
import getpass
accessKeyID = getpass.getpass()
secretAccessKeyID = getpass.getpass()
Create an S3 bucket¶
import boto3
bucket='glue-test-az1'
session = boto3.Session(aws_access_key_id=accessKeyID,
aws_secret_access_key=secretAccessKeyID)
dev_s3_client = session.client('s3')
response = dev_s3_client.create_bucket(Bucket=bucket)
print(response)
{'ResponseMetadata': {'RequestId': '9NNB1CBY318GGHHJ', 'HostId': '134Xmjh49crigr5fEwwhHOyPYUzxgKxqq9B2drRFEwjHM1ACQx4PyYUkS2AqdN1KZee1buzky1g=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': '134Xmjh49crigr5fEwwhHOyPYUzxgKxqq9B2drRFEwjHM1ACQx4PyYUkS2AqdN1KZee1buzky1g=', 'x-amz-request-id': '9NNB1CBY318GGHHJ', 'date': 'Thu, 26 May 2022 20:59:33 GMT', 'location': '/glue-test-az1', 'server': 'AmazonS3', 'content-length': '0'}, 'RetryAttempts': 0}, 'Location': '/glue-test-az1'}
Upload local file to S3¶
fileToUpload = 'TSLA.csv'
dev_s3_client.upload_file(f'{fileToUpload}',
bucket,
f'raw_files/{fileToUpload}')
Verify the file was uploaded.
response = dev_s3_client.list_objects(Bucket=f'{bucket}')
for key in response["Contents"]:
print(key['Key'])
raw_files/TSLA.csv
Create a database where the crawler can store the results¶
session = boto3.session.Session(aws_access_key_id=accessKeyID, aws_secret_access_key=secretAccessKeyID)
glue_client = session.client('glue', region_name='us-east-1')
dbName = 'mydbaz1'
glue_client.create_database(DatabaseInput={'Name': dbName})
{'ResponseMetadata': {'RequestId': 'ba1d465f-5731-4be0-bd0d-7fc8154bddd9', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Thu, 26 May 2022 20:59:36 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '2', 'connection': 'keep-alive', 'x-amzn-requestid': 'ba1d465f-5731-4be0-bd0d-7fc8154bddd9'}, 'RetryAttempts': 0}}
Create a service role¶
import json
trust_policy={
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "glue.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
iam_client = session.client('iam')
iam_response = iam_client.create_role(RoleName = 'AWSGlueServiceRole-AZ1',
AssumeRolePolicyDocument = json.dumps(trust_policy))
print(iam_response)
{'Role': {'Path': '/', 'RoleName': 'AWSGlueServiceRole-AZ1', 'RoleId': 'AROAZ27X47FDQ6YJNAIRV', 'Arn': 'arn:aws:iam::676440373575:role/AWSGlueServiceRole-AZ1', 'CreateDate': datetime.datetime(2022, 5, 26, 20, 59, 39, tzinfo=tzutc()), 'AssumeRolePolicyDocument': {'Version': '2012-10-17', 'Statement': [{'Sid': '', 'Effect': 'Allow', 'Principal': {'Service': 'glue.amazonaws.com'}, 'Action': 'sts:AssumeRole'}]}}, 'ResponseMetadata': {'RequestId': '460be3dc-872b-4dee-84b6-e04058ed0cac', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '460be3dc-872b-4dee-84b6-e04058ed0cac', 'content-type': 'text/xml', 'content-length': '823', 'date': 'Thu, 26 May 2022 20:59:38 GMT'}, 'RetryAttempts': 0}}
Attach the policy AWSGlueServiceRole to the new role¶
iam_client.attach_role_policy(RoleName="AWSGlueServiceRole-AZ1",
PolicyArn="arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole")
{'ResponseMetadata': {'RequestId': 'ecbe78a8-7649-4bb2-b518-d091726e6852', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'ecbe78a8-7649-4bb2-b518-d091726e6852', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Thu, 26 May 2022 20:59:39 GMT'}, 'RetryAttempts': 0}}
iam_client.attach_role_policy(RoleName="AWSGlueServiceRole-AZ1",
PolicyArn="arn:aws:iam::aws:policy/AmazonS3FullAccess")
{'ResponseMetadata': {'RequestId': '68525b7c-30e9-4727-b013-e73ea3b41012', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '68525b7c-30e9-4727-b013-e73ea3b41012', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Thu, 26 May 2022 20:59:39 GMT'}, 'RetryAttempts': 0}}
Create the crawler¶
The crawler needs to use the role AWSGlueServiceRole-AZ1 that was created before.
import json
response = glue_client.create_crawler(Name='CrawlerAZ1',
Role='AWSGlueServiceRole-AZ1',
DatabaseName = dbName,
Targets={
'S3Targets': [
{
'Path': f's3://{bucket}',
},
],
},
SchemaChangePolicy={
'UpdateBehavior': 'UPDATE_IN_DATABASE',
'DeleteBehavior': 'DEPRECATE_IN_DATABASE'
},
RecrawlPolicy={
'RecrawlBehavior': 'CRAWL_EVERYTHING'
},
LineageConfiguration={
'CrawlerLineageSettings': 'DISABLE'
})
print(response)
{'ResponseMetadata': {'RequestId': 'ccbf9af1-8146-44ff-88e3-bf3828e68c49', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Thu, 26 May 2022 20:59:47 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '2', 'connection': 'keep-alive', 'x-amzn-requestid': 'ccbf9af1-8146-44ff-88e3-bf3828e68c49'}, 'RetryAttempts': 0}}
List crawlers¶
glue_client.list_crawlers()
{'CrawlerNames': ['CrawlerAZ1'], 'ResponseMetadata': {'RequestId': '353547ac-f607-4356-bd11-46e9adc2ea3e', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Thu, 26 May 2022 20:59:49 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '31', 'connection': 'keep-alive', 'x-amzn-requestid': '353547ac-f607-4356-bd11-46e9adc2ea3e'}, 'RetryAttempts': 0}}
The crawler I just created is present in the list
If the database doesn’t exist it will be created.
Run the crawler manually¶
glue_client.start_crawler(Name = 'CrawlerAZ1')
{'ResponseMetadata': {'RequestId': '97c60380-7c8f-45cc-873e-bb9233c38706', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Thu, 26 May 2022 20:59:52 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '2', 'connection': 'keep-alive', 'x-amzn-requestid': '97c60380-7c8f-45cc-873e-bb9233c38706'}, 'RetryAttempts': 0}}
Check crawler status¶
exit_v = 0
while not(exit_v):
response = glue_client.get_crawler(Name = 'CrawlerAZ1')
if (response['Crawler']['State'] == 'STOPPING') or (response['Crawler']['State'] == 'READY'):
exit_v = 1
response['Crawler']['State']
'STOPPING'
Wait until it says STOPPING or READY.
See the generated table¶
The table get the name of the bucket it crawled.
response = glue_client.get_tables(DatabaseName=dbName)
response['TableList'][0]
{'Name': 'glue_test_az1', 'DatabaseName': 'mydbaz1', 'Owner': 'owner', 'CreateTime': datetime.datetime(2022, 5, 26, 23, 0, 41, tzinfo=tzlocal()), 'UpdateTime': datetime.datetime(2022, 5, 26, 23, 0, 41, tzinfo=tzlocal()), 'LastAccessTime': datetime.datetime(2022, 5, 26, 23, 0, 41, tzinfo=tzlocal()), 'Retention': 0, 'StorageDescriptor': {'Columns': [{'Name': 'date', 'Type': 'string'}, {'Name': 'open', 'Type': 'double'}, {'Name': 'high', 'Type': 'double'}, {'Name': 'low', 'Type': 'double'}, {'Name': 'close', 'Type': 'double'}, {'Name': 'adj close', 'Type': 'double'}, {'Name': 'volume', 'Type': 'bigint'}], 'Location': 's3://glue-test-az1/', 'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat', 'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat', 'Compressed': False, 'NumberOfBuckets': -1, 'SerdeInfo': {'SerializationLibrary': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe', 'Parameters': {'field.delim': ','}}, 'BucketColumns': [], 'SortColumns': [], 'Parameters': {'CrawlerSchemaDeserializerVersion': '1.0', 'CrawlerSchemaSerializerVersion': '1.0', 'UPDATED_BY_CRAWLER': 'CrawlerAZ1', 'areColumnsQuoted': 'false', 'averageRecordSize': '56', 'classification': 'csv', 'columnsOrdered': 'true', 'compressionType': 'none', 'delimiter': ',', 'objectCount': '1', 'recordCount': '3457', 'sizeKey': '193624', 'skip.header.line.count': '1', 'typeOfData': 'file'}, 'StoredAsSubDirectories': False}, 'PartitionKeys': [{'Name': 'partition_0', 'Type': 'string'}], 'TableType': 'EXTERNAL_TABLE', 'Parameters': {'CrawlerSchemaDeserializerVersion': '1.0', 'CrawlerSchemaSerializerVersion': '1.0', 'UPDATED_BY_CRAWLER': 'CrawlerAZ1', 'areColumnsQuoted': 'false', 'averageRecordSize': '56', 'classification': 'csv', 'columnsOrdered': 'true', 'compressionType': 'none', 'delimiter': ',', 'objectCount': '1', 'recordCount': '3457', 'sizeKey': '193624', 'skip.header.line.count': '1', 'typeOfData': 'file'}, 'CreatedBy': 'arn:aws:sts::676440373575:assumed-role/AWSGlueServiceRole-AZ1/AWS-Crawler', 'IsRegisteredWithLakeFormation': False, 'CatalogId': '676440373575'}
As can be seen above the new table contains the columns corresponding to excel file columns.
Querying the database with AWS Athena¶
athena_client = session.client('athena')
queryStart = athena_client.start_query_execution(
QueryString = f'SELECT count(*) FROM {bucket}',
QueryExecutionContext = {
'Database': f'{dbName}'
},
ResultConfiguration = { 'OutputLocation': f's3://{bucket}'}
)
queryStart
{'QueryExecutionId': 'b3b4e140-ee1d-4813-bcd4-713e8a1b493c', 'ResponseMetadata': {'RequestId': 'ad93a2e3-993f-4632-a7ce-c6a8b7cf609b', 'HTTPStatusCode': 200, 'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1', 'date': 'Wed, 25 May 2022 21:07:24 GMT', 'x-amzn-requestid': 'ad93a2e3-993f-4632-a7ce-c6a8b7cf609b', 'content-length': '59', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}
queryExecution = athena_client.get_query_execution(QueryExecutionId=queryStart['QueryExecutionId'])
queryExecution
{'QueryExecution': {'QueryExecutionId': 'b3b4e140-ee1d-4813-bcd4-713e8a1b493c', 'Query': 'SELECT count(*) FROM glue_test_az', 'StatementType': 'DML', 'ResultConfiguration': {'OutputLocation': 's3://glue-test-az/b3b4e140-ee1d-4813-bcd4-713e8a1b493c.csv'}, 'QueryExecutionContext': {'Database': 'mydbaz1'}, 'Status': {'State': 'QUEUED', 'SubmissionDateTime': datetime.datetime(2022, 5, 25, 23, 7, 25, 498000, tzinfo=tzlocal())}, 'Statistics': {'TotalExecutionTimeInMillis': 225, 'QueryQueueTimeInMillis': 225}, 'WorkGroup': 'primary', 'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 2', 'EffectiveEngineVersion': 'Athena engine version 2'}}, 'ResponseMetadata': {'RequestId': '24ae88c4-725f-4b03-b7a1-bd5b13463d75', 'HTTPStatusCode': 200, 'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1', 'date': 'Wed, 25 May 2022 21:07:25 GMT', 'x-amzn-requestid': '24ae88c4-725f-4b03-b7a1-bd5b13463d75', 'content-length': '1031', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}
results = athena_client.get_query_results(QueryExecutionId=queryStart['QueryExecutionId'])
print(json.dumps(results, indent=4, sort_keys=True))
--------------------------------------------------------------------------- InvalidRequestException Traceback (most recent call last) Input In [292], in <cell line: 1>() ----> 1 results = athena_client.get_query_results(QueryExecutionId=queryStart['QueryExecutionId']) 2 print(json.dumps(results, indent=4, sort_keys=True)) File ~\AppData\Roaming\Python\Python38\site-packages\botocore\client.py:357, in ClientCreator._create_api_method.<locals>._api_call(self, *args, **kwargs) 354 raise TypeError( 355 "%s() only accepts keyword arguments." % py_operation_name) 356 # The "self" in this scope is referring to the BaseClient. --> 357 return self._make_api_call(operation_name, kwargs) File ~\AppData\Roaming\Python\Python38\site-packages\botocore\client.py:676, in BaseClient._make_api_call(self, operation_name, api_params) 674 error_code = parsed_response.get("Error", {}).get("Code") 675 error_class = self.exceptions.from_code(error_code) --> 676 raise error_class(parsed_response, operation_name) 677 else: 678 return parsed_response InvalidRequestException: An error occurred (InvalidRequestException) when calling the GetQueryResults operation: Query has not yet finished. Current state: RUNNING
The PySpark ETL job¶
The file contains a Pyspark code that load the csv file in a dataframe, create a new column with a moving averange of the specified dataframe column. Finally save the new dataframe in a new csv file.
%%writefile sparkETL.py
from pyspark.sql.window import Window
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ETLjob1').getOrCreate()
bucket = 'glue-test-az1'
df = spark.read.option("header", "true").csv(f"s3://{bucket}/raw_files/TSLA.csv")
df_new = df.withColumn("Part", f.lit(1)).withColumn("movingAverage", f.avg(df["Adj Close"]).over(Window.partitionBy("Part").orderBy("Date").rowsBetween(-5,0)))
df_new.coalesce(1).write.option("header", "true").csv(f"s3://{bucket}/processed/TSLA.csv", mode='overwrite')
Overwriting glueETL.py
Load the file into the bucket¶
fileToUpload = 'sparkETL.py'
dev_s3_client.upload_file(f'{fileToUpload}',
bucket,
f'scripts/{fileToUpload}')
Verify the file was uploaded.
response = dev_s3_client.list_objects(Bucket=f'{bucket}')
for key in response["Contents"]:
print(key['Key'])
raw_files/TSLA.csv scripts/glueETL.py
Get user arn account¶
iam_response = iam_client.get_user()
myAccount = iam_response['User']['Arn']
myAccount.split(':user/')[0]
'arn:aws:iam::676440373575'
Creating a Glue ETL job¶
glue_client = session.client('glue')
Notice how the ARN account is used in the Role parameter
import boto3
import json
response = glue_client.create_job(
Name='AZJob',
Description='Test',
Role=f'{myAccount.split(":user/")[0]}:role/AWSGlueServiceRole-AZ1',
ExecutionProperty={
'MaxConcurrentRuns': 2
},
Command={
'Name': 'sparkETL',
'ScriptLocation': f's3://{bucket}/scripts/sparkETL.py',
'PythonVersion': '3'
},
DefaultArguments={
'--TempDir': f's3://{bucket}/temp_dir',
'--job-bookmark-option': 'job-bookmark-disable'
},
MaxRetries=1,
GlueVersion='3.0',
NumberOfWorkers=2,
WorkerType='G.1X'
)
print(json.dumps(response, indent=4, sort_keys=True, default=str))
{ "Name": "AZJob", "ResponseMetadata": { "HTTPHeaders": { "connection": "keep-alive", "content-length": "16", "content-type": "application/x-amz-json-1.1", "date": "Wed, 25 May 2022 21:21:44 GMT", "x-amzn-requestid": "3f171c9b-199f-433f-88fa-e559cc183ac7" }, "HTTPStatusCode": 200, "RequestId": "3f171c9b-199f-433f-88fa-e559cc183ac7", "RetryAttempts": 0 } }
As a side note, for the role parameter the complete ARN role name must be used… otherwise the parameter is not accepted, with no error message and the ETL job won’t run.
I spent few hours, to find it out
To find the ARN Role, find it in the response when the role was created. Or use the next query:
response = iam_client.get_role(RoleName = 'AWSGlueServiceRole-AZ1')
response['Role']['Arn']
'arn:aws:iam::580234752977:role/AWSGlueServiceRole-AZ1'
Delete a Job¶
If something didn’t work you can always delete the job with:
glue_client.delete_job(JobName='AZJob')
Update an ETL job¶
For example to update the description
response = glue_client.update_job(
JobName='AZJob',
JobUpdate={
'Role': f'{myAccount.split(":user/")[0]}:role/AWSGlueServiceRole-AZ1',
'Description': 'Testing AWS Glue ETL',
'Command': {
'Name': 'sparkETL',
'ScriptLocation': f's3://{bucket}/scripts/sparkETL.py',
'PythonVersion': '3'
},
}
)
print(response)
{'JobName': 'AZJob', 'ResponseMetadata': {'RequestId': '702f3faf-2b16-45d4-806c-8690599bbb90', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Wed, 25 May 2022 21:21:54 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '19', 'connection': 'keep-alive', 'x-amzn-requestid': '702f3faf-2b16-45d4-806c-8690599bbb90'}, 'RetryAttempts': 0}}
Start a Job¶
start_job_response = glue_client.start_job_run(JobName='AZJob')
start_job_response
{'JobRunId': 'jr_9d7a8e9679d8ae1a3ba7f4444aa859757bc5f24ae0a6c70ca535ac1a2a399c4c', 'ResponseMetadata': {'RequestId': 'f993a566-3bcb-49eb-a885-42c2c0f7f322', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Wed, 25 May 2022 21:22:05 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '82', 'connection': 'keep-alive', 'x-amzn-requestid': 'f993a566-3bcb-49eb-a885-42c2c0f7f322'}, 'RetryAttempts': 0}}
To see the status of the Job¶
response = glue_client.get_job_run(
JobName='AZJob',
RunId=start_job_response['JobRunId']
)
response['JobRun']['JobRunState']
'SUCCEEDED'
Wait until it says SUCCEEDED… or, worst case, FAILED. In the last case good luck with debugging.
List the files in S3¶
#### code here
Let’s have a look at the csv file
%%%% SCREENSHOT HERE %%%%%
Glue ETL job¶
In my previous ETL Job I used plain Apache Spark, let’s try to use some Glue features, like the GlueContext
%%writefile GlueETL.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
database="mydbaz1", table_name="glue_test_az1", transformation_ctx="S3bucket_node1"
)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=S3bucket_node1,
mappings=[
("date", "string", "date", "string"),
("open", "double", "open", "double"),
("high", "double", "high", "double"),
("low", "double", "low", "double"),
("close", "double", "close", "double"),
("adj close", "double", "adj close", "double"),
("volume", "long", "volume", "long"),
("partition_0", "string", "partition_0", "string"),
],
transformation_ctx="ApplyMapping_node2",
)
# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=ApplyMapping_node2,
connection_type="s3",
format="glueparquet",
connection_options={"path": "s3://glue-test-az1/parquet/", "partitionKeys": []},
transformation_ctx="S3bucket_node3",
)
job.commit()
Pro Tip, to spare you some time: To re-run the Glue job on the same data the jobmark must be disabled, otherwise Glue considers the pocessing of existing source files as already performed and will not re-run the job on the old data.
To force re-running the job on the old data, job boomark must be disabled. This can be done directly in the AWS management console. Alternatively can be done in the AWS CLI .
References¶
Boto3 IAM API: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iam.html
JSON policy element reference: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Principal_specifying
AWS services that work with IAM: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_aws-services-that-work-with-iam.html
Create a role: https://bobbyhadz.com/blog/aws-cli-create-role
AWS Glue – Web API reference: https://docs.aws.amazon.com/glue/latest/webapi/web-api.pdf#WebAPI_Welcome
AWS Glue hands-on: https://hands-on.cloud/working-with-aws-glue-in-python-using-boto3/#h-creating-an-aws-glue-crawler