Using Azure Serverless products to perform file validation on a per-batch basis
page_type: sample
languages:
This sample outlines multiple ways to accomplish the following set of requirements using Azure Serverless technologies. One way uses the “traditional” serverless approach, another Logic Apps, and another Azure Functions’ Durable Functions feature.
Given a set of customers, assume each customer uploads data to our backend for historical record keeping and analysis. This data arrives in the form of a set of .csv
files with each file containing different data. Think of them almost as SQL Table dumps in CSV format.
When the customer uploads the files, we have two primary objectives:
To accomplish this sample, you’ll need to set up a few things:
For the Python version of this sample (folder AzureFunctions.Python
), follow the instructions in its dedicated readme.
Pull down the code.
Copy sample.local.settings.json
in the AzureFunctions.v3
project to a new file called local.settings.json
.
This file will be used across the functions, durable or otherwise.
Next, run any of the Function apps in this solution. You can use the v1 (.Net Framework) or the v3 (.Net Core) version, it’s only needed for Event Grid validation.
With the function running, add an Event Grid Subscription to the Blob Storage account (from step 2), pointing to the ngrok-piped endpoint you created in step 4. The URL should look something like this:
https://b3252cc3.ngrok.io/api/EnsureAllFiles
https://b3252cc3.ngrok.io/api/Orchestrator
Upon saving this subscription, you’ll see your locally-running Function get hit with a request and return HTTP OK, then the Subscription will go green in Azure and you’re set.
Now, open Azure Storage Explorer and connect to the Blob Storage Account you’ve created. In here, create a container named cust1
. Inside the container, create a new folder called inbound
.
Take one of the .csv
files from the sampledata
folder of this repo, and drop it in to the inbound folder.
You’ll see the endpoint you defined as your Event Grid webhook subscription get hit.
sampledata
defined as cust1_20171010_1112
RaiseEvent
to pass the filename along to the instance.In the EnsureAllFiles
sub-orchestration, we look up what files we need for this customer (cust1) and check to see which files have come through thus far. As long as we do not have the files we need, we loop within the orchestration. Each time waiting for an external newfile
event to be thrown to let us know a new file has come through and should be processed.
When we find we have all the files that constitute a “batch” for the customer, we call the ValidateFileSet
activity function to process each file in the set and validate the structure of them according to our rules.
When Validation completes successfully, all files from the batch are moved to a valid-set
subfolder in the blob storage container. If validation fails (try removing a column in one of the lines in one of the files), the whole set gets moved to invalid-set
Because of the persistent behavior of state for Durable Functions, if you need to reset the execution because something goes wrong it’s not as simple as just re-running the function. To do this properly, you must:
DurableFunctionsHubHistory
Table in the “General Purpose” Storage Account you created in Step 1 above./inbound
directory of the blob storage container triggering the Functions.Note: after doing these steps you’ll have to wait a minute or so before running either of the Durable Function implementations as the storage table creation will error with 409 CONFLICT while deletion takes place.
sampledata
defined as cust1_20171010_1112
FileProcessingLocks
table of the General Purpose Storage Account containing this prefix. If so, bail. If not, create one, then call the ValidateFunctionUrl
endpoint with the batch prefix as payload.When Validation completes successfully, all files from the batch are moved to a valid-set
subfolder in the blob storage container. If validation fails (try removing a column in one of the lines in one of the files), the whole set gets moved to invalid-set
FileProcessingLocks
table from the General Purpose Storage Account./inbound
directory of the blob storage container triggering the Functions.Note: after doing these steps you’ll have to wait a minute or so before running either of the Durable Function implementations as the storage table creation will error with 409 CONFLICT while deletion takes place.
While not identically behaved, this repo also contains deployment scripts for two Logic App instances which perform roughly the same flow.
This LA gets Storage Events from event grid, pulls off the full prefix of the file (also containing the URL), and sends this on to…
This receives events from the Processor and waits for 3 containing the same prefix to arrive before sending the batch on to the next step (you can change this to be whatever you want after deployment)
StartNewAsync
fast enough and instead of one instance per batch, you’ll end up with multiple instances for the same prefix (even though we desire one instance per, acting like a singleton).This project welcomes contributions and suggestions. Most contributions require you to agree to a
Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us
the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com.
When you submit a pull request, a CLA bot will automatically determine whether you need to provide
a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions
provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the Microsoft Open Source Code of Conduct.
For more information see the Code of Conduct FAQ or
contact opencode@microsoft.com with any additional questions or comments.