Skip to content

Commit

Permalink
Merge pull request #20 from unicef/staging
Browse files Browse the repository at this point in the history
Staging
  • Loading branch information
vikashkum05 authored Aug 12, 2024
2 parents 0bc241f + 321b6ba commit 611a387
Show file tree
Hide file tree
Showing 7 changed files with 464 additions and 69 deletions.
3 changes: 3 additions & 0 deletions proco/connection_statistics/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ def calculate_country_download_data(self, start_date, end_date, week_number, yea
realtime_registration_status__rt_registered=True,
realtime_registration_status__rt_registration_date__date__lte=end_date,
realtime_registration_status__deleted__isnull=True,
t__deleted__isnull=True,
).annotate(
dummy_group_by=Value(1)).values('dummy_group_by').annotate(
good=Count(Case(When(t__connectivity_speed__gt=speed_benchmark, then='id')), distinct=True),
Expand Down Expand Up @@ -570,8 +571,10 @@ def generate_country_graph_data(self, start_date, end_date):
avg_daily_connectivity_speed = self.queryset.filter(
realtime_registration_status__rt_registered=True,
realtime_registration_status__rt_registration_date__date__lte=end_date,
realtime_registration_status__deleted__isnull=True,
daily_status__date__range=[start_date, end_date],
daily_status__connectivity_speed__isnull=False,
daily_status__deleted__isnull=True,
).values('daily_status__date').annotate(
avg_speed=Avg('daily_status__connectivity_speed'),
).order_by('daily_status__date')
Expand Down
31 changes: 31 additions & 0 deletions proco/core/management/commands/data_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,29 @@ def add_arguments(self, parser):
help='Pass the School ID in case want to control the update.'
)

parser.add_argument(
'--data_loss_recovery_for_pcdc_weekly_with_schedular', action='store_true',
dest='data_loss_recovery_for_pcdc_weekly_with_schedular', default=False,
help='If provided, run the data_loss_recovery_for_pcdc_weekly utility through Schedular in real time.'
)

parser.add_argument(
'-start_week_no', dest='start_week_no', type=int,
required=False,
help='Start week no from which we need to pull the data and then do aggregation.'
)

parser.add_argument(
'-end_week_no', dest='end_week_no', type=int,
required=False,
help='End week no from which we need to pull the data and then do aggregation.'
)

parser.add_argument(
'--pull_data', action='store_true', dest='pull_data', default=False,
help='Pull the PCDC live data from API for specified date.'
)

def handle(self, **options):
logger.info('Executing data cleanup utility.\n')
logger.debug('Options: {}\n\n'.format(options))
Expand Down Expand Up @@ -583,4 +606,12 @@ def handle(self, **options):
# redo_aggregations_task(country_year[0], country_year[1], None)
redo_aggregations_task.delay(country_year[0], country_year[1], week_no)

if options.get('data_loss_recovery_for_pcdc_weekly_with_schedular'):
start_week_no = options.get('start_week_no', None)
end_week_no = options.get('end_week_no', None)
year = options.get('year', None)
pull_data = options.get('pull_data', False)

sources_tasks.data_loss_recovery_for_pcdc_weekly_task.delay(start_week_no, end_week_no, year, pull_data)

logger.info('Completed data cleanup successfully.\n')
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,22 @@ def check_missing_dates_to_table(date_list):
timestamp__date__lte=date_list[-1],
).values_list('timestamp__date', flat=True).distinct('timestamp__date').order_by('timestamp__date')

logger.debug('Missing dates are between {0} - {1}: '.format(date_list[0], date_list[-1]))
logger.info('Missing dates are between {0} - {1}: '.format(date_list[0], date_list[-1]))
missing_dates = list(set(date_list) - set(list(pcdc_timestamp_qry)))

for missing_date in sorted(missing_dates):
# print missing date in string format
logger.debug(date_utilities.format_date(missing_date))
logger.info(date_utilities.format_date(missing_date))


def delete_dailycheckapp_realtime_data(date):
logger.debug('Deleting all the PCDC rows only, from "RealTimeConnectivity" data table for date: {0}'.format(date))
logger.info('Deleting all the PCDC rows only, from "RealTimeConnectivity" data table for date: {0}'.format(date))
RealTimeConnectivity.objects.filter(
created__date=date,
live_data_source=statistics_configs.DAILY_CHECK_APP_MLAB_SOURCE,
).delete()

logger.debug('Deleting all the rows from "DailyCheckAppMeasurementData" data table for date: {0}'.format(date))
logger.info('Deleting all the rows from "DailyCheckAppMeasurementData" data table for date: {0}'.format(date))
DailyCheckAppMeasurementData.objects.filter(timestamp__date=date).delete()


Expand Down Expand Up @@ -123,16 +123,16 @@ def handle(self, **options):
if pull_data and pull_data_date:
pull_data_date = pull_data_date.date()

logger.debug('Deleting PCDC data for date: {}'.format(pull_data_date))
logger.info('Deleting PCDC data for date: {}'.format(pull_data_date))
delete_dailycheckapp_realtime_data(pull_data_date)
logger.debug('Data deleted successfully.\n\n')
logger.info('Data deleted successfully.\n\n')

