Skip to main content

Blog

Learn About Our Meetup

5000+ Members

MEETUPS

LEARN, CONNECT, SHARE

Join our meetup, learn, connect, share, and get to know your Toronto AI community. 

JOB POSTINGS

INDEED POSTINGS

Browse through the latest deep learning, ai, machine learning postings from Indeed for the GTA.

CONTACT

CONNECT WITH US

Are you looking to sponsor space, be a speaker, or volunteer, feel free to give us a shout.

Category: Amazon

Use AWS Machine Learning to Analyze Customer Calls from Contact Centers (Part 2): Automate, Deploy, and Visualize Analytics using Amazon Transcribe, Amazon Comprehend, AWS CloudFormation, and Amazon QuickSight

In the previous blog post, we showed you how to string together Amazon Transcribe and Amazon Comprehend to be able to conduct sentiment analysis on call conversations from contact centers. Here, we demonstrate how to leverage AWS CloudFormation to automate the process and deploy your solution at scale.

Solution Architecture

The following diagram illustrates architecture that takes uses Amazon Transcribe to create text transcripts of call recordings from contact centers. In this example, we refer to Amazon Connect (cloud-based contact center service), but the architecture could work for any contact center.

The following diagram describes the architecture for processing transcribed text by using Amazon Comprehend to conduct Entity, Sentiment and Key Phrases analysis. Finally, we can visualize the analysis using a combination of Athena and QuickSight.

Automate and Deploy using AWS CloudFormation

Here, we will use AWS CloudFormation to automate and deploy the above solution.

First, login to AWS Console and Click on this link to launch the template in CloudFormation.

In the console, provide the following parameters:

  • RecordingsPrefix: S3 prefix where split recordings will be stored
  • TranscriptsPrefix: S3 prefix where transcribed text will be stored
  • TranscriptionJobCheckWaitTime: Time in seconds to wait between transcription wait checks

Leave all other default values. Select both “I acknowledge that AWS CloudFormation might create IAM resources” checkboxes, click on “Create Change Set”, and then choose Execute.

This solution follows below steps:

  1. Amazon Connect drops call recording and CTR records into Amazon S3
  2. S3 Put request triggers AWS Lambda function to split call recording into two media channels – One for Agent and other for Customer. It drops two output audio files into different folders.
  3. Audio drop into S3 folder triggers Lambda function to invoke AWS Step Function.
  4. Step function is used here for scheduling Lambda Functions, which invokes APIs for Amazon Transcribe.
    1. Step 1 from Step Function starts Transcriptions of Audio files.
    2. Step 2 checks status of Transcription Job at regular intervals. Once job status is complete then it goes to Step 3.
    3. Step 3 – Once Transcription Job Status is complete, it writes Transcribed output into S3 Folder.
  5. Transcribed text drop into S3 triggers Lambda, which invokes Amazon Comprehend APIs and writes Entity, Sentiment, Key Phrases and Language output into S3 folder. If you need to write output into Amazon Data Warehouse – Redshift then you can leverage Kinesis Firehose.
  6. AWS Glue is used to maintain database catalogue and database table structure. Amazon Athena to query data out of S3 using Glue database catalogue. This completes the CloudFormation template.
  7. Amazon QuickSight is used to analyze call recordings and performs sentiment, Key Phrases analysis of caller and Agent’s interactions.

Visualize Analysis using Amazon QuickSight

We can visualize Amazon Comprehend’s sentiment analysis by using Amazon QuickSight. First, we must grant Amazon QuickSight access to Amazon Athena and the associated S3 buckets in the account. For more information on doing this, see Managing Amazon QuickSight Permissions. We can then create a new data set in Amazon QuickSight based on the Athena table that was created during deployment.

After setting up permissions, we can create a new analysis in Amazon QuickSight by choosing New analysis.

Then we add a new data set.

We choose Athena as the source and give the data source a name such as connectcomprehend.

Choose the name of the database and the Use Customer SQL

Give a Name to Custom SQL such as “Sentiment_SQL” and enter below SQL. Replace Database name <YOUR DATABASE NAME> with your one.

WITH sentiment AS (
  SELECT
    contactid
    ,talker
    ,text
    ,sentiment
  FROM
    "<YOUR DATABASE NAME>"."sentiment_analysis"
)
SELECT
  contactid
  ,talker
  ,transcript
  ,sentimentresult.sentiment
  ,sentimentresult.sentimentscore.positive
  ,sentimentresult.sentimentscore.negative
  ,sentimentresult.sentimentscore.mixed
FROM
  sentiment
  CROSS JOIN UNNEST(text) as t(transcript)
  CROSS JOIN UNNEST(sentiment) as t(sentimentresult)

Choose Confirm query.

Select Import to SPICE option and then choose Visualize

After that, we should see the following screen.

Now we can create some visualizations by adding Sentiment Analysis into visualization.

Similarly, you can analyze other Comprehend output such as Entity, Key Phrases, and Language. If you have Amazon Connect CTR records available on S3 then you can blend data between comprehend output with CTR records.

Conclusion

Amazon AI services such as Amazon Transcribe and Amazon Comprehend make it easy to analyze contact center recordings by blending it with other data sources such as CTR (Call Details), Call Flow Logs, and business-specific attributes. Enterprises can reap significant benefits by realizing the hidden value in the massive amounts of caller-agent audio recordings from their contact centers. By deriving meaningful insights, enterprises can enhance both efficiency and performance of call centers and improve their overall service quality to end customers. So far, we’ve used Amazon Transcribe to transform audio data into text transcripts and then used Amazon Comprehend to run text analysis. Along the way, we’ve also used Lambda and Step Functions to string together the solution. And finally, AWS Glue, Amazon Athena, and Amazon Quicksight to visualize the analysis.

 


About the Authors

Deenadayaalan Thirugnanasambandam is a Senior Cloud Architect in the Professional Services team in Australia.

 

 

 

 

Piyush Patel is a big data consultant with AWS.

 

 

 

 

Paul Zhao is a Sr. Product Manager at AWS Machine Learning. He manages the Amazon Transcribe service. Outside of work, Paul is a motorcycle enthusiast and avid woodworker.

 

 

 

 

Revanth Anireddy is a professional services consultant with AWS.

 

 

 

Loc Trinh is a Solutions Architect for AWS Database and Analytics services. In his spare time, he captures data from his eating and fitness habits and uses analytical modeling to determine why he is still out of shape.

 

Transcribe speech in three new languages: French, Italian, and Brazilian Portuguese

We’re excited to announce that Amazon Transcribe now supports automatic speech recognition in three new languages: French, Italian, and Brazilian Portuguese. These new languages expand upon the 5 languages already available in Amazon Transcribe: US English, US Spanish, Australian English, British English, and Canadian French.

Using the Amazon Transcribe API, you can analyze audio files stored in Amazon S3 and have the service return a text file of the transcribed speech. You can also send a live audio stream to Amazon Transcribe and receive a stream of transcripts in real time. Automatic transcription is proving to be an extremely useful tool for many developers, across many domains (such as subtitles for videos, contact center call analytics and compliance, court depositions, and generally improving accessibility to any application).

You can learn more about how to use transcription in contact centers (including Amazon Connect) from this recent re:Invent breakout session:

French, Italian, and Brazilian Portuguese transcription is available at the same price, and in the same Regions, as other languages in Amazon Transcribe. You can try the new set of languages through the AWS Management Console, the AWS Command Line Interface, and the AWS SDKs.

 


About the Author

Paul Zhao is a Sr. Product Manager at AWS Machine Learning. He manages the Amazon Transcribe service. Outside of work, Paul is a motorcycle enthusiast and avid woodworker.

 

 

Amazon SageMaker adds Scikit-Learn support

Amazon SageMaker now comes pre-configured with the Scikit-Learn machine learning library in a Docker container. Scikit-Learn is popular choice for data scientists and developers because it provides efficient tools for data analysis and high quality implementations of popular machine learning algorithms through a consistent Python interface and well documented APIs. Scikit-Learn executes quickly and can scale to most data sets and problems, making it an ideal choice when you need to iterate quickly on your machine learning problems. Unlike Deep Learning frameworks such as TensorFlow or MxNet, Scikit-Learn is used for machine learning and data analysis. You can select from a range of supervised and unsupervised learning algorithms for clustering, regression, classification, dimensionality reduction, feature preprocessing, and model selection.

The newly added Scikit-Learn library is available in the Amazon SageMaker Python SDK. You can write your Scikit-Learn script and use the Amazon SageMaker training capabilities, including automatic model tuning. Once your model is trained, you can deploy your Scikit-Learn models to highly available endpoints that auto-scale to make real-time predictions with low latency. You can also use the same models in large-scale batch transform jobs.

In this blog post, I show you how to use the pre-built Scikit-Learn library in Amazon SageMaker to build, train, and deploy a multi-class classification model.

Training and deploying a Scikit-Learn model

In this example, we’re going to train a decision tree classifier on the IRIS dataset. This example is based on the Scikit-Learn Decision Tree Classification example. The full Amazon SageMaker notebook is available to try out. We’ll highlight the most important pieces here. This dataset has 50 samples from three different species of Iris flower, and is commonly used to demonstrate machine learning techniques. The goal is to predict which of the three species a flower belongs to, based on a number of different properties (petal length, petal width, and so on). While we’re using decision trees to solve this problem, Scikit-Learn offers a number of other algorithms that you can use.

Entry point script

The first step is to write the Scikit-Learn script. Starting with the main guard, we use a parser to read the hyperparameters that we pass to our Amazon SageMaker Estimator when creating the training job. These hyperparameters are made available as arguments to our input script in the training container. In this example, we look for the maximum number of leaf nodes. We also parse a number of Amazon SageMaker-specific environment variables to get information about the training environment, such as the location of input data and location where we want to save the model.

if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    # Hyperparameters are described here. In this simple example we are just including one hyperparameter.
    parser.add_argument('--max_leaf_nodes', type=int, default=-1)

    # SageMaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])

    args = parser.parse_args()

After we’ve defined our hyperparameters, we then load the dataset. For this example, we load all of the CSV files using the Pandas library.

    # Take the set of files and read them all into a single pandas dataframe
    input_files = [ os.path.join(args.train, file) for file in os.listdir(args.train) ]
    if len(input_files) == 0:
        raise ValueError(('There are no files in {}.n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,n' +
                          'the data specification in S3 was incorrectly specified or the role specifiedn' +
                          'does not have permission to access the data.').format(args.train, "train"))
    raw_data = [ pd.read_csv(file, header=None, engine="python") for file in input_files ]
    train_data = pd.concat(raw_data)

In this example, we assume that the label (which species the flower belongs to) is stored in the first column. We separate our features and the label into two separate data frames.

    # labels are in the first column
    train_y = train_data.ix[:,0]
    train_X = train_data.ix[:,1:]

Now we’re ready to train our model. This is as simple as creating the right classifier and calling fit. A key benefit of Scikit-Learn is the simplicity and consistency of the interface that each algorithm exposes. If your model needs pre-processing of features or calculation of validation scoresc, you can also do those things in this step.

    # We determine the number of leaf nodes using the hyper-parameter above.
    max_leaf_nodes = args.max_leaf_nodes

    # Now use scikit-learn's decision tree classifier to train the model.
    clf = tree.DecisionTreeClassifier(max_leaf_nodes=max_leaf_nodes)
    clf = clf.fit(train_X, train_y)

Finally, we save our model.

    # Save the decision tree model.
    joblib.dump(clf, os.path.join(args.model_dir, "model.joblib"))

Amazon SageMaker notebook – Setup

Now that we’ve written our Scikit-Learn script we can run it on Amazon SageMaker using the Amazon SageMaker pre-built Scikit-Learn container. We’re going to use a hosted Jupyter notebook to orchestrate the training process. Feel free to follow along interactively by running the notebook.

First, we set up the Amazon S3 bucket for storing the data and the model, as well as the AWS Identity and Access Management (IAM) role for the data and Amazon SageMaker permissions.

import sagemaker

# S3 prefix
prefix = 'scikit-iris'

# Get a SageMaker-compatible role used by this Notebook Instance.
role = sagemaker.get_execution_role()

Now we’ll import the Python libraries we’ll need and create an Amazon SageMaker session.

from sagemaker.sklearn.estimator import SKLearn

sagemaker_session = sagemaker.Session()

Next, we’ll download our dataset and upload it to Amazon S3. In this example, we use Scikit-Learn locally in our notebook since it provides convenience functions to download the IRIS dataset.

import numpy as np
import os
from sklearn import datasets

# Load Iris dataset, then join labels and features together
iris = datasets.load_iris()
joined_iris = np.insert(iris.data, 0, iris.target, axis=1)


# Create a temporary directory and write the dataset as CSV
os.makedirs('./data', exist_ok=True)
np.savetxt('./data/iris.csv', joined_iris, delimiter=',', fmt='%1.1f, %1.3f, %1.3f, %1.3f, %1.3f')

# Upload the dataset to S3.
train_input = sagemaker_session.upload_data('data', key_prefix="{}/{}".format(prefix, 'data'))

Amazon SageMaker notebook – Training

Now that we’ve prepared the training data and our Scikit-Learn script (which we’ll name scikit_learn_iris.py), the SKLearn class in the SageMaker Python SDK allows us to run that script as a training job on the Amazon SageMaker managed training infrastructure. We’ll also pass the estimator our IAM role, the type of instance we want to use, and a dictionary of the hyperparameters that we want to pass to our script. Since Scikit-Learn runs on a single CPU-only machine, the Estimator only supports an instance count for training of 1 and GPU instances are not supported.

sklearn = SKLearn(
    entry_point='scikit_learn_iris.py',
    train_instance_type="ml.c4.xlarge",
    role=role,
    sagemaker_session=sagemaker_session,
    hyperparameters={'max_leaf_nodes': 30})

After we’ve constructed our SKLearn estimator, we can fit it by passing in the data we uploaded to Amazon S3. Amazon SageMaker makes sure our data is available in the local filesystem of the training cluster, so our Scikit-Learn script can simply read the data from disk.

sklearn.fit({'train': train_input})

Amazon SageMaker Notebook – Deployment

After training, we can use the SKLearn estimator to create an Amazon SageMaker endpoint – a hosted and managed prediction service that we can use to perform inference.

To do this our scikit_learn_iris.py script needs a model_fn() function that loads our saved model to make predictions.

def model_fn(model_dir):
    clf = joblib.load(os.path.join(model_dir, "model.joblib"))
    return clf

