项目作者: outrauk

项目描述 :
DSS plugin for fast loading between Snowflake and HDFS
高级语言: Python
项目地址: git://github.com/outrauk/dataiku-plugin-snowflake-hdfs.git
创建时间: 2019-08-08T08:09:50Z
项目社区:https://github.com/outrauk/dataiku-plugin-snowflake-hdfs

开源协议:MIT License

下载


Snowflake Tools

This is a Dataiku plugin that makes it easy to enable fast loads of files between Snowflake and S3.

Supported file formats include JSON Lines and Parquet (as an HDFS dataset).

Prerequisites

  • An HDFS connection in Dataiku that points to an S3 bucket
  • A Snowflake S3 STAGE that points to the same S3 bucket and path as DSS’s managed HDFS connection

For JSON:

  • A managed folder in Dataiku that points to an S3 bucket
  • A Snowflake S3 STORAGE_INTEGRATION that points to the same S3 bucket and path as DSS’s managed S3 connection

Your HDFS connection here:

image

Should match the URL here:

  1. CREATE OR REPLACE STAGE YOUR_ACCOUNT_DATAIKU_EMR_MANAGED_STAGE
  2. URL = 's3://your-account-dataiku-emr/data'
  3. CREDENTIALS = (AWS_ROLE = 'arn:aws:iam::123456:role/SnowflakeCrossAccountRole');
  4. GRANT USAGE ON YOUR_ACCOUNT_DATAIKU_EMR_MANAGED_STAGE TO DSS_SF_ROLE_NAME;

Note that this example uses an AWS IAM role for securing the stage’s connection to your S3 bucket. There’s no reason a stage secured using AWS access keys wouldn’t work, but it has not been tested.

Installing

You can install the plugin by referencing this GitHub repository and following these instructions.

Or, you can create a Zip file and following these instructions. To create the Zip file, you’ll need to build it:

  1. Ensure you have json_pp and node installed locally
  2. make plugin
  3. Upload the zip file from the /dist directory

Configuring

You can (optionally) configure a Default Snowflake Stage in the plugin’s settings. For example, the STAGE created above would be entered as @PUBLIC.YOUR_ACCOUNT_DATAIKU_EMR_MANAGED_STAGE.

When using the recipe, you can override the default stage in the Snowflake Stage setting.

Usage

Snowflake → HDFS

  1. In a Dataiku flow, select an existing Snowflake dataset (note that the dataset must point to a Snowflake table; it can’t be a query)
  2. From the Plugin recipes section on the right, click Snowfl… (it uses exchange arrows as the icon)
  3. From the popup, pick Sync Snowflake to HDFS
  4. Make sure your existing Snowflake table is the Input and set a new or existing HDFS dataset. Make sure Store into is a connection with the same path as the STAGE, as described above, and the Format is Parquet.
  5. Click Create
  6. Set the Snowflake Stage to the stage created above (note that if you’ve set a default stage in the plugin’s settings, you can skip this step)
  7. Click Run

HDFS → Snowflake

  1. In a Dataiku flow, select an existing HDFS dataset. (Note that the dataset’s connection must point to the same S3 bucket as is configured in the STAGE, as described above. Additionally, the dataset must be Parquet format with Snappy compression.)
  2. From the Plugin recipes section on the right, click Snowfl… (it uses exchange arrows as the icon)
  3. From the popup, pick Sync HDFS to Snowflake
  4. Make sure your existing HDFS dataset is the Input and set a new or existing Snowflake dataset
  5. Click Create
  6. Set the Snowflake Stage to the stage created above (note that if you’ve set a default stage in the plugin’s settings, you can skip this step)
  7. Click Run

JSON → Snowflake

  1. In a Dataiku flow, click + Recipe and select Snowflake Tools
  2. From the popup, pick Sync JSON to Snowflake
  3. Select an S3-based managed folder that contains JSON Line files as the Input
  4. Select an existing or create a new Snowflake table as the Output
  5. Click Create
  6. Set the Snowflake Storage Integration to the storage integration described above (note that if you’ve set a default storage integration in the plugin’s settings, you can skip this step)
  7. Optionally, specify a subdirectory in the managed folder
  8. Click Run

Building in PyCharm

Make sure that wget is installed. For macOS, you can install via brew install wget.

Custom Recipe libraries aren’t included in DSS’s dataiku-internal-client package so we need to fake it ‘til we make it.

First, create the package by executing ./make_dss_pip.sh. If successful, the last line it prints is a pip command.

Second, use the pip command from the previous step to install the package in the library’s virtual environment. (If you’re using PyCharm, open ViewTool WindowsTerminal and paste the pip command in.)

Running Tests in PyCharm

  1. Make sure you can build the project by doing all of the pip steps above
  2. In PyCharm, run the Unit Tests configuration.

Known Issues

  • Neither Snowflake nor Dataiku use Logical Type annotations for timezone offsets (Dataiku doesn’t use Logical Types at all). When synching from Snowflake to HDFS, the plugin casts any TIMESTAMP_TZ or TIMESTAMP_LTZ columns to TIMESTAMP_NTZ which simply drops the timezone offset attribute. For greater control of this behaviour, transform your Snowflake table before passing it to this plugin. Consider using CONVERT_TIMEZONE('UTC', t."date")::TIMESTAMP_NTZ. See Date & Time Data Types in Snowflake for more details.
  • Snowflake stores all TIMESTAMP_NTZ and DATE columns as annotated logical types in Parquet. Because Dataiku does not support logical types, these appear in Dataiku as bigint (int64 as datetime) and int (int32 as date). More details and workarounds are described in #12

TODO

  • When Snowflake is an input, check that it’s a table and not a query
  • Add support for Snowflake input being a query rather than a table
  • Check that the HDFS input or output is Parquet and Snappy
  • Add support for partitioned datasources
  • For Snowflake to HDFS, delete the *.parquet.snappy file in the output path (it’s the one created when inserting the empty dataframe)
  • Support more types than Parquet (e.g., CSV, AVRO, etc.)
  • Verify that the HDFS connection is actually an S3 location (possibly the best way to enforce the STAGE lining up to HDFS)
  • Implement workarounds for #12
  • Convert TS columns to UTC using CONVERT_TIMEZONE