logger.debug('Syncing the PCDC api data to proco PCDC table for date: {}'.format(pull_data_date))
logger.info('Syncing the PCDC api data to proco PCDC table for date: {}'.format(pull_data_date))
sync_dailycheckapp_realtime_data(pull_data_date)
logger.debug('Data synced successfully.\n\n')
logger.info('Data synced successfully.\n\n')

logger.debug('Aggregating the pulled data by giga_id_school + country_code and '
'storing in RealTimeConnectivity table.')
logger.info('Aggregating the pulled data by giga_id_school + country_code and '
'storing in RealTimeConnectivity table.')
dailycheckapp_measurements = DailyCheckAppMeasurementData.objects.filter(
timestamp__date=pull_data_date,
).filter(
Expand All @@ -158,16 +158,12 @@ def handle(self, **options):
'country_code', flat=True,
).order_by('country_code'))
for country_code in countries:
logger.debug('Current country code: {}'.format(country_code))
logger.info('Current country code: {}'.format(country_code))
if country_code:
country = Country.objects.filter(code=country_code).first()
else:
country = None

schools_qs = School.objects
if country:
schools_qs = schools_qs.filter(country=country)

dcm_giga_ids = set(dailycheckapp_measurements.filter(
country_code=country_code,
source__iexact='DailyCheckApp',
Expand All @@ -177,9 +173,9 @@ def handle(self, **options):

dcm_schools = {
school.giga_id_school: school
for school in schools_qs.filter(giga_id_school__in=dcm_giga_ids)
for school in School.objects.filter(giga_id_school__in=dcm_giga_ids)
}
logger.debug('Total schools in dailycheckapp: {0}, Successfully mapped schools: {1}'.format(
logger.info('Total schools in dailycheckapp: {0}, Successfully mapped schools: {1}'.format(
len(dcm_giga_ids), len(dcm_schools)))

mlab_school_ids = set(dailycheckapp_measurements.filter(
Expand All @@ -189,11 +185,15 @@ def handle(self, **options):
'school_id', flat=True,
).order_by('school_id'))

schools_qs = School.objects
if country:
schools_qs = schools_qs.filter(country=country)

mlab_schools = {
school.external_id: school
for school in schools_qs.filter(external_id__in=mlab_school_ids)
}
logger.debug('Total schools in MLab: {0}, Successfully mapped schools: {1}'.format(
logger.info('Total schools in MLab: {0}, Successfully mapped schools: {1}'.format(
len(mlab_school_ids), len(mlab_schools)))

unknown_schools = []
Expand Down Expand Up @@ -232,22 +232,22 @@ def handle(self, **options):
))

if len(realtime) == 5000:
logger.debug(
logger.info(
'Loading the data to "RealTimeConnectivity" table as it has reached 5000 benchmark.')
RealTimeConnectivity.objects.bulk_create(realtime)
realtime = []

if len(unknown_schools) > 0:
logger.debug('Skipped dailycheckapp measurement for country: "{0}" unknown school: {1}'.format(
logger.info('Skipped dailycheckapp measurement for country: "{0}" unknown school: {1}'.format(
country_code, unknown_schools))

logger.debug('Loading the remaining ({0}) data to "RealTimeConnectivity" table.'.format(len(realtime)))
logger.info('Loading the remaining ({0}) data to "RealTimeConnectivity" table.'.format(len(realtime)))
if len(realtime) > 0:
RealTimeConnectivity.objects.bulk_create(realtime)

logger.debug('Aggregated successfully to RealTimeConnectivity table.\n\n')
logger.info('Aggregated successfully to RealTimeConnectivity table.\n\n')

logger.debug('Starting finalizing the records to actual proco tables.')
logger.info('Starting finalizing the records to actual proco tables.')
countries_ids = RealTimeConnectivity.objects.filter(
created__date=pull_data_date,
live_data_source=statistics_configs.DAILY_CHECK_APP_MLAB_SOURCE,
Expand All @@ -261,12 +261,12 @@ def handle(self, **options):
monday_week_no = date_utilities.get_week_from_date(monday_date)
monday_year = date_utilities.get_year_from_date(monday_date)

logger.debug('Weekly record details. \tWeek No: {0}\tYear: {1}'.format(monday_week_no, monday_year))
logger.info('Weekly record details. \tWeek No: {0}\tYear: {1}'.format(monday_week_no, monday_year))

for country_id in countries_ids:
logger.debug('Finalizing the records for Country ID: {0}'.format(country_id))
logger.info('Finalizing the records for Country ID: {0}'.format(country_id))
finalize_previous_day_data(None, country_id, pull_data_date)

logger.info('Finalized records successfully to actual proco tables.\n\n')

logger.info('Completed dataloss recovery for pcdc successfully.\n')
logger.info('Completed data loss recovery utility for pcdc successfully.\n')
Loading

0 comments on commit 611a387

Please sign in to comment.