You can also optionally specify other functions to customize the behavior of deserialization of the input request (input_fn()), serialization of the predictions (output_fn()), and how predictions are made (predict_fn()). The defaults work for our current use-case so we don’t need to define them.

For more details on the default implementations to customize the behavior, see the SageMaker Scikit-Learn Container GitHub Repository.

Now we can use our SKLearn estimator in the notebook to deploy the model. The deploy() function allows us to set the number and type of instances that will be used for the prediction endpoint. These do not need to be the same values we used for the training job. Here we will deploy the model to a single ml.m4.xlarge instance but you can also deploy the model to more than one instance and set up Auto Scaling for the endpoint.

predictor = sklearn.deploy(initial_instance_count=1, instance_type="ml.m4.xlarge")

Amazon SageMaker notebook – Prediction and evaluation

Now we can use this predictor to classify flowers from the IRIS dataset. Ideally, the evaluation dataset should be different from the training dataset but for this example, we’ll just reuse some of the training dataset to invoke the endpoint. .

First, we create a test dataset.

import itertools
import pandas as pd

shape = pd.read_csv("data/iris.csv", header=None)

a = [50*i for i in range(3)]
b = [40+i for i in range(10)]
indices = [i+j for i,j in itertools.product(a,b)]

test_data = shape.iloc[indices[:-1]]
test_X = test_data.iloc[:,1:]
test_y = test_data.iloc[:,0]

Now we can use the endpoint to make predictions by calling the predict function with our features.

predictor.predict(test_X.values)

Amazon SageMaker notebook – Cleanup

After you have finished with this example, remember to delete the prediction endpoint to release the instances associated with it.

sklearn.delete_endpoint()

Amazon SageMaker notebook – Batch Transform

Amazon SageMaker also provides Batch Transform, a managed service for doing large-scale that can be used to perform inference against your trained model on a large dataset. It’s ideal for scenarios where you’re dealing with large batches of data, you don’t need sub-second latency, or you need to preprocess and transform the training data.

First, we create a transformer and specify the number and type of instances we want to use for the job. In this case we specify 2 instances but you can scale this with the size of your dataset to reduce the amount of time it takes.

# Define a SKLearn Transformer from the trained SKLearn Estimator
transformer = sklearn.transformer(instance_count=2, instance_type='ml.m4.xlarge')

Now we start the transform job by providing it the location of our data on Amazon S3. The notebook example includes the steps followed to upload the dataset.

# Start a transform job and wait for it to finish
transformer.transform(batch_input_s3, content_type='text/csv')
print('Waiting for transform job: ' + transformer.latest_transform_job.job_name)
transformer.wait()

After the transform job has completed, we can download the output data from Amazon S3. For every input file we had, we will have a corresponding output file.

# Download the output data from S3 to local filesystem
batch_output = transformer.output_path
!mkdir -p batch_output
!aws s3 cp --recursive $batch_output/ batch_data/

Conclusion

In this blog post we show you how to use the Amazon SageMaker built-in Scikit-Learn container to train a model on the IRIS dataset. However, that’s just the beginning. Scikit-Learn has a large selection of algorithms and transformers that you can use for your machine learning use-cases. Amazon SageMaker enables you to use Scikit-Learn scripts and works seamlessly with SageMaker training, automatic model tuning, and deployment capabilities.

Citations

Dua, D. and Karra Taniskidou, E. (2017). UCI Machine Learning Repository

