Airflow Indexima Hook and Operator
Versions following Semantic Versioning
Indexima Airflow integration based on pyhive.
This project is used in our prod environment with success.
As it a young project, take care of change, any help is welcome :)
Install this library directly into an activated virtual environment:
$ pip install airflow-indexima
or add it to your Poetry project:
$ poetry add airflow-indexima
or you could use it as an Airflow plugin
After installation, the package can imported:
$ python
>>> import airflow_indexima
>>> airflow_indexima.__version__
from airflow_indexima.operators import IndeximaQueryRunnerOperator
...
with dag:
...
op = IndeximaQueryRunnerOperator(
task_id = 'my-task-id',
sql_query= 'DELETE FROM Client WHERE GRPD = 1',
indexima_conn_id='my-indexima-connection'
)
...
from airflow_indexima.operators.indexima import IndeximaLoadDataOperator
...
with dag:
...
op = IndeximaLoadDataOperator(
task_id = 'my-task-id',
indexima_conn_id='my-indexima-connection',
target_table='Client',
source_select_query='select * from dsi.client',
truncate=True,
load_path_uri='jdbc:redshift://my-private-instance.com:5439/db_client?ssl=true&user=airflow-user&password=XXXXXXXX'
)
...
In order to get jdbc uri from an Airflow Connection, you could use:
get_redshift_load_path_uri
get_postgresql_load_path_uri
from module airflow_indexima.uri
Both method have this profile: Callable[[str, Optional[ConnectionDecorator]], str]
Example:
get_postgresql_load_path_uri(connection_id='my_conn')
>> 'jdbc:postgresql://my-db:5432/db_client?ssl=true&user=airflow-user&password=XXXXXXXX'
PyHive supported authentication mode:
You could set those parameters:
host
, port
, username
and password
came from airflow Connection configuration.
timeout_seconds
, socket_keepalive
, auth
and kerberos_service_name
parameters can came from:
extra
parameter, like this:
'{"auth": "CUSTOM", "timeout_seconds": 90, "socket_keepalive": true}'
Setted attribut override airflow connection configuration.
You could add a decorator function in order to post process Connection before usage.
This decorator will be executed after connection configuration (see next section).
If you use another backend to store your password (like AWS SSM), you could define a decorator
and use it as a function in your dag.
from airflow.models import Connection
from airflow import DAG
from airdlow_indexima.uri import define_load_path_factory, get_redshift_load_path_uri
def my_decorator(conn:Connection) -> Connection:
# conn instance will be not shared, and use only on connection request
conn.password = get_ssm_parameter(param_name=f'{conn.conn_id}.{con.login}')
return conn
dag = DAG(
dag_id='my_dag',
user_defined_macros={
# we define a macro get_load_path_uri
'get_load_path_uri': define_load_path_factory(
conn_id='my-redshift-connection',
decorator=my_decorator,
factory=get_redshift_load_path_uri)
},
...
)
with dag:
...
op = IndeximaLoadDataOperator(
task_id = 'my-task-id',
indexima_conn_id='my-indexima-connection',
target_table='Client',
source_select_query='select * from dsi.client',
truncate=True,
load_path_uri='{{ get_load_path_uri() }}'
)
...
a Connection decorator must follow this type: ConnectionDecorator = Callable[[Connection], Connection]
define_load_path_factory
is a function which take:
ConnectionDecorator
UriGeneratorFactory = Callable[[str, Optional[ConnectionDecorator]], str]
and return a function with no argument which can be called as a macro in dag’s operator.
On each operator you could set this member:
(could be an int or a timedelta)
Note:
In production, you could have few strange behaviour like those that we have meet.
You could fine this issue https://github.com/dropbox/PyHive/issues/240 on long load query running.
Try this in sequence:
timeout_seconds
member to 3600 second for example.if your facing a broken pipe, after 300s, and you have an AWS NLB V2 :
Read again network-load-balancers, and focus on this:
Elastic Load Balancing sets the idle timeout value for TCP flows to 350 seconds. You cannot modify this value. For TCP listeners, clients or targets can use TCP keepalive packets to reset the idle timeout. TCP keepalive packets are not supported for TLS listeners.
We have tried for you the “socket_keep_alive”, and it did not work at all.
Our solution was to remove our NLB and use a simple dns A field on indexima master.
Be very welcome to add { "serialization.encoding": "utf-8"}
in hive_configuration member of IndeximaHook.
This setting is set in IndeximaHook.init, may you override it ?
When I was trying many little things and deals with hive stuff, i wrote a single script that help me a lot.
Feel free to use it (or not) to set your dag by yourself:
import os
import datetime
from airflow.hooks.base_hook import BaseHook
from airflow import DAG
from airflow_indexima.operators.indexima import IndeximaLoadDataOperator
# here we create our Airflow Connection
os.environ['AIRFLOW_CONN_INDEXIMA_ID'] = 'hive://my-user:my-password@my-server:10000/default'
conn = BaseHook.get_connection('indexima_id')
dag = DAG(
dag_id='my_dag',
default_args={
'start_date': datetime.datetime(year=2019, month=12, day=1),
'depends_on_past': False,
'email_on_failure': False,
'email': [],
},
)
with dag:
load_operator = IndeximaLoadDataOperator(
task_id='my_task',
indexima_conn_id='indexima_id',
target_table='my_table',
source_select_query=(
"select * from source_table where "
"creation_date_tms between '2019-11-30T00:00:00+00:00' and '2019-11-30T12:59:59.000999+00:00'"
),
truncate=True,
truncate_sql=(
"DELETE FROM my_table WHERE "
"creation_date_tms between '2019-11-30T00:00:00+00:00' and '2019-11-30T12:59:59.000999+00:00'"
),
load_path_uri='jdbc:postgresql://myserver:5439/db_common?user=etl_user&password=a_strong_password&ssl=true',
retries=2,
execution_timeout=datetime.timedelta(hours=3),
sla=datetime.timedelta(hours=1, minutes=30),
)
# here we run the dag
load_operator.execute(context={})
del os.environ['AIRFLOW_CONN_INDEXIMA_ID']
See Contributing
Thanks to @bartosz25 for his help with hive connection details…