Skip to content
This repository has been archived by the owner on Apr 17, 2024. It is now read-only.

Commit

Permalink
Custom sql Change
Browse files Browse the repository at this point in the history
  • Loading branch information
vishreddy01 committed Feb 14, 2024
1 parent 3ac006d commit 1d37a3b
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions data_replication_parametrized_audit_os.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def get_active_tables(mstr_schema,app_name):
postgres_connection = PgresPool.getconn()
postgres_cursor = postgres_connection.cursor()
list_sql = f"""
SELECT application_name,source_schema_name,source_table_name,target_schema_name,target_table_name,truncate_flag,cdc_flag,full_inc_flag,cdc_column,replication_order,where_clause,customsql_ind,customsql_query
SELECT application_name,source_schema_name,source_table_name,target_schema_name,target_table_name,truncate_flag,cdc_flag,full_inc_flag,cdc_column,replication_order,customsql_ind,customsql_query
from {mstr_schema}.cdc_master_table_list c
where active_ind = 'Y' and application_name='{app_name}'
order by replication_order, source_table_name
Expand All @@ -121,14 +121,15 @@ def extract_from_oracle(table_name,source_schema,customsql_ind,customsql_query):
oracle_cursor.execute(sql_query)
rows = oracle_cursor.fetchall()
OrcPool.release(oracle_connection)
return rows
return rows
else:
sql_query = f'SELECT * FROM {source_schema}.{table_name}'
print(sql_query)
oracle_cursor.execute(sql_query)
rows = oracle_cursor.fetchall()
OrcPool.release(oracle_connection)
return rows
return rows

except Exception as e:
audit_batch_status_insert(table_name,'failed')
print(f"Error extracting data from Oracle: {str(e)}")
Expand Down Expand Up @@ -171,7 +172,7 @@ def load_into_postgres(table_name, data,target_schema):
def load_data_from_src_tgt(table_name,source_schema,target_schema,customsql_ind,customsql_query):
# Extract data from Oracle
print(f'Source: Thread {table_name} started at ' + datetime.now().strftime("%H:%M:%S"))
oracle_data = extract_from_oracle(table_name,customsql_ind,customsql_query) # Ensure table name is in uppercase
oracle_data = extract_from_oracle(table_name,source_schema,customsql_ind,customsql_query) # Ensure table name is in uppercase
print(f'Source: Extraction for {table_name} completed at ' + datetime.now().strftime("%H:%M:%S"))

if oracle_data:
Expand All @@ -184,8 +185,8 @@ def load_data_from_src_tgt(table_name,source_schema,target_schema,customsql_ind,
if __name__ == '__main__':
# Main ETL process
active_tables_rows =get_active_tables(mstr_schema,app_name)
print(active_tables_rows)
tables_to_extract = [(row[2],row[1],row[3],row[11],row[12]) for row in active_tables_rows]
#print(active_tables_rows)
tables_to_extract = [(row[2],row[1],row[3],row[10],row[11]) for row in active_tables_rows]

print(f"tables to extract are {tables_to_extract}")
print(f'No of concurrent tasks:{concurrent_tasks}')
Expand Down

0 comments on commit 1d37a3b

Please sign in to comment.