I think I figured out how to bypass the speed limit when there is insufficient memory!
I proceeded from the following ideas:
- Most time is spent on I/O operations
- When training the model, OpenNmt-py creates a basket with proposals and mixes them (that is, thousands of sentences will be mixed within the basket).
- You can reduce the mixing quality criterion from 1 sentence to chunks of 100-10,000 sentences, which should increase the speed tenfold.
As a result, with 139 million batches of sentences and a chunk size of 1000 sentences, I got the following result:
Shuffling chunks: 100%|██████████| 139938/139938 [08:29<00:00, 274.59it/s]
Which is already much better than the dozens of hours that I received earlier if there was not enough memory.
Here is the modified script itself:
import os
import random
import mmap
from tqdm import tqdm
chunk_size = 1000
# Paths to the files that need to be shuffled
source_path = 'source.txt'
target_path = 'target.txt'
# Function to get start positions of each chunk in the file
def get_chunk_positions(file_path, chunk_size):
with open(file_path, 'rb') as f:
mmapped_file = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
positions = [0]
line_count = 0
while mmapped_file.readline():
line_count += 1
if line_count % chunk_size == 0:
positions.append(mmapped_file.tell())
if line_count % chunk_size != 0: # Add position for the last chunk if it's smaller than chunk_size
positions.append(mmapped_file.tell())
return positions
# Function to shuffle large linked files with minimal memory usage
def shuffle_large_linked_files(source_path, target_path, chunk_size=1000):
# Check if the number of lines in both files is the same
with open(source_path, 'r') as src, open(target_path, 'r') as tgt:
if sum(1 for _ in src) != sum(1 for _ in tgt):
raise ValueError("The number of lines in the source and target files is not the same.")
# Create temporary files
temp_source_path = source_path + '.shuffled'
temp_target_path = target_path + '.shuffled'
# Get chunk start positions
source_positions = get_chunk_positions(source_path, chunk_size)
target_positions = get_chunk_positions(target_path, chunk_size)
# Create shuffled indices
chunks = list(range(len(source_positions) - 1))
random.shuffle(chunks)
# Process chunks of indices
with open(source_path, 'rb') as source_file, open(target_path, 'rb') as target_file, \
open(temp_source_path, 'ab') as temp_source_file, open(temp_target_path, 'ab') as temp_target_file:
source_map = mmap.mmap(source_file.fileno(), 0, access=mmap.ACCESS_READ)
target_map = mmap.mmap(target_file.fileno(), 0, access=mmap.ACCESS_READ)
for chunk in tqdm(chunks, desc="Shuffling chunks"):
source_chunk = source_map[source_positions[chunk]:source_positions[chunk+1]]
target_chunk = target_map[target_positions[chunk]:target_positions[chunk+1]]
temp_source_file.write(source_chunk)
temp_target_file.write(target_chunk)
source_map.close()
target_map.close()
# Replace original files with shuffled files
if os.path.exists(temp_source_path) and os.path.exists(temp_target_path):
os.replace(temp_source_path, source_path)
os.replace(temp_target_path, target_path)
else:
print("Shuffling did not complete successfully.")
# Shuffle the files
shuffle_large_linked_files(source_path, target_path, chunk_size=chunk_size)
Updated:
I changed the script to take into account the situation if the number of lines is not divisible by the chunk size.
I ran a few more tests, and now the script seems to work without problems with any size of data and memory, maintaining synchronization between files, but just in case, an extra check wouldn’t hurt.