One of the tasks I recently came accross my job was ingest large files but with the following
- Do some processing ( like generate hash for each row )
- Insert it into S3 for audit purposes
- Insert into postgres
Keep in mind your postgres database needs to support this and a s3 bucket policy needs to exist in order to allow the data to be copied over.
The setup I am using is a RDS database with S3 in the same region and proper policies and IAM roles already created.
Read more on that here – AWS documentation
For the purpose of this post I will be using dummy data from – eforexcel(1 million records)
The most straight forward way to do this would be to just do a df.to_sql like this
df = pd.read_csv("records.csv") df.to_sql( name="test_table", con=connection_detail, schema="schema", if_exists="replace", )
Something like this would take more than an hour! Lets do it in less than 5 minutes.
Now ofcourse there are several ways to make this faster – using copy expert, psycogpg driver etc(maybe a sepearate blog post on these), but that’s not the use case I have been tasked with. Since we need to upload the file s3 in the end for audit purposes I will ingest the data from S3 to DB.
Generate table metadata
Before we can assign an s3 operator to ingest the data we need to create the table into which this data will be inserted. We have two ways that I can think of
- Each column in the file will be created in the DB with a highest threshold value like varchar(2000)
- Each column is created with the data length as max length in each row
I will be going with option 2 here.
This entire process took around 210 seconds instead of more than an hour like the last run.
Let’s go over the code one by one
Read the csv
- We can pass the data directly to pandas or stream it into buffered memory something like this
with open("records.csv") as f: csv_rdr = csv.reader(f, delimiter=",") header = next(csv_rdr) with gzip.GzipFile(fileobj=mem_file, mode="wb", compresslevel=6) as gz: buff = io.StringIO() writer = csv.writer(buff) writer.writerows([header]) for row in csv_rdr: writer.writerows([row]) gz.write(buff.getvalue().encode("utf-8", "replace")) mem_file.seek(0) s3.put_object(Bucket="mybucket", Key="folder/file.gz", Body=mem_file)
2. Since the file is less than 50 MB i’ll go ahead and load it directly.
Create the table
Get the max lengths of each column and use that to generate the table. We use pandas to_sql() function for this and pass the dtypes.
Copy data from s3 gzipped file to postgres
Finally we use –
to copy over the file to the postgres table.