Skip to content

Building a Custom Dataloader

Through DagsHub’s streaming client, you can build custom data loaders and train your model on subsets of your data.

This has a few implications for DagsHub users:

  • Training works as if the data is already present on the local machine
  • We do not need to write a script or manually copy the necessary data to our training machine
  • Our training code can decide on-the-fly which data should be included in the current training run

Let’s take a look at an example on how to do this.

Configuration

Authentication

Manually

If you're streaming data from a private repo, we need to authenticate to DagsHub. When training using Colab or an interactive script, the DagsHub client will automatically redirect you to OAuth whenever it's required.

Automatically

However, if out training is not interactive, we need to authenticate in our code. We recommend getting a DagsHub Access Token and follow the below steps:

  • Set an environment variable

    export DAGSHUB_TOKEN="<your-token>"
    
  • Then, in our training script, read that environment variable:

    import os
    DAGSHUB_TOKEN = os.environ.get('DAGSHUB_TOKEN', None)
    
  • Next, we authenticate to DagsHub:

    import dagshub
    dagshub.auth.add_app_token(DAGSHUB_TOKEN)
    

Install hooks

  • We'll hook to the repo we want to stream the data from using DagsHub Client

    from dagshub.streaming import install_hooks
    install_hooks(project_root='.', repo_url='https://dagshub.com/DagsHub-Datasets/LAION-Aesthetics-V2-6.5plus', branch='main')
    
What are DagsHub hooks?

These hooks are installed for most file IO and file system operations, like open() or os.listdir(). They first check to see if the file or directory being requested is a local one and if not, it checks the repository specified in the repo_url. In doing this, we’re essentially treating the DagsHub repo as if it were currently on the local machine.

How to access or stream a subset of a dataset?

As an example, let’s consider the LAION-Aesthetics-V2-6.5plus repo.

This repo includes about 540,000 images in the data/ folder. Additionally, there’s a labels.tsv file under there, which lists each image, a caption description, an aesthetics score, and the original URL it came from.

If we wanted to look at images that had the word squirrel in the caption, we could do something like this:

squirrel_files = []

with open('data/labels.tsv') as f:
    for row in f.readlines():
        image_name, caption = row.split('\t')[:2]
        if 'squirrel' in caption.lower():
            squirrel_files.append(image_name)

len(squirrel_files)

When we run this code, we see there are 115 images that mention squirrels.

We could even iterate over those images with the following code:

import os

from PIL import Image

for squirrel_file in squirrel_files:
    full_path = os.path.join('data', squirrel_file)
    image = Image.open(full_path).convert('RGB')

    # Do something with the image

How to create a DataLoader in PyTorch that streams data?

Luckily, thanks to the hooks that were installed, it’s very easy to create a PyTorch DataLoader using this streamed data. The first thing we need, however, is a PyTorch Dataset

Take a look at the LAIONAestheticsDataset example, which comes from the AestheticPredictor repo.

First the __init__ method:

class LAIONAestheticsDataset(Dataset):
    def __init__(self, annotations_file, img_dir, feature_extractor: EfficientNetFeatureExtractor, limit=None):

        # Set up class properties
        self.feature_extractor = feature_extractor
        self.img_path = img_dir
        self.img_files = []
        self.scores = []
        self.embeddings = {}

        # Open the annotations file (in this case data/labels.tsv)
        with open(annotations_file) as f:
            # Loop through each row
            for i, row in enumerate(f.readlines()):

                # Check if we're limiting the number of images
                if limit is not None and i >= limit:
                    break

                # Get the image file name and its aesthetic score
                img_name, _, aesthetic_score = row.split('\t')[:3]

                # Add the image file name to our list of files
                self.img_files.append(img_name)

                                # Add the aesthetic score as a PyTorch tensor to our list of scores
                self.scores.append(torch.tensor([float(aesthetic_score)]))

A Dataset needs two other methods, __len__ and __getitem__:

class LAIONAestheticsDataset(Dataset):
        ...

    def __len__(self):
        # The size of the dataset is the number of files in it
        return len(self.img_files)

    def __getitem__(self, idx):
        # Make sure we're not dealing with tensors as index values
        if torch.is_tensor(idx):
            idx = idx.tolist()

        # Get the stored aesthetic score for the index
        score = self.scores[idx]

        # See if the embedding for the image is cached
        embedding = self.embeddings.get(idx, None)

        if embedding is None:
            # If it's not cached, calculate it
            img_path = os.path.join(self.img_path, self.img_files[idx])
            embedding = self.feature_extractor.extract(img_path)

            # Then cache it
            self.embeddings[idx] = embedding

        # Return the embedding and the aesthetic score
        return embedding, score

To turn the Dataset into a DataLoader, we can use PyTorch’s built in class initializer:

train_dataloader = DataLoader(train_dataset, 
                              batch_size=32, 
                              shuffle=True, 
                              num_workers=0)

Now we can iterate through this DataLoader as usual to train your model!

To complete the picture, we also need to look at the EfficientNetFeatureExtractor, which is defined in the same file.

class EfficientNetFeatureExtractor:
    ...

    def extract(self, image_path):
        with torch.no_grad():
            image = Image.open(image_path).convert('RGB')
            X = self.preprocess(image)
            X = X.unsqueeze(0)
            embedding = self.model(X)[0, :, 0, 0]
        return embedding

The important line to pay attention to is:

image = Image.open(image_path).convert('RGB')

Normally, this would open a local image file found at image_path. However, thanks to the hooks we installed earlier, the Image.open() function can also stream data from the DagsHub repo.

The first time we run training, using this DataLoader, it will stream the images and extract the features. The images are cached to the local machine.

How to create a Data Generator in TensorFlow that streams data?

The hooks we installed can also help us create a DataGenerator in TensorFlow. To do so, we’re going to subclass keras.utils.Sequence.

Take a look at the LAIONAestheticsDataGenerator example.

Interestingly, there are no file IO methods called at all from the data generator. Well, there is one, but it’s hidden in this line:

def __data_generation(self, idxs):
        embeddings = np.empty((self.batch_size, self.feature_extractor.feature_dims))
        scores = np.empty((self.batch_size))

        # Generate data
        for i, idx in enumerate(idxs):
            # Store sample
            embedding = self.embeddings.get(idx, None)
            if embedding is None:
                img_path = os.path.join(self.img_path, self.img_files[idx])

                                #=======================
                                # SECRET HIDDEN FILE IO
                                #=======================
                embedding = self.feature_extractor.extract(img_path)

                self.embeddings[idx] = embedding

            embeddings[i,] = embedding
            scores[i] = self.scores[idx]

This comes from the EfficientNetFeatureExtractor, which is defined at the top of the same file:

class EfficientNetFeatureExtractor:
        ...

    def extract(self, image_path):
        image = Image.open(image_path).convert('RGB')
        image = image.resize((480, 480))  # Resize the image to match EfficientNet's input size
        image = tf.keras.preprocessing.image.img_to_array(image)
        image = self.preprocess(image)
        image = tf.expand_dims(image, axis=0)
        embedding = self.model(image)[0]
        return embedding

The installed hooks allow Image.open() to either stream data from a DagsHub repo or open local image files, if they exist.

The labels.tsv file is also streamed in the train_valid_split function. This function takes care of determining how the data generators are initialized.

See the project on DagsHub