diff --git a/airflow/airflow-webserver.pid b/airflow/airflow-webserver.pid new file mode 100644 index 0000000000000000000000000000000000000000..946a9fb5d59466a588e135aa6c5bbad287238e64 --- /dev/null +++ b/airflow/airflow-webserver.pid @@ -0,0 +1 @@ +7110 diff --git a/airflow/dags/grades_klasifikace_dag.py b/airflow/dags/grades_klasifikace_dag.py index d706244e02ad3b282b1ee095fb005e7808f22787..a8aa73ef21751e4427299b30348dedc8d31de897 100644 --- a/airflow/dags/grades_klasifikace_dag.py +++ b/airflow/dags/grades_klasifikace_dag.py @@ -68,167 +68,167 @@ with DAG( #==============TASK LOAD BOOLEAN_STUDENT_CLASSIFICATION========================= -# task_load_boolean_student_classification = LoadToStageOperator( -# task_id="task_load_boolean_student_classification", -# src_conn_id=GRADES_SRC_CONN_ID, -# src_db=GRADES_SRC_DB, -# src_schema=GRADES_SRC_SCHEMA, -# src_table=GRADES_SRC_TABLE_BOOLEAN_STUDENT_CLASSIFICATION, -# src_columns=GRADES_SRC_COLUMNS_BOOLEAN_STUDENT_CLASSIFICATION, -# stg_conn_id=GRADES_STG_CONN_ID, -# stg_db=GRADES_STG_DB, -# stg_schema=GRADES_STG_SCHEMA, -# stg_table=GRADES_STG_TABLE_BOOLEAN_STUDENT_CLASSIFICATION, -# stg_columns=GRADES_STG_COLUMNS_BOOLEAN_STUDENT_CLASSIFICATION -# ) -# -# #================TASK LOAD CLASSIFICATION======================================= -# -# task_load_classification = LoadToStageOperator( -# task_id="task_load_classification", -# src_conn_id=GRADES_SRC_CONN_ID, -# src_db=GRADES_SRC_DB, -# src_schema=GRADES_SRC_SCHEMA, -# src_table=GRADES_SRC_TABLE_CLASSIFICATION, -# src_columns=GRADES_SRC_COLUMNS_CLASSIFICATION, -# stg_conn_id=GRADES_STG_CONN_ID, -# stg_db=GRADES_STG_DB, -# stg_schema=GRADES_STG_SCHEMA, -# stg_table=GRADES_STG_TABLE_CLASSIFICATION, -# stg_columns=GRADES_STG_COLUMNS_CLASSIFICATION -# ) -# #================TASK LOAD CLASSIFICATION_TEXT================================== -# -# task_load_classification_text = LoadToStageOperator( -# task_id="task_load_classification_text", -# src_conn_id=GRADES_SRC_CONN_ID, -# src_db=GRADES_SRC_DB, -# src_schema=GRADES_SRC_SCHEMA, -# src_table=GRADES_SRC_TABLE_CLASSIFICATION_TEXT, -# src_columns=GRADES_SRC_COLUMNS_CLASSIFICATION_TEXT, -# stg_conn_id=GRADES_STG_CONN_ID, -# stg_db=GRADES_STG_DB, -# stg_schema=GRADES_STG_SCHEMA, -# stg_table=GRADES_STG_TABLE_CLASSIFICATION_TEXT, -# stg_columns=GRADES_STG_COLUMNS_CLASSIFICATION_TEXT -# ) -# #================TASK LOAD CLASSIFICATION_USER================================== -# -# task_load_classification_user = LoadToStageOperator( -# task_id="task_load_classification_user", -# src_conn_id=GRADES_SRC_CONN_ID, -# src_db=GRADES_SRC_DB, -# src_schema=GRADES_SRC_SCHEMA, -# src_table=GRADES_SRC_TABLE_CLASSIFICATION_USER, -# src_columns=GRADES_SRC_COLUMNS_CLASSIFICATION_USER, -# stg_conn_id=GRADES_STG_CONN_ID, -# stg_db=GRADES_STG_DB, -# stg_schema=GRADES_STG_SCHEMA, -# stg_table=GRADES_STG_TABLE_CLASSIFICATION_USER, -# stg_columns=GRADES_STG_COLUMNS_CLASSIFICATION_USER -# ) -# #================TASK LOAD NUMBER_STUDENT_CLASSIFICATION======================== -# -# task_load_number_student_classification = LoadToStageOperator( -# task_id="task_load_number_student_classification", -# src_conn_id=GRADES_SRC_CONN_ID, -# src_db=GRADES_SRC_DB, -# src_schema=GRADES_SRC_SCHEMA, -# src_table=GRADES_SRC_TABLE_NUMBER_STUDENT_CLASSIFICATION, -# src_columns=GRADES_SRC_COLUMNS_NUMBER_STUDENT_CLASSIFICATION, -# stg_conn_id=GRADES_STG_CONN_ID, -# stg_db=GRADES_STG_DB, -# stg_schema=GRADES_STG_SCHEMA, -# stg_table=GRADES_STG_TABLE_NUMBER_STUDENT_CLASSIFICATION, -# stg_columns=GRADES_STG_COLUMNS_NUMBER_STUDENT_CLASSIFICATION -# ) -# #================TASK LOAD STRING_STUDENT_CLASSIFICATION======================== -# -# task_load_string_student_classification = LoadToStageOperator( -# task_id="task_load_string_student_classification", -# src_conn_id=GRADES_SRC_CONN_ID, -# src_db=GRADES_SRC_DB, -# src_schema=GRADES_SRC_SCHEMA, -# src_table=GRADES_SRC_TABLE_STRING_STUDENT_CLASSIFICATION, -# src_columns=GRADES_SRC_COLUMNS_STRING_STUDENT_CLASSIFICATION, -# stg_conn_id=GRADES_STG_CONN_ID, -# stg_db=GRADES_STG_DB, -# stg_schema=GRADES_STG_SCHEMA, -# stg_table=GRADES_STG_TABLE_STRING_STUDENT_CLASSIFICATION, -# stg_columns=GRADES_STG_COLUMNS_STRING_STUDENT_CLASSIFICATION -# ) -# -# #================TASK LOAD STUDENT_CLASSIFICATION=============================== -# -# task_load_student_classification = LoadToStageOperator( -# task_id="task_load_student_classification", -# src_conn_id=GRADES_SRC_CONN_ID, -# src_db=GRADES_SRC_DB, -# src_schema=GRADES_SRC_SCHEMA, -# src_table=GRADES_SRC_TABLE_STUDENT_CLASSIFICATION, -# src_columns=GRADES_SRC_COLUMNS_STUDENT_CLASSIFICATION, -# stg_conn_id=GRADES_STG_CONN_ID, -# stg_db=GRADES_STG_DB, -# stg_schema=GRADES_STG_SCHEMA, -# stg_table=GRADES_STG_TABLE_STUDENT_CLASSIFICATION, -# stg_columns=GRADES_STG_COLUMNS_STUDENT_CLASSIFICATION -# ) -# -# # =============================================================================== -# # ================================INCREMENT====================================== -# # =============================================================================== -# -# # ================TASK MAKE INCREMENT CLASSIFICATION============================= -# -# task_make_increment_classification = MakeIncrementOperator( -# task_id="task_make_increment_classification", -# source_name=GRADES_SRC_NAME, -# table_name=GRADES_STG_TABLE_CLASSIFICATION, -# conn_id=GRADES_STG_CONN_ID, -# db=GRADES_STG_DB, -# is_psc=False -# ) -# -# # ================TASK MAKE INCREMENT STUDENT_CLASSIFICATION===================== -# -# task_make_increment_student_classification = MakeIncrementOperator( -# task_id="task_make_increment_student_classification", -# source_name=GRADES_SRC_NAME, -# table_name=GRADES_STG_TABLE_STUDENT_CLASSIFICATION, -# conn_id=GRADES_STG_CONN_ID, -# db=GRADES_STG_DB, -# is_psc=False -# ) -# -# # =============================================================================== -# # =================================TARGET======================================== -# # =============================================================================== -# -# # ====================TASK LOAD T_KLAS_KLASIFIKACE=============================== -# -# task_load_t_klas_klasifikace = LoadToTargetOperator( -# task_id="task_load_t_klas_klasifikace", -# #===conections=== -# stg_conn_id=GRADES_STG_CONN_ID, -# stg_db=GRADES_STG_DB, -# tg_conn_id=GRADES_TG_CONN_ID, -# tg_db=GRADES_TG_DB, -# #===table info=== -# si_schema=GRADES_SI_SCHEMA, -# si_table=GRADES_SI_TABLE_CLASSIFICATION, -# si_columns=GRADES_SI_COLUMNS_CLASSIFICATION, -# tg_schema=GRADES_TG_SCHEMA, -# tg_table=GRADES_TG_TABLE_KLASIFIKACE, -# tg_columns=GRADES_TG_COLUMNS_KLASIFIKACE, -# #===info for transforamtions=== -# columns_to_concat=GRADES_TG_COLUMNS_TO_CONCAT_KLASIFIKACE, -# tg_dtypes=GRADES_TG_COLUMNS_DTYPES_KLASIFIKACE, -# str_columns=GRADES_TG_COLUMNS_STRING_KLASIFIKACE, -# timestamp_columns=GRADES_TG_COLUMNS_TIMESTAMP_KLASIFIKACE, -# init_load=False, -# bk_en_cz=GRADES_TG_BK_EN_CZ_KLASIFIKACE, -# process_data=process_data_klasifikace.process_data -# ) + task_load_boolean_student_classification = LoadToStageOperator( + task_id="task_load_boolean_student_classification", + src_conn_id=GRADES_SRC_CONN_ID, + src_db=GRADES_SRC_DB, + src_schema=GRADES_SRC_SCHEMA, + src_table=GRADES_SRC_TABLE_BOOLEAN_STUDENT_CLASSIFICATION, + src_columns=GRADES_SRC_COLUMNS_BOOLEAN_STUDENT_CLASSIFICATION, + stg_conn_id=GRADES_STG_CONN_ID, + stg_db=GRADES_STG_DB, + stg_schema=GRADES_STG_SCHEMA, + stg_table=GRADES_STG_TABLE_BOOLEAN_STUDENT_CLASSIFICATION, + stg_columns=GRADES_STG_COLUMNS_BOOLEAN_STUDENT_CLASSIFICATION + ) + +#================TASK LOAD CLASSIFICATION======================================= + + task_load_classification = LoadToStageOperator( + task_id="task_load_classification", + src_conn_id=GRADES_SRC_CONN_ID, + src_db=GRADES_SRC_DB, + src_schema=GRADES_SRC_SCHEMA, + src_table=GRADES_SRC_TABLE_CLASSIFICATION, + src_columns=GRADES_SRC_COLUMNS_CLASSIFICATION, + stg_conn_id=GRADES_STG_CONN_ID, + stg_db=GRADES_STG_DB, + stg_schema=GRADES_STG_SCHEMA, + stg_table=GRADES_STG_TABLE_CLASSIFICATION, + stg_columns=GRADES_STG_COLUMNS_CLASSIFICATION + ) +#================TASK LOAD CLASSIFICATION_TEXT================================== + + task_load_classification_text = LoadToStageOperator( + task_id="task_load_classification_text", + src_conn_id=GRADES_SRC_CONN_ID, + src_db=GRADES_SRC_DB, + src_schema=GRADES_SRC_SCHEMA, + src_table=GRADES_SRC_TABLE_CLASSIFICATION_TEXT, + src_columns=GRADES_SRC_COLUMNS_CLASSIFICATION_TEXT, + stg_conn_id=GRADES_STG_CONN_ID, + stg_db=GRADES_STG_DB, + stg_schema=GRADES_STG_SCHEMA, + stg_table=GRADES_STG_TABLE_CLASSIFICATION_TEXT, + stg_columns=GRADES_STG_COLUMNS_CLASSIFICATION_TEXT + ) +#================TASK LOAD CLASSIFICATION_USER================================== + + task_load_classification_user = LoadToStageOperator( + task_id="task_load_classification_user", + src_conn_id=GRADES_SRC_CONN_ID, + src_db=GRADES_SRC_DB, + src_schema=GRADES_SRC_SCHEMA, + src_table=GRADES_SRC_TABLE_CLASSIFICATION_USER, + src_columns=GRADES_SRC_COLUMNS_CLASSIFICATION_USER, + stg_conn_id=GRADES_STG_CONN_ID, + stg_db=GRADES_STG_DB, + stg_schema=GRADES_STG_SCHEMA, + stg_table=GRADES_STG_TABLE_CLASSIFICATION_USER, + stg_columns=GRADES_STG_COLUMNS_CLASSIFICATION_USER + ) +#================TASK LOAD NUMBER_STUDENT_CLASSIFICATION======================== + + task_load_number_student_classification = LoadToStageOperator( + task_id="task_load_number_student_classification", + src_conn_id=GRADES_SRC_CONN_ID, + src_db=GRADES_SRC_DB, + src_schema=GRADES_SRC_SCHEMA, + src_table=GRADES_SRC_TABLE_NUMBER_STUDENT_CLASSIFICATION, + src_columns=GRADES_SRC_COLUMNS_NUMBER_STUDENT_CLASSIFICATION, + stg_conn_id=GRADES_STG_CONN_ID, + stg_db=GRADES_STG_DB, + stg_schema=GRADES_STG_SCHEMA, + stg_table=GRADES_STG_TABLE_NUMBER_STUDENT_CLASSIFICATION, + stg_columns=GRADES_STG_COLUMNS_NUMBER_STUDENT_CLASSIFICATION + ) +#================TASK LOAD STRING_STUDENT_CLASSIFICATION======================== + + task_load_string_student_classification = LoadToStageOperator( + task_id="task_load_string_student_classification", + src_conn_id=GRADES_SRC_CONN_ID, + src_db=GRADES_SRC_DB, + src_schema=GRADES_SRC_SCHEMA, + src_table=GRADES_SRC_TABLE_STRING_STUDENT_CLASSIFICATION, + src_columns=GRADES_SRC_COLUMNS_STRING_STUDENT_CLASSIFICATION, + stg_conn_id=GRADES_STG_CONN_ID, + stg_db=GRADES_STG_DB, + stg_schema=GRADES_STG_SCHEMA, + stg_table=GRADES_STG_TABLE_STRING_STUDENT_CLASSIFICATION, + stg_columns=GRADES_STG_COLUMNS_STRING_STUDENT_CLASSIFICATION + ) + +#================TASK LOAD STUDENT_CLASSIFICATION=============================== + + task_load_student_classification = LoadToStageOperator( + task_id="task_load_student_classification", + src_conn_id=GRADES_SRC_CONN_ID, + src_db=GRADES_SRC_DB, + src_schema=GRADES_SRC_SCHEMA, + src_table=GRADES_SRC_TABLE_STUDENT_CLASSIFICATION, + src_columns=GRADES_SRC_COLUMNS_STUDENT_CLASSIFICATION, + stg_conn_id=GRADES_STG_CONN_ID, + stg_db=GRADES_STG_DB, + stg_schema=GRADES_STG_SCHEMA, + stg_table=GRADES_STG_TABLE_STUDENT_CLASSIFICATION, + stg_columns=GRADES_STG_COLUMNS_STUDENT_CLASSIFICATION + ) + +# =============================================================================== +# ================================INCREMENT====================================== +# =============================================================================== + +# ================TASK MAKE INCREMENT CLASSIFICATION============================= + + task_make_increment_classification = MakeIncrementOperator( + task_id="task_make_increment_classification", + source_name=GRADES_SRC_NAME, + table_name=GRADES_STG_TABLE_CLASSIFICATION, + conn_id=GRADES_STG_CONN_ID, + db=GRADES_STG_DB, + is_psc=False + ) + +# ================TASK MAKE INCREMENT STUDENT_CLASSIFICATION===================== + + task_make_increment_student_classification = MakeIncrementOperator( + task_id="task_make_increment_student_classification", + source_name=GRADES_SRC_NAME, + table_name=GRADES_STG_TABLE_STUDENT_CLASSIFICATION, + conn_id=GRADES_STG_CONN_ID, + db=GRADES_STG_DB, + is_psc=False + ) + +# =============================================================================== +# =================================TARGET======================================== +# =============================================================================== + +# ====================TASK LOAD T_KLAS_KLASIFIKACE=============================== + + task_load_t_klas_klasifikace = LoadToTargetOperator( + task_id="task_load_t_klas_klasifikace", + #===conections=== + stg_conn_id=GRADES_STG_CONN_ID, + stg_db=GRADES_STG_DB, + tg_conn_id=GRADES_TG_CONN_ID, + tg_db=GRADES_TG_DB, + #===table info=== + si_schema=GRADES_SI_SCHEMA, + si_table=GRADES_SI_TABLE_CLASSIFICATION, + si_columns=GRADES_SI_COLUMNS_CLASSIFICATION, + tg_schema=GRADES_TG_SCHEMA, + tg_table=GRADES_TG_TABLE_KLASIFIKACE, + tg_columns=GRADES_TG_COLUMNS_KLASIFIKACE, + #===info for transforamtions=== + columns_to_concat=GRADES_TG_COLUMNS_TO_CONCAT_KLASIFIKACE, + tg_dtypes=GRADES_TG_COLUMNS_DTYPES_KLASIFIKACE, + str_columns=GRADES_TG_COLUMNS_STRING_KLASIFIKACE, + timestamp_columns=GRADES_TG_COLUMNS_TIMESTAMP_KLASIFIKACE, + init_load=False, + bk_en_cz=GRADES_TG_BK_EN_CZ_KLASIFIKACE, + process_data=process_data_klasifikace.process_data + ) #=================TASK LOAD T_KLAS_KLASIFIKACE_STUDENT========================== @@ -258,20 +258,24 @@ with DAG( #=============================================================================== - # task_start_grades >> [task_load_boolean_student_classification, - # task_load_classification, - # task_load_classification_text, - # task_load_student_classification, - # task_load_classification_user, - # task_load_number_student_classification, - # task_load_string_student_classification - # ] - # task_load_classification >> task_make_increment_classification - # task_load_student_classification >> task_make_increment_student_classification - # task_make_increment_student_classification >> task_load_t_klas_klasifikace_student - # task_make_increment_classification >> task_load_t_klas_klasifikace - - task_start_grades >> task_load_t_klas_klasifikace_student + task_start_grades >> [task_load_boolean_student_classification, + task_load_classification, + task_load_classification_text, + task_load_student_classification, + task_load_classification_user, + task_load_number_student_classification, + task_load_string_student_classification + ] + task_load_classification >> task_make_increment_classification + task_load_student_classification >> task_make_increment_student_classification + [task_make_increment_student_classification, + task_load_boolean_student_classification, + task_load_classification, + task_load_classification_text, + task_load_student_classification, + task_load_classification_user] >> task_load_t_klas_klasifikace_student + [task_make_increment_classification, + task_load_classification_text] >> task_load_t_klas_klasifikace # task_start_grades >> [task_load_student_classification, task_load_t_klas_klasifikace] # task_load_student_classification >> task_make_increment_student_classification >> task_load_t_klas_klasifikace_student diff --git a/airflow/logs/scheduler/latest b/airflow/logs/scheduler/latest index ae3045cf99dbb24429fc9f809dcfa3bdc72dab0e..0f91b9c3d109ffcc6e67e2b293ebad9c8e92146a 120000 --- a/airflow/logs/scheduler/latest +++ b/airflow/logs/scheduler/latest @@ -1 +1 @@ -/home/zolockri/Documents/FIT/BP/airflow_workspace/airflow/logs/scheduler/2023-03-22 \ No newline at end of file +/home/zolockri/Documents/FIT/BP/airflow_workspace/airflow/logs/scheduler/2023-04-21 \ No newline at end of file diff --git a/airflow/plugins/custom_operators/helpers/connections.py b/airflow/plugins/custom_operators/helpers/connections.py index 8ba5d3f94bf4c9d3109ab0a0b23322a91a781b77..8d2a65c3acc6f9320834f5156389b511daaa2052 100644 --- a/airflow/plugins/custom_operators/helpers/connections.py +++ b/airflow/plugins/custom_operators/helpers/connections.py @@ -7,7 +7,7 @@ def create_connector(conn_id: str, db: str): if db: create_connector_pg(pg_conn_id=conn_id, db=db) - else + else: create_connector_oracle(oracle_conn_id=conn_id) @@ -17,7 +17,7 @@ def create_connection(conn_id: str, db: str): if conn_id in pg_id: create_connection_pg(pg_conn_id=conn_id, db=db) - else + else: create_connection_oracle(oracle_conn_id=conn_id) #=============================================================================== diff --git a/airflow/plugins/custom_operators/load_to_target_operator.py b/airflow/plugins/custom_operators/load_to_target_operator.py index 553c7862c759cb7b18d99cfb9ca738a9cff92165..c38bb41665a8713c6cb3bf7e07648bf373ee1856 100644 --- a/airflow/plugins/custom_operators/load_to_target_operator.py +++ b/airflow/plugins/custom_operators/load_to_target_operator.py @@ -188,17 +188,39 @@ class LoadToTargetOperator(BaseOperator): row['active'], row['last_update'], tg_cursor), axis=1) - # process_list = [] - # - # for row in modified_data.itertuples(index=False): - # proc_args = (row[0], row[1], row[2], tg_cursor) - # p = Process(target=self.call_inc_hist_m, args=proc_args) - # process_list.append(p) - # - # - # for p in process_list: - # p.start() - # p.join() +# from multiprocessing import Pool, cpu_count +# +# def process_row(row_data): +# with create_connection(self.src_conn_id, self.src_db) as conn: +# cursor = conn.cursor() +# call_inc_hist_m(row_data['concated'], self.tg_table, row_data['active'], row_data['last_update'], cursor) +# +# def parallel_process_rows(modified_data): +# with Pool(processes=cpu_count()) as pool: +# pool.map(process_row, modified_data.to_dict(orient='records')) + +#----------------------- +# import multiprocessing as mp +# +# # Define a wrapper function that takes a single argument (row) and calls call_inc_hist_m +# def process_row(row): +# return call_inc_hist_m(row['concated'], row['tg_table'], row['active'], row['last_update']) +# +# # Prepare your data +# modified_data = self.concat_fields(modified_data, self.columns_to_concat, self.str_columns) +# +# # Define a function to process the data in parallel +# def parallel_processing(data, func, num_cores=None): +# if num_cores is None: +# num_cores = mp.cpu_count() +# +# with mp.Pool(num_cores) as pool: +# results = pool.map(func, data.iterrows()) +# return results +# +# # Call the parallel_processing function +# parallel_processing(modified_data, process_row) + #===============================================================================