From b22565f5e892999d4ac6359f8c77f17741a33920 Mon Sep 17 00:00:00 2001 From: Kristina Zolochevskaia <zolockri@fit.cvut.cz> Date: Wed, 15 Mar 2023 18:48:00 +0100 Subject: [PATCH] minor fix --- ..._source_to_stage_student_classification.py | 61 ++++ airflow/dags/grades_klasifikace_dag.py | 292 +++++++++--------- .../load_to_stage_no_psc_operator.py | 10 +- .../load_to_target_operator.py | 4 +- .../make_increment_no_psc_operator.py | 2 +- 5 files changed, 217 insertions(+), 152 deletions(-) create mode 100644 airflow/dags/grades/grades_source_to_stage_student_classification.py diff --git a/airflow/dags/grades/grades_source_to_stage_student_classification.py b/airflow/dags/grades/grades_source_to_stage_student_classification.py new file mode 100644 index 0000000..beb2984 --- /dev/null +++ b/airflow/dags/grades/grades_source_to_stage_student_classification.py @@ -0,0 +1,61 @@ +import pandas as pd +import hashlib +import logging +import sys +import os +import time + +sys.path.append('./helpers') +from helpers.stage_functions_no_psc import load_table + +from airflow.providers.postgres.hooks.postgres import PostgresHook + +#==========================LOADING PARAMETERS=================================== +# connections +src_conn_id = 'grades' +src_db = 'classification' +stg_conn_id = 'dv5' +stg_db = 'dwh3_test_stage' + +# tables +src_schema = 'main' +src_table = 'student_classification' +src_columns = [ + 'student_classification_id', + 'note', + 'classification_id', + 'timestamp', + 'classification_user_id' +] +stg_schema = 'ps_grades' +stg_table = 'student_classification_airflow_orig' +stg_columns = [ + 'student_classification_id', + 'note', + 'classification_id', + 'timestamp', + 'classification_user_id', + 'md5' +] + +#=============================================================================== + +def load_student_classification(): + load_table( + src_conn_id=src_conn_id, + src_db=src_db, + stg_conn_id=stg_conn_id, + stg_db=stg_db, + src_schema=src_schema, + src_table=src_table, + src_columns=src_columns, + stg_schema=stg_schema, + stg_table=stg_table, + stg_columns=stg_columns, + fastLoad=True + ) + + +start_time = time.time() +load_student_classification() +print("--- %s seconds ---" % (time.time() - start_time)) diff --git a/airflow/dags/grades_klasifikace_dag.py b/airflow/dags/grades_klasifikace_dag.py index d9144df..614feb6 100644 --- a/airflow/dags/grades_klasifikace_dag.py +++ b/airflow/dags/grades_klasifikace_dag.py @@ -43,7 +43,7 @@ default_args = { "start_date": pendulum.datetime(2021, 1, 1, tz="UTC"), "email_on_failure": False, "email_on_retry": False, - "retries": 1, + "retries": 0, "retry_delay": timedelta(minutes=5), "execution_timeout": timedelta(minutes=60) } @@ -66,135 +66,135 @@ with DAG( #==============TASK LOAD BOOLEAN_STUDENT_CLASSIFICATION========================= - # task_load_boolean_student_classification = LoadToStageNoPSCOperator( - # task_id="task_load_boolean_student_classification", - # src_conn_id=GRADES_SRC_CONN_ID, - # src_db=GRADES_SRC_DB, - # src_schema=GRADES_SRC_SCHEMA, - # src_table=GRADES_SRC_TABLE_BOOLEAN_STUDENT_CLASSIFICATION, - # src_columns=GRADES_SRC_COLUMNS_BOOLEAN_STUDENT_CLASSIFICATION, - # stg_conn_id=GRADES_STG_CONN_ID, - # stg_db=GRADES_STG_DB, - # stg_schema=GRADES_STG_SCHEMA, - # stg_table=GRADES_STG_TABLE_BOOLEAN_STUDENT_CLASSIFICATION, - # stg_columns=GRADES_STG_COLUMNS_BOOLEAN_STUDENT_CLASSIFICATION - # ) + task_load_boolean_student_classification = LoadToStageNoPSCOperator( + task_id="task_load_boolean_student_classification", + src_conn_id=GRADES_SRC_CONN_ID, + src_db=GRADES_SRC_DB, + src_schema=GRADES_SRC_SCHEMA, + src_table=GRADES_SRC_TABLE_BOOLEAN_STUDENT_CLASSIFICATION, + src_columns=GRADES_SRC_COLUMNS_BOOLEAN_STUDENT_CLASSIFICATION, + stg_conn_id=GRADES_STG_CONN_ID, + stg_db=GRADES_STG_DB, + stg_schema=GRADES_STG_SCHEMA, + stg_table=GRADES_STG_TABLE_BOOLEAN_STUDENT_CLASSIFICATION, + stg_columns=GRADES_STG_COLUMNS_BOOLEAN_STUDENT_CLASSIFICATION + ) #================TASK LOAD CLASSIFICATION======================================= - # - # task_load_classification = LoadToStageNoPSCOperator( - # task_id="task_load_classification", - # src_conn_id=GRADES_SRC_CONN_ID, - # src_db=GRADES_SRC_DB, - # src_schema=GRADES_SRC_SCHEMA, - # src_table=GRADES_SRC_TABLE_CLASSIFICATION, - # src_columns=GRADES_SRC_COLUMNS_CLASSIFICATION, - # stg_conn_id=GRADES_STG_CONN_ID, - # stg_db=GRADES_STG_DB, - # stg_schema=GRADES_STG_SCHEMA, - # stg_table=GRADES_STG_TABLE_CLASSIFICATION, - # stg_columns=GRADES_STG_COLUMNS_CLASSIFICATION - # ) + + task_load_classification = LoadToStageNoPSCOperator( + task_id="task_load_classification", + src_conn_id=GRADES_SRC_CONN_ID, + src_db=GRADES_SRC_DB, + src_schema=GRADES_SRC_SCHEMA, + src_table=GRADES_SRC_TABLE_CLASSIFICATION, + src_columns=GRADES_SRC_COLUMNS_CLASSIFICATION, + stg_conn_id=GRADES_STG_CONN_ID, + stg_db=GRADES_STG_DB, + stg_schema=GRADES_STG_SCHEMA, + stg_table=GRADES_STG_TABLE_CLASSIFICATION, + stg_columns=GRADES_STG_COLUMNS_CLASSIFICATION + ) #================TASK LOAD CLASSIFICATION_TEXT================================== - # task_load_classification_text = LoadToStageNoPSCOperator( - # task_id="task_load_classification_text", - # src_conn_id=GRADES_SRC_CONN_ID, - # src_db=GRADES_SRC_DB, - # src_schema=GRADES_SRC_SCHEMA, - # src_table=GRADES_SRC_TABLE_CLASSIFICATION_TEXT, - # src_columns=GRADES_SRC_COLUMNS_CLASSIFICATION_TEXT, - # stg_conn_id=GRADES_STG_CONN_ID, - # stg_db=GRADES_STG_DB, - # stg_schema=GRADES_STG_SCHEMA, - # stg_table=GRADES_STG_TABLE_CLASSIFICATION_TEXT, - # stg_columns=GRADES_STG_COLUMNS_CLASSIFICATION_TEXT - # ) + task_load_classification_text = LoadToStageNoPSCOperator( + task_id="task_load_classification_text", + src_conn_id=GRADES_SRC_CONN_ID, + src_db=GRADES_SRC_DB, + src_schema=GRADES_SRC_SCHEMA, + src_table=GRADES_SRC_TABLE_CLASSIFICATION_TEXT, + src_columns=GRADES_SRC_COLUMNS_CLASSIFICATION_TEXT, + stg_conn_id=GRADES_STG_CONN_ID, + stg_db=GRADES_STG_DB, + stg_schema=GRADES_STG_SCHEMA, + stg_table=GRADES_STG_TABLE_CLASSIFICATION_TEXT, + stg_columns=GRADES_STG_COLUMNS_CLASSIFICATION_TEXT + ) #================TASK LOAD CLASSIFICATION_USER================================== - # task_load_classification_user = LoadToStageNoPSCOperator( - # task_id="task_load_classification_user", - # src_conn_id=GRADES_SRC_CONN_ID, - # src_db=GRADES_SRC_DB, - # src_schema=GRADES_SRC_SCHEMA, - # src_table=GRADES_SRC_TABLE_CLASSIFICATION_USER, - # src_columns=GRADES_SRC_COLUMNS_CLASSIFICATION_USER, - # stg_conn_id=GRADES_STG_CONN_ID, - # stg_db=GRADES_STG_DB, - # stg_schema=GRADES_STG_SCHEMA, - # stg_table=GRADES_STG_TABLE_CLASSIFICATION_USER, - # stg_columns=GRADES_STG_COLUMNS_CLASSIFICATION_USER - # ) + task_load_classification_user = LoadToStageNoPSCOperator( + task_id="task_load_classification_user", + src_conn_id=GRADES_SRC_CONN_ID, + src_db=GRADES_SRC_DB, + src_schema=GRADES_SRC_SCHEMA, + src_table=GRADES_SRC_TABLE_CLASSIFICATION_USER, + src_columns=GRADES_SRC_COLUMNS_CLASSIFICATION_USER, + stg_conn_id=GRADES_STG_CONN_ID, + stg_db=GRADES_STG_DB, + stg_schema=GRADES_STG_SCHEMA, + stg_table=GRADES_STG_TABLE_CLASSIFICATION_USER, + stg_columns=GRADES_STG_COLUMNS_CLASSIFICATION_USER + ) #================TASK LOAD NUMBER_STUDENT_CLASSIFICATION======================== - # - # task_load_number_student_classification = LoadToStageNoPSCOperator( - # task_id="task_load_number_student_classification", - # src_conn_id=GRADES_SRC_CONN_ID, - # src_db=GRADES_SRC_DB, - # src_schema=GRADES_SRC_SCHEMA, - # src_table=GRADES_SRC_TABLE_NUMBER_STUDENT_CLASSIFICATION, - # src_columns=GRADES_SRC_COLUMNS_NUMBER_STUDENT_CLASSIFICATION, - # stg_conn_id=GRADES_STG_CONN_ID, - # stg_db=GRADES_STG_DB, - # stg_schema=GRADES_STG_SCHEMA, - # stg_table=GRADES_STG_TABLE_NUMBER_STUDENT_CLASSIFICATION, - # stg_columns=GRADES_STG_COLUMNS_NUMBER_STUDENT_CLASSIFICATION - # ) + + task_load_number_student_classification = LoadToStageNoPSCOperator( + task_id="task_load_number_student_classification", + src_conn_id=GRADES_SRC_CONN_ID, + src_db=GRADES_SRC_DB, + src_schema=GRADES_SRC_SCHEMA, + src_table=GRADES_SRC_TABLE_NUMBER_STUDENT_CLASSIFICATION, + src_columns=GRADES_SRC_COLUMNS_NUMBER_STUDENT_CLASSIFICATION, + stg_conn_id=GRADES_STG_CONN_ID, + stg_db=GRADES_STG_DB, + stg_schema=GRADES_STG_SCHEMA, + stg_table=GRADES_STG_TABLE_NUMBER_STUDENT_CLASSIFICATION, + stg_columns=GRADES_STG_COLUMNS_NUMBER_STUDENT_CLASSIFICATION + ) #================TASK LOAD STRING_STUDENT_CLASSIFICATION======================== - # - # task_load_string_student_classification = LoadToStageNoPSCOperator( - # task_id="task_load_string_student_classification", - # src_conn_id=GRADES_SRC_CONN_ID, - # src_db=GRADES_SRC_DB, - # src_schema=GRADES_SRC_SCHEMA, - # src_table=GRADES_SRC_TABLE_STRING_STUDENT_CLASSIFICATION, - # src_columns=GRADES_SRC_COLUMNS_STRING_STUDENT_CLASSIFICATION, - # stg_conn_id=GRADES_STG_CONN_ID, - # stg_db=GRADES_STG_DB, - # stg_schema=GRADES_STG_SCHEMA, - # stg_table=GRADES_STG_TABLE_STRING_STUDENT_CLASSIFICATION, - # stg_columns=GRADES_STG_COLUMNS_STRING_STUDENT_CLASSIFICATION - # ) + + task_load_string_student_classification = LoadToStageNoPSCOperator( + task_id="task_load_string_student_classification", + src_conn_id=GRADES_SRC_CONN_ID, + src_db=GRADES_SRC_DB, + src_schema=GRADES_SRC_SCHEMA, + src_table=GRADES_SRC_TABLE_STRING_STUDENT_CLASSIFICATION, + src_columns=GRADES_SRC_COLUMNS_STRING_STUDENT_CLASSIFICATION, + stg_conn_id=GRADES_STG_CONN_ID, + stg_db=GRADES_STG_DB, + stg_schema=GRADES_STG_SCHEMA, + stg_table=GRADES_STG_TABLE_STRING_STUDENT_CLASSIFICATION, + stg_columns=GRADES_STG_COLUMNS_STRING_STUDENT_CLASSIFICATION + ) #================TASK LOAD STUDENT_CLASSIFICATION=============================== - # - # task_load_student_classification = LoadToStageNoPSCOperator( - # task_id="task_load_student_classification", - # src_conn_id=GRADES_SRC_CONN_ID, - # src_db=GRADES_SRC_DB, - # src_schema=GRADES_SRC_SCHEMA, - # src_table=GRADES_SRC_TABLE_STUDENT_CLASSIFICATION, - # src_columns=GRADES_SRC_COLUMNS_STUDENT_CLASSIFICATION, - # stg_conn_id=GRADES_STG_CONN_ID, - # stg_db=GRADES_STG_DB, - # stg_schema=GRADES_STG_SCHEMA, - # stg_table=GRADES_STG_TABLE_STUDENT_CLASSIFICATION, - # stg_columns=GRADES_STG_COLUMNS_STUDENT_CLASSIFICATION - # ) + + task_load_student_classification = LoadToStageNoPSCOperator( + task_id="task_load_student_classification", + src_conn_id=GRADES_SRC_CONN_ID, + src_db=GRADES_SRC_DB, + src_schema=GRADES_SRC_SCHEMA, + src_table=GRADES_SRC_TABLE_STUDENT_CLASSIFICATION, + src_columns=GRADES_SRC_COLUMNS_STUDENT_CLASSIFICATION, + stg_conn_id=GRADES_STG_CONN_ID, + stg_db=GRADES_STG_DB, + stg_schema=GRADES_STG_SCHEMA, + stg_table=GRADES_STG_TABLE_STUDENT_CLASSIFICATION, + stg_columns=GRADES_STG_COLUMNS_STUDENT_CLASSIFICATION + ) #=============================================================================== #================================INCREMENT====================================== #=============================================================================== #================TASK MAKE INCREMENT CLASSIFICATION============================= - # - # task_make_increment_classification = MakeIncrementNoPSCOperator( - # task_id="task_make_increment_classification", - # source_name=GRADES_SRC_NAME, - # table_name=GRADES_STG_TABLE_CLASSIFICATION, - # conn_id=GRADES_STG_CONN_ID, - # db=GRADES_STG_DB, - # ) + + task_make_increment_classification = MakeIncrementNoPSCOperator( + task_id="task_make_increment_classification", + source_name=GRADES_SRC_NAME, + table_name=GRADES_STG_TABLE_CLASSIFICATION, + conn_id=GRADES_STG_CONN_ID, + db=GRADES_STG_DB, + ) #================TASK MAKE INCREMENT STUDENT_CLASSIFICATION===================== - # task_make_increment_student_classification = MakeIncrementNoPSCOperator( - # task_id="task_make_increment_student_classification", - # source_name=GRADES_SRC_NAME, - # table_name=GRADES_STG_TABLE_STUDENT_CLASSIFICATION, - # conn_id=GRADES_STG_CONN_ID, - # db=GRADES_STG_DB, - # ) + task_make_increment_student_classification = MakeIncrementNoPSCOperator( + task_id="task_make_increment_student_classification", + source_name=GRADES_SRC_NAME, + table_name=GRADES_STG_TABLE_STUDENT_CLASSIFICATION, + conn_id=GRADES_STG_CONN_ID, + db=GRADES_STG_DB, + ) #=============================================================================== #=================================TARGET======================================== @@ -205,29 +205,29 @@ with DAG( # task_id="task_test", # python_callable=test # ) - task_load_t_klas_klasifikace = LoadToTargetOperator( - task_id="task_load_t_klas_klasifikace", - #===conections=== - stg_conn_id=GRADES_STG_CONN_ID, - stg_db=GRADES_STG_DB, - tg_conn_id=GRADES_TG_CONN_ID, - tg_db=GRADES_TG_DB, - #===table info=== - si_schema=GRADES_SI_SCHEMA, - si_table=GRADES_SI_TABLE_CLASSIFICATION, - si_columns=GRADES_SI_COLUMNS_CLASSIFICATION, - tg_schema=GRADES_TG_SCHEMA, - tg_table=GRADES_TG_TABLE_KLASIFIKACE, - tg_columns=GRADES_TG_COLUMNS_KLASIFIKACE, - #===info for transforamtions=== - columns_to_concat=GRADES_TG_COLUMNS_TO_CONCAT_KLASIFIKACE, - tg_dtypes=GRADES_TG_COLUMNS_DTYPES_KLASIFIKACE, - str_columns=GRADES_TG_COLUMNS_STRING_KLASIFIKACE, - timestamp_columns=GRADES_TG_COLUMNS_TIMESTAMP_KLASIFIKACE, - init_load=False, - bk_en_cz=GRADES_TG_BK_EN_CZ_KLASIFIKACE, - process_data=process_data_klasifikace.process_data - ) + # task_load_t_klas_klasifikace = LoadToTargetOperator( + # task_id="task_load_t_klas_klasifikace", + # #===conections=== + # stg_conn_id=GRADES_STG_CONN_ID, + # stg_db=GRADES_STG_DB, + # tg_conn_id=GRADES_TG_CONN_ID, + # tg_db=GRADES_TG_DB, + # #===table info=== + # si_schema=GRADES_SI_SCHEMA, + # si_table=GRADES_SI_TABLE_CLASSIFICATION, + # si_columns=GRADES_SI_COLUMNS_CLASSIFICATION, + # tg_schema=GRADES_TG_SCHEMA, + # tg_table=GRADES_TG_TABLE_KLASIFIKACE, + # tg_columns=GRADES_TG_COLUMNS_KLASIFIKACE, + # #===info for transforamtions=== + # columns_to_concat=GRADES_TG_COLUMNS_TO_CONCAT_KLASIFIKACE, + # tg_dtypes=GRADES_TG_COLUMNS_DTYPES_KLASIFIKACE, + # str_columns=GRADES_TG_COLUMNS_STRING_KLASIFIKACE, + # timestamp_columns=GRADES_TG_COLUMNS_TIMESTAMP_KLASIFIKACE, + # init_load=False, + # bk_en_cz=GRADES_TG_BK_EN_CZ_KLASIFIKACE, + # process_data=process_data_klasifikace.process_data + # ) #=================TASK LOAD T_KLAS_KLASIFIKACE_STUDENT========================== @@ -257,19 +257,21 @@ with DAG( #=============================================================================== - task_start_grades >> task_load_t_klas_klasifikace + # task_start_grades >> task_load_t_klas_klasifikace + # task_start_grades >> task_make_increment_classification + # task_start_grades >> task_test - # task_start_grades >> [task_load_boolean_student_classification, - # task_load_classification, - # task_load_classification_text, - # task_load_student_classification, - # task_load_classification_user, - # task_load_number_student_classification, - # task_load_string_student_classification - # ] - # task_load_classification >> task_make_increment_classification - # task_load_student_classification >> task_make_increment_student_classification + task_start_grades >> [task_load_boolean_student_classification, + task_load_classification, + task_load_classification_text, + task_load_student_classification, + task_load_classification_user, + task_load_number_student_classification, + task_load_string_student_classification + ] + task_load_classification >> task_make_increment_classification + task_load_student_classification >> task_make_increment_student_classification # do we need those dependent tables to be incremented?? # [task_make_increment_classification, task_load_classification_text] >> task_load_t_klas_klasifikace diff --git a/airflow/plugins/custom_operators/load_to_stage_no_psc_operator.py b/airflow/plugins/custom_operators/load_to_stage_no_psc_operator.py index c705396..bd05b2f 100644 --- a/airflow/plugins/custom_operators/load_to_stage_no_psc_operator.py +++ b/airflow/plugins/custom_operators/load_to_stage_no_psc_operator.py @@ -30,8 +30,8 @@ class LoadToStageNoPSCOperator(BaseOperator): self.stg_table = stg_table self.stg_columns = stg_columns - def execute(self, context): - self.load_table() + def execute(self, context): + self.load_table() #=============================================================================== @@ -54,7 +54,7 @@ class LoadToStageNoPSCOperator(BaseOperator): #=============================================================================== def get_data_iterator(self, data): - from helpers.stringiteratorio import StringIteratorIO + from custom_operators.helpers.stringiteratorio import StringIteratorIO data = [tuple(row) for row in data.itertuples(index=False)] #todo try replace here return StringIteratorIO(( @@ -68,8 +68,8 @@ class LoadToStageNoPSCOperator(BaseOperator): from psycopg2 import sql alter_stmt = sql.SQL("ALTER TABLE {schema_name}.{table_name} \ OWNER TO big;").format( - schema_name=sql.Identifier(self.schema_name), - table_name=sql.Identifier(self.table_name) + schema_name=sql.Identifier(self.stg_schema), + table_name=sql.Identifier(self.stg_table) ) cursor.execute(alter_stmt) diff --git a/airflow/plugins/custom_operators/load_to_target_operator.py b/airflow/plugins/custom_operators/load_to_target_operator.py index 7f2ee8c..d3fe483 100644 --- a/airflow/plugins/custom_operators/load_to_target_operator.py +++ b/airflow/plugins/custom_operators/load_to_target_operator.py @@ -214,9 +214,11 @@ class LoadToTargetOperator(BaseOperator): import pandas as pd with create_connection_pg(self.stg_conn_id, self.stg_db) as stage_conn: try: + print(data[lookup_info['bk_en_cz'][1]]) bk_lst = list(set([str(i) for i in data[lookup_info['bk_en_cz'][1]].tolist()])) columns = [f"{key} as {value}" for key, value in lookup_info['cols_to_select'].items()] - + print(bk_lst) + raise lookup_query = f"SELECT {', '.join(columns)} \ FROM {lookup_info['lookup_schema']}.{lookup_info['lookup_table']} \ WHERE {self.bk_en_cz[0]} IN ({', '.join(bk_lst)}) \ diff --git a/airflow/plugins/custom_operators/make_increment_no_psc_operator.py b/airflow/plugins/custom_operators/make_increment_no_psc_operator.py index 1254fc2..bf2f526 100644 --- a/airflow/plugins/custom_operators/make_increment_no_psc_operator.py +++ b/airflow/plugins/custom_operators/make_increment_no_psc_operator.py @@ -47,7 +47,7 @@ class MakeIncrementNoPSCOperator(BaseOperator): #=============================================================================== def get_M_N_D_flags(self, **kwargs): - with create_connector_pg(conn_id, db) as conn: + with create_connector_pg(self.conn_id, self.db) as conn: try: cursor = conn.cursor() cursor.callproc("public.inc_find_modified_in_pre_stage_schema_table_args", -- GitLab