Skip to content

Commit

Permalink
Run CSV parsing in a separate thread, refs #38
Browse files Browse the repository at this point in the history
  • Loading branch information
simonw committed Jan 25, 2024
1 parent 4c19d5e commit 8461488
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 42 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ __pycache__/
*.py[cod]
*$py.class
venv
venv-1
.eggs
.pytest_cache
*.egg-info
Expand Down
136 changes: 94 additions & 42 deletions datasette_upload_csvs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from datasette import hookimpl
from datasette.utils.asgi import Response, Forbidden
from charset_normalizer import detect
Expand All @@ -10,6 +11,7 @@
import os
import sqlite_utils
from sqlite_utils.utils import TypeTracker
import threading
import uuid


Expand Down Expand Up @@ -124,57 +126,107 @@ def insert_initial_record(conn):

await db.execute_write_fn(insert_initial_record)

def insert_docs(database):
reader = csv_std.reader(codecs.iterdecode(csv.file, encoding))
headers = next(reader)

tracker = TypeTracker()

docs = tracker.wrap(dict(zip(headers, row)) for row in reader)
# We run the CSV parser in a thread, sending 100 rows at a time to the DB
def parse_csv_in_thread(event_loop, csv_file, db, table_name, task_id):
try:
reader = csv_std.reader(codecs.iterdecode(csv_file, encoding))
headers = next(reader)

tracker = TypeTracker()

docs = tracker.wrap(dict(zip(headers, row)) for row in reader)

i = 0

def docs_with_progress():
nonlocal i
for doc in docs:
i += 1
yield doc
if i % 10 == 0:

def update_progress(conn):
database = sqlite_utils.Database(conn)
database["_csv_progress_"].update(
task_id,
{
"rows_done": i,
"bytes_done": csv_file.tell(),
},
)

asyncio.run_coroutine_threadsafe(
db.execute_write_fn(update_progress), event_loop
)

def write_batch(batch):
def insert_batch(conn):
database = sqlite_utils.Database(conn)
database[table_name].insert_all(batch, alter=True)

future = asyncio.run_coroutine_threadsafe(
db.execute_write_fn(insert_batch), event_loop
)
# Wait for it to finish so we don't overwhelm write queue
future.result()

batch = []
batch_size = 0
for doc in docs_with_progress():
batch.append(doc)
batch_size += 1
if batch_size > 100:
write_batch(batch)
batch = []
batch_size = 0

if batch:
write_batch(batch)

# Mark progress as complete
def mark_complete(conn):
nonlocal i
database = sqlite_utils.Database(conn)
database["_csv_progress_"].update(
task_id,
{
"rows_done": i,
"bytes_done": total_size,
"completed": str(datetime.datetime.utcnow()),
},
)

i = 0
asyncio.run_coroutine_threadsafe(
db.execute_write_fn(mark_complete), event_loop
)

def docs_with_progress():
nonlocal i
for doc in docs:
i += 1
yield doc
if i % 10 == 0:
database["_csv_progress_"].update(
task_id,
{
"rows_done": i,
"bytes_done": csv.file.tell(),
},
)
# Transform columns to detected types
def transform_columns(conn):
database = sqlite_utils.Database(conn)
database[table_name].transform(types=tracker.types)

database[table_name].insert_all(
docs_with_progress(), alter=True, batch_size=100
)
database["_csv_progress_"].update(
task_id,
{
"rows_done": i,
"bytes_done": total_size,
"completed": str(datetime.datetime.utcnow()),
},
)
# Transform columns to detected types
database[table_name].transform(types=tracker.types)
return database[table_name].count
asyncio.run_coroutine_threadsafe(
db.execute_write_fn(transform_columns), event_loop
)
except Exception as error:

def insert_docs_catch_errors(conn):
database = sqlite_utils.Database(conn)
with conn:
try:
insert_docs(database)
except Exception as error:
def insert_error(conn):
database = sqlite_utils.Database(conn)
database["_csv_progress_"].update(
task_id,
{"error": str(error)},
)

await db.execute_write_fn(insert_docs_catch_errors, block=False)
asyncio.run_coroutine_threadsafe(
db.execute_write_fn(insert_error), event_loop
)

loop = asyncio.get_running_loop()

# Start that thread running in the default executor in the background
loop.run_in_executor(
None, parse_csv_in_thread, loop, csv.file, db, table_name, task_id
)

if formdata.get("xhr"):
return Response.json(
Expand Down

0 comments on commit 8461488

Please sign in to comment.