diff --git a/airflow/dags/grades/log.txt b/airflow/dags/grades/log.txt new file mode 100644 index 0000000000000000000000000000000000000000..fe06eab86faa1baaa431d90eb06c0df32dd06e4f --- /dev/null +++ b/airflow/dags/grades/log.txt @@ -0,0 +1,4 @@ +[[34m2023-03-10 14:43:22,911[0m] {[34mbase.py:[0m73} INFO[0m - Using connection ID 'dv5' for task execution.[0m +[[34m2023-03-10 14:43:22,938[0m] {[34mbase.py:[0m73} INFO[0m - Using connection ID 'grades' for task execution.[0m +COPY "ps_grades"."student_classification_airflow_orig" ("student_classification_id","note","classification_id","timestamp","classification_user_id","md5") FROM STDIN WITH (FORMAT csv, DELIMITER E'', NULL 'None') +1 diff --git a/airflow/dags/helpers/stage_functions_no_psc.py b/airflow/dags/helpers/stage_functions_no_psc.py index f21bf61a5d72229ff84e88de845dfa42f300a980..5535ce5467dc52926e1decafc4ff6853a4cf26eb 100644 --- a/airflow/dags/helpers/stage_functions_no_psc.py +++ b/airflow/dags/helpers/stage_functions_no_psc.py @@ -1,9 +1,13 @@ import pandas as pd import hashlib import numpy as np - +from sqlalchemy import text +from psycopg2 import sql +import io +import csv from connections import * from stringiteratorio import StringIteratorIO +from typing import Dict, Any, Iterator, Optional #=============================================================================== def md5_string(string): @@ -28,10 +32,12 @@ def load_table_in_chunks(**kwargs): for source_chunk in pd.read_sql_query(source_stmt, con=kwargs['src_conn'], chunksize=chunk_size): if is_loading: + print(2) chunk_with_md5(source_chunk).to_sql(kwargs['stg_table'], con=kwargs['stg_conn'], index=False, - schema=kwargs['stg_schema'], if_exists='append') + schema=kwargs['stg_schema'], if_exists='append', method='multi') else: + print(1) chunk_with_md5(source_chunk).to_sql(kwargs['stg_table'], con=kwargs['stg_conn'], index=False, schema=kwargs['stg_schema'], if_exists='replace', method='multi') @@ -89,12 +95,29 @@ def load_table_in_chunks_fast(**kwargs): # TODO: add arg cols needed to stage + src_stmt = f"SELECT {', '.join(kwargs['src_columns'])} FROM \ {kwargs['src_schema']}.{kwargs['src_table']}" - copy_to_stage_stmt = f"COPY {kwargs['stg_schema']}.{kwargs['stg_table']} \ - ({', '.join(kwargs['stg_columns'])}) \ - FROM STDIN WITH (FORMAT csv, DELIMITER E'\x1f', NULL 'None')" + # copy_to_stage_stmt1 = f"COPY {kwargs['stg_schema']}.{kwargs['stg_table']} \ + # ({', '.join(kwargs['stg_columns'])}) \ + # FROM STDIN WITH (FORMAT csv, DELIMITER E'\x1f', NULL 'None')" + + # print(copy_to_stage_stmt1) + + copy_to_stage_stmt = sql.SQL("""COPY {stg_schema}.{stg_table} \ + ({columns}) FROM STDIN WITH (FORMAT csv, DELIMITER E'\x1f', NULL 'None')""").format( + stg_schema=sql.Identifier(kwargs['stg_schema']), + stg_table=sql.Identifier(kwargs['stg_table']), + columns=sql.SQL(',').join(map(sql.Identifier, kwargs['stg_columns'])) + ) + print(copy_to_stage_stmt.as_string(kwargs['stg_cursor'])) for src_chunk in pd.read_sql_query(src_stmt, con=kwargs['src_conn'], chunksize=chunk_size): + src_chunk = src_chunk.replace(r'\n', ' ', regex=True) + src_chunk.replace(to_replace=r'\r', value='1', regex=True, inplace=True) + + + src_chunk.replace({np.nan: None}, inplace=True) + # print(src_chunk.loc['364057']) + print(1) if is_loading: print(2) @@ -111,9 +134,13 @@ def load_table_in_chunks_fast(**kwargs): # TODO: add arg cols needed to stage + #=============================================================================== +# def clean_csv_value(value: Optional[Any]) -> str: +# return repr(str(value).replace('\n', '\\n')) + def get_data_iterator(data): - data = [tuple(row) for row in data.itertuples(index=False)] + data = [tuple(row) for row in data.itertuples(index=False)] #todo try replace here + return StringIteratorIO(( - '\\x1f'.join(map(str, row)) + '\n' + "\x1f".join(map(str, row)) + '\n') for row in data - )) + ) diff --git a/airflow/dags/helpers/stringiteratorio.py b/airflow/dags/helpers/stringiteratorio.py index 3094297d30a105c63bb4788fd38e3412c589f8ee..48fcbc242ded6ec2124f4eccb993640e74604319 100644 --- a/airflow/dags/helpers/stringiteratorio.py +++ b/airflow/dags/helpers/stringiteratorio.py @@ -33,7 +33,7 @@ class StringIteratorIO(io.TextIOBase): break n -= len(m) line.append(m) - return ''.join(line) + return ''.join(line).replace('"', '""""').replace('"', '\"') def __enter__(self): return self diff --git a/airflow/dags/helpers/target_functions.py b/airflow/dags/helpers/target_functions.py index 67bc92eb99ef020965abd3dbf6a33f63c7c60437..6954a6892919ac39579680b0c3df749648e00d4c 100644 --- a/airflow/dags/helpers/target_functions.py +++ b/airflow/dags/helpers/target_functions.py @@ -89,7 +89,7 @@ def load_new(new_data, tg_conn, tg_columns, tg_dtypes, tg_schema, tg_table, fk_c def get_data_iterator(data): data = [tuple(row) for row in data.itertuples(index=False)] return StringIteratorIO(( - "\x1f".join(map(str, row)) + '\n' + "\x1f".join(map(str, row)) + '\r\n' for row in data )) diff --git a/airflow/dags/test_examples/uvozovka.py b/airflow/dags/test_examples/uvozovka.py new file mode 100644 index 0000000000000000000000000000000000000000..4541a4b599d13773e457bfc6c5afa4c1c7a827b0 --- /dev/null +++ b/airflow/dags/test_examples/uvozovka.py @@ -0,0 +1,71 @@ +# row = (23562, 'helooo erger', 2342) +# +# x = "\x1f".join(map(str, row)) +# +# print(x) +import pandas as pd + +from typing import Dict, Any, Iterator, Optional +import io +class StringIteratorIO(io.TextIOBase): + def __init__(self, iter: Iterator[str]): + self._iter = iter + self._buff = '' + + def readable(self) -> bool: + return True + + def _read1(self, n: Optional[int] = None) -> str: + while not self._buff: + try: + self._buff = next(self._iter) + except StopIteration: + break + ret = self._buff[:n] + self._buff = self._buff[len(ret):] + print(ret + "je to z fce _read1") + return ret + + def read(self, n: Optional[int] = None) -> str: + line = [] + print(f"zacatek n je {n}") + if n is None or n < 0: + print("ass") + while True: + m = self._read1() + if not m: + break + line.append(m) + else: + while n > 0: + m = self._read1(n) + if not m: + break + print(f"n je {n}") + print(f"m je {m}") + + n -= len(m) + print(n) + line.append(m) + print("line je ----") + print(repr(''.join(line))) + + print('bez pepru ---------------------------') + print(''.join(line)) + return ''.join(line) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self._iter = None + + + +data = [(232627, 'blabla "erge', 4057, pd.Timestamp('2019-05-14 10:47:59.794000'), 503, '159895abd119aa1cd6cbdedf14cd55f5'), (232627, 'blabla "erge', 4057, pd.Timestamp('2019-05-14 10:47:59.794000'), 503, '159895abd119aa1cd6cbdedf14cd55f5')] + +f = StringIteratorIO(( + "\x1f".join(map(str, row)) + '\n') + for row in data + ) +f.read(200) diff --git a/airflow/logs/scheduler/latest b/airflow/logs/scheduler/latest index 7ed9629229c5c3fac0eb5ff9a715827b0dfda5a5..8bd04b87f9432f29b98acca29ede37438d80c9b6 120000 --- a/airflow/logs/scheduler/latest +++ b/airflow/logs/scheduler/latest @@ -1 +1 @@ -/home/zolockri/Documents/FIT/BP/airflow_workspace/airflow/logs/scheduler/2023-03-07 \ No newline at end of file +/home/zolockri/Documents/FIT/BP/airflow_workspace/airflow/logs/scheduler/2023-03-10 \ No newline at end of file