The tutorial of awswrangler can be found here
I wasn’t able to run it immediately, so here some more details to make it running.
import awswrangler as wr
import pandas as pd
import boto3
import pytz
from datetime import datetime
Create a bucket¶
Using boto3 to create a bucket,
bucket='buckaz3'
aws_access_key_id ='XXXXXXXXXXXXXXXXXXXX'
aws_secret_access_key ='YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY'
# session = boto3.Session(profile_name='default')
session = boto3.Session(aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key)
dev_s3_client = session.client('s3')
response = dev_s3_client.create_bucket(Bucket=bucket)
print(response)
{'ResponseMetadata': {'RequestId': 'RMEBABT3F3EKH3T9', 'HostId': 'DXHM/mEKrAZ7+z1hC98BveWg3GiVAfzGQd8MBoBR/+Yixq4z9KGjsEkOEpQ9i9bFNTWSw8Ei8jo=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'DXHM/mEKrAZ7+z1hC98BveWg3GiVAfzGQd8MBoBR/+Yixq4z9KGjsEkOEpQ9i9bFNTWSw8Ei8jo=', 'x-amz-request-id': 'RMEBABT3F3EKH3T9', 'date': 'Thu, 05 May 2022 20:39:40 GMT', 'location': '/buckaz3', 'server': 'AmazonS3', 'content-length': '0'}, 'RetryAttempts': 0}, 'Location': '/buckaz3'}
Create 2 Pandas dataframe and save into the bucket as csv file.
df1 = pd.DataFrame({
"id": [1, 2],
"name": ["foo", "boo"]
})
df2 = pd.DataFrame({
"id": [3],
"name": ["bar"]
})
path1 = f"s3://{bucket}/csv/file1.csv"
path2 = f"s3://{bucket}/csv/file2.csv"
wr.s3.to_csv(df1, path1, index=False)
wr.s3.to_csv(df2, path2, index=False);
....
ClientError: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
So I need to allow the PutObject policy.
Set policy for the bucket¶
We need to set PutObject policy to Allow
import json
# Create a bucket policy
bucket_policy = {
'Version': '2012-10-17',
'Statement': [{
'Effect': 'Allow',
'Principal': '*',
'Action': ['s3:PutObject'],
'Resource': f'arn:aws:s3:::{bucket}/*'
},
]
}
# Convert the policy from JSON dict to string
bucket_policy = json.dumps(bucket_policy)
# Set the new policy
dev_s3_client.put_bucket_policy(Bucket=bucket, Policy=bucket_policy)
{'ResponseMetadata': {'RequestId': 'M310MCM5GKV3CY1Q', 'HostId': 'eK1trCakIEQ2nxewaGq4ZwXhSVtknuNGxcvKuqZy3BX0qtnc9qe6B/m9pxsCVuMhlB6epeRp6HY=', 'HTTPStatusCode': 204, 'HTTPHeaders': {'x-amz-id-2': 'eK1trCakIEQ2nxewaGq4ZwXhSVtknuNGxcvKuqZy3BX0qtnc9qe6B/m9pxsCVuMhlB6epeRp6HY=', 'x-amz-request-id': 'M310MCM5GKV3CY1Q', 'date': 'Thu, 05 May 2022 20:41:55 GMT', 'server': 'AmazonS3'}, 'RetryAttempts': 0}}
Writing the file into the bucket. Retry!
path1 = f"s3://{bucket}/csv/file1.csv"
path2 = f"s3://{bucket}/csv/file2.csv"
wr.s3.to_csv(df1, path1, index=False)
wr.s3.to_csv(df2, path2, index=False);
Reading the csv file¶
wr.s3.read_csv([path1])
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
Reading all the file in the directory¶
wr.s3.read_csv(f"s3://{bucket}/csv/")
--------------------------------------------------------------------------- ClientError Traceback (most recent call last) Input In [137], in <cell line: 1>() ----> 1 wr.s3.read_csv(f"s3://{bucket}/csv/") .... ClientError: An error occurred (AccessDenied) when calling the ListObjectsV2 operation: Access Denied
ClientError: An error occurred (AccessDenied) when calling the ListObjectsV2 operation: Access Denied
I need to set the LIstObject policy for the bucket
# Create a bucket policy
bucket_policy = {
'Version': '2012-10-17',
'Statement': [{
'Sid': 'AddPerm',
'Effect': 'Allow',
'Principal': '*',
'Action': ['s3:GetObject'],
'Resource': f'arn:aws:s3:::{bucket}/*'
},
{
'Sid': 'AddPerm',
'Effect': 'Allow',
'Principal': '*',
'Action': ['s3:ListBucket'],
'Resource': f'arn:aws:s3:::{bucket}'
},
]
}
# Convert the policy from JSON dict to string
bucket_policy = json.dumps(bucket_policy)
# Set the new policy
dev_s3_client.put_bucket_policy(Bucket=bucket, Policy=bucket_policy)
{'ResponseMetadata': {'RequestId': 'T3KER146GNQTM2KT', 'HostId': 'wsCulBQ2BDwvmNk0cmvSSbW4+crCiKTyjmBDutQ5bUgi6sOLa+lRmcyEk7ovSJUoG7c3xh6CtBA=', 'HTTPStatusCode': 204, 'HTTPHeaders': {'x-amz-id-2': 'wsCulBQ2BDwvmNk0cmvSSbW4+crCiKTyjmBDutQ5bUgi6sOLa+lRmcyEk7ovSJUoG7c3xh6CtBA=', 'x-amz-request-id': 'T3KER146GNQTM2KT', 'date': 'Thu, 05 May 2022 20:42:24 GMT', 'server': 'AmazonS3'}, 'RetryAttempts': 0}}
Retry:
wr.s3.read_csv(f"s3://{bucket}/csv/")
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
2 | 3 | bar |
Writing JSON¶
path1 = f"s3://{bucket}/json/file1.json"
path2 = f"s3://{bucket}/json/file2.json"
wr.s3.to_json(df1, path1)
wr.s3.to_json(df2, path2)
--------------------------------------------------------------------------- ClientError Traceback (most recent call last) .... ClientError: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
ClientError: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
This happens because the previous PutObject policy was overwritten.
Append to the existing policy¶
Ideally I should read the current policy and append the new policy to it.
To read the current policy of the bucket
policy = dev_s3_client.get_bucket_policy(Bucket=bucket)
To print the policy
import pprint
pprint.pprint(policy)
{'Policy': '{"Version":"2012-10-17","Statement":[{"Sid":"AddPerm","Effect":"Allow","Principal":"*","Action":"s3:GetObject","Resource":"arn:aws:s3:::buckaz3/*"},{"Sid":"AddPerm","Effect":"Allow","Principal":"*","Action":"s3:ListBucket","Resource":"arn:aws:s3:::buckaz3"}]}', 'ResponseMetadata': {'HTTPHeaders': {'content-length': '259', 'content-type': 'application/json', 'date': 'Thu, 05 May 2022 20:44:02 GMT', 'server': 'AmazonS3', 'x-amz-id-2': 'MBnhJgb91HGq7mYn32Lch3euDbPElAufeGp0HVw134+M3LhqY8dPuvPHIvEBLvrnIYBbY7mUHoA=', 'x-amz-request-id': 'WD7GV68WTG0PWTP2'}, 'HTTPStatusCode': 200, 'HostId': 'MBnhJgb91HGq7mYn32Lch3euDbPElAufeGp0HVw134+M3LhqY8dPuvPHIvEBLvrnIYBbY7mUHoA=', 'RequestId': 'WD7GV68WTG0PWTP2', 'RetryAttempts': 0}}
The new policy must be added as new element in the Statement field
statement = json.loads(policy['Policy'])['Statement']
print(statement)
[{'Sid': 'AddPerm', 'Effect': 'Allow', 'Principal': '*', 'Action': 's3:GetObject', 'Resource': 'arn:aws:s3:::buckaz3/*'}, {'Sid': 'AddPerm', 'Effect': 'Allow', 'Principal': '*', 'Action': 's3:ListBucket', 'Resource': 'arn:aws:s3:::buckaz3'}]
Define the new policy as json
putObjPolicy = {
'Effect': 'Allow',
'Principal': '*',
'Action': ['s3:PutObject'],
'Resource': f'arn:aws:s3:::{bucket}/*'
}
print(putObjPolicy)
{'Effect': 'Allow', 'Principal': '*', 'Action': ['s3:PutObject'], 'Resource': 'arn:aws:s3:::buckaz3/*'}
Let’s append the putObjPolicy to the existing policy:
statement.append(putObjPolicy)
print(statement)
[{'Sid': 'AddPerm', 'Effect': 'Allow', 'Principal': '*', 'Action': 's3:GetObject', 'Resource': 'arn:aws:s3:::buckaz3/*'}, {'Sid': 'AddPerm', 'Effect': 'Allow', 'Principal': '*', 'Action': 's3:ListBucket', 'Resource': 'arn:aws:s3:::buckaz3'}, {'Effect': 'Allow', 'Principal': '*', 'Action': ['s3:PutObject'], 'Resource': 'arn:aws:s3:::buckaz3/*'}]
Create the bucket policy¶
bucket_policy = {
'Version': '2012-10-17',
'Statement': statement
}
Convert the policy from JSON dict to string
bucket_policy = json.dumps(bucket_policy)
Set the new policy
dev_s3_client.put_bucket_policy(Bucket=bucket, Policy=bucket_policy)
{'ResponseMetadata': {'RequestId': 'Q09KFVEWB5VJ7FGN', 'HostId': 'WF+LHx9YiciBVJB9nSnWP4snYabXb3SkIEWqZNg03llHfhlNKLwD0K+mh2EiEWsKP2W6T/cyG88=', 'HTTPStatusCode': 204, 'HTTPHeaders': {'x-amz-id-2': 'WF+LHx9YiciBVJB9nSnWP4snYabXb3SkIEWqZNg03llHfhlNKLwD0K+mh2EiEWsKP2W6T/cyG88=', 'x-amz-request-id': 'Q09KFVEWB5VJ7FGN', 'date': 'Thu, 05 May 2022 20:46:42 GMT', 'server': 'AmazonS3'}, 'RetryAttempts': 0}}
Retry again to write json files
path1 = f"s3://{bucket}/json/file1.json"
path2 = f"s3://{bucket}/json/file2.json"
wr.s3.to_json(df1, path1)
wr.s3.to_json(df2, path2)
['s3://buckaz3/json/file2.json']