From b6db4097ec2d468bf4277b9100a4969399627d53 Mon Sep 17 00:00:00 2001 From: Kristina Zolochevskaia <zolockri@fit.cvut.cz> Date: Sun, 5 Mar 2023 19:49:28 +0100 Subject: [PATCH] refactor, load_deleted - needs to be tested --- airflow/dags/grades/grades_increment.py | 71 +---- airflow/dags/grades/grades_source_to_stage.py | 2 +- airflow/dags/grades/grades_stage_to_target.py | 147 +++++---- airflow/dags/grades/testing/concat_np.py | 8 + airflow/dags/grades/{ => testing}/cursor.py | 0 .../dags/grades/{ => testing}/grades_to_ps.py | 0 .../{ => testing}/load_t_klas_klasifikace.py | 0 .../stage_to_target_grades_klasifikace (1).py | 281 ++++++++++++++++++ .../stage_to_target_grades_klasifikace.py | 0 .../dags/grades/{ => testing}/test_stage.py | 0 airflow/{ => dags}/helpers/__init__.py | 0 airflow/{ => dags}/helpers/connections.py | 0 .../email.py => dags/helpers/dwh_email.py} | 0 airflow/dags/helpers/increment.py | 56 ++++ airflow/dags/helpers/increment_no_psc.py | 62 ++++ airflow/{ => dags}/helpers/stage_functions.py | 0 .../helpers/stage_functions_no_psc.py | 4 +- .../{ => dags}/helpers/stringiteratorio.py | 0 airflow/dags/helpers/target_functions.py | 132 ++++++++ airflow/helpers/increment.py | 0 airflow/helpers/target_functions.py | 75 +++-- airflow/logs/scheduler/latest | 2 +- 22 files changed, 704 insertions(+), 136 deletions(-) create mode 100644 airflow/dags/grades/testing/concat_np.py rename airflow/dags/grades/{ => testing}/cursor.py (100%) rename airflow/dags/grades/{ => testing}/grades_to_ps.py (100%) rename airflow/dags/grades/{ => testing}/load_t_klas_klasifikace.py (100%) create mode 100644 airflow/dags/grades/testing/stage_to_target_grades_klasifikace (1).py rename airflow/dags/grades/{ => testing}/stage_to_target_grades_klasifikace.py (100%) rename airflow/dags/grades/{ => testing}/test_stage.py (100%) rename airflow/{ => dags}/helpers/__init__.py (100%) rename airflow/{ => dags}/helpers/connections.py (100%) rename airflow/{helpers/email.py => dags/helpers/dwh_email.py} (100%) create mode 100644 airflow/dags/helpers/increment.py create mode 100644 airflow/dags/helpers/increment_no_psc.py rename airflow/{ => dags}/helpers/stage_functions.py (100%) rename airflow/{ => dags}/helpers/stage_functions_no_psc.py (97%) rename airflow/{ => dags}/helpers/stringiteratorio.py (100%) create mode 100644 airflow/dags/helpers/target_functions.py delete mode 100644 airflow/helpers/increment.py diff --git a/airflow/dags/grades/grades_increment.py b/airflow/dags/grades/grades_increment.py index 726ed8c..5724711 100644 --- a/airflow/dags/grades/grades_increment.py +++ b/airflow/dags/grades/grades_increment.py @@ -1,56 +1,15 @@ -from airflow.providers.postgres.hooks.postgres import PostgresHook -import pandas as pd - -def create_connection_to_stage(): - pg_hook = PostgresHook( - postgres_conn_id='dv5', - database='dwh3_test_stage' - ) - pg_engine = pg_hook.get_sqlalchemy_engine() - return pg_engine.connect() -#=============================================================================== - -def make_increment(**kwargs): - inc_clear_state_flag('si_' + kwargs['source_name'], kwargs['table_name']) - get_M_N_D_flags('ps_' + kwargs['source_name'], kwargs['table_name']) -#=============================================================================== - -def inc_clear_state_flag(schema_name: str, table_name: str): - with create_connection_to_stage() as conn: - trans = conn.begin() - try: - clear_state_flag_stmt = f"SELECT public.inc_clear_state_flag_schema_table_args('{schema_name}', '{table_name}');" - conn.execute(clear_state_flag_stmt) - print(f"State flags for table {schema_name}.{table_name} cleared") - trans.commit() - except: - trans.rollback() - print(f"Error: flags for {schema_name}.{table_name} were not cleared") - raise - finally: - conn.close() -#=============================================================================== - -def get_M_N_D_flags(schema_name: str, table_name: str): - with create_connection_to_stage() as conn: - trans = conn.begin() - try: - get_M_flags_stmt = f"SELECT public.inc_find_modified_in_pre_stage_schema_table_args('{schema_name}', '{table_name}');" - conn.execute(get_M_flags_stmt) - print(f"State flags M for table {schema_name}.{table_name} got") - - get_N_flags_stmt = f"SELECT public.inc_find_new_in_pre_stage_schema_table_args('{schema_name}', '{table_name}');" - conn.execute(get_N_flags_stmt) - print(f"State flags N for table {schema_name}.{table_name} got") - - get_D_flags_stmt = f"SELECT public.inc_find_deleted_in_pre_stage_schema_table_args('{schema_name}', '{table_name}');" - conn.execute(get_D_flags_stmt) - print(f"State flags D for table {schema_name}.{table_name} got") - - trans.commit() - except: - trans.rollback() - print(f"Error: flags for {schema_name}.{table_name} were not generated") - raise - finally: - conn.close() +import sys +sys.path.append('../helpers') +from increment_no_psc import * + +source_name='grades' +table_name='boolean_student_classification_airflow_orig' +conn_id='dv5' +db='dwh3_test_stage' + +make_increment( + source_name=source_name, + table_name=table_name, + conn_id=conn_id, + db=db +) diff --git a/airflow/dags/grades/grades_source_to_stage.py b/airflow/dags/grades/grades_source_to_stage.py index 00db506..86d6375 100644 --- a/airflow/dags/grades/grades_source_to_stage.py +++ b/airflow/dags/grades/grades_source_to_stage.py @@ -5,7 +5,7 @@ import sys import os import time -sys.path.append('../../helpers') +sys.path.append('../helpers') from stage_functions_no_psc import load_table from airflow.providers.postgres.hooks.postgres import PostgresHook diff --git a/airflow/dags/grades/grades_stage_to_target.py b/airflow/dags/grades/grades_stage_to_target.py index 953c4a3..ab0e60f 100644 --- a/airflow/dags/grades/grades_stage_to_target.py +++ b/airflow/dags/grades/grades_stage_to_target.py @@ -1,9 +1,22 @@ import pandas as pd from sqlalchemy import text from sqlalchemy.sql.sqltypes import * +import sys +sys.path.append('../helpers') -#=============================================================================== -table_input_columns = { +from target_functions import * + +#=======================CONNECTION CREDENTIALS================================== + +stg_conn_id = 'dv5' +stg_db = 'dwh3_test_stage' + +tg_conn_id = 'dv5' +tg_db = 'dwh3_test_target' + +#========================TABLE INFORMATION====================================== + +si_columns = { 'classification_id': 'klasifikace_bk', 'identifier': 'identifikator', 'course_code': 'predmet_kod', @@ -51,8 +64,9 @@ columns_to_concat = [ 'typ_vyhodnocovani', 'fk_agregovana_klasifikace', 'agregovana_funkce', - 'rozsah_agregace'] -dtypes_tg = { + 'rozsah_agregace' + ] +tg_dtypes = { 'klasifikace_bk': BigInteger, 'identifikator': TEXT, 'nazev_cs': TEXT, @@ -102,7 +116,8 @@ columns_to_load_new_tg = [ 'version', 'date_from', 'date_to', - 'last_update'] + 'last_update' + ] str_columns = [ 'identifikator', 'nazev_cs', @@ -116,76 +131,75 @@ str_columns = [ 'povinny', 'typ_vyhodnocovani', 'agregovana_funkce', - 'rozsah_agregace'] # TODO: comment what each of those variables are for + 'rozsah_agregace' + ] +fk_columns = [ + 'fk_nadrazena_klasifikace', + 'fk_agregovana_klasifikace' +] # TODO: comment what each of those variables are for #=============================================================================== -def load_table_to_tg(**kwargs): - with create_connection_to_target() as tg_conn: - tg_trans = tg_conn.begin() - try: - stage_conn = create_connection_to_stage() - - # TODO: divide the function, add args and documentation - load_to_target(table_input_stmt=table_input_stmt, - stage_conn=stage_conn, tg_conn=tg_conn) - - tg_trans.commit() - except: - tg_trans.rollback() - raise - finally: - stage_conn.close() - tg_conn.close() - def load_to_target(**kwargs): chunk_size = 10**5 - column_aliases_lst = [f"{key} as {value}" for key, value in kwargs['columns'].items()] - table_input_stmt = f"SELECT {', '.join(column_aliases_lst)} FROM \ - {kwargs['stage_schema']}.{kwargs['stage_table']}" + si_column_aliases_lst = [f"{key} as {value}" for key, value in kwargs['si_columns'].items()] + table_input_stmt = f"SELECT {', '.join(si_column_aliases_lst)} FROM \ + {kwargs['si_schema']}.{kwargs['si_table']}" - for chunk in pd.read_sql_query(kwargs['table_input_stmt'], - con=kwargs['stage_conn'], chunksize=kwargs['chunk_size']): + for chunk in pd.read_sql_query(table_input_stmt, + con=kwargs['stg_conn'], chunksize=chunk_size): - processed_data = process_data(chunk) - load_data(process_data) + processed_data = process_data(chunk=chunk) + load_data(data=processed_data, tg_conn=kwargs['tg_conn']) def process_data(**kwargs): # should be manually adjust for every table cols_to_select1 = { 'classification_id': 'klasifikace_bk', 'name': 'nazev_cs' } - processed_data = database_lookup(data_chunk=kwargs['chunk'], - bk_en_cz=bk_en_cz, cols_to_select=cols_to_select1, lookup_schema='si_grades', - lookup_table='classification_text_airflow_orig', - constraint='language_tag = \'cs\'', col_to_add=cols_to_select1['name'], - order_in_table=2) + processed_data = database_lookup(stg_conn_id=stg_conn_id, + stg_db=stg_db, + data_chunk=kwargs['chunk'], + bk_en_cz=bk_en_cz, + cols_to_select=cols_to_select1, + lookup_schema='ps_grades', + lookup_table='classification_text_airflow_orig', + constraint='language_tag = \'cs\'', + col_to_add=cols_to_select1['name'], + order_in_table=2) cols_to_select2 = { 'classification_id': 'klasifikace_bk', 'name': 'nazev_en' } - processed_data = database_lookup(data_chunk=processed_data, bk_en_cz=bk_en_cz, - cols_to_select=cols_to_select2, lookup_schema='ps_grades', - lookup_table='classification_text_airflow_orig', - constraint='language_tag = \'en\'', col_to_add=cols_to_select2['name'], - order_in_table=3) + processed_data = database_lookup(stg_conn_id=stg_conn_id, + stg_db=stg_db, + data_chunk=processed_data, + bk_en_cz=bk_en_cz, + cols_to_select=cols_to_select2, + lookup_schema='ps_grades', + lookup_table='classification_text_airflow_orig', + constraint='language_tag = \'en\'', + col_to_add=cols_to_select2['name'], + order_in_table=3) # other transformations; in this case replace boolean value to varchar Y/N - boolean_to_YN = lambda x: 'Y' if x else 'N' + # boolean_to_YN = lambda x: 'Y' if x else 'N' + boolean_to_YN = lambda x: 'Y' if x is True else ('N' if x is False else None) + boolean_columns = processed_data.select_dtypes(include='bool').columns processed_data[boolean_columns] = processed_data[boolean_columns].apply( lambda x: x.apply(boolean_to_YN)) return processed_data -def fix_dtypes_for_modified(data): - data['fk_nadrazena_klasifikace'] = - data['fk_nadrazena_klasifikace'].apply( - lambda x: '{:.0f}'.format(x) if x.is_integer() else '') - data['fk_agregovana_klasifikace'] = - data['fk_agregovana_klasifikace'].apply( - lambda x: '{:.0f}'.format(x) if x.is_integer() else '') - data.fillna('', inplace=True) - return data +# def fix_dtypes_for_modified(data): +# data['fk_nadrazena_klasifikace'] = +# data['fk_nadrazena_klasifikace'].apply( +# lambda x: '{:.0f}'.format(x) if x.is_integer() else '') +# data['fk_agregovana_klasifikace'] = +# data['fk_agregovana_klasifikace'].apply( +# lambda x: '{:.0f}'.format(x) if x.is_integer() else '') +# data.fillna('', inplace=True) +# return data def load_data(data, tg_conn): # divide data into N, M, D groups by state @@ -194,7 +208,30 @@ def load_data(data, tg_conn): modified = grouped_by_state.get_group('M') deleted = grouped_by_state.get_group('D') - load_new(new, tg_conn, columns_to_load_new_tg, dtypes_tg, tg_table) - modified = fix_dtypes_for_modified(modified) - load_modified(modified, tg_conn, tg_table) - load_deleted(deleted, tg_conn, tg_table, bk_en_cz[1]) + load_new(new_data=new, + tg_conn=tg_conn, + tg_columns=columns_to_load_new_tg, + tg_dtypes=tg_dtypes, + tg_schema=tg_schema, + tg_table=tg_table, + fk_columns=fk_columns) + load_modified(modified_data=modified, + columns_to_concat=columns_to_concat, + str_columns=str_columns, + fk_columns=fk_columns, + tg_table=tg_table, + tg_conn=tg_conn) + load_deleted(deleted, bk_en_cz[1], tg_table, tg_conn) + +#=============================================================================== + +load_table_to_tg( + tg_conn_id=tg_conn_id, + tg_db=tg_db, + stg_conn_id=stg_conn_id, + stg_db=stg_db, + si_schema=si_schema, + si_table=si_table, + si_columns=si_columns, + load_to_target=load_to_target +) diff --git a/airflow/dags/grades/testing/concat_np.py b/airflow/dags/grades/testing/concat_np.py new file mode 100644 index 0000000..30b1fd0 --- /dev/null +++ b/airflow/dags/grades/testing/concat_np.py @@ -0,0 +1,8 @@ +import numpy as np + +def concat_row(chunk, row): + new_row = np.array([row]) + new_chunk = np.concatenate([chunk, new_row]) + return new_chunk + +new_chunk = concat_row(chunk, ['value1', 'value2', 'value3']) diff --git a/airflow/dags/grades/cursor.py b/airflow/dags/grades/testing/cursor.py similarity index 100% rename from airflow/dags/grades/cursor.py rename to airflow/dags/grades/testing/cursor.py diff --git a/airflow/dags/grades/grades_to_ps.py b/airflow/dags/grades/testing/grades_to_ps.py similarity index 100% rename from airflow/dags/grades/grades_to_ps.py rename to airflow/dags/grades/testing/grades_to_ps.py diff --git a/airflow/dags/grades/load_t_klas_klasifikace.py b/airflow/dags/grades/testing/load_t_klas_klasifikace.py similarity index 100% rename from airflow/dags/grades/load_t_klas_klasifikace.py rename to airflow/dags/grades/testing/load_t_klas_klasifikace.py diff --git a/airflow/dags/grades/testing/stage_to_target_grades_klasifikace (1).py b/airflow/dags/grades/testing/stage_to_target_grades_klasifikace (1).py new file mode 100644 index 0000000..542dcf5 --- /dev/null +++ b/airflow/dags/grades/testing/stage_to_target_grades_klasifikace (1).py @@ -0,0 +1,281 @@ +import pandas as pd +from sqlalchemy import text +from sqlalchemy.sql.sqltypes import * +import sys + +from common_functions.connections import create_connection_pg + +def create_connection_to_stage(): + return create_connection_pg('dv5', 'dwh3_test_stage') + +def create_connection_to_target(): + return create_connection_pg('dv5', 'dwh3_test_target') + +def database_lookup(df, bk, cols_to_select, lookup_schema, lookup_table, constraint, added_col_name, order_in_table): + stage_conn = create_connection_to_stage() + bk_list = [str(i) for i in df[bk[1]].tolist()] #bk - tuple (bk_en, bk_cs) + # constraint = 'language_tag = 'cs'' for example + columns = [f"{key} as {value}" for key, value in cols_to_select.items()] + + lookup_query = f"SELECT {', '.join(columns)} \ + FROM {lookup_schema}.{lookup_table} \ + WHERE {bk[0]} IN ({', '.join(bk_list)}) \ + AND {constraint}" + df_res = pd.read_sql_query(lookup_query, con=stage_conn) + merged = pd.merge(df, df_res, how='left', on=bk[1]) + col = merged.pop(added_col_name) + merged.insert(order_in_table, added_col_name, col)z + stage_conn.close() + return merged + +def load_table_to_tg(): + columns = { + 'classification_id': 'klasifikace_bk', + 'identifier': 'identifikator', + 'course_code': 'predmet_kod', + 'semester_code': 'fk_semestr', + 'classification_type': 'typ', + 'expression': 'rovnice_vypoctu', + 'value_type': 'datovy_typ', + 'minimum_value': 'minimalni_hodnota', + 'minimum_required_value': 'minimalni_vyzadovana_hodnota', + 'maximum_value': 'maximalni_hodnota', + 'hidden': 'skryta', + 'index': 'poradi', + 'mandatory': 'povinny', + 'parent_id': 'fk_nadrazena_klasifikace', + 'evaluation_type': 'typ_vyhodnocovani', + 'aggregated_classification_id': 'fk_agregovana_klasifikace', + 'aggregation_function': 'agregovana_funkce', + 'aggregation_scope': 'rozsah_agregace', + # '\'cs\'': 'cs', + # '\'en\'': 'en', + 'active': 'active', + 'last_update': 'last_update', + '\"state\"': 'state' + } + + columns_aliases_lst = [f"{key} as {value}" for key, value in columns.items()] + table_input_stmt = f"SELECT {', '.join(columns_aliases_lst)} FROM \ + si_grades.classification_airflow_orig" + + with create_connection_to_target() as tg_conn: + tg_trans = tg_conn.begin() + try: + stage_conn = create_connection_to_stage() + + load_to_tg_in_chunks(table_input_stmt=table_input_stmt, + stage_conn=stage_conn, tg_conn=tg_conn) + + tg_trans.commit() + except: + tg_trans.rollback() + raise + finally: + stage_conn.close() + tg_conn.close() + + + # for stage_chunk in pd.read_sql_query() +def load_to_tg_in_chunks(**kwargs): + chunk_size = 10**5 + for chunk in pd.read_sql_query(kwargs['table_input_stmt'], + con=kwargs['stage_conn'], chunksize=chunk_size): + cols_to_select1 = { + 'classification_id': 'klasifikace_bk', + 'name': 'nazev_cs' + } + chunk = database_lookup(chunk, ('classification_id', 'klasifikace_bk'), + cols_to_select1, 'ps_grades', 'classification_text_airflow_orig', + 'language_tag = \'cs\'', cols_to_select1['name'], 2) + + cols_to_select2 = { + 'classification_id': 'klasifikace_bk', + 'name': 'nazev_en' + } + chunk = database_lookup(chunk, ('classification_id', 'klasifikace_bk'), + cols_to_select2, 'ps_grades', 'classification_text_airflow_orig', + 'language_tag = \'en\'', cols_to_select2['name'], 3) + + # print(chunk.dtypes) + + #fix boolean to varchar Y/N + bool_to_yn = lambda x: 'Y' if x else 'N' + bool_cols = chunk.select_dtypes(include='bool').columns + chunk[bool_cols] = chunk[bool_cols].apply(lambda x: x.apply(bool_to_yn)) + + # chunk.to_csv('/home/zolockri/Documents/res.csv') + grouped_by_state = chunk.groupby('state') + + new = grouped_by_state.get_group('N') + modified = grouped_by_state.get_group('M') + # print(modified.dtypes) + deleted = grouped_by_state.get_group('D') + + # todo dtypes + # + load_modified(modified, kwargs['tg_conn']) + load_new(new, kwargs['tg_conn']) + load_deleted(deleted, kwargs['tg_conn']) + +def concat_fields(df, cols_to_concat, str_cols): + delimiter = '\\x1f' + new_df = df.copy() + # new_df['klasifikace_bk'] = pd.to_numeric(new_df['klasifikace_bk'], errors='coerce') + # non_numeric_rows = new_df[new_df['klasifikace_bk'].isna()] + # print(non_numeric_rows['klasifikace_bk']) + # + # new_df.astype({'klasifikace_bk': 'int64', + # 'poradi': 'int64', + # 'fk_nadrazena_klasifikace' : 'int64', + # 'fk_agregovana_klasifikace': 'int64'}, errors='ignore') + + print(new_df.dtypes) + new_df['fk_nadrazena_klasifikace'] = new_df['fk_nadrazena_klasifikace'].apply(lambda x: '{:.0f}'.format(x) if x.is_integer() else '') + new_df['fk_agregovana_klasifikace'] = new_df['fk_agregovana_klasifikace'].apply(lambda x: '{:.0f}'.format(x) if x.is_integer() else '') + print(new_df.dtypes) + new_df.fillna('', inplace=True) + + new_df = new_df.astype(str) + new_df[str_cols] = new_df[str_cols].apply(lambda x: x.str.strip()) + # + # print(new_df.dtypes) + + new_df['concated'] = new_df[cols_to_concat].apply(lambda x: (delimiter.join(x).replace("'", "''")).replace("%", "%%"), axis=1) #.replace("'", "\\'") + # new_df['concated'] = new_df[cols_to_concat].apply(lambda x: text(delimiter.join(x)), axis=1) #.replace("'", "\\'") + new_df.to_csv('/home/zolockri/Documents/conc.csv') + return new_df + +def load_modified(modified, tg_conn): + # columns to concat, ORDER IS IMPORTANT + cols_to_concat = ['klasifikace_bk', + 'identifikator', + 'nazev_cs', + 'nazev_en', + 'predmet_kod', + 'fk_semestr', + 'typ', + 'rovnice_vypoctu', + 'datovy_typ', + 'minimalni_hodnota', + 'minimalni_vyzadovana_hodnota', + 'maximalni_hodnota', + 'skryta', + 'poradi', + 'povinny', + 'fk_nadrazena_klasifikace', + 'typ_vyhodnocovani', + 'fk_agregovana_klasifikace', + 'agregovana_funkce', + 'rozsah_agregace'] + + str_cols = [ + 'identifikator', + 'nazev_cs', + 'nazev_en', + 'predmet_kod', + 'fk_semestr', + 'typ', + 'rovnice_vypoctu', + 'datovy_typ', + 'skryta', + 'povinny', + 'typ_vyhodnocovani', + 'agregovana_funkce', + 'rozsah_agregace'] + + ready_data = concat_fields(modified, cols_to_concat, str_cols) + # print(ready_data.columns) + # ready_data.apply(lambda row: print(row)) + ready_data.to_csv('/home/zolockri/Documents/ready_data.csv') + + ready_data.apply(lambda row: load_modified_to_db(row['concated'], + 't_klas_klasifikace_airflow', row['active'], row['last_update'], tg_conn), axis=1) + +def load_modified_to_db(conacted, table_name, active, last_update, tg_conn): + modified_stmt = f"SELECT public.inc_historize_case_m('{conacted}', \ + '{table_name}', {active}, '{last_update}')" + tg_conn.execute(modified_stmt) + + +def load_deleted(deleted, tg_conn): + deleted.apply(lambda row: load_deleted_to_db(row['klasifikace_bk'], + 't_klas_klasifikace_airflow', tg_conn), axis=1) + +def load_deleted_to_db(bk, table_name, tg_conn): + delete_stmt = f"update dwh.{table_name} \ + set date_to = CURRENT_TIMESTAMP \ + where klasifikace_bk = {bk} \ + and (date_to = '2199-12-31 00:00:00');" + print(delete_stmt) + tg_conn.execute(delete_stmt) + +def load_new(new, tg_conn): + # todo - add init_load to dag + init_load = False + + if init_load: + new['version'] = 1 + new['date_to'] = '2199-12-31 00:00:00' + new['date_from'] = '1900-01-01 00:00:00' + else: + new['version'] = 1 + new['date_to'] = '2199-12-31 00:00:00' + new['last_update'] = new['last_update'].dt.floor() + new['date_from'] = new['last_update'].copy() + + dtypes = { + 'klasifikace_bk': BigInteger, + 'identifikator': TEXT, + 'nazev_cs': TEXT, + 'nazev_en': TEXT, + 'predmet_kod': TEXT, + 'fk_semestr': TEXT, + 'typ': TEXT, + 'rovnice_vypoctu': TEXT, + 'datovy_typ': TEXT, + 'minimalni_hodnota': Float, + 'minimalni_vyzadovana_hodnota': Float, + 'maximalni_hodnota': Float, + 'skryta': VARCHAR(1), + 'poradi': BigInteger, + 'povinny':VARCHAR(1) , + 'fk_nadrazena_klasifikace': BigInteger, + 'typ_vyhodnocovani': TEXT, + 'fk_agregovana_klasifikace': BigInteger, + 'agregovana_funkce': TEXT, + 'rozsah_agregace': TEXT, + 'last_update': TIMESTAMP, + 'version': BigInteger, + 'date_to': TIMESTAMP, + 'date_from': TIMESTAMP + } + cols_to_load = ['klasifikace_bk', + 'identifikator', + 'nazev_cs', + 'nazev_en', + 'predmet_kod', + 'fk_semestr', + 'typ', + 'rovnice_vypoctu', + 'datovy_typ', + 'minimalni_hodnota', + 'minimalni_vyzadovana_hodnota', + 'maximalni_hodnota', + 'skryta', + 'poradi', + 'povinny', + 'fk_nadrazena_klasifikace', + 'typ_vyhodnocovani', + 'fk_agregovana_klasifikace', + 'agregovana_funkce', + 'rozsah_agregace', + 'last_update', + 'version', + 'date_to', + 'date_from'] + new_to_load = new[cols_to_load].copy() + new_to_load.to_sql('t_klas_klasifikace_airflow', con=tg_conn, index=False, + schema='dwh', if_exists='append', dtype=dtypes) + +load_table_to_tg() diff --git a/airflow/dags/grades/stage_to_target_grades_klasifikace.py b/airflow/dags/grades/testing/stage_to_target_grades_klasifikace.py similarity index 100% rename from airflow/dags/grades/stage_to_target_grades_klasifikace.py rename to airflow/dags/grades/testing/stage_to_target_grades_klasifikace.py diff --git a/airflow/dags/grades/test_stage.py b/airflow/dags/grades/testing/test_stage.py similarity index 100% rename from airflow/dags/grades/test_stage.py rename to airflow/dags/grades/testing/test_stage.py diff --git a/airflow/helpers/__init__.py b/airflow/dags/helpers/__init__.py similarity index 100% rename from airflow/helpers/__init__.py rename to airflow/dags/helpers/__init__.py diff --git a/airflow/helpers/connections.py b/airflow/dags/helpers/connections.py similarity index 100% rename from airflow/helpers/connections.py rename to airflow/dags/helpers/connections.py diff --git a/airflow/helpers/email.py b/airflow/dags/helpers/dwh_email.py similarity index 100% rename from airflow/helpers/email.py rename to airflow/dags/helpers/dwh_email.py diff --git a/airflow/dags/helpers/increment.py b/airflow/dags/helpers/increment.py new file mode 100644 index 0000000..a9f94ad --- /dev/null +++ b/airflow/dags/helpers/increment.py @@ -0,0 +1,56 @@ +import pandas as pd +from connections import * + +def create_connection_to_stage(): + pg_hook = PostgresHook( + postgres_conn_id='dv5', + database='dwh3_test_stage' + ) + pg_engine = pg_hook.get_sqlalchemy_engine() + return pg_engine.connect() +#=============================================================================== + +def make_increment(**kwargs): + inc_clear_state_flag('si_' + kwargs['source_name'], kwargs['table_name']) + get_M_N_D_flags('ps_' + kwargs['source_name'], kwargs['table_name']) +#=============================================================================== + +def inc_clear_state_flag(schema_name: str, table_name: str, conn_id: str, db: str): + with create_connection_pg(conn_id, db) as conn: + trans = conn.begin() + try: + clear_state_flag_stmt = f"SELECT public.inc_clear_state_flag_schema_table_args('{schema_name}', '{table_name}');" + conn.execute(clear_state_flag_stmt) + print(f"State flags for table {schema_name}.{table_name} cleared") + trans.commit() + except: + trans.rollback() + print(f"Error: flags for {schema_name}.{table_name} were not cleared") + raise + finally: + conn.close() +#=============================================================================== + +def get_M_N_D_flags(schema_name: str, table_name: str, conn_id: str, db: str): + with create_connection_pg(conn_id, db) as conn: + trans = conn.begin() + try: + get_M_flags_stmt = f"SELECT public.inc_find_modified_in_pre_stage_schema_table_args('{schema_name}', '{table_name}');" + conn.execute(get_M_flags_stmt) + print(f"State flags M for table {schema_name}.{table_name} got") + + get_N_flags_stmt = f"SELECT public.inc_find_new_in_pre_stage_schema_table_args('{schema_name}', '{table_name}');" + conn.execute(get_N_flags_stmt) + print(f"State flags N for table {schema_name}.{table_name} got") + + get_D_flags_stmt = f"SELECT public.inc_find_deleted_in_pre_stage_schema_table_args('{schema_name}', '{table_name}');" + conn.execute(get_D_flags_stmt) + print(f"State flags D for table {schema_name}.{table_name} got") + + trans.commit() + except: + trans.rollback() + print(f"Error: flags for {schema_name}.{table_name} were not generated") + raise + finally: + conn.close() diff --git a/airflow/dags/helpers/increment_no_psc.py b/airflow/dags/helpers/increment_no_psc.py new file mode 100644 index 0000000..12abed3 --- /dev/null +++ b/airflow/dags/helpers/increment_no_psc.py @@ -0,0 +1,62 @@ +import pandas as pd +from connections import * + +def create_connection_to_stage(): + pg_hook = PostgresHook( + postgres_conn_id='dv5', + database='dwh3_test_stage' + ) + pg_engine = pg_hook.get_sqlalchemy_engine() + return pg_engine.connect() +#=============================================================================== + +def make_increment(**kwargs): + inc_clear_state_flag(schema_name='si_' + kwargs['source_name'], + table_name=kwargs['table_name'], + conn_id=kwargs['conn_id'], + db=kwargs['db']) + get_M_N_D_flags(schema_name='ps_' + kwargs['source_name'], + table_name=kwargs['table_name'], + conn_id=kwargs['conn_id'], + db=kwargs['db']) +#=============================================================================== + +def inc_clear_state_flag(schema_name: str, table_name: str, conn_id: str, db: str): + with create_connection_pg(conn_id, db) as conn: + trans = conn.begin() + try: + clear_state_flag_stmt = f"SELECT public.inc_clear_state_flag_schema_table_args('{schema_name}', '{table_name}');" + conn.execute(clear_state_flag_stmt) + print(f"State flags for table {schema_name}.{table_name} cleared") + trans.commit() + except: + trans.rollback() + print(f"Error: flags for {schema_name}.{table_name} were not cleared") + raise + finally: + conn.close() +#=============================================================================== + +def get_M_N_D_flags(schema_name: str, table_name: str, conn_id: str, db: str): + with create_connection_pg(conn_id, db) as conn: + trans = conn.begin() + try: + get_M_flags_stmt = f"SELECT public.inc_find_modified_in_pre_stage_schema_table_args('{schema_name}', '{table_name}');" + conn.execute(get_M_flags_stmt) + print(f"State flags M for table {schema_name}.{table_name} got") + + get_N_flags_stmt = f"SELECT public.inc_find_new_in_pre_stage_schema_table_args('{schema_name}', '{table_name}');" + conn.execute(get_N_flags_stmt) + print(f"State flags N for table {schema_name}.{table_name} got") + + get_D_flags_stmt = f"SELECT public.inc_find_deleted_in_pre_stage_schema_table_args('{schema_name}', '{table_name}');" + conn.execute(get_D_flags_stmt) + print(f"State flags D for table {schema_name}.{table_name} got") + + trans.commit() + except: + trans.rollback() + print(f"Error: flags for {schema_name}.{table_name} were not generated") + raise + finally: + conn.close() diff --git a/airflow/helpers/stage_functions.py b/airflow/dags/helpers/stage_functions.py similarity index 100% rename from airflow/helpers/stage_functions.py rename to airflow/dags/helpers/stage_functions.py diff --git a/airflow/helpers/stage_functions_no_psc.py b/airflow/dags/helpers/stage_functions_no_psc.py similarity index 97% rename from airflow/helpers/stage_functions_no_psc.py rename to airflow/dags/helpers/stage_functions_no_psc.py index dceac3b..f21bf61 100644 --- a/airflow/helpers/stage_functions_no_psc.py +++ b/airflow/dags/helpers/stage_functions_no_psc.py @@ -91,7 +91,7 @@ def load_table_in_chunks_fast(**kwargs): # TODO: add arg cols needed to stage + copy_to_stage_stmt = f"COPY {kwargs['stg_schema']}.{kwargs['stg_table']} \ ({', '.join(kwargs['stg_columns'])}) \ - FROM STDIN WITH (FORMAT csv, DELIMITER '|', NULL 'None')" + FROM STDIN WITH (FORMAT csv, DELIMITER E'\x1f', NULL 'None')" for src_chunk in pd.read_sql_query(src_stmt, con=kwargs['src_conn'], chunksize=chunk_size): @@ -114,6 +114,6 @@ def load_table_in_chunks_fast(**kwargs): # TODO: add arg cols needed to stage + def get_data_iterator(data): data = [tuple(row) for row in data.itertuples(index=False)] return StringIteratorIO(( - '|'.join(map(str, row)) + '\n' + '\\x1f'.join(map(str, row)) + '\n' for row in data )) diff --git a/airflow/helpers/stringiteratorio.py b/airflow/dags/helpers/stringiteratorio.py similarity index 100% rename from airflow/helpers/stringiteratorio.py rename to airflow/dags/helpers/stringiteratorio.py diff --git a/airflow/dags/helpers/target_functions.py b/airflow/dags/helpers/target_functions.py new file mode 100644 index 0000000..67bc92e --- /dev/null +++ b/airflow/dags/helpers/target_functions.py @@ -0,0 +1,132 @@ +import pandas as pd +import numpy as np +from connections import * +from dwh_email import send_email +from stringiteratorio import StringIteratorIO +from sqlalchemy import text + +def database_lookup(**kwargs): # TODO: add documentation + with create_connection_pg(kwargs['stg_conn_id'], kwargs['stg_db']) as stage_conn: + try: + bk_lst = [str(i) for i in kwargs['data_chunk'][kwargs['bk_en_cz'][1]].tolist()] + columns = [f"{key} as {value}" for key, value in kwargs['cols_to_select'].items()] + + lookup_query = f"SELECT {', '.join(columns)} \ + FROM {kwargs['lookup_schema']}.{kwargs['lookup_table']} \ + WHERE {kwargs['bk_en_cz'][0]} IN ({', '.join(bk_lst)}) \ + AND {kwargs['constraint']}" + + lookup_res = pd.read_sql_query(lookup_query, con=stage_conn) + res_chunk = pd.merge(kwargs['data_chunk'], lookup_res, how='left', on=kwargs['bk_en_cz'][1]) + looked_up_col = res_chunk.pop(kwargs['col_to_add']) + res_chunk.insert(kwargs['order_in_table'], kwargs['col_to_add'], looked_up_col) + return res_chunk + + except Exception as e: + email_sub = f"Lookup in table {kwargs['lookup_table']} FAILED" + email_msg = str(e) + send_email(email_sub, email_msg) + raise Exception(f"Lookup in table {kwargs['lookup_table']} FAILED") + + finally: + stage_conn.close() + + +def load_table_to_tg(**kwargs): + with create_connector_pg(kwargs['tg_conn_id'], kwargs['tg_db']) as tg_conn: + try: + stg_conn = create_connection_pg(kwargs['stg_conn_id'], kwargs['stg_db']) + load_to_target = kwargs['load_to_target'] + if load_to_target: + load_to_target( + stg_conn=stg_conn, + tg_conn=tg_conn, + si_schema=kwargs['si_schema'], + si_table=kwargs['si_table'], + si_columns=kwargs['si_columns'] + ) + + tg_conn.commit() + except: + tg_conn.rollback() + raise + finally: + stg_conn.close() + + +def load_new(new_data, tg_conn, tg_columns, tg_dtypes, tg_schema, tg_table, fk_columns): # TODO: change to tg_cursor in input params + init_load = False # TODO: add this to dag variables + + new_data['last_update'] = new_data['last_update'].dt.floor('D') + new_data.loc[:, 'version'] = 1 + + if init_load: + new_data['date_from'] = '1900-01-01 00:00:00' + else: + new_data.loc[:, 'date_from'] = new_data['last_update'].copy() + + new_data['date_to'] = '2199-12-31 00:00:00' + last_update_col = new_data.pop('last_update') + new_data['last_update'] = last_update_col + new_to_load = new_data[tg_columns].copy() + + for col in fk_columns: + new_to_load[col] = new_to_load[col].apply(lambda x: '{:.0f}'.format(x) if x.is_integer() else 'None') + + all_columns = list(new_to_load.columns) + nan_columns = [col for col in all_columns if col not in fk_columns] + new_to_load[nan_columns] = new_to_load[nan_columns].replace({pd.np.nan: None}) + new_to_load = new_to_load.replace(r'\n', ' ', regex=True) + + copy_to_target_stmt = f"COPY {tg_schema}.{tg_table} \ + ({', '.join(tg_columns)}) \ + FROM STDIN WITH (FORMAT csv, DELIMITER E'\x1f', NULL 'None')" + data_iterator = get_data_iterator(new_to_load) + tg_cursor = tg_conn.cursor() + tg_cursor.copy_expert(copy_to_target_stmt, data_iterator) + + +def get_data_iterator(data): + data = [tuple(row) for row in data.itertuples(index=False)] + return StringIteratorIO(( + "\x1f".join(map(str, row)) + '\n' + for row in data + )) + +def concat_fields(data, columns_to_concat, str_columns, fk_columns): + delimiter = '\\x1f' + # fk columns may contain NULL, so pandas dataframe sets its data type as float + # which is not correspond with postgres bigint, so this transforation fixes it + for col in fk_columns: + data[col] = data[col].apply(lambda x: '{:.0f}'.format(x) if x.is_integer() else '') + + data.fillna('', inplace=True) + data = data.astype(str) + data[str_columns] = data[str_columns].apply(lambda x: x.str.strip()) + data['concated'] = data[columns_to_concat].apply( + lambda x: (delimiter.join(x).replace("'", "''")).replace("%", "%%"), + axis=1) + return data + +def call_inc_hist_m(concated, tg_table, active, last_update, tg_conn): + inc_hist_m_stmt = f"SELECT public.inc_historize_case_m('{concated}', \ + '{tg_table}', {active}, '{last_update}')" + tg_conn.cursor().execute(inc_hist_m_stmt) + +def load_modified(modified_data, columns_to_concat, str_columns, fk_columns, tg_table, tg_conn): + modified_data = concat_fields(modified_data, columns_to_concat, + str_columns, fk_columns) + modified_data.to_csv('/home/zolockri/Documents/suuuka.csv') + modified_data.apply(lambda row: call_inc_hist_m(row['concated'], tg_table, + row['active'], row['last_update'], tg_conn), axis=1) + +def call_delete_update(bk_str, bk, tg_table, tg_conn): + delete_update_stmt = f"update dwh.{tg_table} \ + set date_to = CURRENT_TIMESTAMP \ + where {bk_str} = {bk} \ + and (date_to = '2199-12-31 00:00:00');" + tg_conn.cursor().execute(delete_update_stmt) + +def load_deleted(deleted_data, bk_str, tg_table, tg_conn): + deleted_data.apply(lambda row: call_delete_update(bk_str, row[bk_str], + tg_table, tg_conn), axis=1) diff --git a/airflow/helpers/increment.py b/airflow/helpers/increment.py deleted file mode 100644 index e69de29..0000000 diff --git a/airflow/helpers/target_functions.py b/airflow/helpers/target_functions.py index 52893f6..a368050 100644 --- a/airflow/helpers/target_functions.py +++ b/airflow/helpers/target_functions.py @@ -1,7 +1,8 @@ import pandas as pd +from connections import * def database_lookup(**kwargs): # TODO: add documentation - with create_connection_to_stage() as stage_conn: # TODO: add import + with create_connection_pg(kwargs['stg_conn_id'], kwargs['stg_db']) as stage_conn: # TODO: add import try: bk_lst = [str(i) for i in kwargs['data_chunk'][kwargs['bk_en_cz'][1]].tolist()] columns = [f"{key} as {value}" for key, value in kwargs['cols_to_select'].items()] @@ -28,29 +29,28 @@ def database_lookup(**kwargs): # TODO: add documentation def load_table_to_tg(**kwargs): - column_aliases_lst = [f"{key} as {value}" for key, value in kwargs['columns'].items()] - table_input_stmt = f"SELECT {', '.join(column_aliases_lst)} FROM \ - {kwargs['stage_schema']}.{kwargs['stage_table']}" - - with create_connection_to_target() as tg_conn: - tg_trans = tg_conn.begin() + with create_connector_pg(kwargs['tg_conn_id'], kwargs['tg_db']) as tg_conn: try: - stage_conn = create_connection_to_stage() + stg_conn = create_connection_pg(kwargs['stg_conn_id'], kwargs['stg_db']) load_to_target = kwargs['load_to_target'] if load_to_target: - load_to_target(table_input_stmt=table_input_stmt, - stage_conn=stage_conn, tg_conn=tg_conn) - - tg_trans.commit() + load_to_target( + stg_conn=stg_conn, + tg_conn=tg_conn, + si_schema=kwargs['si_schema'], + si_table=kwargs['si_table'], + si_columns=kwargs['si_columns'] + ) + + tg_conn.commit() except: - tg_trans.rollback() + tg_conn.rollback() raise finally: - stage_conn.close() - tg_conn.close() + stg_conn.close() -def load_new(new_data, tg_cursor, tg_columns, tg_dtypes, tg_schema, tg_table): # TODO: change to tg_cursor in input params +def load_new(new_data, tg_conn, tg_columns, tg_dtypes, tg_schema, tg_table): # TODO: change to tg_cursor in input params init_load = False # TODO: add this to dag variables new_data['last_update'] = new_data['last_update'].dt.floor() new_data['version'] = 1 @@ -69,6 +69,7 @@ def load_new(new_data, tg_cursor, tg_columns, tg_dtypes, tg_schema, tg_table): # ({', '.join(tg_columns)}) \ FROM STDIN WITH (FORMAT csv, DELIMITER '|', NULL 'None')" data_iterator = get_data_iterator(new_to_load) + tg_cursor = tg_conn.cursor() tg_cursor.copy_expert(copy_to_target_stmt, data_iterator) @@ -79,8 +80,40 @@ def get_data_iterator(data): for row in data )) -def load_modified(): - pass - -def load_deleted(): - pass +def concat_fields(data, columns_to_concat, str_columns, fk_columns): + delimiter = '\\x1f' + # fk columns may contain NULL, so pandas dataframe sets its data type as float + # which is not correspond with postgres bigint, so this transforation fixes it + for col in fk_columns: + data[col] = data[col].apply(lambda x: '{:.0f}'.format(x) if x.is_integer() else '') + + data.fillna('', inplace=True) + data = data.astype(str) + data[str_columns] = data[str_columns].apply(lambda x: x.str.strip()) + data['concated'] = data[columns_to_concat].apply( + lambda x: (delimiter.join(x).replace("'", "''")).replace("%", "%%"), + axis=1) + return data + +def call_inc_hist_m(conacted, tg_table, active, last_update, tg_conn): + inc_hist_m_stmt = f"SELECT public.inc_historize_case_m('{concated}', \ + '{tg_table}', {active}, '{last_update}')" + tg_conn.execute(inc_hist_m_stmt) + +def load_modified(modified_data, columns_to_concat, str_columns, fk_columns, tg_table, tg_conn): + modified_data = concat_fields(modified_data, columns_to_concat, + str_columns, fk_columns) + + modified_data.apply(lambda row: call_inc_hist_m(row['concated'], tg_table, + row['active'], row['last_update'], tg_conn)) + +def call_delete_update(bk_str, bk, tg_table, tg_conn): + delete_update_stmt = f"update dwh.{tg_table} \ + set date_to = CURRENT_TIMESTAMP \ + where {bk_str} = {bk} \ + and (date_to = '2199-12-31 00:00:00');" + tg_conn.execute(delete_update_stmt) + +def load_deleted(deleted_data, bk_str, tg_table, tg_conn): + deleted_data.apply(lambda row: call_delete_update(bk_str, row[bk_str], + tg_table, tg_conn), axis=1) diff --git a/airflow/logs/scheduler/latest b/airflow/logs/scheduler/latest index 24827bd..af3f3c6 120000 --- a/airflow/logs/scheduler/latest +++ b/airflow/logs/scheduler/latest @@ -1 +1 @@ -/home/zolockri/Documents/FIT/BP/airflow_workspace/airflow/logs/scheduler/2023-03-04 \ No newline at end of file +/home/zolockri/Documents/FIT/BP/airflow_workspace/airflow/logs/scheduler/2023-03-05 \ No newline at end of file -- GitLab