Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCS/S3 Enumerator #50

Open
kirs opened this issue Mar 20, 2020 · 3 comments
Open

GCS/S3 Enumerator #50

kirs opened this issue Mar 20, 2020 · 3 comments

Comments

@kirs
Copy link
Contributor

kirs commented Mar 20, 2020

I've heard some people use CSV Enumerator combined with a file on GCS or S3. If that's a common pattern, we may consider exposing an enumerator that would support from out of the box.

cc @GustavoCaso @djmortonShopify @cardy31

@GabrielAlacchi
Copy link

GabrielAlacchi commented May 23, 2021

Hey @kirs. I've implemented a similar use case with Azure Blob Storage.

My goal was to be able to not have to download the file (imagine we're processing a 1GB dataset or something along those lines). If each worker node that touches the long running task during its many interrupt and resume cycles has to download the whole file this would be a nightmare. The Blob Storage API allows for loading a range of bytes from the file, which I imagine GCS and S3 also supports. My goal was to be able to stream the file without downloading it as needed during the long running task, and then every byte of the file will only have to be downloaded roughly once (a small % more because I buffer by 256KB at a time and thus jobs will be interrupted with an average of 128KB wasted bytes left to consume which have to be downloaded again the next time), but the necessary ingress scales linearly which is acceptable.

So here was my approach:

First of all I noticed that the CsvEnumerator provided by this gem uses row number as its cursor value. This means that post interrupt the CSV enumerator is reading and parsing every single line of the file that it had previously consumed, as indicated by the implementation of the rows method

@csv.lazy.each_with_index.drop(count_of_processed_rows(cursor)).to_enum { count_of_rows_in_file }

This doesn't work for my purposes and assumes a relatively small file size, so I wrote my own CsvEnumerator which instead counts the number of bytes preceding the current line that is being processed in the file. When you call rows(cursor:) it will read and parse the header, then seek to the correct position in the file and start ingesting rows.
https://gist.github.com/GabrielAlacchi/ab25ecd8359aa7107aae010c4ede97d0
It's not fully featured as it assumes the bare minimum that I needed for my use case, but if you'd like to mainline this into the gem I'd be happy to help.

Now this enumerator can accept any underlying IO object and parse the CSV much more efficiently by seeking to the correct place. As long as the IO object supports gets, eof? and seek(n, IO::SEEK_SET) it can parse the CSV with many of the same options provided by the CSV standard library since it makes use of CSV.parse_line which is a public part of the standard library API. Therefore you can independently write readers for each of the 3 cloud APIs which act like an IO and provide these 3 methods.

Let me know what you think!

@kirs
Copy link
Contributor Author

kirs commented May 25, 2021

This is great! I agree it's a much better approach. We would love to see this a PR if you have some extra time.

@yyamanoi1222
Copy link

Hi @kirs !
Any progress or plans regarding this feature?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants