Creating a ML Pipeline on AWS Sagemaker Part Two: Data Steps

Pierce Lamb
12 min readApr 19, 2023

--

This is the second post in a three part series on creating a reusable ML pipeline that is initiated with a single config file and five user-defined functions. The pipeline is finetuning-based for the purposes of classification, runs on distributed GPUs on AWS Sagemaker and uses Huggingface Transformers, Accelerate, Datasets & Evaluate, PyTorch, wandb and more.

This post originally appeared on VISO Trust’s Blog

This post will cover the two data steps, data reconciliation and data preparation. These are common steps in a ML process where data is collected, cleaned and encoded the way a model will expect. If you have landed on this post first, check out the first post in the series detailing the pipeline setup. You can also jump to the third post in the series detailing training and testing.

Data Reconciliation

Of all the pipeline steps, the Data Reconciliation step is the step most likely to be customized to your specific use case. It represents the taking off point for collecting, cleaning, filtering etc the training data that will compose your experiment and getting it on S3. In our case, the raw training data exists in flat files already on S3 while the labels required for supervised training exist in a production database. This is, in fact, why I called it ‘Data Reconciliation.’ In our case, the production database labels are being reconciled with the flat files on s3.

As it is unlikely the reader has the exact same setup, I will try and highlight some of the re-usable parts of Data Reconciliation without getting too into our specific flavor of Data Reconciliation. Recall that a major architecture decision in the pipeline is a separate set of training data for every experiment; the goal of this step, then, is to collect the raw data, clean it and copy it to the bucket and folder on S3 where this experiment’s storage will reside (for e.g. EXP-3333-longformer/data/reconciled_artifacts).

I’ll create a distinction here between ‘artifacts’ and ‘files’ to better understand what follows. For every ‘artifact’ uploaded into our system, tens of ‘files’ are created that represent data and analysis about the given ‘artifact.’ As such, our raw data is composed of these sets of files per uniquely identified artifact.

The first step in Data Reconciliation is to collect all of the raw data. In our case, this means authenticating to a read replica of the production database, and running a query that contains artifact identifiers related to their ground truth classification labels. We then collect all of the S3 file paths on the production instance of S3 keyed by the same artifact GUID identifier.

Data Reconciliation knows which S3 file paths to collect via a settings.ini value passed by the experimenter call FILES_FROM_PROD. For e.g. imagine each artifact has a file called raw_text.json, the experimenter would pass FILES_FROM_PROD=raw_text.json and Data Reconciliation would find the S3 path to every raw_text.json file on the production S3 bucket.

Using the artifact identifiers (GUIDs), we then filter the production database results such that both datasets contain the exact same artifact identifiers and drop duplicates using the file hash. At this point the labels and S3 paths to the flat files are now reconciled; the actual files and the label just need to be copied to the correct experiment directory.

Before that copying begins, note that we now have unique insight into the training data for this experiment. Using the filtered database results, we can discover exactly the labels that will be trained on, and the instance count per label:

Where df is a pandas dataframe of the filtered database results. Now every experiment has a unique_labels_and_counts.json in its /data folder the experimenter can interrogate to see which labels and their counts are associated with this training data set.

At this point, we encounter our first user-defined function. process_func is an optional function that will run after Data Reconciliation has copied files for every artifact identifier; it gives the experimenter the opportunity to execute some arbitrary code for each artifact identifier. As an example, when we go to train we need access to the ground truth labels extracted from the production database. process_func gives us the ability to create an additional file per artifact, say, ground_truth_label.json that contains this label. Furthermore, if one’s model requires additional files to train on, for e.g. an image of a given page, that additional file can be created here, per artifact. Because it’s optional, the user could not define it; thus:

Now that we have our reconciled data and our process_func, we have to copy data from the production S3 bucket into our experiment S3 directory. This can easily occur in parallel, so we utilize multiprocessing to kick it off as a parallel process:

This function gets the df we discussed earlier, the experiment bucket, the dict of artifact identifier (GUID) to list of desired file paths (raw_training_data_paths), the parent experiment dir (s3_artifact_path), the number of parallel processes (either a config value or multiprocessing.cpu_count()) the process_func and a boolean that determines whether or not to overwrite.

First, it uses the same function that created raw_training_data_paths except pointed at the experiment bucket and with EXP-3333-longformer/data/reconciled_artifacts/ as a filter. This gives us a dict of what training data already exists for the experiment in case Data Reconciliation failed and had been restarted; we don’t copy the same data again. Next, it splits the reconciled data per process and for each split, creates a process and calls the add_to_research_experiment function. Let’s take a look at that function:

The parameters to this function should be fairly straightforward given our discussion of copy_s3_data_in_parallel. The function iterates the data frame chunk directly checking for three different copying scenarios. I am aware that iterating a data frame directly is generally frowned upon in favor of a vectorized approach. In our case, these chunks are fairly small so it is not something we worry about. For each artifact, this function checks to see if, first, overwriting (reload) was set to true, if the current artifact already exists in the experiment and whether or not the proposed artifact has additional files to add to it and finally if it does not exist. In each case it calls an additional function that will copy the correct set of files. Next, let’s take a look at copy_to_s3:

This function is straight forward, and nicely shows what gets passed to process_func if the user has defined it. It gets the row from the df representing the current artifact, the existing files for the artifact _after_ copying, the experiment path and the overwriting boolean. This gives the experimenter a lot of flexibility on what he/she can do per artifact.

The final step of Data Reconciliation is a validation step where we use the config value FILES_ON_RESEARCH to validate that each artifact has the files it needs for training. The reason we can’t just use the earlier FILES_FROM_PROD value is because new files may have been created in process_func. So FILE_ON_RESEARCH may look like raw_text.json, page_01.png for example. This validation step is meant to provide some assurance that when we move onto Data Preparation, each artifact will have every file it needs and we don’t need to write code to handle missing files. So after all of our parallel processing completes, validate_data_was_created runs which we will view in partial stub form:

This function takes the full df, the list of desired files defined by FILES_FROM_PROD, the list of desired files that should be in the experiment FILES_ON_RESEARCH, the experiment directory (EXP-3333-longformer/data/reconciled_artifacts/) and the user defined process_func. It collects all the existing file paths for the given experiment and iterates them, popping file names off FILES_ON_RESEARCH to check if they exist for each artifact. If files are missing, it then discovers if they are FILES_FROM_PROD files and retrieves them from the prod S3 bucket or if they are process_func files which it re-runs to generate them. Once this step is complete, we can have high confidence that all of our raw training data files exist for each artifact. As such, we can move on to Data Preparation.

Data Preparation

The data preparation step is meant to take the raw training files for the experiment and encode them so they are prepared to be input into a model’s forward() function. For this task, we will utilize the HuggingFace Datasets library and specifically its powerful map() function. This is also the first task that will utilize Sagemaker, specifically Sagemaker Processor jobs.

Let’s start by taking a look at how the Processor job is constructed and called. First, we utilize the Sagemaker Python SDK’s ScriptProcessor class. This allows us to run an arbitrary script on a Processor instance. Creating the ScriptProcessor object will look like:

As you can see, this construction is basically defined by config values. Arguably the most important is config.docker_image_path. This carefully constructed docker image which we spoke about in the first post in this series is re-used among all Sagemaker jobs (Processor/Training/Tuning). We spoke in the first post about how an experimenter extends a base image that contains all common dependencies like cuda enabled pytorch, transformers, datasets, accelerate, numpy, etc and adds any of their model-specific dependencies. That base image also contains lines that allow it to run on these different Sagemaker instances, we’ll discuss one now and more during our discussion of training:

Sagemaker Training/Tuning jobs always look in the /opt/ml/code directory for custom dependencies while Processor jobs look in /opt/ml/processing. These lines copy all of our ML pipeline code into these directories to ensure that all custom dependencies are available in either type of job. Now if we jump back over to where we constructed the ScriptProcessor object, this is how we kick off the job:

One feature of Processor jobs that is easy to miss is that before the script is executed, Sagemaker copies everything from the S3 URI provided in the source param onto local disk in the destination path. Building your script around this fact will give you huge performance benefits which we will discuss more later on. Another important point that may not be immediately obvious is that the command param combined with the code param is basically like defining an ENTRYPOINT for the Processor job. While its not exactly accurate, you can imagine these params creating this command in the container:

ENTRYPOINT [‘python3’, ‘/opt/ml/code/src/preprocessing/data_preparation.py’]

So the code above is constructing the S3 URI to the reconciled artifacts we created in the Data Reconciliation step and passing it in the source` param and the Processor job copies all of this data to local disk before it kicks off. SAGEMAKER_LOCAL_DATA_DIR defines where that data will be copied and is specified in data_preparation.py` so the path can be used there as well. Processor jobs can output data which is why I’ve defined outputs, but for now the data_preparation.py script is not utilizing this feature. Now that we’ve discussed how it is kicked off, we can take a look at encoding data in data_preparation.py.

The first step at the beginning of encoding is to define the S3 directory where data will be saved and get the label file we produced during Data Reconciliation. We read a config value to get the encoded data dir, namely, ENCODED_DATA_DIR. The value will typically be full_dataset, but it gives the experimenter the ability to produce smaller test datasets if desired (e.g. partial_dataset). So the full path will look like:

encoded_data_dir = f"{config.s3_parent_dir}/data/prepared_data/{config.encoded_data_dir}"

