Skip to content

Commit

Permalink
v4.3.1: 🔧 Fix time series and vax collections update (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziomiano committed Jan 30, 2021
2 parents 22f24e3 + 95ab2ec commit cb091cc
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 45 deletions.
5 changes: 2 additions & 3 deletions app/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,11 @@ def get_total_administrations(area=None):
{'$group': {'_id': '{}', 'tot': {'$sum': '$totale'}}}
]
try:

cursor = VAX_SUMMARY_COLL.aggregate(pipeline=pipe)
tot_adms = next(cursor)['tot']
except Exception as e:
app.logger.error(f"While getting total admins: {e}")
return tot_adms
return int(tot_adms)


def get_age_chart_data(area=None):
Expand Down Expand Up @@ -326,7 +325,7 @@ def get_category_chart_data(area=None):
"""Return category series data"""
chart_data = []
if area is not None:
match = {'$match': {VAX_AREA_KEY: area}},
match = {'$match': {VAX_AREA_KEY: area}}
group = {
'$group': {
'_id': f'${VAX_AREA_KEY}',
Expand Down
21 changes: 16 additions & 5 deletions app/data/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,10 +455,21 @@ def augment_summary_vax_df(df):
- column population based on mapping of VAX_AREA_KEY
to config ITALY_POPULATION
"""
df['_id'] = (
df[VAX_DATE_KEY].apply(lambda x: x.strftime(VAX_DATE_FMT)) +
df[VAX_AREA_KEY]
out_df = pd.DataFrame()
for r in df[VAX_AREA_KEY].unique():
reg_df = df[df[VAX_AREA_KEY] == r]
reg_df = reg_df.set_index(VAX_DATE_KEY).resample('1D').asfreq()
for col in reg_df:
if isinstance(reg_df[col].values[-1], str):
reg_df[col].ffill(inplace=True)
else:
reg_df.fillna(0, inplace=True)
out_df = out_df.append(reg_df)
out_df.reset_index(inplace=True)
out_df['_id'] = (
out_df[VAX_DATE_KEY].apply(lambda x: x.strftime(VAX_DATE_FMT)) +
out_df[VAX_AREA_KEY]
)
df[POP_KEY] = df[VAX_AREA_KEY].apply(
out_df[POP_KEY] = out_df[VAX_AREA_KEY].apply(
lambda x: ITALY_POPULATION[OD_TO_PC_MAP[x]])
return df
return out_df
52 changes: 18 additions & 34 deletions app/db/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
import pandas as pd
from flask import current_app as app
from pymongo import UpdateOne
from pymongo import UpdateOne, InsertOne

from app.data import (
NAT_DATA_COLL, NAT_TRENDS_COLL, NAT_SERIES_COLL, REG_DATA_COLL,
Expand Down Expand Up @@ -353,7 +353,7 @@ def update_provincial_series_or_trends_collection(coll_type):
def update_vax_collection(summary=False):
"""Update vax / vax_summary collection"""
response = {"status": "ko", "n_inserted_docs": 0, "errors": []}
inserted_ids = []
operations = []
if not summary:
collection = VAX_COLL
url = URL_VAX_DATA
Expand All @@ -365,38 +365,22 @@ def update_vax_collection(summary=False):
df = pd.read_csv(url, parse_dates=[VAX_DATE_KEY])
df = augment_summary_vax_df(df)
try:
records_in_db = list(collection.find())
if records_in_db:
df_mongo = pd.DataFrame(records_in_db)
latest_dt_target = df_mongo[VAX_DATE_KEY].max()
latest_dt_source = df[VAX_DATE_KEY].max()
# latest_dt_target > latest_dt source is not an option
if latest_dt_target < latest_dt_source:
df_to_db = df[df[VAX_DATE_KEY] > latest_dt_target]
new_records = df_to_db.to_dict(orient='records')
r = collection.insert_many(new_records, ordered=True)
inserted_ids.extend(r.inserted_ids)
if latest_dt_target == latest_dt_source:
df = df[df[VAX_DATE_KEY] == latest_dt_source]
operations = []
for index, row in df.iterrows():
_id = row['_id']
new_value = row.to_dict()
operations.append(
UpdateOne({'_id': _id}, {'$set': new_value})
)
r = collection.bulk_write(operations)
response['bulk_update'] = r.bulk_api_result
response["status"] = "ok"
else:
msg = f"Filling empty {collection.name}"
app.logger.warning(msg)
r = collection.insert_many(df.to_dict(orient='records'))
inserted_ids.extend(r.inserted_ids)
response["status"] = "ok"
response["msg"] = msg
response["n_inserted_docs"] = len(inserted_ids)
msg = f"{len(inserted_ids)} docs updated in {collection.name}"
for index, row in df.iterrows():
_id = row['_id']
cursor = collection.find({'_id': _id})
new_value = row.to_dict()
try:
next(cursor)
operations.append(UpdateOne({'_id': _id}, {'$set': new_value}))
except StopIteration:
operations.append(InsertOne(new_value))
r = collection.bulk_write(operations)
bulk_result = r.bulk_api_result
response['bulk_update'] = bulk_result
response["status"] = "ok"
n_inserted = bulk_result['nInserted']
n_modified = bulk_result['nModified']
msg = f"{n_inserted} inserted and {n_modified} modified"
app.logger.warning(msg)
except Exception as e:
app.logger.error(f"While updating vax collection: {e}")
Expand Down
14 changes: 11 additions & 3 deletions app/static/js/vaccines-charts.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ if (!REGIONS.includes(area)) {
text: adminsTimeseriesyAxisTitle
}
},
tooltip: {
crosshairs: {
width: 1,
color: 'gray',
dashStyle: 'ShortDashDot'
},
shared: true,
split: false,
enabled: true
},
legend: {
enabled: false
},
Expand Down Expand Up @@ -178,8 +188,6 @@ let adminsPerCategory = {
data: adminsPerCategoryData
}],
subtitle: subtitle,
credits: {
enabled: false
}
credits: credits
}
Highcharts.chart('chart-pie-categories', adminsPerCategory);

0 comments on commit cb091cc

Please sign in to comment.