Using CloudWright to Automate File Ingestion

January 12, 2020

A common pattern at companies which perform file-based processing is to build manual workflows to ingest file deliveries from customers and partners. These workflows often include email alerts, databases queries, and file transformations.

These workflows combine resources from across the company and are rarely (fully) automated. Worse, any automation that does exist is a hacked-together script running on a laptop which fails silently — and silent errors are worse than no automation at all!

In this post, we'll work with a four-step file ingestion workflow:

  • Watch an S3 bucket for file uploads from customers
  • When a file arrives, looks up the customer's account owner from an internal database
  • Transform the file from a comma-delimiters tab-delimiters and write it a 'processed' directory
  • Send an email to the account owner to alert them about new files

Or, as we'd chart out the process for an account owner:

S3

Sounds easy enough for a person — but not a trivial process to (reliably!) automate.

Well, at CloudWright, we hate manual workflows — we think customers should spend time solving business problems, not pretending to be robots. This post walks through how we'll script this ingestion pipeline into a simple CloudWright application — without a complex ETL solution.

Let's get started.

Modules

A module is a configure-once, use everywhere integration to a specific internal or external service — a module packages client libraries alongside required configurations and secrets. To automate our workflow, we'll use three Modules:

  • The AWS Module gives us an authenticated S3 client for file operations
  • The Gmail module lets us send emails as an authenticated service user
  • The MySQL module lets us query our internal database for customer accounts
Setting up the modules

We'll show how to these modules up from scratch, but remember — you'll usually be able to re-use modules your team has already created.

S3

CloudWright exposes all AWS APIs through the 'AWS API' Module template. Configuring this module is easy — you'll just need your AWS Access Key ID and Secret Key (to learn how to create these credentials, check out the Amazon documentation)

S3

Gmail

To send emails from a service Gmail account, we'll need to use the 'Send Email with Gmail' template, and authenticate as a service user — here, we'll use example-user@cloudwright.io:

Gmail

(feel free to authenticate as yourself for testing purposes)

MySQL

Connecting to a database is easy with the MySQL Module:

MySQL

We'll connect to this database directly — if your database is behind a firewall, you'll need to set up a Deployment Zone with access to your resouces.

To automate this workflow in CloudWright, we'll need to create a new Application — the quickstart guide walks you through how to create new application (you'll want to read that tutorial first).

In our new Application, we'll attach the three Modules we need — MySQL, AWS, and Gmail:

Configure Modules

With these Modules added, we're ready for the important part — building the tool.


For more details about how to create and use CloudWright Modules, check out the Modules section in the CloudWright docs.

Building the application

We'll step through the application in this post, but if you'd rather skip the explanation, you can find whole script on GitHub

To start off, we'll import our three modules. Since the AWS API imports the whole boto3 library, we'll pull the S3 resource out of it to interact with S3 buckets.

    s3 = CloudWright.get_module("aws_api").resource("s3")
    email = CloudWright.get_module("gmail")
    mysql = CloudWright.get_module("mysql")

Once we have the S3 resource, listing files for which match a filter is easy:

    bucket_name = 'cloudwright-examples-file-automation'
    bucket = s3.Bucket(bucket_name)
    
    files_processed = 0
    
    # Scan for files in the input directory of the S3 bucket
    for object_summary in bucket.objects.filter(Prefix='input'):
    
        if(object_summary.size > 0):
            detected_file = object_summary.key`

For this example, we'll assume that customer folders are scoped by ID — for example, customer ID '123' uploads files to s3://cloudwright-examples-file-automation/123/. We'll grab the customer ID and filename out of whatever we find in the bucket:

    match = re.match(r"input/([0-9]+)/([\w.]+)", detected_file)

    if match:
            customer_id = match.group(1)
            file_name = match.group(2)

In this post, we'll do a simple file transformation which fits in memory. If we need to do transformations on files too large to fit in memory, we'd want to kick the job over to a cloud-native big data service like BigQuery (we'll show how to do that in a later post):

    target_key = f'processed/{customer_id}/{file_name}'
    print(f"Detected a new file for customer {customer_id}: {file_name}")        
    print(f"Copying file to {target_key}")
    
    # Lightweight reformatting -- split the file on comma, and join it on tabs 
    output = list(map(lambda l: "\t".join(l.decode().split(",")), object_summary.get()['Body'].iter_lines()))

Writing the file back to S3 is just as simple as reading it:

    new_file = s3.Object(bucket_name, target_key)

    # Write the reformatted file to the processed directory.
    new_file.put(Body="\n".join(output))

Now that we're done with our file operations, we want to let the customer account owner know that the file is available for processing. We can use the ID we pulled out of the bucket to look up the customer account. We can do a simple SQL query using the MySQL Module we've attatched:

    customer = mysql.execute(sqlalchemy.text("SELECT email,name FROM customers where ID=:customer_id"), customer_id=customer_id).fetchone()
    customer_email = customer[0]
    customer_name = customer[1]

Since our Gmail module is hooked up and ready to use, sending an email to the account owner is a one-liner:

    email.send_email("We got your file!", f"We received your file {file_name} and are processing it now.  Stay tuned!", customer_email)

Last, we can set our application's response value with a summary of what we've done. This isn't necessary, but it makes monitoring easier at a glance (we'll get to that in a minute):

    CloudWright.response.set_value({'processed_files': files_processed})

That's the whole script — about 40 lines of actual code. Because CloudWright Modules handle the authentication and configuration, our code focuses on the business problems, not the plumbing.

We can run our new script in the dry-run console and see it works with a sample file:

Configure Modules

That was satisfying, but the whole point of automating this workflow is that we don't have to run scripts by hand! To run this application on a schedule, we'll set up a cron trigger:

Configure Modules

We'll run this script every minute for this demo — in practice, you'll want to balance resource usage costs against your script cadence.

Once our code is done and the triggers are configured, we'll publish our application to make it active. We can keep an eye on the script's progress in the Run Console — running every minute, just like we asked:

Configure Modules

That's really all it takes to build a simple — but useful — CloudWright application. If you'd like to learn more, or start using CloudWright to automate your own business workflows, let's get in touch.