[http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.

 


About the Author

 Laurence Rouesnel is the Algorithms & Platforms Group Manager in Amazon AI Labs. He leads a team of engineers and scientists working on deep learning and machine learning research and products. In his spare time he is an avid traveler, and loves the outdoors whether it’s hiking, skiing, or windsurfing.

 

 

 

Eric Kim is an engineer in the Algorithms & Platforms Group of Amazon AI Labs. He helps support the AWS service SageMaker, and has experience in machine learning research, development, and application. Outside of work, he is an avid music lover and a fan of all dogs.

 

 

 

 

Power your website with on-demand translated reviews using Amazon Translate

The success of an ecommerce platform heavily relies on the reputation that has been established through thousands of user reviews and social shares by customers. By reviewing and sharing information existing customers establish a trust relationship with something they can’t physically touch. For this content to be accessible to a global audience it’s critical to translate it into the local language to help customers make their buying decisions.

Imagine that a company that sells older cars, boats, and motorcycles has the following problem: they are expanding their ecommerce business to several countries and they want to allow their customers to effortlessly read all reviews of their offerings written by other shoppers.

To solve this problem, we’ll show you how the company can leverage Amazon Translate to get on-demand translated reviews in real time. We’ll also show you how easily they ca integrate the service in a modern ecommerce architecture.

Amazon Translate is a high-quality neural machine translation service that uses advanced deep learning techniques to provide fast language translation of content from a source language to a target language, chosen among the supported pairs. It enables developers to easily invoke an API providing the text to be translated and obtain its translated version in real-time, hiding the complexity of building a neural machine translation model.

The ecommerce architecture

Our example website is a JavaScript single-page application that is hosted in a public Amazon S3 bucket where static website hosting has been enabled.

The example company wants to extend their global presence, so they have decided to use Amazon CloudFront to speed-up the distribution of their static web content worldwide. With CloudFront, files are delivered to end  users using a global network of edge locations that reduce latency and increase data transfer rates.

The website integrates with Login with Amazon and Amazon Cognito User Pools for user authentication and authorization, and makes REST API calls to an API deployed through Amazon API Gateway.

When API resources are requested, API Gateway invokes AWS Lambda functions that implement business logic operations like listing products, getting product details, adding user reviews, and translating user reviews.

More specifically ,the Lambda function code interacts with:

  • Amazon Translate, to get translated reviews
  • Amazon DynamoDB, to cache translated reviews in a fast and flexible NoSQL database and avoid invoking the Translate API for reviews that had already been translated
  • Amazon Comprehend, to analyse the sentiment of the reviews
  • Amazon Kinesis Data Firehose, to capture translation and sentiment analysis data into Amazon S3 for further analysis
  • Amazon RDS for Aurora, to store product and review data

Translating reviews and detecting language and sentiment

Wherever a user wants to translate a review, the website will make an API call to Amazon API Gateway that will execute a Lambda function that makes an API call to Amazon Translate. As you can see from the following code snippet you can see how easily you can make this call in Python by using the AWS SDK.

try:
       # Translate text
       result = translate_client.translate_text(Text=review, SourceLanguageCode=source_language, TargetLanguageCode=target_language)
       logger.info("Translation result: " +str(result))
except Exception as e:
       logger.error(str(e))
       raise e

The API call translate_text() needs only three parameters: text (in this case the review), source language (original language of the review), and target_language (the language into which you want to translate the review).

Obviously when a user writes a review we do not know which language the review is written in. The API call to post a new review will call the Lambda function PostReview. PostReview understands which language the review was written in by using Amazon Comprehendbefore saving it Amazon RDS for Aurora. Amazon Comprehend is a natural language processing (NLP) service that uses machine learning to understand the language. Amazon Comprehend is also very simple to invoke. Here the snippet of the code:

try:
       # Amazon Comprehend
       language = comprehend.detect_dominant_language(Text = review)
       logger.info("Language return: " + str(language))
except Exception as e:
       logger.error(str(e))
       raise e

When a review is posted, the Lambda function will also detect the sentiment that translate in an emoticon in the example website. To understand the sentiment of the review is also a single line of code:

try:
             # Amazon Comprehend
             sentiment = comprehend.detect_sentiment(Text=review, LanguageCode=language)
             logger.info("Sentiment ->: " + str(sentiment))
except Exception as e:
             logger.error(str(e))
             raise e

As you can see, both Amazon Comprehend detect_dominant_language() and detect_sentiment() API calls will just need the text as parameter.

Summary

Now that you’ve seen an example of how you can use Amazon Translate and Amazon Comprehend to empower your website, we hope you are inspired to create your own solutions.

Amazon Translate can get translations of a full sentence within a few milliseconds, enabling usage in both synchronous on-demand translations and in asynchronous tasks for executing a large number of translations with the aim of storing first and then delivering them.

Amazon Translate, Amazon Comprehend, Amazon Kinesis Data Firehose, Amazon S3, Amazon RDS, Amazon DynamoDB, Amazon Route 53, Amazon S3, Amazon CloudFront, Amazon API Gateway, Amazon Cognito are powerful services that allow you to implement fully-managed and serverless solutions for your business needs: the possible use cases are only limited by your imagination.

 


About the Authors

Giuseppe Angelo Porcelli is a Sr. Solutions Architect for Amazon Web Services in Italy. With several years engineering background, he helps enterprise customers designing flexible and resilient architectures using AWS services. His field of expertise covers Artificial Intelligence and Machine Learning. In free time, Giuseppe enjoys playing football.

 

 

 

 

Diego Natali is a solutions architect for Amazon Web Services in Italy. With several years engineering background, he helps ISV and Start up customers designing flexible and resilient architectures using AWS services. In his spare time he enjoys watching movies and riding his dirt bike.

 

 

 

 

Easily train models using datasets labeled by Amazon SageMaker Ground Truth

Data scientists and developers can now easily train machine learning models on datasets labeled by Amazon SageMaker Ground Truth. Amazon SageMaker Training now accepts the labeled datasets produced in augmented manifest format as input through both AWS Management Console and Amazon SageMaker Python SDK APIs.

Last month during AWS re:Invent, we launched Amazon SageMaker Ground Truth to build highly accurate training datasets with up to 70 percent savings in labeling costs by using machine learning to aid public as well as private workforces of human labelers. The labeled datasets are produced in augmented manifest file format that augments each input dataset object with additional metadata – such as labels – inline in the file. Earlier you could use only the low-level AWS SDK APIs to train models on augmented datasets. Starting today, you can now quickly and easily perform such training with few quick clicks in the Amazon SageMaker console or one-line API calls using the high-level Amazon SageMaker Python SDK.

Furthermore, the model will be trained using the Amazon SageMaker Pipe Mode, which significantly accelerates the speeds at which data is streamed from Amazon Simple Storage Service (S3) into Amazon SageMaker so that your training job starts sooner, finishes quicker, and needs less disk space, thus reducing your overall cost to train machine learning models on Amazon SageMaker.

Now let’s dive into an example. Our example uses the CBCL StreetScenes dataset consisting of 3548 street images. In an earlier blog post we had shown you an example of how you can use Amazon SageMaker Ground Truth to manage a workforce for drawing bounding boxes around all the cars in the images, thus creating a labeled dataset for training an Amazon SageMaker Object Detection model. Now we’ll show you how to train such a model on Amazon SageMaker.

Step 1: Explore the labeled dataset

The labeled dataset is produced in an augmented manifest file format. An augmented manifest file is a file in JSON Lines format. This means that each line in the file is a complete JSON object followed by a newline separator. Each JSON object contains the Amazon S3 URI of an image file along with its labels. The labels are the coordinates of the bounding boxes around each of the cars in the image. Here is a sample JSON object from the augmented manifest file for an image that was labeled with 4 cars.


SSDB00004.JPG

Here is the JSON object in the augmented manifest file. We have formatted the display for ease of visualization. In the augmented manifest file, this will appear as a JSON object in a single line.

{
 "source-ref":"s3://sthakur/demo/images/SSDB00004.JPG",
 "sthakur-groundtruth-demo":{
   "annotations":[
     {"class_id":0,"width":162,"top":458,"height":89,"left":378},
     {"class_id":0,"width":201,"top":434,"height":96,"left":602},
     {"class_id":0,"width":61,"top":434,"height":39,"left":343},
     {"class_id":0,"width":66,"top":426,"height":47,"left":240}
   ],
   "image_size":[{"width":1280,"depth":3,"height":960}]
 },
 "sthakur-groundtruth-demo-metadata":{
   "job-name":"labeling-job/sthakur-groundtruth-demo",
   "class-map":{"0":"car"},
   "human-annotated":"yes",
   "objects":[
     {"confidence":0.09},
     {"confidence":0.09},
     {"confidence":0.09},
     {"confidence":0.09}
   ],
   "creation-date":"2018-12-13T21:24:33.546706",
   "type":"groundtruth/object-detection"
 }
}

Here source-ref is the Amazon S3 URI of the image file. Note that sthakur-groundtruth-demo (named after the Amazon SageMaker Ground Truth labeling job that produced the manifest file in the first place) is the list of labels. The labels consist of the coordinates of the four cars labeled by human labeler.

Step 2: Create the Amazon SageMaker training job

We’ll now train an Amazon SageMaker Object Detection Model which takes the augmented manifest file from Step 1 as an input.

Using the Amazon SageMaker console

Choose Training jobs from the left navigation pane on the Amazon SageMaker console and then choose Create training job. After choosing the model training configurations, such as the learning algorithm, training cluster specs, and hyperparameters, scroll down to the section where you enter the input data channel for sourcing the training dataset.

Choose the S3 data type as AugmentedManifestFile. Provide the Amazon S3 location of the manifest file from Step 1. Also provide the names of the JSON attributes that you want the Object Detection Algorithm to use from the augmented manifest file. Here we need only two attributes for training the model: source-ref and sthakur-groundtruth-demo as described in Step 1.

Since the JSON objects from the augmented manifest file are streamed in an ordered fashion using Amazon SageMaker Pipe mode, you need to carefully choose the sequence of attribute names that the algorithm should expect to find in the input data stream. Use the up and down arrow buttons next to each attribute name for choosing the sequence.

While we are showing here how to create an input data channel for sourcing training dataset, the Object Detection algorithm also requires an input data channel for validation dataset. One way to prepare the validation dataset would be to hold out a subset of your labeled images, and create another augmented manifest file for the validation set. You can then use the Amazon S3 URI of the new augmented manifest file as an input to your validation channel, and define the channel in exactly the same manner as described in this step.

Using the Amazon SageMaker Python SDK

We’ll use Amazon SageMaker Estimator to train the Object Detection Model.

od_model = sagemaker.estimator.Estimator(training_image,
                                         role, 
                                         train_instance_count=1, 
                                         train_instance_type='ml.p3.2xlarge',
                                         train_volume_size = 50,
                                         train_max_run = 360000,
                                         input_mode = 'Pipe',
                                         output_path=s3_output_location,
                                         sagemaker_session=sess)
…………………………..

train_data = sagemaker.session.s3_input(s3_train_data, distribution='FullyReplicated', content_type='image/jpeg', s3_data_type='AugmentedManifestFile', attribute_names=['source-ref', 'sthakur-groundtruth-demo'])

validation_data = sagemaker.session.s3_input(s3_validation_data, distribution='FullyReplicated', content_type='image/jpeg', s3_data_type='AugmentedManifestFile', attribute_names=['source-ref', 'sthakur-groundtruth-demo'])


…………………………………………..

data_channels = {'train': train_data, 'validation': validation_data}
od_model.fit(inputs=data_channels, logs=True)

Note that we’ve chosen the input_mode as Pipe, s3_data_type as AugmentedManifestFile, and specified the attribute_names sequence before training the model using SageMaker estimator’s fit routine.

Additional benefits of using augmented manifest format

Amazon SageMaker has always supported traditional manifest files for training models on datasets stored in Amazon S3. A manifest file simply provides a list of Amazon S3 key name prefixes for the data objects that need to be downloaded to Amazon SageMaker for training the model.

For example, this is a manifest file:

[
    {"prefix":"s3://foo/"},
  "relative/path/to/data-1",
  "relative/path/to/data-2",
  ...
]

Will match the following Amazon S3 URIs:

s3://foo/relative/path/to/data-1
s3://foo/relative/path/to/data-2

Using this traditional approach for specifying the input data channel for learning algorithms, such as visual recognition algorithms, requires you to specify two input data channels in Amazon SageMaker – one for the input data (images), and other for its labels. Using an augmented manifest, you can now put the data and its labels in one manifest file, thus reducing the need for two channels. It also eliminates any unnecessary complexity in algorithm code for matching data objects with labels across multiple channels.

For example, this manifest file can be easily expressed as an augmented manifest by restructuring the S3 URIs to JSON Lines format, and adding labels inline.

{"source-ref":"s3://foo/relative/path/to/data-1","label":"0"}
{"source-ref":"s3://foo/relative/path/to/data-2","label":"1"}
……….

Set attribute_names=['source-ref','label'] while training the model.

In addition, the augmented manifest file uses Amazon SageMaker Pipe mode, which means that your learning algorithm can benefit from the high throughput data streaming from Amazon S3 to your training instances in Amazon SageMaker. Here is a blog post that describes changes you can make to your learning algorithm to start consuming data using Pipe mode.

Get started with more examples and developer support

We’ve shown you examples of how to train models on Amazon SageMaker using labeled datasets in augmented manifest file format. You can also try out our sample notebook that provides the step-by-step AWS SDK experience. You can also see additional examples in our developer guide or post your questions on our developer forum.

 


About the Author

Sumit Thakur is a Senior Product Manager for AWS Machine Learning Platforms where he loves working on products that make it easy for customers to get started with machine learning on cloud. He is product manager for Amazon SageMaker and AWS Deep Learning AMI. In his spare time, he likes connecting with nature and watching sci-fi TV series.

 

 

 

Analyzing contact center calls—Part 1: Use Amazon Transcribe and Amazon Comprehend to analyze customer sentiment

Contact centers aiming to improve overall operational efficiency have an imperative to understand caller-agent dynamics. In part one of this two-part blog post series we’ll show you how you can use Amazon Transcribe and Amazon Comprehend to transform call recordings from audio to text and then run sentiment analysis on the transcripts. We will demonstrate how to use Amazon Transcribe to create text transcripts from an audio file. Afterwards, we’ll use Amazon Comprehend to analyze the call transcript, producing insights on keywords, topics, entities, and sentiment.

AWS services leveraged

Amazon Transcribe is an automatic speech recognition (ASR) service that makes it easy for developers to add speech-to-text capability to their applications. Using the Amazon Transcribe API, you can transcribe audio files stored in Amazon S3 into text transcripts.

Amazon Comprehend analyzes text and tells you what it finds, starting with the language, from Afrikaans to Yoruba, with 98 more in between. It can identify different types of entities (people, places, brands, products, and so forth), key phrases, sentiment (positive, negative, mixed, or neutral), and extract key phrases, all from text in English or Spanish. Finally, the Amazon Comprehend topic modeling service extracts topics from large sets of documents for analysis or topic-based grouping.

AWS Lambda lets you run code without provisioning or managing servers. You pay only for the compute time you consume – there is no charge when your code is not running.

AWS Step Functions makes it easy to coordinate the components of distributed applications and microservices using visual workflows.

Amazon Connect is a self-service, cloud-based contact center service that makes it easy for any business to deliver better customer service at lower cost. Amazon connect produces Call Recordings between caller and Agent interactions.

Solution overview

The architecture is broadly divided into these components, as the following diagram illustrates:

  1. Audio Transcript Storage → Amazon S3 bucket
  2. Orchestration component and business logic component → AWS Step Functions and AWS Lambda
  3. Transcribing component → Amazon Transcribe
  4. Sentiment analysis component → Amazon Comprehend
  5. Notification component → SNS Topic
  6. Amazon Comprehend → Entity, sentiment, key phrases, and language output into an Amazon S3 bucket
  7. AWS Glue maintains the database catalogue and database table structure. Amazon Athena queries data in Amazon S3 using the AWS Glue database catalogue.
  8. Amazon QuickSight analyzes call recording and performs sentiment, and performs a key phrases analysis of caller-agent interactions.

Transcribe call center audio, run sentiment analysis, and visualize analytics

After uploading audio files to an Amazon S3 bucket, we’ll trigger a Lambda function to invoke Step Functions that will point the Amazon Transcribe service to the bucket destination to create transcription jobs. Accepted audio/visual formats include: WAV, FLAC, MP3, and MP4.

Step 1: Create the Lambda function and IAM policy

  1. Open the AWS Management Console and navigate to the Lambda console. Then choose Create a Lambda function.
  2. Choose Skip to skip the blueprint selection.
  3. For Runtime, choose Node JS 8.10.
  4. For Name, enter a function name.
  5. Enter a description that notes the source bucket and destination bucket used.
  6. For Code entry type, choose Edit code inline.
  7. Create environment variable – STEP_FUNCTIONS_ARN
  8. Paste the following into the code editor:
    'use strict';
    
    const aws = require('aws-sdk');
    
    var stepfunctions = new aws.StepFunctions();
    const s3 = new aws.S3({apiVersion: '2006-03-01'});
    
    exports.handler = (event, context, callback) => {
        const bucket = event.Records[0].s3.bucket.name;
        const key = decodeURIComponent(event.Records[0].s3.object.key.replace(/+/g, ' '));
        const params = {Bucket: bucket,Key: key};
    
        s3.getObject(params, (err, data) => {
            if (err) {
                console.log(err);
                const message = `Error getting object ${key} from bucket ${bucket}. Make sure they exist and your bucket is in the same region as this function.`;
                console.log(message);
                callback(message);
            } else {
                var job_name = key.replace("/", "-");
                var stepparams = {
                  "stateMachineArn": process.env.STEP_FUNCTIONS_ARN,
                   "input": "{"s3URL": "https://s3.amazonaws.com/" + bucket + "/" + key + "","JOB_NAME": ""+ job_name + ""}"
                };
                stepfunctions.startExecution(stepparams, function(err, data) {
                  if (err) console.log(err, err.stack); // an error occurred
                  else     console.log(data);           // successful response
                });
                callback(null, data.ContentType);
            }
        });
    };
    

Step 2: Create source Amazon S3 bucket

  1. Navigate to the Amazon S3 console and edit the source bucket configuration.
  2. Expand the Events section and provide a name for the new event.
  3. For Events, choose ObjectCreated (ALL).
  4. For Send to, choose Lambda Functions.
  5. For Lambda Function, select the Lambda function name you chose in Step 1.
  6. Choose Save

Step 3: Create Transcribe and Comprehend APIs using a Lambda function

Trigger Transcribe job based on the input S3 audio transcript received. Two parameters are received – s3URL and JOB name.

  1. Navigate to the Lambda console, and then choose Create a Lambda function.
  2. Choose Skip to skip the blueprint selection.
  3. For Runtime, choose Node JS 8.10.
  4. For Name, enter a function name.
  5. Enter a description that notes Create Transcribe JOB based on the input received.
  6. For Code entry type, choose Edit code inline.
  7. Paste the following into the code editor:
    var AWS = require('aws-sdk');
    var transcribeservice = new AWS.TranscribeService();
    exports.handler = (event, context, callback) => {
        var params = {
          LanguageCode: 'en-US',
          Media: { /* required */
            MediaFileUri: event.s3URL + ""
          },
          MediaFormat: 'mp3',
          TranscriptionJobName: event.JOB_NAME
    
        };
        transcribeservice.startTranscriptionJob(params, function(err, data) {
          if (err) console.log(err, err.stack); // an error occurred
          else     {
          console.log(data);           // successful response
          event.wait_time = 60;
          event.JOB_NAME = data.TranscriptionJob.TranscriptionJobName;
          callback(null, event);
          }
        });
    
    };

Step 4: Get Transcribe Job Status

Get transcribe JOB status. This function will enable Step functions to wait for transcribe job to complete.

  1. In the Lambda console, choose Create a Lambda function.
  2. Choose Skip to skip the blueprint selection.
  3. For Runtime, choose Node JS 8.10.
  4. For Name, enter a function name.
  5. Enter a description that notes Transcribe JOB details.
  6. For Code entry type, choose Edit code inline.
  7. Paste the following into the code editor:
    var AWS = require('aws-sdk');
    var transcribeservice = new AWS.TranscribeService();
    
    exports.handler = (event, context, callback) => {
        var params = {
          TranscriptionJobName: event.JOB_NAME /* required */
        };
        transcribeservice.getTranscriptionJob(params, function(err, data) {
          if (err) console.log(err, err.stack); // an error occurred
          else     console.log(data);           // successful response
          event.STATUS = data.TranscriptionJob.TranscriptionJobStatus;
          event.Transcript =data.TranscriptionJob.Transcript;
          callback(null,event);
        });
    };

Step 5: Get transcribe job details

This function will enable Step Functions to get transcribe JOB details once completed.

  1. In the Lambda console, choose Create a Lambda function.
  2. Choose Skip to skip the blueprint selection.
  3. For Runtime, choose Node JS 8.10.
  4. For Name, enter a function name.
  5. Enter a description that notes get information about the transcribe job.
  6. For Code entry type, choose Edit code inline.
  7. Paste the following into the code editor:
    var AWS = require('aws-sdk');
    var transcribeservice = new AWS.TranscribeService();
    
    exports.handler = (event, context, callback) => {
    
        var params = {
          TranscriptionJobName: event.JOB_NAME /* required */
        };
        transcribeservice.getTranscriptionJob(params, function(err, data) {
          if (err) console.log(err, err.stack); // an error occurred
          else     console.log(data);           // successful response
          event.STATUS = data.TranscriptionJob.TranscriptionJobStatus;
          event.TranscriptFileUri =data.TranscriptionJob.Transcript.TranscriptFileUri;
          callback(null,event);
        });
    };

Step 6: Call Amazon Comprehend to analyze transcription text

In this step, you’ll get transcribed audio text and perform contextual analysis. This function will enable Step Functions to call Amazon Comprehend to perform sentiment analysis.

  1. In the Lambda console, choose Create a Lambda function.
  2. Choose Skip to skip the blueprint selection.
  3. For Runtime, choose Node JS 8.10.
  4. For Name, enter a function name.
  5. Enter a description that notes get information about the transcribe job.
  6. For Code entry type, choose Edit code inline.
  7. Paste the following into the code editor:
    var https = require('https');
    let AWS = require('aws-sdk');
    var comprehend = new AWS.Comprehend({apiVersion: '2017-11-27'});
    exports.handler = function(event, context, callback) {
        var request_url = event.request_url;
        https.get(request_url, (res) => {
          var chunks = [];
    	  res.on("data", function (chunk) {
            chunks.push(chunk);
          });
          res.on("end", function () {
            var body = Buffer.concat(chunks);
            var results = JSON.parse(body);
            console.log( body.toString());
            var transcript = results.results.transcripts[0].transcript;
            console.log(transcript)
            var params = {
              LanguageCode: "en",
              Text: transcript + ""
            };
            comprehend.detectSentiment(params, function(err, data) {
              if (err) console.log(err, err.stack); // an error occurred
              else     console.log(data);           // successful response
              callback(null, data);
            });
            callback(null, transcript);      });
    
    	}).on('error', (e) => {
    	  console.error(e);
    	});
    };

Step 7: Invoke Step Functions

In this step you will leverage AWS Steps Functions orchestrate the Lambda functions created earlier and notify the end customer about the contextual analysis.

  1. In the Step Functions console, choose Create a state machine.
  2. Choose Author from scratch
  3. For Name,  Enter your State Machine Name For example : TranscribeJob.
  4. Paste the following code in State machine definition
  5. Update the ARN values of lambda functions created in earlier steps in the State machine definition code
  6. Click Next
  7. Choose existing role which has permissions to invoke lambda functions, send SNS (Create the custom role if the role doesn’t exists)
  8. Click Create state machine

 

{
	"Comment": "A state machine that submits a Job to AWS Batch and monitors the Job until it completes.",
	"StartAt": "Transcribe Audio Job",
	"States": {
		"Transcribe Audio Job": {
			"Type": "Task",
			"Resource": "<<Start Transcribe job for Audio to Text ARN>>",
			"ResultPath": "$",
			"Next": "Wait X Seconds",
			"Retry": [{
				"ErrorEquals": ["States.ALL"],
				"IntervalSeconds": 1,
				"MaxAttempts": 3,
				"BackoffRate": 2
			}]
		},
		"Wait X Seconds": {
			"Type": "Wait",
			"SecondsPath": "$.wait_time",
			"Next": "Get Job Status"
		},
		"Get Job Status": {
			"Type": "Task",
			"Resource": "<<Get Transcribe job status ARN>>",
			"Next": "Job Complete?",
			"InputPath": "$",
			"ResultPath": "$",
			"Retry": [{
				"ErrorEquals": ["States.ALL"],
				"IntervalSeconds": 1,
				"MaxAttempts": 3,
				"BackoffRate": 2
			}]
		},
		"Job Complete?": {
			"Type": "Choice",
			"Choices": [{
				"Variable": "$.STATUS",
				"StringEquals": "IN_PROGRESS",
				"Next": "Wait X Seconds"
			}, {
				"Variable": "$.STATUS",
				"StringEquals": "COMPLETED",
				"Next": "Get Final Job Status"
			}, {
				"Variable": "$.STATUS",
				"StringEquals": "FAILED",
				"Next": "Job Failed"
			}],
			"Default": "Wait X Seconds"
		},
		"Job Failed": {
			"Type": "Fail",
			"Cause": "AWS Batch Job Failed",
			"Error": "DescribeJob returned FAILED"
		},
		"Get Final Job Status": {
			"Type": "Task",
			"Resource": "<<Get Transcribe job details ARN>>",
			"InputPath": "$",
			"Next": "Send contextual analysis"
			"Retry": [{
				"ErrorEquals": ["States.ALL"],
				"IntervalSeconds": 1,
				"MaxAttempts": 3,
				"BackoffRate": 2
			}]
		},
		
		"Send contextual analysis": {
			"Type": "Task",
			"Resource": "<<Send Contextual Analysis ARN>>",
			"InputPath": "$",
			"End": true,
			"Retry": [{
				"ErrorEquals": ["States.ALL"],
				"IntervalSeconds": 1,
				"MaxAttempts": 3,
				"BackoffRate": 2
			}]
		}
	}
}

Step 8: Create an AWS Glue database for visualization

Navigate to the AWS Glue console and create a database to store sentiment analysis entities.

Add the AWS Glue table to the database you just created.

 

A created table in database looks like this:

Step 9: Visualization using Amazon QuickSight

To visualize Amazon Comprehend output using Amazon QuickSight, do the following:

  1. Connect Amazon QuickSight to Amazon Athena.
    1. https://docs.aws.amazon.com/quicksight/latest/user/create-a-data-set-athena.html
    2. https://docs.aws.amazon.com/quicksight/latest/user/managing-permissions.html
    3. https://aws.amazon.com/blogs/big-data/derive-insights-from-iot-in-minutes-using-aws-iot-amazon-kinesis-firehose-amazon-athena-and-amazon-quicksight/
  2. Grant Amazon QuickSight access to Athena and the associated S3 buckets in the account. For information on how to do this, see Managing Amazon QuickSight Permissions to AWS Resources in the Amazon QuickSight User Guide.
  3. Create a new data set for visualising sentiment analysis in Amazon QuickSight based on the Athena table that was created during deployment.

After setting up permissions, create a new analysis in Amazon QuickSight by choosing New analysis.

Add a new data set.

Choose Athena as the source and give the data source a name, such as comprehend_demo.

Click Select to choose the database and table.

Click Visualize.

Create custom visualizations.

Conclusion

Enterprises can reap significant benefits by realizing the hidden value in the massive amounts of caller-agent audio recordings from their contact centers. By deriving meaningful insights, enterprises can enhance both efficiency and performance of call centers and improve their overall service quality to end customers. So far, we’ve used Amazon Transcribe to transform audio data into text transcripts and then used Amazon Comprehend to run text analysis. Along the way, we’ve also used AWS Lambda and AWS Step Functions to string together the solution. And finally, AWS Glue, Amazon Athena, and AWS QuickSight to visualize the analysis. The AWS CloudFormation templates used to build and deploy this process are available on part 2 of the next post.

In Part 2 of this blog post series we’ll show you how to automate, deploy, and visualize analytics using Amazon Transcribe, Amazon Comprehend, AWS CloudFormation, and Amazon QuickSight.

About the Authors

Deenadayaalan Thirugnanasambandam is a Senior Cloud Architect in the Professional Services team in Australia.

 

 

 

 

Piyush Patel is a big data consultant with AWS.

 

 

 

 

Paul Zhao is a Sr. Product Manager at AWS Machine Learning. He manages the Amazon Transcribe service. Outside of work, Paul is a motorcycle enthusiast and avid woodworker.

 

 

 

 

Revanth Anireddy is a professional services consultant with AWS.

 

 

 

Loc Trinh is a Solutions Architect for AWS Database and Analytics services. In his spare time, he captures data from his eating and fitness habits and uses analytical modeling to determine why he is still out of shape.

 

Scalable multi-node training with TensorFlow

We’ve heard from customers that scaling TensorFlow training jobs to multiple nodes and GPUs successfully is hard. TensorFlow has distributed training built-in, but it can be difficult to use. Recently, we made optimizations to TensorFlow and Horovod to help AWS customers scale TensorFlow training jobs to multiple nodes and GPUs. With these improvements, any AWS customer can use an AWS Deep Learning AMI to train ResNet-50 on ImageNet in just under 15 minutes.

To achieve this, 32 Amazon EC2 instances, each with 8 GPUs, a total 256 GPUs, were harnessed with TensorFlow. All of the required software and tools for this solution ship with the latest Deep Learning AMIs (DLAMIs), so you can try it out yourself. You can train faster, implement your models faster, and get results faster than ever before. This blog post describes our results and shows you how to try out this easier and faster way to run distributed training with TensorFlow.

Figure A. ResNet-50 ImageNet model training with the latest optimized TensorFlow with Horovod on a Deep Learning AMI takes 15 minutes on 256 GPUs.

Training a large model takes time, and the larger and more complex the model is, the longer the training is going to take. If your business requirement is to generate updated models on a regular basis, any training that takes too long means missed opportunities. A typical response is to throw more processing power at the problem, but for deep learning, the communications overhead during training has made this approach infeasible or profoundly expensive. This communications overhead results in a loss of efficiency, significantly reducing your throughput and increasing your time to train. It can also be complex to set up the required infrastructure and reach required levels of accuracy. TensorFlow supports distributed training natively, but in our experiments, we obtained better results (in both speed and accuracy) when we incorporated Horovod.

Horovod is a popular choice for distributed training. Take, for example, the recent use of Horovod with 27,000 GPUs to analyze climate change. Orchestrating this number of GPUs would be impossible without proper tooling. With Horovod, using software optimizations and Amazon EC2 p3 instances, we were able to limit the efficiency loss to 15 percent, resulting in a time-to-train under 15 minutes.

Figure B. Time to train vs number of GPUs vs images per second, communication overhead, and efficiency. Startup time is a consistent 1.5 minutes regardless of cluster size.

All of the tools you need to try this out are shipped on the latest DLAMI. This includes example scripts for training ResNet-50 with ImageNet. If you’re ready to roll up your sleeves now and try it out, continue reading. The rest of this blog post shows you how you can use EC2 p3 instances, TensorFlow, and Horovod to train ResNet-50 on ImageNet in under 15 minutes.

How to train ResNet/ImageNet in under 15 minutes with TensorFlow

I first tried out Horovod with TensorFlow on the Deep Learning AMI (DLAMI). I was asked to write a tutorial on using Horovod to train ImageNet on an eight-GPU EC2 instance. I wondered what on earth was Uber thinking when they named their distributed training framework “Horovod”, and I wondered how much time was this going to take? Training ImageNet takes forever, and Horovod sounds like some villain from the Harry Potter universe. It’s not. It was created by Alexander Sergeev at Uber. He named it after Russian & eastern European folk dance where a large group of dancers perform synchronized moves in a circle. It turned out to be a fun way to learn how to dance with Horovod using up to 8 GPUs in one DLAMI.

That was a couple of months ago, and now I’m going to show you what the the TensorFlow team here at Amazon AI has been up to. They’ve been fine-tuning Horovod with TensorFlow, and the implementation on the DLAMI is much faster. More importantly, you can run upwards of 256 GPUs in one training run to train ResNet-50 on ImageNet in under 15 minutes!

The first time I ran Horovod on a DLAMI was on a p3.16xlarge EC2 instance. This beast of an instance has eight Tesla V100 GPUs. Horovod uses all of the instance’s GPUs  to turn a training time that could take more than a day to a training time that could be finished in a few hours. I used the latest DLAMI so I wouldn’t have to install and configure CUDA, TensorFlow, or Horovod. I could activate the environment with one command, and then execute the training script with another one liner.

Setup was easy. Training was relatively fast – only slightly sub-linear scaling from one to eight GPUs. It finished in eight hours and the accuracy was acceptable: 75.4% for top-1 and 92.6% for top-5. Based on this result, I wrote the Tensorflow-Horovod tutorial for the DLAMI .

Next, I asked myself how fast I can train ResNet-50 on several p3 instances. I knew that scaling efficiency will never be 100% and that, in total, people will end up paying more. However, if your team was waiting for a model to train for a day, they would think training in 15 minutes was worth the savings on developers’ productivity. This is especially true because the efficiency loss of the faster training, as our experiments demonstrate, is minimal.

We recently benchmarked running 256 GPUs with great accuracy and an even faster completion time. Don’t you want to try that out yourself? Does your dance card even have 256 slots? Keep reading and I’ll walk you through how we can make this happen.

With the latest updates to the DLAMI and its TensorFlow-Horovod environment you could train ResNet-50 on all of ImageNet for about a 20% cost reduction compared to its release. In this blog post we’ll demonstrate how fast things can go and scale, and save your wallet for future dances with Horovod. Are you ready?

The original TensorFlow-Horovod tutorial shows you a single node implementation. You spin up one instance and use all of its GPUs. This time I’ll show you how you can spin up several nodes, link them up with a Horovod configuration, and then run the training. We’ll run some benchmarks, so you can estimate your time for completion and see what your efficiency loss is for each new node. With this info you can estimate your costs, and then apply this pattern to other models that you want to train.

Part 1: Spin up a bunch of DLAMIs

Now is a good time to plan out your moves with a quick questionnaire:

  1. Do you need to get a copy of ImageNet?
    1. If yes:
      1. Spin up one DLAMI for now. Downloading and prepping the dataset can take several hours, and you don’t want several instances sitting around, racking up your bill while you wait. You can add more DLAMIs later. If you want to be clever, you can run the download and prep steps faster with a big DLAMI CPU instance, then transfer it to your DLAMI GPU instances for training when it is ready. You could also divide up the dataset and prep the dataset across multiple machines.
    2. If no: go to 2.
  2. Do you have an ImageNet dataset already downloaded to your AWS environment – on Amazon S3, a shared volume, or on an instance?
    1. If yes:
      1. Later in this blog I’ll give you some bash functions that can help you distribute the dataset to each node.
    2. If no:
      1. Only spin up one instance for now. Get it ready there, then spin up the rest and distribute the dataset using one of the functions I just mentioned.
  3. Is your ImageNet dataset already preprocessed for training with TensorFlow?
    1. If yes:
      1. You’re going to be able to do everything in this blog post pretty quickly, even train ImageNet entirely in about an hour-and-a-half with just four instances.
    2. If no:
      1. Follow these detailed instructions in DLAMI’s docs on how to prepare the ImageNet dataset.

Now let’s spin up one or more DLAMIs. There are several ways you can do this, but I’m going to use the Amazon EC2 console. If you already know how to use AWS Cloud Formation templates or the AWS CLI, you can use those tools as well. The goal here is to launch some number of identical DLAMIs that each have more than one GPU. For this next step, I’m going to launch four p3.16xlarge DLAMIs all in the same Region and security zone (VPC). This step is important. You can’t just link up random instances you have that are launched in different Regions or security zones without impacting performance.

On the EC2 console you can search for an AMI by name. Search for “deep learning” and you will find Deep Learning AMI (Ubuntu). Choose the Select button.

After you select the Deep Learning AMI (Ubuntu) you can choose the instance type. Since we want to use the faster instance to achieve the fastest training, choose the p3dn.24xl instance type. If this is not available yet in your Region, choose the p3.16xlarge instead. The more GPUs you have on the same system, the faster your training will be. Now choose Next: Configure Instance Details. You have the option of launching multiple identical instances. Choose up to 32 instances to achieve 256 GPU training. For the purposes of this example, however, I’ll use 4 instances, with a total of 32 GPUs.

Next, choose your instance details.  You should choose an instance with at least 200 GB of fast storage. I’m choosing a Provisioned IOPS SSD with 10,000 IOPS to get the best performance.

You can just skip through the tags screen and continue to Configure Security Group.

On the security group settings page, you can create a new group, or use an existing one. Next, review your choices, make corrections as needed, and then choose Review and Launch.

Your screen should look much like the following screenshot. Important things to note are in the storage section: size, volume type, and IOPS.  Choose Launch.

After choosing Launch, you’ll select a key pair. Use existing keys or create new ones. Make note of where you put your keys and what you named them because you will need this information later.

If you get a green box, choose View Instances to review the list of the freshly launch DLAMIs.

It takes a couple of minutes to launch the instances, so now is a good time to name them.

Select one of your DLAMIs. Rename it in the console so you don’t forget which one is which. If you have 8 nodes, you could call them Snow White and the Seven Dwarves (Doc, Dopey, Bashful, Grumpy, Sneezy, Sleepy, and Happy). I only launched four, so I called them John Lennon, Paul McCartney, George Harrison, and Ringo Starr. I chose John Lennon to be the leader. Some people like to call the leader the master, but I prefer “leader.” There’s a leader and members.

Part 2: Prep the dataset

Step 1. Download a copy of ImageNet to each new cluster of DLAMIs. 

For the fastest performance you will want each instance in your cluster to have a local copy of the dataset. The raw dataset needs to be preprocessed by a TensorFlow utility before you train with it. Otherwise, training will take longer, and you won’t see the accuracy levels that are reported in most benchmarks. If you don’t have ImageNet handy, you’ll need to download it. Even if you do have a copy, you will need this data to be inside your cluster’s Region and security zone, and it will need that previously mentioned preprocessing. So, if you have a preprocessed copy ready on Amazon S3 or elsewhere, great, copy it to your leader, then you can skip ahead to Part 3.

Download ImageNet to one of your instances now, or if you already have it somewhere, copy it to an instance now, and then read ahead. This way you’ll know what is coming, and in Part 3 you can try out distributed training with a synthetic dataset while you wait.

Step 2. Prep the ImageNet files for training.

You need to run a preparation step prior to training. This preprocesses all of the images, so that they’re consistent and optimized for training speed. Without running this step you can’t hope to achieve comparable speed or accuracy. Your costs will certainly be higher. Note that after you run this step once, you don’t have to do it again for subsequent training runs. You might want to keep this volume around and connect it to future DLAMIs and tests or benchmarking runs.

Follow these detailed instructions in DLAMI’s docs on how to prepare the ImageNet dataset.

I must admit that I already had my preprocessed copy of ImageNet sitting around, so that’s why I said the setup and training was so easy. Now that you’ve done the preprocessing, you’ll want to keep yours too. You can stop any one of your instances without terminating it, then bring the instance back along with the data sometime in the future! You could also save the preprocessed dataset to S3 to archive it for later use.

Part 3: training with synthetic data

While you wait for ImageNet to download, you can try the setup with synthetic data. This will assure that your members can talk to each other, TensorFlow with Horovod is working in this multi-node mode, and that eventually you can switch to training with the ImageNet dataset.

Before you move on to the next step, review the overall settings, making sure each node is running, is the same instance type and is in the same Availability Zone.

In the console, choose the leader, choose the Actions button, and then choose Connect. The next page provides instructions for connecting. If you created a new key, you will need to adjust its security settings with chmod 400 key.pem. These instructions are in the Connect prompt. However, one important variation in how you connect with ssh is that you want your leader to be able to access your members. You do this by adding your key and customizing your ssh login to be slightly different than what is suggested by the Connect prompt. Run the following commands from your local terminal and the directory where you downloaded your key. Be sure to swap out “key.pem” with the filename of the key and “PUBLIC_IP_ADDRESS_OF_THE_LEADER” before running it.

ssh-add -K key.pem
ssh -A ubuntu@PUBLIC_IP_ADDRESS_OF_THE_LEADER

Once connected, activate the tensorflow_p36 environment.

In this example, I’m launching John Lennon now. After I have logged in, I’ll start the TensorFlow environment. You will likely see TensorFlow being optimized for the instance type, so this first activation may take a moment.

source activate tensorflow_p36

After activating the environment we must let Horovod know about the rest of the band. This is achieved by adding each member’s info to a hosts file. Change directories to where the training scripts reside.

cd ~/examples/horovod/tensorflow

Use vim to edit a file in the leader’s home directory.

vim hosts

Select one of the members in the EC2 console, and the description page opens. Find the Private IPs field and copy the IP address and paste it in a text file. Copy each member’s private IP address on a new line. Then, next to each IP address add a space and then the text slots=8. This represents how many GPUs each instance has. The p3.16xlarge instances have 8 GPUs, so if you chose a different instance type, you would provide the actual number of GPUs for each instance. For the leader you can use localhost. It should look similar to the following:

172.100.1.200 slots=8
172.200.8.99 slots=8
172.48.3.124 slots=8
localhost slots=8

Save the file and exit back to the leader’s terminal.

Now your leader knows how to reach each member. This is all going to happen on the private network interfaces. Next, use a short bash function to help send commands to each member. Run this command in your leader’s terminal session:

function runclust(){ while read -u 10 host; do host=${host%% slots*}; ssh -o "StrictHostKeyChecking no" $host ""$2""; done 10<$1; };

First tell the other members to not do “StrickHostKeyChecking” as this may cause training to hang.

runclust hosts "echo "StrictHostKeyChecking no" >> ~/.ssh/config"

Now it is time to try out the training with synthetic data. The script deep-learning-models/models/resnet/tensorflow/dlami_scripts/train_synthetic.sh will default to 8 GPUs, but you can provide it the number of GPUs you want to run. Run the script, passing 4 as a parameter for the 4 GPUs we’re using for this run.

$ ./train_synthetic.sh 4

After some warning messages you will see the following output that verifies Horovod is using 4 GPUs.

PY3.6.5 |Anaconda custom (64-bit)| (default, Apr 29 2018, 16:14:56) 
[GCC 7.2.0]TF1.11.0
Horovod size: 4

Then after some other warnings you see the start of a table and some data points. You break out of the training if you don’t want to watch for 1,000 batches. Here I stop it at 400 since I can see that the training is averaging about 3,000 images per second.

   Step Epoch  Speed  Loss   FinLoss LR
     0   0.0   105.6  6.794  7.708 6.40000
     1   0.0   311.7  0.000  4.315 6.38721
   100   0.1  3010.2  0.000 34.446 5.18400
   200   0.2  3013.6  0.000 13.077 4.09600
   300   0.2  3012.8  0.000  6.196 3.13600
   400   0.3  3012.5  0.000  3.551 2.30401

Let’s try 8 GPUs.

./train_synthetic.sh 8

I stopped at 200 this time once I saw that the speed was a little less than double: 5,874 vs 3,012.

   Step Epoch  Speed  Loss   FinLoss LR
    0    0.0   200.5  6.804   7.718 6.40000
    1    0.0   564.2  0.000   6.878 6.38721
  100    0.2  5871.7  0.000  60.158 5.18400
  200    0.3  5874.3  0.000  22.838 4.09600

Now you’re ready to test multi-node training. Try out the full 32 GPUs.

./train_synthetic.sh 32

Your output will be similar. You will see the Horovod size at 32, and you will see roughly 4 times the speed. With this experimentation completed, you will have tested your leader and its ability to communicate with the members. If you run into any issues, check the troubleshooting section in the Horovod tutorials docs.

Part 4: Train ResNet-50 on ImageNet

After you’re satisfied watching the synthetic data training step and you’ve prepared the ImageNet dataset, you’re ready to copy the prepared dataset to all of the members.

If you still only have the dataset on your leader, use this copyclust function to copy data over to other members. Run this command in your leader’s terminal session:

function copyclust(){ while read -u 10 host; do host=${host%% slots*}; rsync -azv "$2" $host:"$3"; done 10<$1; };

Now you can use copyclust to copy the dataset folder. The first param is the hosts file, the second is the dataset folder on your leader, and the third is the target directory on each member:

copyclust hosts ~/imagenet_data ~/imagenet_data

Or, if you have the files sitting in an Amazon S3 bucket, use the runclust function to download the files to each member directly.

runclust hosts "tmux new-session -d "export AWS_ACCESS_KEY_ID=YOUR_ACCESS_KEY && export AWS_SECRET_ACCESS_KEY=YOUR_SECRET && aws s3 sync s3://your-imagenet-bucket ~/imagenet_data/ && aws s3 sync s3://your-imagenet-validation-bucket ~/imagenet_data/""

There’s something to be said here about using tmux or screen or some tools to let you disconnect and resume sessions. Using tools that let you manage multiple nodes at once is a great timesaver. But, I’m going to gloss over this part because it goes beyond the scope of this blog. You have many options: wait around for each step and manage each instance separately or use some power tools.

After the copying is completed, you’re ready to start training. Run the script, passing 32 as a parameter for the 32 GPUs we’re using for this run. Use tmux or a similar tool if you’re concerned about disconnecting and terminating your session, thereby aborting the training run.

./train.sh 32

The following output is what you see when running the training on ImageNet with 32 GPUs. 32 GPUs will take 90-110 minutes.

   Step Epoch  Speed  Loss   FinLoss LR
     0   0.0   440.6  6.935  7.850 0.00100
     1   0.0  2215.4  6.923  7.837 0.00305
    50   0.3 19347.5  6.515  7.425 0.10353
   100   0.6 18631.7  6.275  7.173 0.20606
   150   1.0 19742.0  6.043  6.922 0.30860
   200   1.3 19790.7  5.730  6.586 0.41113
   250   1.6 20309.4  5.631  6.458 0.51366
   300   1.9 19943.9  5.233  6.027 0.61619
   350   2.2 19329.8  5.101  5.864 0.71872
   400   2.6 19605.4  4.787  5.519 0.82126
   450   2.9 20025.5  5.020  5.725 0.92379
   500   3.2 19526.8  4.702  5.383 1.02632
   550   3.5 18102.1  4.632  5.294 1.12885
   600   3.8 19450.3  4.377  5.023 1.23138
   650   4.2 19845.1  3.738  4.372 1.33392
   700   4.5 18838.6  3.862  4.488 1.43645
   750   4.8 19572.7  3.435  4.059 1.53898
   800   5.1 20697.7  3.388  4.015 1.64151
   850   5.4 19651.1  3.141  3.774 1.74405
   900   5.8 20012.3  3.231  3.878 1.84658
   950   6.1 19261.0  3.039  3.699 1.94911
  1000   6.4 18248.2  2.969  3.645 2.05164
  1050   6.7 18730.4  2.731  3.429 2.15417
  ...
   13750  87.9 19398.8  0.676  1.082 0.00217
 13800  88.2 19827.5  0.662  1.067 0.00156
 13850  88.6 19986.7  0.591  0.997 0.00104
 13900  88.9 19595.1  0.598  1.003 0.00064
 13950  89.2 19721.8  0.633  1.039 0.00033
 14000  89.5 19567.8  0.567  0.973 0.00012
 14050  89.8 20902.4  0.803  1.209 0.00002
Finished in 6004.354426383972

This run completed! It follows up with an evaluation run. It will run on the leader as it will run quickly enough without having to distribute the job to the other members. The following is the output of the evaluation run.

Horovod size: 32
Evaluating
Validation dataset size: 50000
 step  epoch  top1    top5     loss   checkpoint_time(UTC)
14075   90.0  75.716   92.91    0.97  2018-11-14 08:38:28

If you’re curious what this output looks like with 256 GPUs, you can check it out in the following output block.

  Step Epoch  Speed    Loss   FinLoss LR
  1550  79.3 142660.9  1.002  1.470 0.04059
  1600  81.8 143302.2  0.981  1.439 0.02190
  1650  84.4 144808.2  0.740  1.192 0.00987
  1700  87.0 144790.6  0.909  1.359 0.00313
  1750  89.5 143499.8  0.844  1.293 0.00026
Finished in 860.5105031204224

Finished evaluation
1759   90.0  75.086   92.47    0.99  2018-11-20 07:18:18

You can see that the speed in images/sec is over 140k. The following chart shows the latest benchmarks with CUDA10 using 256 GPUs which reaches speeds of 171k! This improves efficiency to 90%. Look for this to ship on the DLAMI after TensorFlow releases an official binary for CUDA 10.  The following chart shows the Performance of ResNet-50 training using CUDA 10. Overhead is reduced compared to CUDA 9.

Conclusion

Now that you’ve tried four nodes, do you want to try more? How about 16 or 32 nodes? Or how about 2? You can scale up or down and see how that impacts performance. Compare epoch training times and estimate your overall cost for completion.

Note: if you use the “more like this” feature in the Amazon EC2 console, be prepared to adjust all of the settings, most notably the storage. “More like this” doesn’t include storage, so make sure you update that to have at least 200 GB.

You might want to also try a different dataset and see how fast you can train it using the latest instance types and optimized TensorFlow environments on the DLAMI.

Stay tuned for our next blog where we apply the latest improvements in scalable training on a cluster of DLAMIs.


Appendix

Troubleshooting

The following command might help you get past errors that come up when you experiment with Horovod.

  • If the training crashes, mpirun may fail to clean up all the Python processes on each machine. In that case, before you start the next job kill the Python processes on all machines as follows:
    • runclust hosts “pkill -9 python”
  • If the process finishes abruptly without error, try deleting your log folder.
  • If other unexplained issues pop up, check your disk space. If you’re out of space, try removing the logs folder since that is full of checkpoints and data. You can also increase the size of the volumes for each member.
  • As a last resort you can also try rebooting.
# kill python
runclust hosts "pkill -9 python"
# delete log folder
runclust hosts "rm -rf ~/imagenet_resnet/"
# check disk space
runclust hosts "df /"
# reboot
runclust hosts "sudo reboot"

About the Author

Aaron Markham is a programmer writer for MXNet and AWS Deep Learning AMI. He has a degree in winemaking and a passion for new technology which he shares by writing and teaching. Aside from talking about deep learning tech, he teaches computer skills to the homeless in Santa Cruz and web programming to prisoners at San Quentin. When not working or teaching, you can find him on the slopes snowboarding or hiking.

 

 

 

Amazon SageMaker Automatic Model Tuning now supports early stopping of training jobs

In June 2018, we launched Amazon SageMaker Automatic Model Tuning, a feature that automatically finds well-performing hyperparameters to train a machine learning model with. Unlike model parameters learned during training, hyperparameters are set before the learning process begins. A typical example of the use of hyperparameters is the learning rate of stochastic gradient procedures. Using default hyperparameters doesn’t always yield the best model performance, and finding well-performing hyperparameters can be a non-trivial and time-consuming task. Using Automatic Model Tuning, Amazon SageMaker will automatically find well-performing hyperparameters and train your model to maximize your objective metric.

The number of possible hyperparameter configurations is exponential in the number of hyperparameters that are being explored. A naive exploration of this search space would require a large number of training jobs and would result in high cost. To overcome this, Amazon SageMaker uses Bayesian optimization, a strategy that efficiently models the performance of different hyperparameters based on a small number of training jobs. This algorithm will, however, at times explore hyperparameter configurations which, by the end of the training, turn out to be significantly worse than previous configurations.

Today, we are adding the early stopping feature to Automatic Model Tuning. By enabling early stopping when you launch a tuning job, Amazon SageMaker tracks objective metrics per training iteration (‘epoch’) for each candidate model. Amazon SageMaker then assesses how likely each candidate is to outperform the previous best model evaluated thus far in your tuning job. With early stopping those models that are unlikely to bring value are terminated before completing all iterations, saving time and reducing cost by up to 28% (depending on your algorithm and dataset). For example, in this blog post we show how to use early stopping with an image classification algorithm using Amazon SageMaker, reducing time and cost by 23%.

You can use early stopping with supported built-in Amazon SageMaker algorithms and with your own algorithms, provided that they emit objective metrics per epoch.

Tuning an image classification model that uses early stopping

To demonstrate how you can leverage early stopping, we’ll build an image classifier using the built-in image classification algorithm and tune the model against the Caltech-256 dataset. We will run two hyperparameter tuning jobs: one without automatic early stopping and one with early stopping enabled, while all the other configurations stay the same. We’ll then compare the results of the two hyperparameter tuning jobs toward the end. You can find the full sample notebook here.

Set up and launch the hyperparameter tuning job without early stopping

We’ll skip the steps for creating a notebook instance, preparing the dataset, and pushing it to Amazon S3. The sample notebook covers these processes, so we won’t go through them here. Instead we’ll start by launching a hyperparameter tuning job.

To create a tuning job, we first need to create a training estimator for the built-in image classification algorithm, and specify a value for every hyperparameter of this algorithm, except for those we plan to tune. To learn more about hyperparameters of the built-in image classification algorithm, you can explore our documentation.

s3_train_data = 's3://{}/{}/'.format(bucket, s3_train_key)
s3_validation_data = 's3://{}/{}/'.format(bucket, s3_validation_key)

s3_input_train = sagemaker.s3_input(s3_data=s3_train_data, content_type='application/x-recordio')
s3_input_validation = sagemaker.s3_input(s3_data=s3_validation_data, content_type='application/x-recordio')

s3_output_key = "image-classification-full-training/output"
s3_output = 's3://{}/{}/'.format(bucket, s3_output_key)

sess = sagemaker.Session()
imageclassification = sagemaker.estimator.Estimator(training_image, 
                                                    role, 
                                                    train_instance_count=1,
                                                    train_instance_type='ml.p3.2xlarge',
                                                    output_path=s3_output, 
                                                    sagemaker_session=sess)

imageclassification.set_hyperparameters(num_layers=18, 
                                        image_shape='3,224,224',
                                        num_classes=257, 
                                        epochs=10, 
                                        top_k='2',
                                        num_training_samples=15420,  
                                        precision_dtype='float32',
                                        augmentation_type='crop')

Now we can create a hyperparameter tuning job with the estimator. We’ll specify the search ranges for the hyperparameters that we want to tune and the number of total training jobs that we want to run.

We selected three hyperparameters that should have the greatest impact on model quality, and thus our objective metric, according to the image classification algorithm tuning guide. You can find the full list of hyperparameters in our documentation. These are the three hyperparameters:

  • learning_rate: Controls how fast the training algorithm will try to optimize your model.
  • mini_batch_size: Controls how many data points are used for one gradient update.
  • optimizer: A choice among ‘sgd’, ‘adam’, ‘rmsprop’ and ‘nag’.

In this case we don’t need to specify the regular expressions for the objective metric because we are using one of the Amazon SageMaker built-in algorithms.

We first launch a hyperparameter tuning job without early stopping, which is turned off by default.

from time import gmtime, strftime 
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

tuning_job_name = "imageclassif-job-{}".format(strftime("%d-%H-%M-%S", gmtime()))

hyperparameter_ranges = {'learning_rate': ContinuousParameter(0.00001, 1.0),
                         'mini_batch_size': IntegerParameter(16, 64),
                         'optimizer': CategoricalParameter(['sgd', 'adam', 'rmsprop', 'nag'])}

objective_metric_name = 'validation:accuracy'

tuner = HyperparameterTuner(imageclassification, 
                            objective_metric_name, 
                            hyperparameter_ranges,
                            objective_type='Maximize', 
                            max_jobs=20, 
                            max_parallel_jobs=2)

tuner.fit({'train': s3_input_train, 'validation': s3_input_validation}, 
          job_name=tuning_job_name, include_cls_metadata=False)
tuner.wait()

After the tuning job is finished, we can bring in a table of metrics using HyperparameterTuningJobAnalytics from the Amazon SageMaker Python SDK.

tuner_metrics = sagemaker.HyperparameterTuningJobAnalytics(tuning_job_name)
tuner_metrics.dataframe().sort_values(['FinalObjectiveValue'], ascending=False).head(5)

The following table shows the top 5 performing training jobs that were run. You can look at all of the results by running the notebook. From this table, we can see that the best model from this hyperparameter tuning job has validation accuracy of 0.356. Unlike in the notebook, here we provide a screenshot from the Amazon SageMaker console to check the total training time and the job status. We will then compare them later to the tuning results when training job early stopping is enabled.

In the following screenshots from the Amazon SageMaker console, you can see that the total training duration is 2 hours and 48 minutes. Total training duration is defined as the aggregated duration of all training jobs and thus reflects the total cost of the hyperparameter tuning job. You may also notice that the hyperparameter tuning job takes 1 hour and 53 minutes to complete, thanks to parallelization of the training jobs. Also, all 20 training jobs are completed normally, as you can see in the Training job status counter.

Set up and launch the hyperparameter tuning job with early stopping

Next, we’ll launch another hyperparameter tuning job with the same configuration, but this time we’ll enable early stopping of training jobs. Specifically, in the tuning job configuration, we set one extra field ‘early_stopping_type’ to ‘Auto’. It’s worth noting that training job early stopping requires training jobs to emit epoch-wise objective metrics, preferably validation metrics. In this example, since the built-in image classification algorithm already emits the metric ‘validation:accuracy’ for each epoch, we can directly use ‘validation:accuracy’ as the objective metric without any change.

tuning_job_name_es = "imageclassif-job-{}-es".format(strftime("%d-%H-%M-%S", gmtime()))

tuner_es = HyperparameterTuner(imageclassification, 
                               objective_metric_name, 
                               hyperparameter_ranges,
                               objective_type='Maximize', 
                               max_jobs=20, 
                               max_parallel_jobs=2, 
                               early_stopping_type='Auto')

tuner_es.fit({'train': s3_input_train, 'validation': s3_input_validation}, 
             job_name=tuning_job_name_es, include_cls_metadata=False)
tuner_es.wait()

Alternatively, you can launch a tuning job with early stopping from the console by setting Training job early stopping type to Auto (default is Off).

After the hyperparameter tuning job is finished, we can again check the top 5 performing training jobs.

tuner_metrics_es = sagemaker.HyperparameterTuningJobAnalytics(tuning_job_name_es)
tuner_metrics_es.dataframe().sort_values(['FinalObjectiveValue'], ascending=False).head(5)

This time, because we have training job early stopping enabled, the best hyperparameter training job has a validation accuracy of 0.353, which is very close to the setting without early stopping. The total training time and training job status can be again checked from the console as shown in the next screenshot.

 

This time, with training job early stopping, the total training duration is 2 hours and 10 minutes, 38 minutes (23%) shorter than without early stopping and thus 23% cheaper as well. It takes 1 hour and 38 minutes to complete the hyperparameter tuning job, which is 15 minutes faster than the previous hyperparameter tuning job. Meanwhile, 6 training jobs are stopped by early stopping, as you can see in the following list.

df = tuner_metrics_es.dataframe
df[df.TrainingJobStatus == 'Stopped']

It is clear that all the stopped training jobs have very low validation accuracies and they run much shorter than normally completed jobs.

Conclusion

To recap, we demonstrated in this blog post how to use training job early stopping to speed up hyperparameter tuning jobs in Amazon SageMaker. Keep in mind that as the training time for each training job gets longer, the benefit of training job early stopping becomes more significant. However, smaller training jobs won’t benefit as much due to infrastructure overhead. For example, our experiments show that the effect of training job early stopping typically becomes noticeable when the training jobs last longer than 4 minutes.

Training job early stopping of Automatic Model Tuning is now available in all the AWS Regions where Amazon SageMaker is available today. For more information on Amazon SageMaker Automatic Model Tuning, see the Amazon SageMaker documentation.


About the Authors

Huibin Shen is an applied scientist in Amazon AI. He works in the part of the team that launched the Automatic Model Tuning feature in Amazon SageMaker.

 

 

 

 

Fan Li is a Product Manager of Amazon SageMaker. He used to be a big fan of ballroom dance but now loves whatever his 8-year-old son likes.

 

 

 

 

Miroslav Miladinovic is a Software Development Manager at Amazon SageMaker.

 

 

 

 

 

 

 

 

Build a serverless Twitter reader using AWS Fargate

In a previous post, Ben Snively and Viral Desai showed us how to build a social media dashboard using serverless technology. The social media dashboard reads tweets with the #AWS hashtag, uses machine learning based services to do translation, and natural language processing (NLP) to determine topics, entities, and sentiment analysis. Finally, it aggregates this information using Amazon Athena and builds dashboards to visualize the information captured from the tweets. In this architecture, the only server to manage is running the application that reads the Twitter feed. In this blog post we’ll walk you through the steps to move this application to a Docker container and execute it in Amazon ECS with AWS Fargate. This removes the need to manage any Amazon EC2 instances in the architecture.

AWS Fargate is a technology for Amazon Elastic Container Service (ECS) that allows you to run containers without having to manage servers or clusters. With AWS Fargate, you no longer have to provision, configure, and scale clusters of virtual machines to run containers. This removes the need to choose server types, decide when to scale your clusters, or optimize cluster packing. AWS Fargate removes the need for you to interact with or think about servers or clusters. Using AWS Fargate you can focus on designing and building your container applications, instead of managing the infrastructure that runs them.

AWS Fargate is a great approach if you want to eliminate operational responsibilities with Amazon EC2. AWS Fargate is fully integrated with the AWS Code services such as AWS CodeStar, AWS CodeBuild, AWS CodeDeploy, and AWS CodePipeline, making it very simple to configure an end-to-end continuous delivery pipeline to automate deployments to ECS.

Run tweet-reading app on Fargate

As you follow this blog post, you’ll set up an architecture that looks like this:

Our focus for this blog post is to move the Twitter stream producer app from running in an EC2 instance to running in containers managed by Fargate.

We’ll start by creating a Docker image that has our code for the Twitter feed reader application, plus all of its dependencies. After we have the Docker image, we’ll upload and register this image to ECR, which serves as a repository for Docker images. With the image registered in ECR, we are going to create a task definition, which describes the configuration we want to set for running our Docker container in the Fargate service. Finally, we are going to run the task and test our app.

Prerequisites

  1. Go to the previous blog post and follow the instructions found there. The high level steps are:
    1. Launching the AWS CloudFormation template. When you launch the template you need to provide the Twitter API configuration parameters.
    2. After the CloudFormation stack is created go to the AWS Management Console, search for the stack and choose the Resources Take note of the Physical ID of the IngestionFirehoseStream resource. It will be something like: SocialMediaAnalyticsBlogPo-IngestionFirehoseStream-<ID>
    3. Setting up S3 Notification – Call Amazon Translate/Comprehend from new Tweets.
    4. Start the Twitter stream producer. This is the application that is running in an EC2 instance.
    5. Create the Athena tables. This is done by running four different SQL statements.
    6. (optional – recommended) Building Amazon QuickSight dashboards.
      After you complete all the steps you should have following architecture deployed on your environment:
  2. Configure an environment where you can run AWS CLI and Docker commands. You can launch an EC2 instance and install Docker or you can use AWS Cloud9, which comes with Docker. We used an EC2 instance and installed Docker in it. If you decide to go with the EC2 option, you’ll need to create an IAM role and attach it to the instance.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "ssm:PutParameter",
                    "firehose:*",
                    "iam:CreateRole",
                    "ecr:*",
                    "iam:AttachRolePolicy",
                    "ssm:GetParameter"
                ],
                "Resource": "*"
            }
        ]
    }
    

    We are going to refer this environment as the “Dev Environment” in the following steps. We will use the Dev Environment to create our Docker image and register the image with the ECR service. This environment will need permission to use Amazon Kinesis Data Firehose, ECR, and IAM APIs.

    Note: You need to install the AWS CLI on the Docker environment. For AWS CLI installation, refer this page.

 Step 1: Create the Docker image

