Skip to content

Active Learning Pipeline

Data Engine enables companies of any size, or even individual developers, to implement an active learning pipeline without the need for large amounts of resources.

This documentation is based on the active learning pipeline created in the Tooth Fairy repo notebook.

Open in Colab

This pipeline will use the following tools, available with a DagsHub account:

  • Data Engine - enables and orchestrates the entire active learning process
  • MLflow - used to log and track experiments and models
  • Label Studio - annotation tool

Each DagsHub repo comes with the above tools.

This documentation assumes you've already connected a Datasource.

0. Setup

The most general setup we need to perform is authentication to DagsHub and setting up the MLflow tracking URI. This happens in step 0 of the notebook.

import dagshub
import mlflow

DAGSHUB_TOKEN = dagshub.auth.get_token()
DAGSHUB_USER = "yonomitt"
DAGSHUB_REPO = "ToothFairy"
DATASOURCE_PATH = "s3://tooth-dataset/data"
DAGSHUB_FULL_REPO = DAGSHUB_USER + "/" + DAGSHUB_REPO

MLFLOW_TRACKING_URI = f"https://dagshub.com/{DAGSHUB_USER}/{DAGSHUB_REPO}.mlflow"

mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
os.environ["MLFLOW_TRACKING_USERNAME"] = DAGSHUB_USER
os.environ["MLFLOW_TRACKING_PASSWORD"] = DAGSHUB_TOKEN

By setting the tracking URI and the environment variables above, the MLflow client will be configured to communicate with the repo's MLflow server.

1. Getting or creating the Datasource

Since Data Engine works primarily with Datasource objects, we need to get or create one.

def get_or_create_datasource(name):
    try:
        # get the Datasource named `name`
        ds = datasources.get_datasource(repo=DAGSHUB_FULL_REPO, name=name)
    except:
        # the Datasource named `name` doesnt exist, so create it
        ds = datasources.create(repo=DAGSHUB_FULL_REPO, name=name, path=DATASOURCE_PATH)
    return ds

ds = get_or_create_datasource('<datasource_name>')

This handy function that will try to get a Data Engine datasource under a given name. If it does not exist, then it will create one. This function is safe to call multiple times.

2. Enriching the Datasource

The first time through our data, we want to create our train/validation/test splits. We can do this directly in our Datasource's metadata, which will keep our data points assigned to the same splits each time we run.

We use the following function, which operates on a pandas Dataframe to create the splits and assign them to a new column:

from sklearn.model_selection import train_test_split

def create_splits(df, train = 0.6, valid=0.3, test=0.1):
    total = train + valid + test
    train /= total
    valid /= total
    test /= total

    # create the training DataFrame and the rest
    train_df, rest_df = train_test_split(df, test_size=valid + test)

    total = valid + test
    valid /= total
    test /= total

    # create the validation and test DataFrames
    valid_df, test_df = train_test_split(rest_df, test_size=test)

    train_df['split'] = 'train'
    valid_df['split'] = 'valid'
    test_df['split'] = 'test'

    # combine the three splits DataFrames into one and return
    return pd.concat([train_df, valid_df, test_df], ignore_index=True)

Since Data Engine Datasources can be converted to pandas DataFrames, we can run the following to create our splits:

# get all data points from our Datasource and convert to a pandas DataFrame
md = ds.all().dataframe

# create the train/valid/test splits for our data
md = create_splits(md, train=0.6, valid=0.3, test=0.1)

The next step of the enrichment process is to add any current annotations we have. For this we use the panads.DataFrame.apply function to apply our conversion function to each row of the DataFrame that was created from our Datasource:

def create_metadata(row):
    # convert annotation to the Data Engine format and assign to the `annotation` column
    row['annotation'] = ...

# apply the `create_metadata` function to each row of our metadata DataFrame
enriched_md = md.apply(create_metadata, axis=1)
Converting Annotations to Data Engine format

The Tooth Fairy repo has examples of functions that convert from COCO to the Data Engine format and from YOLO to the Data Engine format. You can use these as a basis for writing custom coverters for your data.

Finally, we need to upload our metadata to DagsHub:

# set the batch size for uploads
dagshub.common.config.dataengine_metadata_upload_batch_size = 50

# upload the enriched metadata to DagsHub
ds.upload_metadata_from_dataframe(enriched_md, path_column="path")

3. Filtering the Datasource

After creating our splits and adding any annotations we have, we can now filter our Datasource based on these metadata.

# filter Datasource to only include annotated data
labeled = ds[ds['annotation'].is_not_null()]

# filter Datasource into train and valid splits
train = labeled[labeled['split'] == 'train']
valid = labeled[labeled['split'] == 'valid']

4. Training a model

Using the training and validation splits, we can create DataLoaders for them and use them directly in training. While the input data can automatically be converted to tensors, we need to write a conversion function to handle the annotations.

def annotation_to_tensor(annotation_file: str) -> torch.Tensor:
    # logic to read annotation file and convert to `torch.Tensor`
    ...

