-
Notifications
You must be signed in to change notification settings - Fork 0
/
app_dask.py
44 lines (34 loc) · 935 Bytes
/
app_dask.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import os
from functools import wraps
from pathlib import Path
import dask.dataframe as dd
import typer
from dask.distributed import Client
from app_pandas import pandas_types
from utils import cols, with_res_logger
app = typer.Typer()
def with_client(f):
@wraps(f)
def wrapper(*args, **kwargs):
with Client():
return f(*args, **kwargs)
return wrapper
def load(year):
return dd.read_csv(
Path(os.environ["DATA"]) / f"{year}.csv",
dtype={k: pandas_types[v] for k, v in cols.items() if v != "date"},
parse_dates=[k for k, v in cols.items() if v == "date"],
on_bad_lines="warn",
)
@app.command()
@with_res_logger
@with_client
def top_flop(year: str):
serie = (
load(year)
.groupby(["code_postal"], group_keys=False)["id_mutation"]
.nunique()
.compute()
)
print(serie.nlargest(10))
print(serie.nsmallest(10))