Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
ETL Parallelization - Apache Airflow
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Package Registry
Model registry
Operate
Terraform modules
Monitor
Service Desk
Analyze
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Kristina Zolochevskaia
ETL Parallelization - Apache Airflow
Commits
6efd403b
Commit
6efd403b
authored
1 year ago
by
Kristina Zolochevskaia
Browse files
Options
Downloads
Patches
Plain Diff
usermap to stage ready pizdec
parent
6d07dc90
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
airflow/plugins/custom_operators/helpers/pd_pg_dtypes.py
+11
-11
11 additions, 11 deletions
airflow/plugins/custom_operators/helpers/pd_pg_dtypes.py
airflow/plugins/custom_operators/load_to_stage_operator.py
+32
-4
32 additions, 4 deletions
airflow/plugins/custom_operators/load_to_stage_operator.py
with
43 additions
and
15 deletions
airflow/plugins/custom_operators/helpers/pd_pg_dtypes.py
+
11
−
11
View file @
6efd403b
...
...
@@ -2,14 +2,14 @@ import pandas as pd
import
numpy
as
np
from
sqlalchemy.sql.sqltypes
import
*
def
transform_value
(
value
):
if
pd
.
isna
(
value
)
or
value
is
None
:
return
"
NULL
"
elif
value
==
int
(
value
):
return
int
(
value
)
def
pandas_to_postgres
(
df
,
dtypes
):
for
col
,
dtype
in
dtypes
.
items
():
if
dtype
==
'
int32
'
:
df
[
col
]
=
df
[
col
].
apply
(
transform_value
).
astype
(
'
object
'
)
return
df
#
def transform_value(value):
#
if pd.isna(value) or value is None:
#
return "NULL"
#
elif value == int(value):
#
return int(value)
#
#
def pandas_to_postgres(df, dtypes):
#
for col, dtype in dtypes.items():
#
if dtype == 'int32':
#
df[col] = df[col].apply(transform_value).astype('object')
#
return df
This diff is collapsed.
Click to expand it.
airflow/plugins/custom_operators/load_to_stage_operator.py
+
32
−
4
View file @
6efd403b
...
...
@@ -3,13 +3,14 @@ from airflow.utils.decorators import apply_defaults
from
custom_operators.helpers.connections
import
*
from
custom_operators.helpers.dwh_email
import
*
from
custom_operators.helpers.pd_pg_dtypes
import
*
#
from custom_operators.helpers.pd_pg_dtypes import *
# import os
# os.environ["MODIN_ENGINE"] = "dask"
#
# import modin.pandas as pd
import
pandas
as
pd
import
numpy
as
np
class
LoadToStageOperator
(
BaseOperator
):
@apply_defaults
...
...
@@ -85,6 +86,7 @@ class LoadToStageOperator(BaseOperator):
#===============================================================================
def
load_table_in_chunks
(
self
,
**
kwargs
):
# TODO: add arg cols needed to stage + add getting coursor from connections
# import modin.pandas as pd
import
numpy
as
np
...
...
@@ -113,12 +115,38 @@ class LoadToStageOperator(BaseOperator):
src_chunk
[
str_cols
]
=
src_chunk
[
str_cols
].
replace
(
to_replace
=
r
'
\n
'
,
value
=
'
'
,
regex
=
True
)
src_chunk
[
str_cols
]
=
src_chunk
[
str_cols
].
replace
(
to_replace
=
r
'
\r
'
,
value
=
''
,
regex
=
True
)
src_chunk
.
replace
({
np
.
nan
:
None
},
inplace
=
True
)
#TODO conversion float64 to object with NULL a ints
src_chunk
=
pandas_to_postgres
(
src_chunk
,
self
.
stg_columns_dtype
)
# src_chunk.replace({np.nan: None}, inplace=True)!!!!!!!!!!!!!
# TODO conversion float64 to object with NULL a ints
def
convert_to_int
(
value
):
if
value
is
None
:
return
None
elif
value
==
int
(
value
):
return
int
(
value
)
else
:
return
value
int_dtype
=
'
int32
'
int_cols
=
[
col
for
col
,
dtype
in
self
.
stg_columns_dtype
.
items
()
if
dtype
==
int_dtype
]
print
(
int_cols
)
# src_chunk[int_cols] = src_chunk[int_cols].fillna(value=None)
src_chunk
[
int_cols
].
to_csv
(
"
/home/zolockri/Documents/FIT/BP/chunkc1.csv
"
)
# raise
src_chunk
=
src_chunk
.
fillna
(
np
.
nan
).
replace
([
np
.
nan
],
[
None
])
for
col
in
int_cols
:
src_chunk
[
col
]
=
src_chunk
[
col
].
astype
(
'
Int64
'
)
src_chunk
[
col
]
=
src_chunk
[
col
].
replace
({
pd
.
NA
:
None
})
# src_chunk[col] = src_chunk[col].apply(lambda x: int(x) if pd.notna(x) else x)
# src_chunk[int_cols] = src_chunk[int_cols].applymap(transform_value)
# .apply(lambda x: int(x) if pd.notna(x) and x == int(x) else x)
# src_chunk[int_cols] = src_chunk[int_cols].applymap(convert_to_int)
print
(
src_chunk
.
info
())
# src_chunk = src_chunk.replace({pd.NaT: None}).replace({np.NaN: None})
(
self
.
chunk_with_md5
(
src_chunk
)).
to_csv
(
"
/home/zolockri/Documents/FIT/BP/chunkc.csv
"
)
# print(self.chunk_with_md5(src_chunk[int_cols]))
# print((self.chunk_with_md5(src_chunk[int_cols]))['stat_narozeni'].dtype.unique())
# raise
print
(
f
"
Inserting data to
{
self
.
stg_schema
}
.
{
self
.
stg_table
}
"
)
if
is_loading
:
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment