from frictionless import Loader
class S3Loader(Loader):
remote = True
def read_byte_stream_create(self):
boto3 = helpers.import_from_plugin("boto3", plugin="s3")
control = self.resource.control
parts = urlparse(self.resource.fullpath, allow_fragments=False)
client = boto3.resource("s3", endpoint_url=control.endpoint_url)
object = client.Object(bucket_name=parts.netloc, key=parts.path[1:])
byte_stream = S3ByteStream(object)
return byte_stream
def write_byte_stream_save(self, byte_stream):
boto3 = helpers.import_from_plugin("boto3", plugin="s3")
control = self.resource.control
parts = urlparse(self.resource.fullpath, allow_fragments=False)
client = boto3.resource("s3", endpoint_url=control.endpoint_url)
object = client.Object(bucket_name=parts.netloc, key=parts.path[1:])
object.put(Body=byte_stream)