LibreTranslate/FastShuffle: Python module to quickly shuffle large parallel corpora

Continuing my efforts to handle larger datasets using consumer hardware, I’ve developed another key module that allows shuffling of parallel corpora without needing to read the entire dataset in memory.

It can shuffle 150 million sentence pairs using less than 10GB of RAM in less than 20 minutes on my old Intel core i7.

2 Likes

Also allows sampling (useful for creating the validation dataset):

file_shuffle_sample(in_src, in_tgt, 5000) # Shuffle and sample 5000 sentences

Great job! You may also find my small Python script useful, which mixes data using mmap.
I added a progressbar to it and a check for the number of lines in source.txt and target.txt.

On NVMe SSD it took me about 6 seconds to process 4M pairs of sentences.

added:

For 28 million sentence pairs:

28193568/28193568 [00:53<00:00, 523336.86it/s]

Plus 10-15 seconds at startup (to compare the number of lines in files).

import os
import random
import mmap
from tqdm import tqdm

# Paths to the files that need to be shuffled
source_path = 'source.txt'
target_path = 'target.txt'


# Function to get start positions of each line in the file
def get_line_positions(file_path):
   with open(file_path, 'rb') as f:
       map = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ, flags=mmap.MAP_SHARED)
       pos = 0
       positions = [0]
       while True:
           pos = map.find(b'\n', pos) + 1
           if pos > 0:
               positions.append(pos)
           else:
               break
       map.close()
   return positions

# Function to shuffle with minimal memory usage
def shuffle_large_linked_files(source_path, target_path):
   # Check if the number of lines in both files is the same
   with open(source_path, 'r') as src, open(target_path, 'r') as tgt:
       if sum(1 for _ in src) != sum(1 for _ in tgt):
           raise ValueError("The number of lines in the source and target files is not the same.")

   # Create shuffled indices
   with open(source_path, 'r') as src:
       lines = sum(1 for _ in src)
   indices = list(range(lines))
   random.shuffle(indices)

   # Create temporary files
   temp_source_path = source_path + '.shuffled'
   temp_target_path = target_path + '.shuffled'

   # Get line start positions
   source_positions = get_line_positions(source_path)
   target_positions = get_line_positions(target_path)

   # Process chunks of indices
   with open(source_path, 'rb') as source_file, open(target_path, 'rb') as target_file, \
        open(temp_source_path, 'ab') as temp_source_file, open(temp_target_path, 'ab') as temp_target_file:

       source_map = mmap.mmap(source_file.fileno(), 0, access=mmap.ACCESS_READ, flags=mmap.MAP_SHARED)
       target_map = mmap.mmap(target_file.fileno(), 0, access=mmap.ACCESS_READ, flags=mmap.MAP_SHARED)

       for i in tqdm(range(lines), desc="Shuffling lines"):
           source_pos = source_positions[indices[i]]
           target_pos = target_positions[indices[i]]
           source_line_end = source_map.find(b'\n', source_pos)
           target_line_end = target_map.find(b'\n', target_pos)
           source_line = source_map[source_pos:source_line_end+1]
           target_line = target_map[target_pos:target_line_end+1]

           temp_source_file.write(source_line)
           temp_target_file.write(target_line)

       source_map.close()
       target_map.close()

   # Replace original files with shuffled files
   if os.path.exists(temp_source_path) and os.path.exists(temp_target_path):
       os.replace(temp_source_path, source_path)
       os.replace(temp_target_path, target_path)
   else:
       print("Shuffling did not complete successfully.")


# Shuffle the files
shuffle_large_linked_files(source_path, target_path)
2 Likes

By the way, I seem to understand why there are such differences in speed between different computers and why FastShuffle worked for a long time on a large body of text.
My script given above on a corpus of 60 million pairs of sentences also showed a decent drop in speed (ten times).

I assume that this is due to the operating principle of mmap and the amount of free RAM.
That is, when there is not enough RAM and the swap partition is used, the speed drops sharply. (At that time I only had 14GB free, the rest was occupied by Locomotive).

I encountered the same effect with llama.cpp, when they also started using mmap there, while it seems that the RAM is free and not used, but this is not entirely true, it’s a hack so to speak.