With this in place, we can then create our DataLoaders from our train and valid query results.

# load our model
model = ...

# convert the file hash into a full file path that stores the annotations
# this makes it easier to read the file in the `annotation_to_tensor` function
train = train.all().get_blob_fields('annotation')
valid = valid.all().get_blob_fields('annotation')

train_loader = train.as_ml_dataloader(flavor='torch', 
                                      metadata_columns=['annotation'],
                                      # `tensorizers` need one more value than `metadata_columns`
                                      # because the 'path' column for our input is automatically included
                                      tensorizers=['image', annotation_to_tensor])
valid_loader = train.as_ml_dataloader(flavor='torch', 
                                      metadata_columns=['annotation'], 
                                      tensorizers=['image', annotation_to_tensor])

best_valid_loss = 1_000_000.

# start an mlflow run context
with mlflow.start_run():
    for epoch in range(EPOCHS):
        # ensure model is in training mode
        model.train(True)

        # run a single epoch using a function to do so
        train_loss = train_one_epoch(model, train_loader)

        # put our model into evaluation mode
        model.eval()

        valid_loss = calculate_validation_loss(model, valid_loader)

        # log the training metrics to the MLflow server
        mlflow.log_metrics({'train_loss': train_loss, 'valid_loss', valid_loss}, step=epoch)

        # save the model if it has the lowest validation loss
        if valid_loss < best_valid_loss:
            save_model(model)
            best_valid_loss = valid_loss

    # log the best model weights to the MLflow server
    mlflow.log_artifact('path/to/best/model.pt')

This example training loop shows the most important aspects of using the Data Engine with training a model. Functions such as train_one_epoch and calculate_validation_loss will, potentially, be custom functions for your project.

5. Filtering the Datasource for unlabeled data

After a model has been trained, we need to use that model to run predictions on all of the unlabeled data. These predictions will be used in the next step to determine which data needs to be labeled for the next active learning cycle.

unlabeled = ds[ds['annotation'].is_null()]

6. Running inference on unlabeled data

When running inference, we can use the as_ml_dataloader function again to turn our unlabeled data into tensors we can feed directly into our model.

dataloader = unlabled.all().as_ml_dataloader(flavor='torch')

After this we can run each datapoint through the model to get a prediction and an overall confidence score.

# get a DataFrame for the metadata of the unlabeled data points
md = unlabled.all().dataframe

results = {}

# iterate over the dataloader and the 'path' column of the metadata
for data, path in zip(dataloader, md['path']):

    # run the model
    out = model(data)

    # convert the output to Data Engine format and a confidence score
    prediction, score = convert_to_de(out)

    # store the predictions and scores to the `results` dictionary
    results[path] = (prediction, score)
Converting to Data Engine format

The Tooth Fairy repo has examples of functions that convert from COCO to the Data Engine format and from YOLO to the Data Engine format. You can use these as a basis for writing custom coverters for your data.

During active learning, you need to calculate a single score for each data point. This score should be an indication of how difficult the data point was for the model to predict.

How you calculate a score for each data point will be project dependent. There are simple calculations such as:

  • Lowest confidence - the score is the lowest confidence of all detected objects
  • Average confidence - average of all confidences of detected objects
  • Minimizing confidence delta - difference between confidences for the top two labels of an object

But there are much more complex algorithms such as:

  • Maximizing entropy - entropy is a measure of the confidence across all categories
  • Predicting loss - predicting the loss of the sample, were the ground truth known

7. Enriching the Datasource with predictions

Once we have our results, we can update our metadata to include the predictions and the scores we just calculated.

def pred_to_metadata(row, results):
    path = row['path']
    if path not in results:
        return row
    prediction, score = results[path]
    row['prediction'] = prediction
    row['pred_score'] = score
    return row

enriched_unlabeled = md.apply(lambda x: pred_to_metadata(x, results), axis=1)

Then we upload the newly enriched metadata to DagsHub

dagshub.common.config.dataengine_metadata_upload_batch_size = 50
ds.upload_metadata_from_dataframe(enriched_unlabeled, path_column="path")

8. Sorting Datasource by score

Using the score we just added to the metadata, we want to choose MAX_IMAGES number of samples for our annotators, where MAX_IMAGES depends on:

  • number of image to annotate per active learning cycle desired
  • speed of annotators
  • availability of annotators

We may also want to threshold our selected images by a MAX_SCORE. The model may not learn anything new from images it was already very confident about.

Let's start by querying our Datasource for all unlabeled, predicted data points:

unlabeled = ds[ds['annotation'].is_null()]
predicted = unlabeled[unlabeled['pred_score'].is_not_null()]

Next we determine the threshold score that will give us at most MAX_IMAGES number of images.

# get all scores as a list
scores = predicted.all().dataframe['pred_score'].tolist()

# sort the scores
scores = sorted(scores)

# remove all scores above our maximum allowed score
scores = [score for score in scores if score <= MAX_SCORE]

# determine which score is our threshold
threshold_score = scores[:MAX_IMAGES][-1]

