diff --git a/airflow/plugins/custom_operators/load_to_stage_no_psc_operator.py b/airflow/plugins/custom_operators/load_to_stage_no_psc_operator.py index c0ba056a7d9cf650fa45c1045df2d64af6b80925..2c7b2c5914a9a0642a55062ed17d8f6b49284867 100644 --- a/airflow/plugins/custom_operators/load_to_stage_no_psc_operator.py +++ b/airflow/plugins/custom_operators/load_to_stage_no_psc_operator.py @@ -32,6 +32,9 @@ class LoadToStageNoPSCOperator(BaseOperator): self.stg_table = stg_table self.stg_columns = stg_columns + def execute(self, context): + self.load_table() + #=============================================================================== def md5_string(self, string): @@ -63,12 +66,12 @@ class LoadToStageNoPSCOperator(BaseOperator): #=============================================================================== - def alter_owner_to_big(self, schema_name, table_name, cursor): + def alter_owner_to_big(self, cursor): from psycopg2 import sql alter_stmt = sql.SQL("ALTER TABLE {schema_name}.{table_name} \ OWNER TO big;").format( - schema_name=sql.Identifier(schema_name), - table_name=sql.Identifier(table_name) + schema_name=sql.Identifier(self.schema_name), + table_name=sql.Identifier(self.table_name) ) cursor.execute(alter_stmt) @@ -83,24 +86,15 @@ class LoadToStageNoPSCOperator(BaseOperator): is_loading = False chunk_size = 10**5 - # src_stmt = sql.SQL("SELECT {src_columns} FROM {src_schema}.{src_table}").format( - # src_columns=sql.SQL(',').join(sql.Identifier(n) for n in kwargs['src_columns']), - # src_schema=sql.Identifier(kwargs['src_schema']), - # src_table=sql.Identifier(kwargs['src_table']) - # ).as_string(kwargs['src_conn']) - # - # print(src_stmt) - # raise - - src_stmt = f"SELECT {', '.join(kwargs['src_columns'])} FROM \ - {kwargs['src_schema']}.{kwargs['src_table']}" + src_stmt = f"SELECT {', '.join(self.src_columns)} FROM \ + {self.src_schema}.{self.src_table};" copy_to_stage_stmt = sql.SQL("""COPY {stg_schema}.{stg_table} \ ({columns}) FROM STDIN WITH (FORMAT csv, DELIMITER E'\x1f', NULL 'None')""").format( - stg_schema=sql.Identifier(kwargs['stg_schema']), - stg_table=sql.Identifier(kwargs['stg_table']), - columns=sql.SQL(',').join(sql.Identifier(n) for n in kwargs['stg_columns'])) + stg_schema=sql.Identifier(self.stg_schema), + stg_table=sql.Identifier(self.stg_table), + columns=sql.SQL(',').join(sql.Identifier(n) for n in self.stg_columns)) str_cols = [] @@ -111,18 +105,16 @@ class LoadToStageNoPSCOperator(BaseOperator): src_chunk[str_cols] = src_chunk[str_cols].replace(to_replace=r'\n', value=' ', regex=True) src_chunk[str_cols] = src_chunk[str_cols].replace(to_replace=r'\r', value='', regex=True) - src_chunk.replace({np.nan: None}, inplace=True) # src_chunk = src_chunk.replace({pd.NaT: None}).replace({np.NaN: None}) - print(1) if is_loading: data_iterator = self.get_data_iterator(self.chunk_with_md5(src_chunk)) kwargs['stg_cursor'].copy_expert(copy_to_stage_stmt, data_iterator) else: - trunc_stmt = sql.SQL("TRUNCATE TABLE {stg_schema}.{stg_table}").format( - stg_schema=sql.Identifier(kwargs['stg_schema']), - stg_table=sql.Identifier(kwargs['stg_table']) + trunc_stmt = sql.SQL("TRUNCATE TABLE {stg_schema}.{stg_table};").format( + stg_schema=sql.Identifier(self.stg_schema), + stg_table=sql.Identifier(self.stg_table) ) kwargs['stg_cursor'].execute(trunc_stmt) data_iterator = self.get_data_iterator(self.chunk_with_md5(src_chunk)) @@ -134,23 +126,17 @@ class LoadToStageNoPSCOperator(BaseOperator): def load_table(self, **kwargs): # source_table_name, source_schema_name, source_columns, stage_table_name, stage_schema_name # with create_connector_pg(kwargs['stg_conn_id'], kwargs['stg_db']) as stg_conn: - with create_connector_pg(kwargs['stg_conn_id'], kwargs['stg_db']) as stg_conn: + with create_connector_pg(self.stg_conn_id, self.stg_db) as stg_conn: try: - src_conn = create_connection_pg(kwargs['src_conn_id'], kwargs['src_db']) - + src_conn = create_connection_pg(self.src_conn_id, self.src_db) + stg_cursor = stg_conn.cursor() self.load_table_in_chunks( src_conn=src_conn, - src_schema=kwargs['src_schema'], - src_table=kwargs['src_table'], - src_columns=kwargs['src_columns'], - stg_cursor=stg_conn.cursor(), - stg_schema=kwargs['stg_schema'], - stg_table=kwargs['stg_table'], - stg_columns=kwargs['stg_columns']) + stg_cursor=stg_cursor) - self.alter_owner_to_big(kwargs['stg_schema'], - kwargs['stg_table'], stg_conn.cursor()) + self.alter_owner_to_big(stg_cursor) + stg_cursor.close() stg_conn.commit() except: stg_conn.rollback() @@ -160,17 +146,3 @@ class LoadToStageNoPSCOperator(BaseOperator): src_conn.close() #=============================================================================== - - def execute(self, context): - self.load_table( - src_conn_id=self.src_conn_id, - src_db=self.src_db, - stg_conn_id=self.stg_conn_id, - stg_db=self.stg_db, - src_schema=self.src_schema, - src_table=self.src_table, - src_columns=self.src_columns, - stg_schema=self.stg_schema, - stg_table=self.stg_table, - stg_columns=self.stg_columns - ) diff --git a/airflow/plugins/custom_operators/load_to_target_operator.py b/airflow/plugins/custom_operators/load_to_target_operator.py index 887279f9a2dde41a7d2ab41bde81cee86bb31b78..dae4c26d54a023c26ad4c6f3660c123398adfe96 100644 --- a/airflow/plugins/custom_operators/load_to_target_operator.py +++ b/airflow/plugins/custom_operators/load_to_target_operator.py @@ -7,16 +7,28 @@ class LoadToTargetOperator(BaseOperator): @apply_defaults def __init__( self, - tg_conn_id, - tg_db, - stg_conn_id, - stg_db, - src_columns, + #===conections=== stg_conn_id, stg_db, - stg_schema, - stg_table, - stg_columns, + tg_conn_id, + tg_db, + #===table info=== + si_schema, + si_table + si_columns, + bk_en_cz, + tg_schema, + tg_table, + tg_columns, + #===info for transforamtions=== + columns_to_concat, + tg_dtypes, + str_columns, + fk_columns, + timestamp_columns, + lookups + + *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.src_conn_id = src_conn_id @@ -29,3 +41,178 @@ class LoadToTargetOperator(BaseOperator): self.stg_schema = stg_schema self.stg_table = stg_table self.stg_columns = stg_columns + + def execute(self, contex): + self.load_table_to_tg() + +#=============================================================================== + + def load_table_to_tg(self, **kwargs): + with create_connector_pg(self.tg_conn_id, self.tg_db) as tg_conn: + try: + stg_conn = create_connection_pg(self.stg_conn_id, self.stg_db) + tg_cursor = tg_conn.cursor() + self.load_to_target( + stg_conn=stg_conn, + tg_cursor=tg_cursor + ) + tg_cursor.close() + tg_conn.commit() + except: + tg_conn.rollback() + raise + finally: + stg_conn.close() + +#=============================================================================== + + def load_to_target(self, **kwargs): + chunk_size = 10**5 + si_column_aliases_lst = [f"{key} as {value}" for key, value in self.si_columns.items()] + table_input_stmt = f"SELECT {', '.join(si_column_aliases_lst)} FROM \ + {self.si_schema}.{self.si_table} WHERE state IS NOT NULL" + + for chunk in pd.read_sql_query(table_input_stmt, + con=kwargs['stg_conn'], chunksize=chunk_size): + + for lookup in lookups: + chunk = self.database_lookup(lookup, chunk) # TODO: add passing lookup arguments + + self.process_data(chunk=chunk) + + self.load_data(data=processed_data, tg_cursor=kwargs['tg_cursor']) + +#=============================================================================== + + def load_data(self, data, tg_cursor): + # divide data into N, M, D groups by state + grouped_by_state = data.groupby('state') + states = set(data['state'].unique()) + + if 'N' in states: + new = grouped_by_state.get_group('N') + self.load_new(self, new_data=new, tg_cursor=tg_cursor) + + if 'M' in states: + modified = grouped_by_state.get_group('M') + self.load_modified(self, modified_data=modified, tg_cursor=tg_cursor) + + if 'D' in states: + deleted = grouped_by_state.get_group('D') + self.load_deleted(self, deleted, self.bk_en_cz[1], tg_cursor) + +#=============================================================================== + + def load_new(self, new_data, tg_cursor): + + new_data['last_update'] = new_data['last_update'].dt.floor('D') + new_data.loc[:, 'version'] = 1 + + if self.init_load: + new_data['date_from'] = '1900-01-01 00:00:00' + else: + new_data.loc[:, 'date_from'] = new_data['last_update'].copy() + + new_data['date_to'] = '2199-12-31 00:00:00' + last_update_col = new_data.pop('last_update') + new_data['last_update'] = last_update_col + new_to_load = new_data[self.tg_columns].copy() + + float_cols = new_to_load.select_dtypes(include=['float64']).columns + for col in float_cols: + new_to_load[col] = new_to_load[col].apply(lambda x: '{:.0f}'.format(x) if x.is_integer() else 'None') + + new_to_load = new_to_load.replace(r'\n', ' ', regex=True) + + for ts_col in self.timestamp_columns: + new_to_load[ts_col] = new_to_load[ts_col].astype(object).where(new_to_load[ts_col].notnull(), None) + + + copy_to_stage_stmt = sql.SQL("""COPY {tg_schema}.{tg_table} \ + ({columns}) FROM STDIN WITH (FORMAT csv, DELIMITER E'\x1f', NULL 'None')""").format( + tg_schema=sql.Identifier(self.tg_schema), + tg_table=sql.Identifier(self.tg_table), + columns=sql.SQL(',').join(sql.Identifier(n) for n in self.tg_columns)) + + + data_iterator = self.get_data_iterator(self, new_to_load) + tg_cursor.copy_expert(copy_to_target_stmt, data_iterator) + +#=============================================================================== + + def load_modified(self, modified_data, tg_cursor): + modified_data = self.concat_fields(modified_data, self.columns_to_concat, + self.str_columns) + # modified_data.to_csv('/home/zolockri/Documents/suuuka.csv') + modified_data.apply(lambda row: self.call_inc_hist_m(row['concated'], self.tg_table, + row['active'], row['last_update'], tg_cursor), axis=1) + +#=============================================================================== + + def concat_fields(self, data): + delimiter = '\\x1f' + float_cols = data.select_dtypes(include=['float64']).columns + for col in float_cols: + new_to_load[col] = new_to_load[col].apply(lambda x: '{:.0f}'.format(x) if x.is_integer() else 'None') + + data.fillna('', inplace=True) + data = data.astype(str) + data[self.str_columns] = data[self.str_columns].apply(lambda x: x.str.strip()) + data['concated'] = data[self.columns_to_concat].apply( + lambda x: (delimiter.join(x).replace("'", "''")).replace("%", "%%"), + axis=1) + return data + +#=============================================================================== + + def call_inc_hist_m(self, concated, active, last_update, tg_cursor): + tg_cursor.callproc("public.inc_historize_case_m", [concated, self.tg_table, active, last_update]) + +#=============================================================================== + + def call_delete_update(self, bk_str, bk, tg_cursor): + delete_update_stmt = sql.SQL("UPDATE dwh.{tg_table} \ + SET date_to = CURRENT_TIMESTAMP \ + WHERE {bk_str} = {bk} \ + AND (date_to = '2199-12-31 00:00:00');").format( + tg_table=sql.Identifier(self.tg_table), + bk_str=sql.Identifier(bk_str), + bk=sql.Identifier(bk) + ) + tg_cursor.execute(delete_update_stmt) + +#=============================================================================== + + def load_deleted(self, deleted_data, bk_str, tg_cursor): + print("start load_del") + deleted_data.apply(lambda row: call_delete_update(bk_str, row[bk_str], + self.tg_table, tg_cursor), axis=1) + print("end load_del") + +#=============================================================================== + + def database_lookup(self, **kwargs): # TODO: add documentation + with create_connection_pg(self.stg_conn_id, self.stg_db) as stage_conn: + try: + bk_lst = list(set([str(i) for i in kwargs['data_chunk'][self.bk_en_cz[1]].tolist()])) + columns = [f"{key} as {value}" for key, value in kwargs['cols_to_select'].items()] + + lookup_query = f"SELECT {', '.join(columns)} \ + FROM {kwargs['lookup_schema']}.{kwargs['lookup_table']} \ + WHERE {kwargs['bk_en_cz'][0]} IN ({', '.join(bk_lst)}) \ + AND {kwargs['constraint']}" + + lookup_res = pd.read_sql_query(lookup_query, con=stage_conn) + res_chunk = pd.merge(kwargs['data_chunk'], lookup_res, how='left', on=kwargs['bk_en_cz'][1]) + # looked_up_col = res_chunk.pop(kwargs['col_to_add']) + # res_chunk.insert(kwargs['order_in_table'], kwargs['col_to_add'], looked_up_col) + return res_chunk + + except Exception as e: + email_sub = f"Lookup in table {kwargs['lookup_table']} FAILED" + email_msg = str(e) + send_email(email_sub, email_msg) + raise Exception(f"Lookup in table {kwargs['lookup_table']} FAILED") + + finally: + stage_conn.close() diff --git a/airflow/plugins/custom_operators/make_increment_no_psc_operator.py b/airflow/plugins/custom_operators/make_increment_no_psc_operator.py index 5c81a22a94fbcba9ffcb04c96ce061c8aa2dfb4d..a2a9954ab78b2a74d2fec64a1b7870ab77056abc 100644 --- a/airflow/plugins/custom_operators/make_increment_no_psc_operator.py +++ b/airflow/plugins/custom_operators/make_increment_no_psc_operator.py @@ -19,34 +19,23 @@ class MakeIncrementNoPSCOperator(BaseOperator): self.db = db def execute(self, context): - self.make_increment( - source_name=self.source_name, - table_name=self.table_name, - conn_id=self.conn_id, - db=self.db - ) + self.make_increment() #=============================================================================== def make_increment(self, **kwargs): - self.inc_clear_state_flag(schema_name='si_' + kwargs['source_name'], - table_name=kwargs['table_name'], - conn_id=kwargs['conn_id'], - db=kwargs['db']) - self.get_M_N_D_flags(schema_name='ps_' + kwargs['source_name'], - table_name=kwargs['table_name'], - conn_id=kwargs['conn_id'], - db=kwargs['db']) + self.inc_clear_state_flag(schema_name='si_' + self.source_name) + self.get_M_N_D_flags(schema_name='ps_' + self.source_name) #=============================================================================== - def inc_clear_state_flag(self, schema_name: str, table_name: str, conn_id: str, db: str): - with create_connector_pg(conn_id, db) as conn: + def inc_clear_state_flag(self, **kwargs): + with create_connector_pg(self.conn_id, self.db) as conn: try: cursor = conn.cursor() cursor.callproc("public.inc_clear_state_flag_schema_table_args", - [schema_name, table_name]) - print(f"State flags for table {schema_name}.{table_name} cleared") + [kwargs['schema_name'], self.table_name]) + print(f"State flags for table {kwargs['schema_name']}.{self.table_name} cleared") cursor.close() conn.commit() @@ -57,22 +46,22 @@ class MakeIncrementNoPSCOperator(BaseOperator): #=============================================================================== - def get_M_N_D_flags(self, schema_name: str, table_name: str, conn_id: str, db: str): + def get_M_N_D_flags(self, **kwargs): with create_connector_pg(conn_id, db) as conn: try: cursor = conn.cursor() cursor.callproc("public.inc_find_modified_in_pre_stage_schema_table_args", - [schema_name, table_name]) - print(f"State flags M for table {schema_name}.{table_name} got") + [kwargs['schema_name'], self.table_name]) + print(f"State flags M for table {kwargs['schema_name']}.{self.table_name} got") cursor.callproc("public.inc_find_new_in_pre_stage_schema_table_args", - [schema_name, table_name]) - print(f"State flags N for table {schema_name}.{table_name} got") + [kwargs['schema_name'], self.table_name]) + print(f"State flags N for table {kwargs['schema_name']}.{self.table_name} got") cursor.callproc("public.inc_find_deleted_in_pre_stage_schema_table_args", - [schema_name, table_name]) + [kwargs['schema_name'], self.table_name]) - print(f"State flags D for table {schema_name}.{table_name} got") + print(f"State flags D for table {kwargs['schema_name']}.{self.table_name} got") cursor.close() conn.commit() diff --git a/airflow/target_transformations/__init__.py b/airflow/target_transformations/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/airflow/target_transformations/process_data_klasifikace.py b/airflow/target_transformations/process_data_klasifikace.py new file mode 100644 index 0000000000000000000000000000000000000000..b5f7618ddc260f8c21b9b6813ed23734d7e5059a --- /dev/null +++ b/airflow/target_transformations/process_data_klasifikace.py @@ -0,0 +1,10 @@ + +def process_data(**kwargs): # should be manually adjust for every table + # other transformations; in this case replace boolean value to varchar Y/N + # boolean_to_YN = lambda x: 'Y' if x else 'N' + boolean_to_YN = lambda x: 'Y' if x is True else ('N' if x is False else None) + + boolean_columns = data.select_dtypes(include='bool').columns + data[boolean_columns] = data[boolean_columns].apply(lambda x: x.apply(boolean_to_YN)) + # print(processed_data.dtypes) + return data