import boto3 import json import logging import re import time import urllib.parse from datetime import datetime, timezone BUCKET_LISTENER_ARN = <My_Bucket_Listener_ARN> URLS = [<My_File_URL_1>, <My_File_URL_2>, <My_File_URL_3>, ...] REGION = BUCKET_LISTENER_ARN.split(':')[3] logging.getLogger().setLevel(logging.INFO) lambda_client = boto3.client('lambda', region_name=REGION) def parse_url(url): s3_domain_pattern = 's3(\..+)?\.amazonaws.com' parsed_url = urllib.parse.urlparse(url) # check pre-signed URL type, path or virtual if re.fullmatch(s3_domain_pattern, parsed_url.netloc): bucket = parsed_url.path.split('/')[1] s3_object = '/'.join(parsed_url.path.split('/')[2:]) else: bucket = parsed_url.netloc.split('.')[0] s3_object = parsed_url.path[1:] object_key = urllib.parse.unquote_plus(s3_object) return (bucket, object_key) def get_s3_event(bucket, object_key): (bucket, object_key) = parse_url(url) return { 'eventTime': datetime.now().strftime('%Y-%m-%dT%H:%M:%S%Z'), 'responseElements': { 'x-amz-request-id': 'some-uuid' }, "awsRegion": REGION, 's3': { 'bucket': { 'name': bucket }, 'object': { 'key': object_key, 'eTag': '' # Use S3 HeadObject to get the object's eTag value } }, } def trigger_scans(urls): records = list(map(lambda url: get_s3_event(url), urls)) response = lambda_client.invoke( FunctionName=BUCKET_LISTENER_ARN, InvocationType='Event', Payload=bytes(json.dumps({'Records': records}), encoding='utf8') ) logging.debug(f'Trigger response: {response}') def main(): trigger_scans(URLS) if __name__ == '__main__': main()
前のページに戻るにはこちらをクリックしてください。