Ingesting large files to postgres through S3

One of the tasks I recently came accross my job was ingest large files but with the following

  1. Do some processing ( like generate hash for each row )
  2. Insert it into S3 for audit purposes
  3. Insert into postgres

Note:

Keep in mind your postgres database needs to support this and a s3 bucket policy needs to exist in order to allow the data to be copied over.

The setup I am using is a RDS database with S3 in the same region and proper policies and IAM roles already created.

Read more on that here – AWS documentation

For the purpose of this post I will be using dummy data from – eforexcel(1 million records)

The most straight forward way to do this would be to just do a df.to_sql like this

df = pd.read_csv("records.csv")
df.to_sql(
    name="test_table",
    con=connection_detail,
    schema="schema",
    if_exists="replace",
)

Something like this would take more than an hour! Lets do it in less than 5 minutes.

Now ofcourse there are several ways to make this faster – using copy expert, psycogpg driver etc(maybe a sepearate blog post on these), but that’s not the use case I have been tasked with. Since we need to upload the file s3 in the end for audit purposes I will ingest the data from S3 to DB.

Generate table metadata

Before we can assign an s3 operator to ingest the data we need to create the table into which this data will be inserted. We have two ways that I can think of

  1. Each column in the file will be created in the DB with a highest threshold value like varchar(2000)
  2. Each column is created with the data length as max length in each row

I will be going with option 2 here.

This entire process took around 210 seconds instead of more than an hour like the last run.

Let’s go over the code one by one

Read the csv

  1. We can pass the data directly to pandas or stream it into buffered memory something like this
with open("records.csv") as f:
    csv_rdr = csv.reader(f, delimiter=",")
    header = next(csv_rdr)
    with gzip.GzipFile(fileobj=mem_file, mode="wb", compresslevel=6) as gz:
        buff = io.StringIO()
        writer = csv.writer(buff)
        writer.writerows([header])
        for row in csv_rdr:
            writer.writerows([row])
        gz.write(buff.getvalue().encode("utf-8", "replace"))
    mem_file.seek(0)
    s3.put_object(Bucket="mybucket", Key="folder/file.gz", Body=mem_file)

2. Since the file is less than 50 MB i’ll go ahead and load it directly.

Create the table

Get the max lengths of each column and use that to generate the table. We use pandas to_sql() function for this and pass the dtypes.

Copy data from s3 gzipped file to postgres

Finally we use –

aws_s3.table_import_from_s3

to copy over the file to the postgres table.

Leave a reply:

Your email address will not be published.

Site Footer

Sliding Sidebar

About Me

About Me

Hey, I am Thomas Ashish Cherian. What I cannot create, I do not understand.

Social Profiles