-
Notifications
You must be signed in to change notification settings - Fork 192
Open
Labels
Description
Problem Description
The S3 connector uses aioboto3 to identify files in buckets based on prefix. There is no way however to provide more sophisticated filtering except on file suffix.
It would be very useful to be able to filter particular files based on a regex.
Proposed Solution
implement a regex in class S3DataSource(BaseDataSource): async def get_bucket_objects(self, bucket, **kwargs):
try:
bucket_obj = await s3.Bucket(bucket)
await asyncio.sleep(0)
if kwargs.get("prefix"):
objects = bucket_obj.objects.filter(
Prefix=kwargs["prefix"]
).page_size(page_size)
else:
objects = bucket_obj.objects.page_size(page_size)
async for obj_summary in objects:
# >> implement a regex filter on the found objects here
yield obj_summary, s3_client
except Exception as exception:
self._logger.warning(
f"Something went wrong while fetching documents from {bucket}. Error: {exception}"
)
Alternatives
implement filtering in class S3DataSource(BaseDataSource) async def advanced_sync(self, rule):
elif self.get_file_extension(obj_summary.key) in rule.get("extension", []):
yield await process_object(obj_summary, s3_client)
Or use a higher level library: awswrangler.s3.list_objects
Additional Context
N/A