diff --git a/proco/connection_statistics/api.py b/proco/connection_statistics/api.py index 5fac616..5cfd173 100644 --- a/proco/connection_statistics/api.py +++ b/proco/connection_statistics/api.py @@ -481,6 +481,7 @@ def calculate_country_download_data(self, start_date, end_date, week_number, yea realtime_registration_status__rt_registered=True, realtime_registration_status__rt_registration_date__date__lte=end_date, realtime_registration_status__deleted__isnull=True, + t__deleted__isnull=True, ).annotate( dummy_group_by=Value(1)).values('dummy_group_by').annotate( good=Count(Case(When(t__connectivity_speed__gt=speed_benchmark, then='id')), distinct=True), @@ -570,8 +571,10 @@ def generate_country_graph_data(self, start_date, end_date): avg_daily_connectivity_speed = self.queryset.filter( realtime_registration_status__rt_registered=True, realtime_registration_status__rt_registration_date__date__lte=end_date, + realtime_registration_status__deleted__isnull=True, daily_status__date__range=[start_date, end_date], daily_status__connectivity_speed__isnull=False, + daily_status__deleted__isnull=True, ).values('daily_status__date').annotate( avg_speed=Avg('daily_status__connectivity_speed'), ).order_by('daily_status__date') diff --git a/proco/core/management/commands/data_cleanup.py b/proco/core/management/commands/data_cleanup.py index ff11998..4868408 100644 --- a/proco/core/management/commands/data_cleanup.py +++ b/proco/core/management/commands/data_cleanup.py @@ -475,6 +475,29 @@ def add_arguments(self, parser): help='Pass the School ID in case want to control the update.' ) + parser.add_argument( + '--data_loss_recovery_for_pcdc_weekly_with_schedular', action='store_true', + dest='data_loss_recovery_for_pcdc_weekly_with_schedular', default=False, + help='If provided, run the data_loss_recovery_for_pcdc_weekly utility through Schedular in real time.' + ) + + parser.add_argument( + '-start_week_no', dest='start_week_no', type=int, + required=False, + help='Start week no from which we need to pull the data and then do aggregation.' + ) + + parser.add_argument( + '-end_week_no', dest='end_week_no', type=int, + required=False, + help='End week no from which we need to pull the data and then do aggregation.' + ) + + parser.add_argument( + '--pull_data', action='store_true', dest='pull_data', default=False, + help='Pull the PCDC live data from API for specified date.' + ) + def handle(self, **options): logger.info('Executing data cleanup utility.\n') logger.debug('Options: {}\n\n'.format(options)) @@ -583,4 +606,12 @@ def handle(self, **options): # redo_aggregations_task(country_year[0], country_year[1], None) redo_aggregations_task.delay(country_year[0], country_year[1], week_no) + if options.get('data_loss_recovery_for_pcdc_weekly_with_schedular'): + start_week_no = options.get('start_week_no', None) + end_week_no = options.get('end_week_no', None) + year = options.get('year', None) + pull_data = options.get('pull_data', False) + + sources_tasks.data_loss_recovery_for_pcdc_weekly_task.delay(start_week_no, end_week_no, year, pull_data) + logger.info('Completed data cleanup successfully.\n') diff --git a/proco/data_sources/management/commands/data_loss_recovery_for_pcdc.py b/proco/data_sources/management/commands/data_loss_recovery_for_pcdc.py index cae118b..733bea2 100644 --- a/proco/data_sources/management/commands/data_loss_recovery_for_pcdc.py +++ b/proco/data_sources/management/commands/data_loss_recovery_for_pcdc.py @@ -31,22 +31,22 @@ def check_missing_dates_to_table(date_list): timestamp__date__lte=date_list[-1], ).values_list('timestamp__date', flat=True).distinct('timestamp__date').order_by('timestamp__date') - logger.debug('Missing dates are between {0} - {1}: '.format(date_list[0], date_list[-1])) + logger.info('Missing dates are between {0} - {1}: '.format(date_list[0], date_list[-1])) missing_dates = list(set(date_list) - set(list(pcdc_timestamp_qry))) for missing_date in sorted(missing_dates): # print missing date in string format - logger.debug(date_utilities.format_date(missing_date)) + logger.info(date_utilities.format_date(missing_date)) def delete_dailycheckapp_realtime_data(date): - logger.debug('Deleting all the PCDC rows only, from "RealTimeConnectivity" data table for date: {0}'.format(date)) + logger.info('Deleting all the PCDC rows only, from "RealTimeConnectivity" data table for date: {0}'.format(date)) RealTimeConnectivity.objects.filter( created__date=date, live_data_source=statistics_configs.DAILY_CHECK_APP_MLAB_SOURCE, ).delete() - logger.debug('Deleting all the rows from "DailyCheckAppMeasurementData" data table for date: {0}'.format(date)) + logger.info('Deleting all the rows from "DailyCheckAppMeasurementData" data table for date: {0}'.format(date)) DailyCheckAppMeasurementData.objects.filter(timestamp__date=date).delete() @@ -123,16 +123,16 @@ def handle(self, **options): if pull_data and pull_data_date: pull_data_date = pull_data_date.date() - logger.debug('Deleting PCDC data for date: {}'.format(pull_data_date)) + logger.info('Deleting PCDC data for date: {}'.format(pull_data_date)) delete_dailycheckapp_realtime_data(pull_data_date) - logger.debug('Data deleted successfully.\n\n') + logger.info('Data deleted successfully.\n\n') - logger.debug('Syncing the PCDC api data to proco PCDC table for date: {}'.format(pull_data_date)) + logger.info('Syncing the PCDC api data to proco PCDC table for date: {}'.format(pull_data_date)) sync_dailycheckapp_realtime_data(pull_data_date) - logger.debug('Data synced successfully.\n\n') + logger.info('Data synced successfully.\n\n') - logger.debug('Aggregating the pulled data by giga_id_school + country_code and ' - 'storing in RealTimeConnectivity table.') + logger.info('Aggregating the pulled data by giga_id_school + country_code and ' + 'storing in RealTimeConnectivity table.') dailycheckapp_measurements = DailyCheckAppMeasurementData.objects.filter( timestamp__date=pull_data_date, ).filter( @@ -158,16 +158,12 @@ def handle(self, **options): 'country_code', flat=True, ).order_by('country_code')) for country_code in countries: - logger.debug('Current country code: {}'.format(country_code)) + logger.info('Current country code: {}'.format(country_code)) if country_code: country = Country.objects.filter(code=country_code).first() else: country = None - schools_qs = School.objects - if country: - schools_qs = schools_qs.filter(country=country) - dcm_giga_ids = set(dailycheckapp_measurements.filter( country_code=country_code, source__iexact='DailyCheckApp', @@ -177,9 +173,9 @@ def handle(self, **options): dcm_schools = { school.giga_id_school: school - for school in schools_qs.filter(giga_id_school__in=dcm_giga_ids) + for school in School.objects.filter(giga_id_school__in=dcm_giga_ids) } - logger.debug('Total schools in dailycheckapp: {0}, Successfully mapped schools: {1}'.format( + logger.info('Total schools in dailycheckapp: {0}, Successfully mapped schools: {1}'.format( len(dcm_giga_ids), len(dcm_schools))) mlab_school_ids = set(dailycheckapp_measurements.filter( @@ -189,11 +185,15 @@ def handle(self, **options): 'school_id', flat=True, ).order_by('school_id')) + schools_qs = School.objects + if country: + schools_qs = schools_qs.filter(country=country) + mlab_schools = { school.external_id: school for school in schools_qs.filter(external_id__in=mlab_school_ids) } - logger.debug('Total schools in MLab: {0}, Successfully mapped schools: {1}'.format( + logger.info('Total schools in MLab: {0}, Successfully mapped schools: {1}'.format( len(mlab_school_ids), len(mlab_schools))) unknown_schools = [] @@ -232,22 +232,22 @@ def handle(self, **options): )) if len(realtime) == 5000: - logger.debug( + logger.info( 'Loading the data to "RealTimeConnectivity" table as it has reached 5000 benchmark.') RealTimeConnectivity.objects.bulk_create(realtime) realtime = [] if len(unknown_schools) > 0: - logger.debug('Skipped dailycheckapp measurement for country: "{0}" unknown school: {1}'.format( + logger.info('Skipped dailycheckapp measurement for country: "{0}" unknown school: {1}'.format( country_code, unknown_schools)) - logger.debug('Loading the remaining ({0}) data to "RealTimeConnectivity" table.'.format(len(realtime))) + logger.info('Loading the remaining ({0}) data to "RealTimeConnectivity" table.'.format(len(realtime))) if len(realtime) > 0: RealTimeConnectivity.objects.bulk_create(realtime) - logger.debug('Aggregated successfully to RealTimeConnectivity table.\n\n') + logger.info('Aggregated successfully to RealTimeConnectivity table.\n\n') - logger.debug('Starting finalizing the records to actual proco tables.') + logger.info('Starting finalizing the records to actual proco tables.') countries_ids = RealTimeConnectivity.objects.filter( created__date=pull_data_date, live_data_source=statistics_configs.DAILY_CHECK_APP_MLAB_SOURCE, @@ -261,12 +261,12 @@ def handle(self, **options): monday_week_no = date_utilities.get_week_from_date(monday_date) monday_year = date_utilities.get_year_from_date(monday_date) - logger.debug('Weekly record details. \tWeek No: {0}\tYear: {1}'.format(monday_week_no, monday_year)) + logger.info('Weekly record details. \tWeek No: {0}\tYear: {1}'.format(monday_week_no, monday_year)) for country_id in countries_ids: - logger.debug('Finalizing the records for Country ID: {0}'.format(country_id)) + logger.info('Finalizing the records for Country ID: {0}'.format(country_id)) finalize_previous_day_data(None, country_id, pull_data_date) logger.info('Finalized records successfully to actual proco tables.\n\n') - logger.info('Completed dataloss recovery for pcdc successfully.\n') + logger.info('Completed data loss recovery utility for pcdc successfully.\n') diff --git a/proco/data_sources/management/commands/data_loss_recovery_for_pcdc_weekly.py b/proco/data_sources/management/commands/data_loss_recovery_for_pcdc_weekly.py new file mode 100644 index 0000000..f9b70aa --- /dev/null +++ b/proco/data_sources/management/commands/data_loss_recovery_for_pcdc_weekly.py @@ -0,0 +1,324 @@ +import logging +from datetime import timedelta + +from django.conf import settings +from django.core.management import call_command +from django.core.management.base import BaseCommand +from django.db.models import Avg +from django.db.models import F +from django.db.models import Q + +from proco.connection_statistics import models as statistics_models +from proco.connection_statistics.config import app_config as statistics_configs +from proco.core.utils import get_current_datetime_object +from proco.data_sources.models import DailyCheckAppMeasurementData +from proco.data_sources.tasks import finalize_previous_day_data +from proco.data_sources.utils import load_daily_check_app_data_source_response_to_model +from proco.locations.models import Country +from proco.schools.models import School +from proco.utils import dates as date_utilities + +logger = logging.getLogger('gigamaps.' + __name__) + +ds_settings = settings.DATA_SOURCE_CONFIG + + +def delete_dailycheckapp_realtime_data(start_date, end_date, week_no, year): + logger.info('Deleting all the PCDC rows only, from "RealTimeConnectivity" data table for dates: {0} - {1}'.format( + start_date, end_date)) + statistics_models.RealTimeConnectivity.objects.filter( + created__date__gte=start_date, + created__date__lte=end_date, + live_data_source=statistics_configs.DAILY_CHECK_APP_MLAB_SOURCE, + ).delete() + + logger.info('Deleting all the rows from "DailyCheckAppMeasurementData" data table for for dates: {0} - {1}'.format( + start_date, end_date)) + DailyCheckAppMeasurementData.objects.filter( + timestamp__date__gte=start_date, + timestamp__date__lte=end_date, + ).delete() + + logger.info('Updating live PCDC live fields from "SchoolWeeklyStatus" data table for year - week: {0} - {1}'.format( + year, week_no)) + statistics_models.SchoolWeeklyStatus.objects.filter( + week=week_no, + year=year, + school_id__in=list(statistics_models.SchoolDailyStatus.objects.filter( + date__gte=start_date, + date__lte=end_date, + live_data_source=statistics_configs.DAILY_CHECK_APP_MLAB_SOURCE, + ).values_list('school', flat=True).order_by('school_id').distinct('school_id')) + ).update( + connectivity_speed=None, + connectivity_upload_speed=None, + connectivity_latency=None, + connectivity=False + ) + + logger.info('Deleting all the rows from "SchoolDailyStatus" data table for dates: {0} - {1}'.format( + start_date, end_date)) + statistics_models.SchoolDailyStatus.objects.filter( + date__gte=start_date, + date__lte=end_date, + live_data_source=statistics_configs.DAILY_CHECK_APP_MLAB_SOURCE, + ).update( + deleted=get_current_datetime_object() + ) + + impacted_country_ids = (statistics_models.CountryDailyStatus.objects.filter( + date__gte=start_date, + date__lte=end_date, + live_data_source=statistics_configs.DAILY_CHECK_APP_MLAB_SOURCE, + ).values_list('country', flat=True).order_by('country_id').distinct('country_id')) + + logger.info( + 'Updating live PCDC live fields from "CountryWeeklyStatus" data table for year - week: {0} - {1}'.format( + year, week_no)) + statistics_models.CountryWeeklyStatus.objects.filter( + week=week_no, + year=year, + country_id__in=impacted_country_ids, + ).update( + connectivity_availability='no_connectivity', + schools_connectivity_good=0, + schools_connectivity_moderate=0, + schools_connectivity_no=0, + schools_connected=0, + schools_with_data_percentage=0, + connectivity_speed=None, + connectivity_latency=None, + connectivity_upload_speed=None, + ) + + logger.info('Deleting all the rows from "CountryDailyStatus" data table for dates: {0} - {1}'.format( + start_date, end_date)) + statistics_models.CountryDailyStatus.objects.filter( + date__gte=start_date, + date__lte=end_date, + live_data_source=statistics_configs.DAILY_CHECK_APP_MLAB_SOURCE, + ).update( + deleted=get_current_datetime_object() + ) + + return impacted_country_ids + + +def sync_dailycheckapp_realtime_data(date): + request_configs = { + 'url': '{0}/measurements/v2'.format(ds_settings.get('DAILY_CHECK_APP').get('BASE_URL')), + 'method': 'GET', + 'data_limit': 1000, + 'query_params': { + 'page': '{page_no}', + 'size': '{page_size}', + 'orderBy': 'timestamp', + 'filterBy': 'timestamp', + 'filterCondition': 'eq', + 'filterValue': '{0}'.format(date), + }, + 'auth_token_required': True, + 'headers': { + 'Content-Type': 'application/json' + } + } + load_daily_check_app_data_source_response_to_model(DailyCheckAppMeasurementData, request_configs) + + +class Command(BaseCommand): + def add_arguments(self, parser): + + parser.add_argument( + '-start_week_no', dest='start_week_no', type=int, + required=True, + help='Start week no from which we need to pull the data and then do aggregation.' + ) + + parser.add_argument( + '-end_week_no', dest='end_week_no', type=int, + required=True, + help='End week no from which we need to pull the data and then do aggregation.' + ) + + parser.add_argument( + '-year', dest='year', type=int, + required=True, + help='Year for which weeks are provided.' + ) + + parser.add_argument( + '--pull_data', action='store_true', dest='pull_data', default=False, + help='Pull the PCDC live data from API for specified date.' + ) + + def handle(self, **options): + logger.info('Executing data loss recovery for pcdc.') + start_week_no = options.get('start_week_no') + end_week_no = options.get('end_week_no') + year = options.get('year') + pull_data = options.get('pull_data') + + impacted_country_ids = [] + + if start_week_no > end_week_no: + logger.error('Start date value can not be greater than end_date.') + exit(0) + + for week_no in range(start_week_no, end_week_no + 1): + start_date = date_utilities.get_first_date_of_week(year, week_no) + end_date = start_date + timedelta(days=6) + + if pull_data: + country_ids = delete_dailycheckapp_realtime_data(start_date, end_date, week_no, year) + impacted_country_ids.extend(country_ids) + + date_list = sorted( + [(start_date + timedelta(days=x)) for x in range((end_date - start_date).days)] + [end_date]) + logger.info('date_list: {}'.format(date_list)) + + for pull_data_date in date_list: + logger.info('Syncing the PCDC api data to proco PCDC table for date: {}'.format(pull_data_date)) + sync_dailycheckapp_realtime_data(pull_data_date) + logger.info('Data synced successfully.\n\n') + + logger.info('Aggregating the pulled data by giga_id_school + country_code and storing in ' + 'RealTimeConnectivity table.') + dailycheckapp_measurements = DailyCheckAppMeasurementData.objects.filter( + timestamp__date=pull_data_date, + ).filter( + (Q(download__isnull=True) | Q(download__gte=0)) & + (Q(upload__isnull=True) | Q(upload__gte=0)) & + (Q(latency__isnull=True) | Q(latency__gte=0)), + ).values( + 'giga_id_school', 'country_code', 'school_id', 'source', + ).annotate( + download_avg=Avg('download'), + latency_avg=Avg('latency'), + upload_avg=Avg('upload'), + ).order_by('country_code', 'giga_id_school', 'school_id', 'source') + + if not dailycheckapp_measurements.exists(): + logger.error('No records to aggregate on provided date: "{0}". ' + 'Hence stopping the execution here.'.format(pull_data_date)) + return + + realtime = [] + + countries = set(dailycheckapp_measurements.values_list( + 'country_code', flat=True, + ).order_by('country_code')) + for country_code in countries: + logger.info('Current country code: {}'.format(country_code)) + if country_code: + country = Country.objects.filter(code=country_code).first() + else: + country = None + + dcm_giga_ids = set(dailycheckapp_measurements.filter( + country_code=country_code, + source__iexact='DailyCheckApp', + ).values_list( + 'giga_id_school', flat=True, + ).order_by('giga_id_school')) + + dcm_schools = { + school.giga_id_school: school + for school in School.objects.filter(giga_id_school__in=dcm_giga_ids) + } + logger.info('Total schools in dailycheckapp: {0}, Successfully mapped schools: {1}'.format( + len(dcm_giga_ids), len(dcm_schools))) + + mlab_school_ids = set(dailycheckapp_measurements.filter( + country_code=country_code, + source__iexact='MLab', + ).values_list( + 'school_id', flat=True, + ).order_by('school_id')) + + schools_qs = School.objects + if country: + schools_qs = schools_qs.filter(country=country) + + mlab_schools = { + school.external_id: school + for school in schools_qs.filter(external_id__in=mlab_school_ids) + } + logger.info('Total schools in MLab: {0}, Successfully mapped schools: {1}'.format( + len(mlab_school_ids), len(mlab_schools))) + + unknown_schools = [] + + for dailycheckapp_measurement in dailycheckapp_measurements.filter(country_code=country_code): + if str(dailycheckapp_measurement['source']).lower() == 'dailycheckapp': + giga_id_school = dailycheckapp_measurement.get('giga_id_school') + if giga_id_school not in dcm_schools: + unknown_schools.append(giga_id_school) + continue + school = dcm_schools[giga_id_school] + else: + school_id = dailycheckapp_measurement.get('school_id') + if school_id not in mlab_schools: + unknown_schools.append(school_id) + continue + school = mlab_schools[school_id] + + connectivity_speed = dailycheckapp_measurement.get('download_avg') + if connectivity_speed: + # kb/s -> b/s + connectivity_speed = connectivity_speed * 1000 + + connectivity_upload_speed = dailycheckapp_measurement.get('upload_avg') + if connectivity_upload_speed: + # kb/s -> b/s + connectivity_upload_speed = connectivity_upload_speed * 1000 + + realtime.append(statistics_models.RealTimeConnectivity( + created=pull_data_date, + connectivity_speed=connectivity_speed, + connectivity_upload_speed=connectivity_upload_speed, + connectivity_latency=dailycheckapp_measurement.get('latency_avg'), + school=school, + live_data_source=statistics_configs.DAILY_CHECK_APP_MLAB_SOURCE, + )) + + if len(realtime) == 5000: + logger.info('Loading the data to "RealTimeConnectivity" table as it ' + 'has reached 5000 benchmark.') + statistics_models.RealTimeConnectivity.objects.bulk_create(realtime) + realtime = [] + + if len(unknown_schools) > 0: + logger.info('Skipped dailycheckapp measurement for country: "{0}" unknown school:' + ' {1}'.format(country_code, unknown_schools)) + + logger.info( + 'Loading the remaining ({0}) data to "RealTimeConnectivity" table.'.format(len(realtime))) + if len(realtime) > 0: + statistics_models.RealTimeConnectivity.objects.bulk_create(realtime) + + logger.info('Aggregated successfully to RealTimeConnectivity table.\n\n') + + logger.info('Starting finalizing the records to actual proco tables.') + countries_ids = statistics_models.RealTimeConnectivity.objects.filter( + created__date__gte=start_date, + created__date__lte=end_date, + live_data_source=statistics_configs.DAILY_CHECK_APP_MLAB_SOURCE, + school__deleted__isnull=True, + ).annotate( + country_id=F('school__country_id'), + ).order_by('country_id').values_list('country_id', flat=True).distinct('country_id') + + logger.info('Weekly record details. \tWeek No: {0}\tYear: {1}'.format(week_no, year)) + + for country_id in countries_ids: + logger.info('Finalizing the records for Country ID: {0}'.format(country_id)) + finalize_previous_day_data(None, country_id, end_date) + + impacted_country_ids.extend(countries_ids) + logger.info('Finalized records successfully to actual proco tables.\n\n') + + for impacted_country_id in set(impacted_country_ids): + cmd_args = ['--reset', f'-country_id={impacted_country_id}'] + call_command('populate_school_registration_data', *cmd_args) + + logger.info('Completed data loss recovery utility for pcdc successfully.\n') diff --git a/proco/data_sources/management/commands/data_loss_recovery_for_qos.py b/proco/data_sources/management/commands/data_loss_recovery_for_qos.py index 5ac8c31..cb95c70 100644 --- a/proco/data_sources/management/commands/data_loss_recovery_for_qos.py +++ b/proco/data_sources/management/commands/data_loss_recovery_for_qos.py @@ -65,7 +65,7 @@ def load_qos_data_source_response_to_model(version_number, country): if country.iso3_format != table_name: continue - logger.debug('#' * 10) + logger.info('#' * 10) try: if QoSData.objects.all().filter( country=country, @@ -89,7 +89,7 @@ def load_qos_data_source_response_to_model(version_number, country): ) api_current_version = delta_sharing.get_table_version(table_url) - logger.debug('Current version from api: {0}'.format(api_current_version)) + logger.info('Current version from api: {0}'.format(api_current_version)) if version_number > api_current_version: logger.error('Given version must not be higher then latest api version. ' @@ -103,7 +103,7 @@ def load_qos_data_source_response_to_model(version_number, country): None, None, ) - logger.debug('Total count of rows in the {0} version data: {1}'.format( + logger.info('Total count of rows in the {0} version data: {1}'.format( version_number, len(loaded_data_df))) loaded_data_df = loaded_data_df[loaded_data_df[DeltaSharingReader._change_type_col_name()].isin( @@ -121,8 +121,8 @@ def load_qos_data_source_response_to_model(version_number, country): 'modified', 'school_id', 'country_id', 'modified_by', ] - logger.debug('All QoS api response columns: {}'.format(df_columns)) - logger.debug('All QoS api response columns to delete: {}'.format( + logger.info('All QoS api response columns: {}'.format(df_columns)) + logger.info('All QoS api response columns to delete: {}'.format( list(set(df_columns) - set(qos_model_fields)))) loaded_data_df.drop(columns=cols_to_delete, inplace=True, errors='ignore', ) @@ -154,10 +154,10 @@ def load_qos_data_source_response_to_model(version_number, country): logger.error('Higher version for same school ID and timestamp already exists. ' 'Hence skipping the update for current row.') qos_instance = duplicate_higher_version_records.first() - logger.debug('School ID: {0},\tTimestamp: {1},\tCurrent Version: {2},\t' - 'Higher Version: {3}'.format(qos_instance.school_id, - qos_instance.timestamp, - version_number, qos_instance.version)) + logger.info('School ID: {0},\tTimestamp: {1},\tCurrent Version: {2},\t' + 'Higher Version: {3}'.format(qos_instance.school_id, + qos_instance.timestamp, + version_number, qos_instance.version)) continue insert_entries.append(row_as_dict) @@ -166,13 +166,13 @@ def load_qos_data_source_response_to_model(version_number, country): logger.info('Loading the data to "QoSData" table as it has reached 5000 benchmark.') bulk_create_or_update(insert_entries, QoSData, ['school', 'timestamp']) insert_entries = [] - logger.debug('#\n' * 10) + logger.info('#\n' * 10) - logger.debug('Loading the remaining ({0}) data to "QoSData" table.'.format(len(insert_entries))) + logger.info('Loading the remaining ({0}) data to "QoSData" table.'.format(len(insert_entries))) if len(insert_entries) > 0: bulk_create_or_update(insert_entries, QoSData, ['school', 'timestamp']) else: - logger.debug('No data to update in current table: {0}.'.format(table_name)) + logger.info('No data to update in current table: {0}.'.format(table_name)) except Exception as ex: logger.error('Exception caught for "{0}": {1}'.format(schema_table.name, str(ex))) else: @@ -203,10 +203,10 @@ def sync_qos_realtime_data(date, country): ).order_by('school') if not qos_measurements.exists(): - logger.debug('No records to aggregate on provided date: "{0}". Hence skipping for the given date.'.format(date)) + logger.info('No records to aggregate on provided date: "{0}". Hence skipping for the given date.'.format(date)) return - logger.debug('Migrating the records from "QoSData" to "RealTimeConnectivity" with date: {0} '.format(date)) + logger.info('Migrating the records from "QoSData" to "RealTimeConnectivity" with date: {0} '.format(date)) realtime = [] @@ -248,11 +248,11 @@ def sync_qos_realtime_data(date, country): )) if len(realtime) == 5000: - logger.debug('Loading the data to "RealTimeConnectivity" table as it has reached 5000 benchmark.') + logger.info('Loading the data to "RealTimeConnectivity" table as it has reached 5000 benchmark.') RealTimeConnectivity.objects.bulk_create(realtime) realtime = [] - logger.debug('Loading the remaining ({0}) data to "RealTimeConnectivity" table.'.format(len(realtime))) + logger.info('Loading the remaining ({0}) data to "RealTimeConnectivity" table.'.format(len(realtime))) if len(realtime) > 0: RealTimeConnectivity.objects.bulk_create(realtime) @@ -269,7 +269,7 @@ def get_latest_api_version(country_code=None): if qos_schema: schema_tables = client.list_tables(qos_schema) - logger.debug('\nAll tables ready to access: {0}'.format(schema_tables)) + logger.info('\nAll tables ready to access: {0}'.format(schema_tables)) for schema_table in schema_tables: table_name = schema_table.name @@ -288,7 +288,7 @@ def get_latest_api_version(country_code=None): ) table_current_version = delta_sharing.get_table_version(table_url) - logger.debug( + logger.info( 'Country "{0}" current version from API: {1}\n'.format(table_name, table_current_version)) version_for_countries[table_name] = table_current_version @@ -325,10 +325,10 @@ def check_missing_versions_from_table(country_code=None): missing_version_list = list(set(must_version_list) - set(versions_list)) - logger.debug('Missing versions details for country "{0}" are: \n\tStart version from DB: {1}' - '\n\tEnd version from API: {2}' - '\n\tMissing versions: {3}\n'.format(country_iso_code, start_version, end_version, - missing_version_list)) + logger.info('Missing versions details for country "{0}" are: \n\tStart version from DB: {1}' + '\n\tEnd version from API: {2}' + '\n\tMissing versions: {3}\n'.format(country_iso_code, start_version, end_version, + missing_version_list)) class Command(BaseCommand): @@ -388,7 +388,7 @@ def handle(self, **options): country = None if country_iso3_format: country = Country.objects.filter(iso3_format=country_iso3_format).first() - logger.debug('Country object: {0}'.format(country)) + logger.info('Country object: {0}'.format(country)) if not country: logger.error('Country with ISO3 format ({0}) not found in proco db. ' @@ -398,7 +398,7 @@ def handle(self, **options): if check_missing_versions: logger.info('\nChecking the missing versions.') check_missing_versions_from_table(country_code=country_iso3_format) - logger.debug('Checking the missing versions action completed successfully.\n') + logger.info('Checking the missing versions action completed successfully.\n') pull_data = options.get('pull_data') if pull_data: @@ -411,7 +411,7 @@ def handle(self, **options): pull_end_version = options.get('pull_end_version') if pull_start_version and pull_end_version and pull_start_version <= pull_end_version: - logger.debug('\nLoading the api data to "data_sources_qosdata" table ***\n') + logger.info('\nLoading the api data to "data_sources_qosdata" table ***\n') for version_number in range(pull_start_version, pull_end_version + 1): load_qos_data_source_response_to_model(version_number, country) logger.info('\nData load completed successfully.\n') @@ -445,24 +445,24 @@ def handle(self, **options): date_list_from_versions = qos_queryset.order_by('timestamp__date').values_list( 'timestamp__date', flat=True).distinct('timestamp__date') - logger.debug('Date list from versions: {0}'.format(date_list_from_versions)) + logger.info('Date list from versions: {0}'.format(date_list_from_versions)) for pull_data_date in date_list_from_versions: - logger.debug( + logger.info( '\nSyncing the "data_sources_qosdata" data to "connection_statistics_realtimeconnectivity" ' 'for date: {0}'.format(pull_data_date)) sync_qos_realtime_data(pull_data_date, country) - logger.debug('Data synced successfully.\n\n') + logger.info('Data synced successfully.\n\n') - logger.debug('Starting finalizing the records to actual proco tables.') + logger.info('Starting finalizing the records to actual proco tables.') monday_date = pull_data_date - timedelta(days=pull_data_date.weekday()) monday_week_no = date_utilities.get_week_from_date(monday_date) monday_year = date_utilities.get_year_from_date(monday_date) - logger.debug('Weekly record details. \tWeek No: {0}\tYear: {1}'.format(monday_week_no, monday_year)) + logger.info('Weekly record details. \tWeek No: {0}\tYear: {1}'.format(monday_week_no, monday_year)) - logger.debug('\n\nFinalizing the records for country ID: {0}'.format(country.id)) + logger.info('\n\nFinalizing the records for country ID: {0}'.format(country.id)) finalize_previous_day_data(None, country.id, pull_data_date) - logger.debug('Finalized records successfully to actual proco tables.\n\n') + logger.info('Finalized records successfully to actual proco tables.\n\n') else: logger.error('Please pass required parameters as:' ' -pull_start_version= -pull_end_version=') diff --git a/proco/data_sources/tasks.py b/proco/data_sources/tasks.py index df89c97..eee749e 100644 --- a/proco/data_sources/tasks.py +++ b/proco/data_sources/tasks.py @@ -7,6 +7,7 @@ from celery import chain, chord, group, current_task from django.conf import settings from django.contrib.gis.geos import Point +from django.core.management import call_command from django.db.models import Count from django.db.utils import DataError from requests.exceptions import HTTPError @@ -819,3 +820,41 @@ def clean_old_live_data(): background_task_utilities.task_on_complete(task_instance) else: logger.error('Found running Job with "{0}" name so skipping current iteration'.format(task_key)) + + +@app.task(soft_time_limit=10 * 60 * 60, time_limit=10 * 60 * 60) +def data_loss_recovery_for_pcdc_weekly_task(start_week_no, end_week_no, year, pull_data, *args): + """ + data_loss_recovery_for_pcdc_weekly_task + Task to schedule manually from Console. + """ + if not start_week_no or not end_week_no or not year: + logger.error('Required args not provided: [start_week_no, end_week_no, year]') + return + + logger.info('Starting data loss recovery for pcdc task: start_week_no "{0}" - end_week_no "{1}" - ' + 'year "{2}"'.format(start_week_no, end_week_no, year)) + + task_key = 'data_loss_recovery_for_pcdc_weekly_task_start_week_no_{0}_end_week_no_{1}_year_{2}_on_{3}'.format( + start_week_no, end_week_no, year, format_date(core_utilities.get_current_datetime_object(), frmt='%d%m%Y_%H')) + + task_id = current_task.request.id or str(uuid.uuid4()) + task_instance = background_task_utilities.task_on_start( + task_id, task_key, 'Recover the data fro PCDC live source') + + if task_instance: + logger.debug('Not found running job: {}'.format(task_key)) + cmd_args = [ + '-start_week_no={}'.format(start_week_no), + '-end_week_no={}'.format(end_week_no), + '-year={}'.format(year), + ] + + if pull_data: + cmd_args.append('--pull_data') + + call_command('data_loss_recovery_for_pcdc_weekly', *cmd_args) + + background_task_utilities.task_on_complete(task_instance) + else: + logger.error('Found running Job with "{0}" name so skipping current iteration'.format(task_key)) diff --git a/proco/data_sources/utils.py b/proco/data_sources/utils.py index 99b6065..d0de63a 100644 --- a/proco/data_sources/utils.py +++ b/proco/data_sources/utils.py @@ -404,6 +404,8 @@ def load_daily_check_app_data_source_response_to_model(model, request_configs): has_more_data = False else: for data in response_data: + if not data.get('created_at', None): + data['created_at'] = data.get('timestamp') insert_entries.append(model(**data)) if len(insert_entries) >= 5000: @@ -467,20 +469,14 @@ def sync_dailycheckapp_realtime_data(): else: country = None - schools_qs = School.objects - if country: - schools_qs = schools_qs.filter(country=country) - dcm_giga_ids = set(dailycheckapp_measurements.filter( country_code=country_code, source__iexact='DailyCheckApp', - ).values_list( - 'giga_id_school', flat=True, - ).order_by('giga_id_school')) + ).values_list('giga_id_school', flat=True).order_by('giga_id_school')) dcm_schools = { school.giga_id_school: school - for school in schools_qs.filter(giga_id_school__in=dcm_giga_ids) + for school in School.objects.filter(giga_id_school__in=dcm_giga_ids) } logger.debug('Total schools in DailyCheckApp: {0}, Successfully mapped schools: {1}'.format( len(dcm_giga_ids), len(dcm_schools))) @@ -488,9 +484,11 @@ def sync_dailycheckapp_realtime_data(): mlab_school_ids = set(dailycheckapp_measurements.filter( country_code=country_code, source__iexact='MLab', - ).values_list( - 'school_id', flat=True, - ).order_by('school_id')) + ).values_list('school_id', flat=True).order_by('school_id')) + + schools_qs = School.objects + if country: + schools_qs = schools_qs.filter(country=country) mlab_schools = { school.external_id: school