To create the Docker image, first we need to create a Dockerfile. A Dockerfile is a manifest that describes the base image to use for your Docker image and what you want installed and running on it. For more information about Dockerfiles, go to the Dockerfile Reference. In our case, we want to create a Docker image that we use to instantiate containers that run Node applications. In particular, we want to run our Twitter stream producer node app.

  1. Choose a directory in the Docker environment and perform the following steps in that directory. I used /home/ec2-user directory to perform the following steps.
  2. Download the application code to our Dev Environment. When you executed the instructions in the prerequisites section 1.a, a CloudFormation template was used to create a stack called SocialMediaAnalyticsBlogPost. This stack configures the EC2 instance with the Twitter stream producer app. If you open the CloudFormation file and go to the EC2 configuration section (line 229 in the template), you will find that the application code was copied from an Amazon S3 bucket to the EC2 instance. We want to copy the same code to our Dev Environment. Use the following command:
    mkdir SocialAnalyticsReader
    
    cd socialAnalyticsReader
    
    wget https://s3.amazonaws.com/serverless-analytics/SocialMediaAnalytics-blog/SocialAnalyticsReader.tar
    
    tar -xf SocialAnalyticsReader.tar

  3. Now we are going to do a small refactor on the SocialAnalyticsReader app. The Node application is currently designed to read the Twitter API credentials from a configuration file. We want to avoid this approach after the application runs on a container. A better way is to store the configuration settings on a service like AWS Systems Manager Parameter Store. Extracting the configuration settings increases the flexibility and reusability of our container image. For example, it allows us to change the configuration values for the application without the need of rebuilding the Docker image.Replace the contents of the following files:twitter_stream_producer_app.js
    'use strict';
    
    
    var AWS = require('aws-sdk');
    var config = require('./config');
    var producer = require('./twitter_stream_producer');
    
    // var kinesis = new AWS.Kinesis({region: config.kinesis.region});
    var kinesis_firehose = new AWS.Firehose({apiVersion: '2015-08-04', region: config.region});
    // console.log(kinesis_firehose.listDeliveryStreams());
    
    var params = {
      Name: '/twitter-reader/aws-config', /* required */
      WithDecryption: false
    };
    
    var config_from_parameter_store;
    var ssm = new AWS.SSM({region: config.region});
    var request = ssm.getParameter(params);
    var promise = request.promise();
    
    promise.then(
       function(data){
          console.log('promise then:',data.Parameter.Value);
         // global.twitter_config = data.Parameter.Value;
          producer(kinesis_firehose, data.Parameter.Value).run();
       },
       function(error){
            console.log(error);
       });
    

    twitter_stream_producer.js

    'use strict';
    
    var config = require('./config');
    //var twitter_config = require('./twitter_reader_config.js');
    var Twit = require('twit');
    var util = require('util');
    var logger = require('./util/logger');
    
    function twitterStreamProducer(firehose, twitter_config_str) {
      var twitter_config = JSON.parse(twitter_config_str);
      var log = logger().getLogger('producer');
      var waitBetweenPutRecordsCallsInMilliseconds = config.waitBetweenPutRecordsCallsInMilliseconds;
      var T = new Twit(twitter_config.twitter)
    
      function _sendToFirehose() {
    
        var stream = T.stream('statuses/filter', { track: twitter_config.topics , language: twitter_config.languages });
    
    
        var records = [];
        var record = {};
        var recordParams = {};
        stream.on('tweet', function (tweet) {
                    var tweetString = JSON.stringify(tweet)
                    recordParams = {
                      DeliveryStreamName: twitter_config.kinesis_delivery,
                      Record: {
                        Data: tweetString +'n'
                      }
                    };
                  firehose.putRecord(recordParams, function(err, data) {
                    if (err) {
                      console.log(err);
                    }
                  });
            }
        );
      }
    
    
      return {
        run: function() {
          log.info(util.format('Configured wait between consecutive PutRecords call in milliseconds: %d',
              waitBetweenPutRecordsCallsInMilliseconds));
            _sendToFirehose();
          }
      }
    }
    
    module.exports = twitterStreamProducer;
    

    If you compare the new files with the original version, you will notice that only a few lines of code were changed. In summary, our application will now use the AWS Node SDK to retrieve configuration settings from AWS Systems Manager Parameter store instead of retrieving them from a config file. For additional information on recommended approaches for handling configuration and secrets on containers, we recommend this blog post.

  4. Navigate back to /home/ec2-user directory and create a file called Dockerfile (case sensitive) with the following content:
    FROM amazonlinux:2017.09
    RUN curl -o- https://raw.githubusercontent.com/creationix/nvm/v0.32.0/install.sh | bash 
            && . ~/.nvm/nvm.sh 
            && nvm install 8.10.0
    ENV PATH /root/.nvm/versions/node/v8.10.0/bin:$PATH
    WORKDIR /home/ec2-user
    RUN mkdir twitterApp
    COPY ./SocialAnalyticsReader/ /home/ec2-user/twitterApp
    RUN chmod ugo+x /home/ec2-user/*
    USER root
    WORKDIR /home/ec2-user/twitterApp
    	 ENTRYPOINT ["node","twitter_stream_producer_app.js"]

  5. After completing these steps, your directory structure will look like this:

Step 2: Build the Docker image

Build the Docker image by running this command from the directory where you created the Dockerfile in the Docker environment in the previous step (I ran it from /home/ec2-user directory):

docker build –t tweetreader .

Output: It installs various packages and sets environment variables as part of building the image from the Dockerfile. The steps 5 to 10 from the Dockerfile should produce an output similar to the following:

Step 5/8 : COPY ./SocialAnalyticsReader/ /home/ec2-user/twitterApp/
 ---> Using cache
 ---> 04d0088db623
Step 6/8 : RUN chmod ugo+x /home/ec2-user/*
 ---> Running in 5e4d9cc10239
Removing intermediate container 5e4d9cc10239
 ---> 5d7d7328cb93
Step 7/8 : USER root
 ---> Running in 893f1653c200
Removing intermediate container 893f1653c200
 ---> dfc016c8e4a7
Step 8/8 : ENTRYPOINT ["node","twitterApp/twitter_stream_producer_app.js"]
 ---> Running in a839a2139689
Removing intermediate container a839a2139689
 ---> ba5ede432da0
Successfully built ba5ede432da0
Successfully tagged tweetreader:latest

Step 3: Push the Docker image to Amazon Elastic Container Registry (ECR)

Now we are going to upload the Docker image we just built to Amazon Elastic Container Registry (ECR), a fully-managed Docker container registry that makes it easy for developers to store, manage, and deploy Docker container images. Amazon ECR is integrated with Amazon Elastic Container Service (ECS), simplifying your development to production workflow.

Perform the following steps in the Dev Environment.

  1. Run the following aws configure command and set the default Region to be us-east-1.
    aws configure set default.region us-east-1

  2. Create an Amazon ECR repository using this command (note the repositoryUri in the output):
    aws ecr create-repository --repository-name tweetreader-repo

Output:

  1. Tag the tweetreader image with the repositoryUri value from the previous step using this command:
    docker tag tweetreader:latest aws_account_id.dkr.ecr.us-east-1.amazonaws.com/ tweetreader-repo

  2. Get the Docker login credentials using the following command:
    aws ecr get-login --no-include-email

  3. Run the Docker login command returned from the previous step. If the command is successful, you will get a message “Login Succeeded.”
  4. Push the Docker image to Amazon ECR with the repositoryUri from step 1 using this command:
    docker push aws_account_id.dkr.ecr.us-east-1.amazonaws.com/tweetreader-repo

Step 4: Store configuration information in AWS Systems Manager Parameter Store

Now we are going to store application configuration information in AWS Systems Manager Parameter Store, which provides hierarchical storage for configuration data management and secrets management. You can store data such as passwords, database strings, and license codes as parameter values. By storing our configuration parameters outside the container we improve the security posture by separating this data from the code and enabling us to control and audit access at granular levels. 

  1. Go to the AWS Management Console and navigate to the AWS Systems Manager console.
  2. In the navigation pane at the left, scroll to the bottom and choose Parameter Store and then choose create parameter at the top right.
    For name use : /twitter-reader/aws-configFor type: select StringFor Value:

    { "twitter": {
     "consumer_key": "VAL1", 
    "consumer_secret": "VAL2", 
    "access_token": "VAL3", 
    "access_token_secret": "VAL4" }, 
    "topics": ["AWS", "VPC", "EC2", "RDS", "S3", "ECSSSS"], 
    "languages": ["en", "es", "de", "fr", "ar", "pt"],
     "kinesis_delivery": "VAL5" }

    • Update the placeholders VAL1 through VAL4 with the values corresponding to your Twitter API credentials.
    • Update the placeholder VAL5 with the value captured in the Prerequisites section step b IngestionFirehoseStream physical ID.
    • The value will be something like SocialMediaAnalyticsBlogPo-IngestionFirehoseStream-<value>
    • Choose Create Parameter.

Step 5: Create Fargate task definition and cluster

Now we are going to configure a task for Amazon ECS using AWS Fargate as the launch type. The Fargate launch type allows you to run your containerized applications without the need to provision and manage the backend infrastructure. 

  1. Create a text file called trustpolicyforecs.json with the following content in the DevEnvironment:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "ecs-tasks.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }

  2. Create a role called AccessRoleForTweetReaderfromFG using the following command in the DevEnvironment:
    aws iam create-role --role-name AccessRoleForTweetReaderfromFG --assume-role-policy-document file://trustpolicyforecs.json

  3. Attach Kinesis Data Firehose and Systems Manager IAM policies to the role created in step 2 using the following commands in the DockerEnvironment:
    aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonKinesisFirehoseFullAccess --role-name AccessRoleForTweetReaderfromFG
    
    aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonSSMReadOnlyAccess --role-name AccessRoleForTweetReaderfromFG

    AccessRoleForTweetReaderfromFG is the IAM role that will be assumed by the task running on ECS. Our task is running the Node application and only needs IAM policies to write records to Kinesis Data Firehose and read configuration information from AWS Systems Manager Parameter Store.

  4. In the Amazon ECS console, choose Repositories and select the tweetreader-repo repository that was created in the previous step. Copy the Repository URI.
  5. Choose Task Definitions and then choose Create New Task Definition.
  6. Select launch type compatibility as FARGATE and click Next Step.
  7. In the create task definition screen, do the following:
    • In Task Definition Name, type tweetreader-task
    • In Task Role, choose AccessRoleForTweetReaderfromFG
    • In Task Memory, choose 2GB
    • In Task CPU, choose 1 vCPU
    • Choose Add Container under Container Definitions. On the Add Container page, do the following:
      • Enter Container name as tweetreader-cont
      • Enter Image URL copied from step 1
      • Enter Memory Limits as 128 and choose Add.

    Note: Select TaskExecutionRole as “ecsTaskExecutionRole” if it already exists. If not, select Create new role and it will create “ecsTaskExecutionRole” for you.

  8. Choose the Create button on the task definition screen to create the task. It will successfully create the task, execution role and Amazon CloudWatch Logs groups.
  9. In the Amazon ECS console, choose Clusters and create cluster. Select template as “Networking only, Powered by AWS Fargate” and chooose the next step.
  10. Enter cluster name as tweetreader-cluster and choose Create.

Step 6: Start the Fargate task and verify the application       

  1. In the Amazon ECS console, go to Task Definitions, select the tweetreader-task, choose Actions, and then choose Run Task.
  2. On the Run Task page, for Launch Type select Fargate, for Cluster select tweetreader-cluster, select Cluster VPC and Subnets values, and then choose Run Task.
  3. To test the application, choose the running task in the Fargate console. Go to the logs tab and verify there is nothing there. This means the node application is running and no errors occurred. After you have verified that the Fargate task does not have any error logs, navigate to the Amazon S3 console and go to the bucket that was created as part of the CloudFormation template in the original blog post. You will see a folder called raw. Check the contents of this folder. It should have the data sent from our Twitter feed reader app to serverless processing flow (Amazon Kinesis Data Firehose, Amazon Lex, Amazon Translate, Amazon Athena )

Conclusion

Congratulations! You have successfully ‘containerized’ an application that was previously running on an EC2 instance. Furthermore, you are running the container with Amazon ECS and AWS Fargate so you don’t need to provision or manage any EC2 instances. You can also tweak the task definition configuration in AWS Fargate by tuning the amount of memory, CPU, and concurrent executions your task might need.

For more information about working with ECS and Fargate, see the AWS Fargate documentation.


About the Authors

Raja Mani is a Solution Architect supporting AWS partners. He is interested in Serverless development, DevOps, Containers, Big Data and Machine Learning. He is helping AWS partners to architect the enterprise-grade Amazon Web Services Solutions for their customers.

 

 

 

 

Luis Pineda is a Partner Solutions Architect at Amazon Web Services based in Chicago. He works with our partners and customers to solve business problems using AWS. Outside of work, Luis enjoys being outdoors, running, cycling and soccer.

 

 

 

 

 

Anomaly detection on Amazon DynamoDB Streams using the Amazon SageMaker Random Cut Forest algorithm

Have you considered introducing anomaly detection technology to your business? Anomaly detection is a technique used to identify rare items, events, or observations which raise suspicion by differing significantly from the majority of the data you are analyzing.  The applications of anomaly detection are wide-ranging including the detection of abnormal purchases or cyber intrusions in banking, spotting a malignant tumor in an MRI scan, identifying fraudulent insurance claims, finding unusual machine behavior in manufacturing, and even detecting strange patterns in network traffic that could signal an intrusion.

There are many commercial products to do this, but you can easily implement an anomaly detection system by using Amazon SageMaker, AWS Glue, and AWS Lambda. Amazon SageMaker is a fully-managed platform to help you quickly build, train, and deploy machine learning models at any scale. AWS Glue is a fully-managed ETL service that makes it easy for you to prepare your data/model for analytics. AWS Lambda is a well-known a serverless real-time platform. Using these services, your model can be automatically updated with new data, and the new model can be used to alert for anomalies in real time with better accuracy.

In this blog post I’ll describe how you can use AWS Glue to prepare your data and train an anomaly detection model using Amazon SageMaker. For this exercise, I’ll store a sample of the NAB NYC Taxi data in Amazon DynamoDB to be streamed in real time using an AWS Lambda function.

The solution that I describe provides the following benefits:

  • You can make the best use of existing resources for anomaly detection. For example, if you have been using Amazon DynamoDB Streams for disaster recovery (DR) or other purposes, you can use the data in that stream for anomaly detection. In addition, stand-by storage usually has low utilization. The data in low awareness can be used for training data.
  • You can automatically retrain the model with new data on a regular basis with no user intervention.
  • You can make it easy to use the Random Cut Forest built-in Amazon SageMaker algorithm. Amazon SageMaker offers flexible distributed training options that adjust to your specific workflows in a secure and scalable environment.

Solution architecture

The following diagram shows the overall architecture of the solution.

The steps that data follows through the architecture are as follows:

  1. Source DynamoDB captures changes and stores them in a DynamoDB stream.
  2. AWS Glue job regularly retrieves data from target DynamoDB table and runs a training job using Amazon SageMaker to create or update model artifacts on Amazon S3.
  3. The same AWS Glue job deploys the updated model on the Amazon SageMaker endpoint for real-time anomaly detection based on Random Cut Forest.
  4. AWS Lambda function polls data from the DynamoDB stream and invokes the Amazon SageMaker endpoint to get inferences.
  5. The Lambda function alerts user applications after anomalies are detected.

This blog post consists of two sections. The first section, “Building the auto-updating model,” explains how the previous steps 1, 2, and 3 can be automated using AWS Glue. All of the sample scripts in this section run in one AWS Glue job. The second section, “Detecting anomalies in real time,” shows how the AWS Lambda function processes previous steps 4 and 5 for anomaly detection.

Building the auto-updating model

This section explains how AWS Glue reads a DynamoDB table and automatically trains and deploys a model of Amazon SageMaker. I assume that the DynamoDB stream is already enabled and DynamoDB items are being written to the stream. If you have not set these up yet, you can reference these documents for more information: Capturing Table Activity with DynamoDB Streams, DynamoDB Streams and AWS Lambda Triggers, and Global Tables.

In this example, a DynamoDB table (“taxi_ridership”) in the us-west-2 Region is replicated to another DynamoDB table with same name in us-east-1 Region using the Global Tables of DynamoDB.

Create an AWS Glue job and prepare data

To prepare data for model training, we’ll store our data in DynamoDB. The AWS Glue job retrieves data from the target DynamoDB table by using create_dynamic_frame_from_options() with a dynamodb connection_type argument. While you pull data from DynamoDB, we recommend that you choose only the necessary columns for model training and write them into Amazon S3 as CSV files.  You can do this by using the ApplyMapping.apply() function in AWS Glue. In this example, only the transaction_id and ridecount columns are mapped.

In addition, when you run the write_dynamic_frame.from_options function, you need to add this option,  format_options = {"writeHeader": False , "quoteChar": "-1" }, because the column’s name and double quotation marks (‘”‘) are not necessary for model training.

Finally, the AWS Glue job should be created in the same Region (for this blog post it’s us-east-1 ) where the DynamoDB table resides. For more information on creating an AWS Glue job. See Adding Jobs in AWS Glue.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
 
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

my_region  = '<region name>'
my_bucket = '<bucket name>'
my_project = '<project name>'
my_train_data = "s3://{}/{}/taxi-ridership-rawdata/".format(my_bucket ,  my_project )
my_dynamodb_table = "taxi_ridership"

## Read raw(source) data from target DynamoDB 
raw_data_dyf = glueContext.create_dynamic_frame_from_options("dynamodb", {"dynamodb.input.tableName" : my_dynamodb_table , "dynamodb.throughput.read.percent" : "0.7" } , transformation_ctx="raw_data_dyf" )
 
## Write necessary columns into S3 as CSV format for creating Random Cut Forest(RCF)  model  
selected_data_dyf = ApplyMapping.apply(frame = raw_data_dyf, mappings = [("transaction_id", "string", "transaction_id", "string"), ("ridecount", "string", "ridecount", "string")], transformation_ctx = "selected_data_dyf")
datasink = glueContext.write_dynamic_frame.from_options(frame=selected_data_dyf , connection_type="s3", connection_options={ "path": my_train_data }, format="csv", format_options = {"writeHeader": False , "quoteChar": "-1" }, transformation_ctx="datasink")

This AWS Glue job writes CSV files in the specified path on Amazon S3 ( “s3://<bucket name>/<project name>/taxi-ridership-rawdata/” ).

Run training job and update model

After the data is prepared, you can run a training job on Amazon SageMaker. To submit the training job to Amazon SageMaker the boto3 package, which is automatically bundled with your AWS Glue ETL script, should be imported. This enables you to use the low-level SDK for Python in the AWS Glue ETL script. To learn more about how to create a training job, see Create a Training Job.

The create_training_job function creates model artifacts on the S3 path you specified. Those model artifacts are required for creating the model in the next step.

## Execute training job with CSV data and create model artifacts for RCF
import boto3
from time import gmtime, strftime

sagemaker = boto3.client('sagemaker', region_name= my_region)
job_name = 'randomcutforest-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
sagemaker_role = "arn:aws:iam::<account id>:role/service-role/<AmazonSageMaker-ExecutionRole-Name>"

containers = {
    'us-west-2': '174872318107.dkr.ecr.us-west-2.amazonaws.com/randomcutforest:latest',
    'us-east-1': '382416733822.dkr.ecr.us-east-1.amazonaws.com/randomcutforest:latest',
    'us-east-2': '404615174143.dkr.ecr.us-east-2.amazonaws.com/randomcutforest:latest',
    'eu-west-1': '438346466558.dkr.ecr.eu-west-1.amazonaws.com/randomcutforest:latest'}

image = containers[my_region]
artifacts_location = 's3://{}/{}/artifacts'.format(my_bucket , my_project )
print('myINFO : training artifacts will be uploaded to: {}'.format(artifacts_location))

create_training_params = 
{
    "AlgorithmSpecification": { "TrainingImage": image, "TrainingInputMode": "File" },
    "RoleArn": sagemaker_role, "OutputDataConfig": {"S3OutputPath": artifacts_location },
    "ResourceConfig": { "InstanceCount": 2, "InstanceType": "ml.c4.xlarge", "VolumeSizeInGB": 50 },
    "TrainingJobName": job_name,
    "HyperParameters": { "num_samples_per_tree": "200", "num_trees": "50", "feature_dim": "2" },
    "StoppingCondition": { "MaxRuntimeInSeconds": 60 * 60 },
    "InputDataConfig": [
        {
            "ChannelName": "train",
            "ContentType": "text/csv;label_size=0",
            "DataSource": {
                "S3DataSource": {"S3DataType": "S3Prefix", "S3Uri": my_train_data, "S3DataDistributionType": "ShardedByS3Key" } 
            },
            "CompressionType": "None",
            "RecordWrapperType": "None"
        }
    ]
}

sagemaker.create_training_job(**create_training_params)
status = sagemaker.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
print('myINFO : Status of {} traning job ==>  {}'.format(job_name , status ))
 
try:
    sagemaker.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=job_name)
finally:
    status = sagemaker.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
    print("myINFO : Training job ended with status: " + status)
    if status == 'Failed':
        message = sagemaker.describe_training_job(TrainingJobName=job_name)['FailureReason']
        print('myINFO : Training failed with the following error: {}'.format(message))
        raise Exception('Training job failed')

## Create Model from model artifacts 
model_name=job_name
print("myINFO : Model name - {}".format(model_name))

info = sagemaker.describe_training_job(TrainingJobName=job_name)
model_data = info['ModelArtifacts']['S3ModelArtifacts']
primary_container = {'Image': image, 'ModelDataUrl': model_data }

create_model_response = sagemaker.create_model(
    ModelName = model_name,
    ExecutionRoleArn = sagemaker_role,
    PrimaryContainer = primary_container)
print("myINFO : Created Model ARN : {}".format( create_model_response['ModelArn']))

You can see a new model name with date format on the Amazon SageMaker console after model creation is successful.

Execute batch transform and obtain cut-off score

We can now use this trained model to compute anomaly scores for each of the training data points. As the amount of data to work with is big, I decided to use Amazon SageMaker Batch Transform. Batch transform uses a trained model to get inferences for an entire dataset in Amazon S3, and saves the inferences in an S3 bucket that you specify when you create a batch transform job.

After getting the inferences (=anomaly scores) on each data point, we need to obtain a score_cutoff value to be used for real-time anomaly detection. To make it simple, I used a standard technique for classifying anomalies. Anomaly scores outside three standard deviations from the mean score are considered anomalous.

## Execute Batch Transform in order to calculate anomaly scores and the value of score cutoff.
## score cutoff will be used in Lambda function in real time to identify anomalous transaction 
import time
batch_job_name = 'Batch-Transform-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
batch_output = "s3://{}/{}/batch_output/".format(my_bucket , my_project )

request = {
        "TransformJobName": batch_job_name,
        "ModelName": model_name,
        "MaxConcurrentTransforms": 1, 
        "TransformOutput": { "S3OutputPath": batch_output },
        "TransformInput" : { 
            "ContentType": "text/csv;label_size=0",
             "DataSource" : { 
                 "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": my_train_data } 
             }
        }, 
       "TransformResources": { "InstanceType": "ml.m4.xlarge","InstanceCount": 1  } 
}
response = sagemaker.create_transform_job(**request)

batch_status = 'InProgress'
while batch_status == 'InProgress':
    batch_status = sagemaker.describe_transform_job( TransformJobName=batch_job_name)['TransformJobStatus']
    print("myINFO : Batch job {} in Progress ".format( batch_job_name ))
    time.sleep(10)
if batch_status == 'Failed':
    message = sagemaker.describe_transform_job(TransformJobName=batch_job_name)['FailureReason']
    print('myINFO : Transforming job failed with the following error: {}'.format(message))
    raise Exception('Transforming job failed')

## Calculate score_cutoff from the result of Batch-Transform 
from pyspark.sql.functions import mean, stddev
from decimal import Decimal
all_scores_dfy = glueContext.create_dynamic_frame_from_options("s3", {'paths': [ batch_output ]}, format="json", transformation_ctx = "all_scores_dfy" ).toDF()
score_mean = all_scores_dfy.agg(mean(all_scores_dfy["score"]).alias("mean")).collect()[0]["mean"]
score_stddev = all_scores_dfy.agg(stddev(all_scores_dfy["score"]).alias("stddev")).collect()[0]["stddev"]
score_cutoff = Decimal( str( score_mean + 3*score_stddev ) ) 
print("myINFO : RFC score cutoff : {}".format( score_cutoff))

The history of the batch transform job can be found in the Batch transform jobs menu on the Amazon SageMaker console.

Deploy model and update cut-off score

The final step in the AWS Glue ETL script is to deploy the updated model on the Amazon SageMaker endpoint and upload the obtained score_cutoff value in the DynamoDB table for real-time anomaly detection. The Lambda function queries this score_cutoff value on DynamoDB to compare it with anomaly scores of new transactions.

## Create Endpoint Configuration for realtime service 
endpoint_config_name = 'randomcutforest-endpointconfig-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime()) 
create_endpoint_config_response = sagemaker.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{ 'InstanceType':'ml.m4.xlarge', 'InitialInstanceCount':1, 'ModelName':model_name, 'VariantName':'AllTraffic'}]
)
print("myINFO : Endpoint Config Arn:  " + create_endpoint_config_response['EndpointConfigArn'] )


##  Create/Update Endpoint with new configuration that has updated model. 
endpoint_name = 'randomcutforest-endpoint'
endpoint_status = ""
try:
    endpoint_status = sagemaker.describe_endpoint(EndpointName=endpoint_name)['EndpointStatus']
except Exception as e : 
    endpoint_status = "NotInService"
print("myINFO : randomcutforest-endpoint Status: " + status)

if endpoint_status == 'InService':
    update_endpoint_response = sagemaker.update_endpoint( EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name)
    try:
        sagemaker.get_waiter('endpoint_in_service').wait(EndpointName=endpoint_name)
    finally:
        resp = sagemaker.describe_endpoint(EndpointName=endpoint_name) 
        status = resp['EndpointStatus']
        print("myINFO : Update endpoint {} ended with {} status: ".format( resp['EndpointArn'] , status ) )         
        if status != 'InService':
            message = sagemaker.describe_endpoint(EndpointName=endpoint_name)['FailureReason']
            print('myINFO : Endpoint update failed with the following error: {}'.format(message))
            raise Exception('Endpoint update did not succeed')
else:
    create_endpoint_response = sagemaker.create_endpoint( EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name)
    try:
        sagemaker.get_waiter('endpoint_in_service').wait(EndpointName=endpoint_name)
    finally:
        resp = sagemaker.describe_endpoint(EndpointName=endpoint_name) 
        status = resp['EndpointStatus']
        print("myINFO : Create endpoint {} ended with {} status: ".format( resp['EndpointArn'] , status ) )         
        if status != 'InService':
            message = sagemaker.describe_endpoint(EndpointName=endpoint_name)['FailureReason']
            print('myINFO : Endpoint creation failed with the following error: {}'.format(message))
            raise Exception('Endpoint creation did not succeed')

            
## Add the score_cutoff value into DynamoDB 
## score_cutoff will be queried by Lambda function for real time abnormal detection
dynamodb_table = boto3.resource('dynamodb', region_name= my_region).Table('anomaly_cut_off')
dynamodb_table.put_item(Item= {'data_kind': my_dynamodb_table ,'update_time':  strftime("%Y%m%d%H%M%S", gmtime()), 'score_cutoff': score_cutoff })
print('myINFO : New score_cutoff value has been updated in DynamoDB table.')
    
## Delete Temporary data to save cost. 
s3 = boto3.resource('s3').Bucket(my_bucket ) 
s3.objects.filter(Prefix="{}/taxi-ridership-rawdata".format(my_project)).delete()
s3.objects.filter(Prefix="{}/batch_output".format(my_project)).delete()
print('myINFO : Temporary S3 objects have been deleted.')

## End job 
job.commit()

Now the Amazon SageMaker endpoint is created, and the AWS Glue job has been completed. You can check the detailed log of the AWS Glue job by choosing the Logs link in the AWS Glue console.

The score_cutoff value is stored in a DynamoDB table whose partition key is taxi-ridership and whose range key is a latest update time.

Schedule the AWS Glue job

Those previous scripts run in the same AWS Glue job and AWS Glue supports a time-based schedule by creating a trigger. You can retrain model with new data on regular basis if you define a time-based schedule and associate it with your job.  I do not think the model should be updated too frequently.  Weekly or bi-weekly renewal should be enough.

Detecting anomalies in real time

This section discusses how to detect anomalous transactions in real time from an AWS Lambda function. You need to create an AWS Lambda function to poll the DynamoDB stream. While you create the AWS Lambda function, you can use the “dynamodb-process-stream-python3” blueprint for quick implementation. The Lambda function with the blueprint can be integrated with the DynamoDB table that you specify. The blueprint provides the basic Lambda code.

Get an anomaly score on each data point

I’ll briefly explain the code in the Lambda function. It filters only INSERT and MODIFY events because they are new data.  The Lambda function adds them into instances array in order to get inferences for an entire events in the array. The Amazon SageMaker Random Cut Forest algorithm accepts multiple records as input requests and return multi-record inferences to support a mini-batch predictions. To learn more, see Common Data Formats—Inference.

import json
import boto3
from boto3.dynamodb.conditions import Key, Attr

print("Starting Lambda Function.... ")
sagemaker = boto3.client('sagemaker-runtime', region_name ='<region name>' )
dynamodb_table = boto3.resource('dynamodb', region_name='us-east-1').Table('anomaly_cut_off')

def lambda_handler(event, context):
    #print("Received event: " + json.dumps(event, indent=2))
    transaction_data = {} # key : transaction_id / value : ridecount
    
    for record in event['Records']:
        ## filter only INSERT or MODIFY event and add to "transaction_data" dictionary 
        if record['eventName'] == "INSERT" or record['eventName'] == "MODIFY":
            transaction_id = record['dynamodb']['NewImage']['transaction_id']['S']
            ridecount = record['dynamodb']['NewImage']['ridecount']['S']
            transaction_data[transaction_id] = ridecount
    print( "transaction_data: " + str(transaction_data )) 
    
    features=[]  
    features_dic={}   
    instances=[]
    instances_dic={}  # example, {'instances': [{'features': ['10231', '3837']}, {'features': ['10232', '10844']}]}
    for key in transaction_data.keys():
        features.append(key)
        features.append(transaction_data[key])
        features_dic["features"] = features
        instances.append(features_dic)
        features=[]
        features_dic={}
    instances_dic["instances"] = instances
    transaction_json = json.dumps(instances_dic)  # To make argument format for invoke_endpoint method.

Alert anomalous transaction

An array of features can be submitted to the sagemaker.invoke_endpoint function. It returns an array of scores corresponding to each feature in the instances array. We can compare each score in response to the latest value of score_cutoff retrieved from the DynamoDB table. If the anomaly score of a new transaction is larger than the value of score_cutoff, that transaction is considered to be anomalous. Then the Lambda function will alert the user application.

    response = sagemaker.invoke_endpoint( EndpointName='randomcutforest-endpoint', Body=transaction_json ,  ContentType='application/json' )
    scores_result = json.loads(response['Body'].read().decode())
    print("Result score : "+ str(scores_result))  # return an array of score 
    
    response = dynamodb_table.query(
              Limit = 1,
              ScanIndexForward = False,
              KeyConditionExpression=Key('data_kind').eq('taxi_ridership') & Key('update_time').lte('99990000000000')
           )
    socre_cutoff = response['Items'][0]['score_cutoff'] 
    print("socre cutoff : " + str(socre_cutoff) )       
    
    for index in range(len(scores_result['scores'])):
        if scores_result['scores'][index]['score'] > socre_cutoff:
            print("Detected abnormal transaction ID : {} , Ridecount : {}".format(instances[index]['features'][0], instances[index]['features'][1]   ))
            ## Add your codes to send a notification
            
    return 'Successfully processed {} records.'.format(len(event['Records']))

The following is an example of an output log in Amazon CloudWatch. Two transactions (10231 and 21101) were created  in DynamoDB, and those transaction triggered a Lambda function as new events. The anomaly score of transaction of 21101 is 3.6189932108. That is larger than the cut-off value (1.31462299965) in the DynamoDB table, so the transaction is detected to be anomalous.

Conclusion

In this blog post, I introduced an example of how to build an anomaly detection system on Amazon DynamoDB Streams by using Amazon SageMaker, AWS Glue, and AWS Lambda.

In addition, you can adapt this example to your specific use case because AWS Glue is very flexible based on user’s script and continues to add new data source. Other kinds of data sources and streams can be applied to this architecture because AWS Lambda function also works with many other AWS streaming services.

Finally, I hope this post helps you reduce business risks and save cost while adopting anomaly detection system.


About the Author

Yong Seong Lee is a Cloud Support Engineer for AWS Big Data Services. He is interested in every technology related to Big Data/Data Analysis/Machine Learning and helping customers who have difficulties in using AWS services. His motto is “Enjoy life, be curious and have maximum experience.”