Using this threshold_score, we can filter our Datasource further into data points we want to label:

to_label = predicted[predicted['pred_score'] <= threshold_score]

9. Creating a Label Studio project

Calling annotate on our filtered Datasource will help us create a Label Studio project with the datapoints in the Datasource.

This function returns a link to create the Label Studio project. You will need to:

  • Provide a name for the Label Studio project (which you need to note for later)
  • Decide whether or not to copy the project config from another Label Studio project. If you do not, or cannot, do this, you will have to manually configure the project later by providing the annotation type and the correct label names.
url = to_label.annotate()

For more information, check out our documentation on annotating data.

10. Sending predictions to Label Studio

We can send our model's predictions for our data points to Label Studio. This can help speed up your annotators, since they will have a starting point they can make corrections to.

To do so, we need to use the Label Studio SDK to get an instance of a Label Studio client:

from label_studio_sdk import Client

def label_studio_client():
    # build the appropriate URL for the Label Studio API endpoint
    url = f'https://{DAGSHUB_USER}:{dagshub.auth.get_token()}@dagshub.com/{DAGSHUB_USER}/{DAGSHUB_REPO}/annotations/de'

    # create a Label Studio client
    ls = Client(url=url, api_key=dagshub.auth.get_token())

    return ls

ls = label_studio_client()

Next, we need to get access to the project we just created on Label Studio. To do this, we can loop through all projects and stop when we find the one with the right name:

for proj in ls.list_projects():
    if proj.params['title'] == LS_PROJ_NAME_FROM_STEP_9:
        break

Now that our project is available in the proj variable, we need to get all available Tasks. A Task in Label Studio is a data point to be labeled along with some metadata about it.

We can use this helper function to get all tasks for the project:

import requests

def get_tasks(ls, proj):
    # build the API endpoing to get the tasks
    url = ls.get_url('/api/tasks')

    # set the project ID as a query parameter
    query = {'project': proj.id}

    # execute the request
    res = requests.request(method='GET', url=url, params=query)

    if res.status_code == 200:
        return res.json()['tasks']

    return []

tasks = get_tasks(ls, proj)

The Tasks include an ID, which is required when sending predictions back to the Label Studio server. Since our Data Engine metadata includes the URL for each data point, we can create a map from URL to Task ID:

url_to_taskid = {task['data']['image']: task['id'] for task in tasks}

Since our prediction metadata column is JSON formated data stored as a binary BLOB, we need to make sure we have the data loaded into memory:

predictions = to_label.all()
predictions.get_blob_fields('prediction', load_into_memory=True)

Our Data Engine prediction and annotation format is compatible with the Label Studio format. However, to send the Task predictions the Label Studio API needs two more pieces of information:

  • a prediction score
  • the Task ID

Through good planning, we have all of that information on hand. To make things simpler, let's put it into a new list called ls_preds:

# list of predictions to be sent to Label Studio
ls_preds = []

# loop through all our data points that are to be labeled
for pred in predictions:
    # convert the binary BLOB to JSON
    pred_json = json.loads(pred['prediction'].decode())

    # get the URL and prediction score from the metadata
    url = pred['dagshub_download_url']
    score = pred['pred_score']

    # look up the Task ID for this data point
    task_id = url_to_taskid[url]

    # append a dictionary of the info Label Studio needs
    ls_preds.append({
        'result': pred_json['prediction'][0]['result'],
        'score': score,
        'task': task_id
    })

All that's left to do is to upload our Task predictions. We use the backoff Python module to retry and failed upload attempts, with an exponential backoff schedule.

import backoff

@backoff.on_exception(backoff.expo, ConnectionError, max_tries=8)
def create_ls_prediction(proj, pred):
    # send the prediction to the Label Studio API through the SDK
    proj.create_prediction(pred['task'], pred['result'], pred['score'])

# loop through all task predictions
for ls_pred in ls_preds:
    try:
        # call function to create predicitions with exponential backoff
        create_ls_prediction(proj, ls_pred)
    except:
        print(f'ERROR: prediction for task ({ls_pred['task']}) was not successfully created')

11. Correcting and saving Label Studio annotations

At this point, you can open your Label Studio project in your favorite browser. The URL for this project can be created programatically:

proj_id = proj.get_params().params['id']
url = f'https://dagshub.com/{DAGSHUB_USER}/{DAGSHUB_REPO}/annotations/de/dagshub/projects/{proj_id}/data'

Once you go to that URL, you will be presented with the Label Studio UI. The data points selected for annotation will be available in the Label Studio project and the predictions should be included.

You or your annotator can either modify the predictions to make them more accurate or remove them and create annotations from scrath. When you click the Save button any predictions and changes made will automatically be saved to the 'annotation' column of your Datasource's metadata. This makes it available for training in the next active learning cycle.

For more information on using Label Studio with Data Engine, check out our documentation on annotating data.

12. GOTO step 3

At this point, you're ready to train a new model, which means you will start a new active learning cycle.