Skip to content

Commit

Permalink
Interpolate DA from CD in Canada (#528)
Browse files Browse the repository at this point in the history
* Interpolate DA from CD in Canada
  • Loading branch information
ethervoid authored Jul 12, 2018
1 parent ccd41b6 commit 7ad904f
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 10 deletions.
3 changes: 3 additions & 0 deletions tasks/base_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,10 @@ def run(self):
LOGGER.info('time: %s', after - before)
else:
LOGGER.info('populate')
before = time.time()
self.populate()
after = time.time()
LOGGER.info('time: %s', after - before)

before = time.time()
LOGGER.info('update_or_create_metadata')
Expand Down
67 changes: 58 additions & 9 deletions tasks/ca/statcan/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,6 @@ def output(self):
return LocalTarget(os.path.join('tmp', classpath(self), self.task_id))


#####################################
# IMPORT TO TEMP TABLES
#####################################
class CopyDataToTable(BaseParams, TempTableTask):

topic = Parameter()
Expand Down Expand Up @@ -180,9 +177,17 @@ def requires(self):
yield ImportData(resolution=resolution, survey=survey)


#####################################
# COPY TO OBSERVATORY
#####################################
class InterpolateNHSDAFromCD(Task):
topic = Parameter()
resolution = Parameter()

def requires(self):
return {
'nhs': NHS(resolution=GEO_CD, topic=self.topic, survey=SURVEY_NHS),
'geo_cd': Geography(resolution='GEO_CD'),
'geo_da': Geography(resolution='GEO_DA')
}

class Survey(BaseParams, TableTask):

topic = Parameter(default='t001')
Expand Down Expand Up @@ -212,6 +217,42 @@ def columns(self):
return cols

def populate(self):
if self.survey == SURVEY_NHS:
if self.resolution == GEO_DA:
self.populate_da_from_cd()
else:
self.populate_general()
else:
self.populate_general()

def populate_da_from_cd(self):
session = current_session()
columns = self.columns()
colnames = list(columns.keys())
out_colnames = [oc for oc in colnames if oc is not None]
in_colnames = ['da.geom_id']
for colname in out_colnames:
if colname != 'geo_code':
# We reduce the number of decimals to reduce the size of the row to avoid hit
# the limit which is 8Kb. More info https://github.com/CartoDB/bigmetadata/issues/527
in_colnames.append('round(cast(float8 ({colname} * (ST_Area(da.the_geom)/ST_Area(cd.the_geom))) as numeric), 2) {colname}'.format(colname=colname))

insert_query = '''
INSERT INTO {output} ({out_colnames})
SELECT {in_colnames} FROM {da_geom} da
INNER JOIN {cd_geom} cd ON (cd.geom_id = left(da.geom_id,4))
INNER JOIN {cd_data} data ON (cd.geom_id = data.geo_code)
'''.format(output=self.output().table,
da_geom=self.input()['geo'].table,
cd_geom=self.input()['geo_cd'].table,
cd_data=self.input()['data_cd'].table,
in_colnames=', '.join(in_colnames),
out_colnames=', '.join(out_colnames))

LOGGER.debug(insert_query)
session.execute(insert_query)

def populate_general(self):
session = current_session()
columns = self.columns()
out_colnames = list(columns.keys())
Expand Down Expand Up @@ -276,12 +317,20 @@ def requires(self):
class NHS(Survey):

def requires(self):
return {
'data': CopyDataToTable(resolution=self.resolution, survey=SURVEY_NHS, topic=self.topic),
requires = {
'geo': Geography(resolution=self.resolution),
'geometa': GeographyColumns(resolution=self.resolution),
'meta': NHSColumns(),
}
# DA interpolate data and there is no data for DA in NHS so we should
# avoid this step for DA resolution
if self.resolution == GEO_DA:
requires['geo_cd'] = Geography(resolution=GEO_CD)
requires['data_cd'] = NHS(resolution=GEO_CD, survey=self.survey, topic=self.topic)
else:
requires['data'] = CopyDataToTable(resolution=self.resolution, survey=SURVEY_NHS, topic=self.topic)

return requires

def targets(self):
return {
Expand All @@ -296,7 +345,7 @@ class AllNHSTopics(BaseParams, WrapperTask):
def requires(self):
topic_range = list(range(1, 30)) # 1-29

for resolution in (GEO_CT, GEO_PR, GEO_CD, GEO_CSD, GEO_CMA):
for resolution in (GEO_CT, GEO_PR, GEO_CD, GEO_CSD, GEO_CMA, GEO_DA):
for count in topic_range:
topic = 't{:03d}'.format(count)
yield NHS(resolution=resolution, survey=SURVEY_NHS, topic=topic)
Expand Down
1 change: 0 additions & 1 deletion tasks/ca/statcan/geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ def requires(self):
def download(self):
copyfile(self.input().path, '{output}.zip'.format(output=self.output().path))


class ImportGeography(Shp2TempTableTask):
'''
Import geographies into postgres by geography level
Expand Down

0 comments on commit 7ad904f

Please sign in to comment.