Or EXP-3333-longformer/data/prepared_data/full_dataset

Next, we get the unique_labels_and_counts.json file we uploaded during Data Reconciliation as our ground truth for supervised learning. We give the experimenter the ability to modify the ground truth here through some basic knobs: IGNORED_LABELS and NUM_LABELS_THRESHOLD; I could imagine a number of other options here. These knobs are self explanatory:

After modifying the labels the way the experimenter wants, execution moves onto the get_artifact_paths function. This function gets the paths on local disk that raw training data was copied to and returns them in a format that the Huggingface Datasets library will expect:

get_artifact_paths is called using the same path we passed to Processor.run() to define where data should be copied along with the results of the MODEL_INPUT_FILES config param. Following our example, this value would simply be [raw_text.json]. A Huggingface.arrow_dataset.datatsets.Dataset is eventually going to expect data formatted where each row constitutes an instance of training data, and each column represents the path to the needed input file. In our case it would look like:

This would be easy to represent in pandas, but since we’d prefer to not depend on pandas and will utilize Dataset.from_dict(), get_artifact_paths represents this structure using the file names as keys and lists to contain the paths.

Execution then enters the directory defined in SAGEMAKER_LOCAL_DATA_DIR and extracts the list of subdirs which, in our case, are guids for each artifact. It iterates these subdirs collecting the filenames for all files that are children of each subdir. It then uses the passed MODEL_INPUT_FILES to validate that each needed file is there and adds it to the artifact_paths dict. We now have a dict that is ready for Datasets processing.

Control now moves to a get_encoded_data() function that will kick off Huggingface.arrow_dataset.datasets.Dataset.map() which is a very powerful abstraction for encoding datasets. get_encoded_data is intended to setup the map() function for parallel processing of raw training data encoding and is the main part of the Data Preparation step:

This function sets up the mapper, executes it, splits the returned encoded data and saves the split, encoded data to S3. The function takes the get_artifact_paths data we just generated (as data), a list of the labels only from unique_labels_and_counts.json, a few directory paths and the number of parallel processes to spin up. It starts by generating two label dicts in handle_labels, label2id.json and id2label.json which will be used downstream to convert between the integer values predicted by the model and actual string labels.

Next, one of our user defined functions get_dataset_features is called. As you may have noticed from the hints in Datasets classpaths, Datasets uses PyArrow as the backend for writing and reading data. PyArrow needs to enforce a schema it writes to and reads from; get_dataset_features` allows the experimenter to write that schema. This function returns a Datasets Features object which packages up this schema for the backend. Following our Longformer example, this function might look like:

The keys here represent the parameters the Longformer forward() function will expect when performing the forward pass. Now that we have these features, we can call Dataset.from_dict() on our get_artifact_paths data and we are fully ready for the mapper. The mapper has a variety of options, but the core concept is applying a function to every instance of training data that encodes and returns it. Let’s take a closer look at the call in Data Preparation:

Here we pass the function we want to execute per instance, preprocess_data; fn_kwargs allows us to specify additional parameters we want to pass to that function; batched means that preprocess_data will receive batches of data instead of single instances; this allows us to perform additional filtering. features are the features we retrieved from get_dataset_features, we remove the column names so they aren’t encoded and finally the number of processes to process in parallel between.

With this in place, we can take a look at def preprocess_data which is executed by each process in parallel:

The function first validates that each column of data has the exact same length and returns that length so it can be iterated over. It then iterates the batch, constructing a single instance and passing it to another user-defined function, encode_data. encode_data gives the experimenter the ability to define exactly how a single training instance is encoded with the option of returning None if additional filtering is desired. For instance, say we were using a Huggingface Transformers Tokenizer to encode; a single_instance here represents the file paths to the data we need, so we would get that data, say, in a variable called text_content and call something like this:

Where TOKENIZER is defined as a constant outside the function so it’s not re-constructed each time this function is called. If we continue following preprocess_data we can see that it simply skips single_instance’s where encode_data returns None. Finally, the encoded input is returned to the mapper in the correct Features format.

I’m going to skip looking at get_train_valid_test_split(), but suffice it to say that it uses Datasets internal function dataset.train_test_split() to split data using percentages and writes a metadata file that shows the counts of the split and associated labels to the experimenter.

And with that, Data Preparation is complete. Recall from the beginning that this will run as a ScriptProcessor job on a Sagemaker Processor instance. These instances tend to have lots of vCPU’s and can really take advantage of the parallel processing we’re doing in the mapper. The encoded data will end up on S3 ready to be downloaded by a Training or Tuning job which is discussed in the third post in this series. You can jump to the first and third post via these links: Part One: Setup, Part Three: Training and Inference.

--

--

Pierce Lamb

Data & Machine Learning Engineer at a Security startup