This explanation seems to be the closest:
The kernel does a good job of distributing data between memory and storage, so it’s natural to try to use this property in our task. When we call mmap on our file, the kernel will reserve a range of virtual addresses that will not be allocated immediately. When we try to access them, the kernel will load it from the input file into memory. When we run out of physical memory, the kernel will remove the pages to swap. This way we will balance data between the file on disk, memory and swap.

1 Like

I think I figured out how to bypass the speed limit when there is insufficient memory!
I proceeded from the following ideas:

  1. Most time is spent on I/O operations
  2. When training the model, OpenNmt-py creates a basket with proposals and mixes them (that is, thousands of sentences will be mixed within the basket).
  3. You can reduce the mixing quality criterion from 1 sentence to chunks of 100-10,000 sentences, which should increase the speed tenfold.

As a result, with 139 million batches of sentences and a chunk size of 1000 sentences, I got the following result:

Shuffling chunks: 100%|██████████| 139938/139938 [08:29<00:00, 274.59it/s]
Which is already much better than the dozens of hours that I received earlier if there was not enough memory.

Here is the modified script itself:

import os
import random
import mmap
from tqdm import tqdm

chunk_size = 1000

# Paths to the files that need to be shuffled
source_path = 'source.txt'
target_path = 'target.txt'


# Function to get start positions of each chunk in the file
def get_chunk_positions(file_path, chunk_size):
    with open(file_path, 'rb') as f:
        mmapped_file = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
        positions = [0]
        line_count = 0
        while mmapped_file.readline():
            line_count += 1
            if line_count % chunk_size == 0:
                positions.append(mmapped_file.tell())
        if line_count % chunk_size != 0:  # Add position for the last chunk if it's smaller than chunk_size
            positions.append(mmapped_file.tell())
        return positions


# Function to shuffle large linked files with minimal memory usage
def shuffle_large_linked_files(source_path, target_path, chunk_size=1000):
    # Check if the number of lines in both files is the same
    with open(source_path, 'r') as src, open(target_path, 'r') as tgt:
        if sum(1 for _ in src) != sum(1 for _ in tgt):
            raise ValueError("The number of lines in the source and target files is not the same.")

    # Create temporary files
    temp_source_path = source_path + '.shuffled'
    temp_target_path = target_path + '.shuffled'

    # Get chunk start positions
    source_positions = get_chunk_positions(source_path, chunk_size)
    target_positions = get_chunk_positions(target_path, chunk_size)

    # Create shuffled indices
    chunks = list(range(len(source_positions) - 1))
    random.shuffle(chunks)

    # Process chunks of indices
    with open(source_path, 'rb') as source_file, open(target_path, 'rb') as target_file, \
            open(temp_source_path, 'ab') as temp_source_file, open(temp_target_path, 'ab') as temp_target_file:

        source_map = mmap.mmap(source_file.fileno(), 0, access=mmap.ACCESS_READ)
        target_map = mmap.mmap(target_file.fileno(), 0, access=mmap.ACCESS_READ)

        for chunk in tqdm(chunks, desc="Shuffling chunks"):
            source_chunk = source_map[source_positions[chunk]:source_positions[chunk+1]]
            target_chunk = target_map[target_positions[chunk]:target_positions[chunk+1]]

            temp_source_file.write(source_chunk)
            temp_target_file.write(target_chunk)

        source_map.close()
        target_map.close()

    # Replace original files with shuffled files
    if os.path.exists(temp_source_path) and os.path.exists(temp_target_path):
        os.replace(temp_source_path, source_path)
        os.replace(temp_target_path, target_path)
    else:
        print("Shuffling did not complete successfully.")


# Shuffle the files
shuffle_large_linked_files(source_path, target_path, chunk_size=chunk_size)

Updated:
I changed the script to take into account the situation if the number of lines is not divisible by the chunk size.
I ran a few more tests, and now the script seems to work without problems with any size of data and memory, maintaining synchronization between files, but just in case, an extra check wouldn’t hurt.

2 Likes

A clever approach! Thanks for sharing. You are correct in that OpenNMT-py will rearrange the batch sentences, so some shuffling is already done at the batch level.

2 Likes