Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Foot traffic data import and aggregations #479

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
2 changes: 2 additions & 0 deletions postgres_10/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,7 @@ COPY docker-entrypoint.sh /usr/local/bin/
RUN chmod 755 /usr/local/bin/docker-entrypoint.sh
RUN ln -s /usr/local/bin/docker-entrypoint.sh /docker-entrypoint.sh

RUN apt-get install postgresql-plpython-10

EXPOSE 5432
ENTRYPOINT ["docker-entrypoint.sh"]
5 changes: 5 additions & 0 deletions postgres_10/docker-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ if [ ! -s "$PGDATA/PG_VERSION" ]; then

su - postgres -c "PGUSER=${PGUSER:-postgres} /usr/lib/postgresql/10/bin/pg_ctl -D $PGDATA -m fast -w stop"

echo
echo 'Creating extension plpythonu'
echo
su - psql -c 'CREATE EXTENSION plpythonu'

echo
echo 'PostgreSQL init process complete; ready for start up.'
echo
Expand Down
51 changes: 51 additions & 0 deletions tasks/foot_traffic/data_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import os
import urllib.request
from luigi import Task, LocalTarget
from lib.logger import get_logger
from tasks.util import (classpath, unqualified_task_id)
from tasks.foot_traffic.quadkey import tile2bounds, quadkey2tile

LOGGER = get_logger(__name__)


class DownloadData(Task):

URL = 'YOUR_URL_HERE'

def run(self):
self.output().makedirs()
urllib.request.urlretrieve(self.URL, self.output().path)

def output(self):
return LocalTarget(os.path.join('tmp', classpath(self),
unqualified_task_id(self.task_id) + '.csv'))


class AddLatLngData(Task):
def requires(self):
return DownloadData()

def _point_position(self, z, x, y):
lon0, lat0, lon1, lat1 = tile2bounds(z, x, y)
return (lon0 + lon1) / 2, (lat0 + lat1) / 2

def run(self):
with open(self.output().path, 'w') as outfile, open(self.input().path, 'r', encoding='utf-8') as infile:
i = 0
for line in infile:
line = line.split(',')
quadkey = line[0]
z, x, y = quadkey2tile(quadkey)
lon, lat = self._point_position(z, x, y)
line.append(lon)
line.append(lat)
outline = [line[0]] + [lon] + [lat] + [line[1]] + [line[2]] + [line[3]]
outfile.write(','.join([str(c) for c in outline]))

i = i + 1
if i % 10000 == 0:
LOGGER.info('Written {i} lines'.format(i=i))

def output(self):
return LocalTarget(os.path.join('tmp', classpath(self),
unqualified_task_id(self.task_id) + '.csv'))
72 changes: 72 additions & 0 deletions tasks/foot_traffic/foot_traffic_ch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import requests

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'requests' imported but unused

from luigi import Task
from lib.logger import get_logger
from tasks.foot_traffic.data_file import AddLatLngData

LOGGER = get_logger(__name__)
CLICKHOUSE_HOST = '172.17.0.1'
CLICKHOUSE_PORT = 8123
DATABASE_NAME = 'foot_traffic'
TABLE_NAME = 'foot_traffic'
QUADKEY_FIELD = 'quadkey'
LONGITUDE_FIELD = 'lon'
LATITUDE_FIELD = 'lat'
DATE_FIELD = 'ftdate'
HOUR_FIELD = 'fthour'
VALUE_FIELD = 'val'


def execute_query(query):
uri = 'https://{host}:{port}/?query={query}'.format(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT, query=query)
response = requests.get(uri)
if response.status_code != requests.codes.ok:
raise RuntimeError('Received status code {code}'.format(code=response.status_code))
return response.text


class ImportData(Task):
def requires(self):
return AddLatLngData()

def _create_table(self):
query = '''
CREATE DATABASE IF NOT EXISTS "{database}";
'''.format(
database=DATABASE_NAME
)
execute_query(query)

query = '''
CREATE TABLE IF NOT EXISTS {database}.{table} (
{quadkey} String,
{longitude} Float64,
{latitude} Float64,
{date} Date,
{hour} UInt8,
{value} UInt16
) ENGINE = MergeTree({date}, ({quadkey}, {longitude}, {latitude}, {date}, {hour}), 8192)
'''.format(
database=DATABASE_NAME,
table=TABLE_NAME,
quadkey=QUADKEY_FIELD,
longitude=LONGITUDE_FIELD,
latitude=LATITUDE_FIELD,
date=DATE_FIELD,
hour=HOUR_FIELD,
value=VALUE_FIELD,
)
execute_query(query)

def run(self):
self._create_table()

def complete(self):
query = '''
EXISTS TABLE {database}.{table}
'''.format(
database=DATABASE_NAME,
table=TABLE_NAME,
)
exists = execute_query(query)
LOGGER.error(exists)
return exists == '1'
Loading