Sergii Vershynskyi
Sergii Vershynskyi
Creator of this blog.
Jul 25, 2024 9 min read 1853 words

PERFORM BATCH DATA PROCESSING USING LLM AT SCALE

Abstract: LLMs usage is expanding into new applications. This article provides a solution to process data at scale using OpenAI Batch API.

Introduction

LLM “revolution” continues, which results in LLM to start being used in applications, which were historically performed by humans only. While LLM can be used to successfully solve a large class of tasks, with more coming in the near future, it is crucial to perform foundation work, for example: evaluation of the accuracy of the model outputs, proper data preparation, prompt engineering, model fine-tuning (if needed), etc. After initial research, the next logical step might be to send massive amounts of data to the LLM. This article focuses only on the latter phase.

Advantages of using Batch API compared to synchronous API calls:

  • 50% cheaper;
  • faster to implement, since there is no need to write code to send requests at scale and to process network errors.

The batch processing procedure is split into a series of steps, each of which is performed by a simple Pyhton application, which are described below.

Batch data preparation

Let’s create the following directory structure, which we will use to store batch-related data:

batch/
  in/
  out/

The next step is to transform data to requests in OpenAI batch API format, split them into batches of 50K requests each (current limit for 1 batch), and save batches to batch/in directory. We use status.json file as a global storage of batch processing state, which is used by Python programs in this article. For these purposes, we can use the following code:

import os
from utils.prompt import get_batch_request
from utils.file_util import read_jsonl_in_batches, dump_as_jsonl, write_status_file

batch_dir = 'batch'
counter = 1
max_file_size = 100 * 1024 * 1024 #100 Mb
status = []

for batch in read_jsonl_in_batches(f'{batch_dir}/items.jsonl', chunk_size=50000):
    file_name = f'batch_{counter:03d}.jsonl'
    file_path = f'{batch_dir}/in/{file_name}'

    with open(file_path, 'w') as output_file:
        for item in batch:
            request = get_batch_request(item)
            dump_as_jsonl(output_file, request)

    if os.path.getsize(file_path) > max_file_size:
        raise Exception(f"File <{file_name}> exceeds the maximum file size of 100 Mb.")

    print(f"Saved <{file_name}>.")
    counter += 1

    status.append({
        'input_filename': {
            'local': file_name
        }
    })

write_status_file(status)

utils/prompt.py defines the LLM model to use, as well as contains the function get_batch_request to transform item to batch request:

LLM_MODEL = {
    'full_name': 'gpt-4o',
    'price_per_1m': {
        'input': 5.00,
        'output': 15.00
    }
}

def get_batch_request(item):
    return {
        "custom_id": str(item["uuid"]),
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": LLM_MODEL['full_name'],
            "messages": get_prompt(item),
            "max_tokens": 50
        }
    }

def get_prompt(item):
  item_info = get_item_info(item)

  return [
    {"role": "system", "content": system_message},
    {"role": "user", "content": f": {item_info}"}
  ]

def get_item_info(item):
  #transform item to a string item_info

  return item_info.replace("\n", " ").replace('"', "'")

Each batch request has a unique field custom_id, which is used later to restore the order of batch responses.

For reading/writing json and giant json files we use utils/file_util.py:

import json

def dump_as_jsonl(output_file, object):
    object_json = json.dumps(object, ensure_ascii=False)
    output_file.write(object_json + '\n')

def read_json(file_path):
    with open(file_path, 'r') as file:
        data = json.load(file)

    return data

def write_json(data, file_path):
    with open(file_path, 'w') as file:
        json.dump(data, file, indent=4)

STATUS_FILE_PATH = 'batch/status.json'

def read_status_file():
    return read_json(STATUS_FILE_PATH)

def write_status_file(status):
    write_json(status, STATUS_FILE_PATH)

The following function read_jsonl_in_batches allows to read giant jsonl files in small chunks, which makes programs to have a small RAM footprint:

def read_jsonl_in_batches(file_path, chunk_size=100):
    with open(file_path, 'r') as file:
        while True:
            lines = []

            for _ in range(chunk_size):
                line = file.readline()

                if not line:
                    break
                
                lines.append(json.loads(line))

            if not lines:
                break

            yield lines

Uploading of batch data

To start batch processing we should upload files with batch requests, created during the previous step, to the OpenAI platform:

import os
import requests
import getpass
from utils.file_util import read_status_file, write_status_file

api_key=getpass.getpass("Enter OpenAI key: ")

headers = {
    'Authorization': f'Bearer {api_key}'
}

def upload_file_to_openai(file_path):
    with open(file_path, 'rb') as file:
        response = requests.post(
            'https://api.openai.com/v1/files',
            headers=headers,
            files={
                'file': (os.path.basename(file_path), file, 'application/json')
            },
            data={
                'purpose': 'batch'
            }
        )

    if response.status_code in [200, 202]:
        return response.json()['id']
    else:
        raise Exception(f'Failed to upload {file_path}: {response}')

status = read_status_file()
for item in status:
    local_file_name = item['input_filename']['local']

    if 'remote' in item['input_filename'] and len(item['input_filename']['remote']) > 0:
        #there was error in uploading files, skip successful uploads
        print(f'Skipping {local_file_name} as it was already uploaded.')
        continue

    file_path = os.path.join('./batch/in', local_file_name)
    remote_file_id = upload_file_to_openai(file_path)
    print(f'Successfully uploaded {file_path}, id: {remote_file_id}')
    item['input_filename']['remote'] = remote_file_id
    write_status_file(status)

Note: Here we save the status file after every successful upload in order to not lose remote_file_id(s) if the next upload fails.

Creating batches

Now that we have remote batch files, we can create batches, which automatically submit them to the processing queue:

import requests
import getpass
from utils.file_util import read_status_file, write_status_file

api_key=getpass.getpass("Enter OpenAI key: ")

headers = {
    'Authorization': f'Bearer {api_key}',
    'Content-Type': 'application/json'
}

def create_batch_in_openai(input_file_id):
    response = requests.post(
        'https://api.openai.com/v1/batches',
        headers=headers,
        json={
            "input_file_id": input_file_id,
            "endpoint": "/v1/chat/completions",
            "completion_window": "24h"
        }
    )

    if response.status_code in [200, 202]:
        return response.json()['id']
    else:
        raise Exception(f'Failed to create batch {input_file_id}: {response}')
    
status = read_status_file()
for item in status:
    remote_file_id = item['input_filename']['remote']
    batch_id = create_batch_in_openai(remote_file_id)
    item['batch_id'] = batch_id

    print(f'Successfully started batch <{batch_id}> for file {remote_file_id}')
    write_status_file(status)

Wait for batches to complete

OpenAI can start and finish processing of batch files anytime during the 24 hours. The small Python script below checks the status of all batches, as well as updates the status file with the ids of the output filenames.

import requests
import getpass
from utils.file_util import read_status_file, write_status_file

api_key=getpass.getpass("Enter OpenAI key: ")

headers = {
    'Authorization': f'Bearer {api_key}',
    'Content-Type': 'application/json'
}

def check_batch_status_in_openai(batch_id):
    response = requests.get(
        f'https://api.openai.com/v1/batches/{batch_id}',
        headers=headers
    )

    if response.status_code in [200, 202]:
        return response.json()
    else:
        raise Exception(f'Failed to retrieve batch {batch_id}: {response}')
    
status = read_status_file()
status_counts = {}

for item in status:
    batch_id = item['batch_id']
    response = check_batch_status_in_openai(batch_id)

    batch_status = response['status']
    item['status'] = batch_status

    if batch_status in status_counts:
        status_counts[batch_status] += 1
    else:
        status_counts[batch_status] = 1

    batch_errors = response['errors']
    item['errors'] = batch_errors

    if batch_errors:
        print(batch_errors)

    if batch_status == 'completed':
        item['output_filenames'] = {
            'ok': {
                'remote': response['output_file_id']
            },
            'failed': {
                'remote': response['error_file_id']
            }
        }

write_status_file(status)

for status, count in status_counts.items():
    print(f"{status}: {count}")

Get batch results

Now that all batches are completed, we can download remote files with batch responses to /batch/out/ directory:

import requests
import getpass
from utils.file_util import read_status_file, write_status_file

api_key=getpass.getpass("Enter OpenAI key: ")

headers = {
    'Authorization': f'Bearer {api_key}'
}

def download_file(file_id):
    response = requests.get(
        f'https://api.openai.com/v1/files/{file_id}/content',
        headers=headers
    )

    if response.status_code in [200, 202]:
        return response.text
    else:
        raise Exception(f'Failed to download file {file_id}: {response}')
    
def download_and_save_file(file_id, output_file_name):
    file_content = download_file(file_id)
    
    with open(f'./batch/out/{output_file_name}', 'w') as output_file:
        output_file.write(file_content)

    print(f'Successfully downloaded {output_file_name}')

status = read_status_file()
for item in status:
    if 'local' in item['output_filenames']['ok']:
        #there was error in downloading files, skip successful downloads
        print(f'Skipping {item["output_filenames"]["ok"]["local"]} as it was already downloaded.')
        continue

    ok_file_id = item['output_filenames']['ok']['remote']
    ok_local_filename = item['input_filename']['local']
    item['output_filenames']['ok']['local'] = ok_local_filename
    download_and_save_file(ok_file_id, ok_local_filename)

    error_file_id = item['output_filenames']['failed']['remote']
    if error_file_id:
        error_local_filename = ok_local_filename.replace('.jsonl', '_error.jsonl')
        download_and_save_file(error_file_id, error_local_filename)
        item['output_filenames']['failed']['local'] = error_local_filename

    write_status_file(status)

The names of output files with responses will match the names of the input files with requests. Files with errors (with suffix _error.jsonl) should be reviewed before proceeding to the next step.

Cleaning up

After we downloaded all results from OpenAI platform, we should delete all remote files:

import requests
import getpass
from utils.file_util import read_status_file, write_status_file

api_key=getpass.getpass("Enter OpenAI key: ")

headers = {
    'Authorization': f'Bearer {api_key}'
}

def delete_file(file):
    file_id = file['remote']

    if 'is_remote_deleted' in file and file['is_remote_deleted'] == True:
        print(f'File {file_id} is already deleted')
        return

    response = requests.delete(
        f'https://api.openai.com/v1/files/{file_id}',
        headers=headers
    )

    if response.status_code in [200, 202] and response.json()['deleted'] == True:
        print(f'Successfully deleted {file_id}')
        file['is_remote_deleted'] = True
    else:
        raise Exception(f'Failed to delete file {file_id}: {response}')

status = read_status_file()
for item in status:
    delete_file(item['input_filename'])
    delete_file(item['output_filenames']['ok'])

    if (item['output_filenames']['failed']['remote']):
        delete_file(item['output_filenames']['failed'])

    write_status_file(status)

Sort responses

Since OpenAI platform does not guarantee that responses will be in the same order, as requests, we should restore the original order. As we used a simple counter as custom_id, restoring the order is as simple, as sorting by it:

import json
from utils.file_util import read_status_file

status = read_status_file()
custom_id = 1

for item in status:
    output_filename = item['output_filenames']['ok']['local']
    output_file_path = f'batch/out/{output_filename}'

    with open(output_file_path, 'r') as file:
        data = [json.loads(line) for line in file]

    # Sort by 'custom_id'
    sorted_data = sorted(data, key=lambda x: int(x['custom_id']))

    # Check if the 'custom_id' is without gaps
    for response in sorted_data:
        response_custom_id = int(response['custom_id'])
        if response_custom_id != custom_id:
            raise Exception(f"Expected custom_id {custom_id}, got {response_custom_id}")
        
        custom_id += 1

    with open(output_file_path, 'w') as file:
        for item in sorted_data:
            file.write(json.dumps(item) + '\n')

Note: While this can be redundant, we verify that all requests have been processed and there are no omissions.

Add LLM resposes to the original data

Finally, we read LLM responses from files in batch/out directory, add them to items in batch/items.jsonl and write the result to batch/items_classified.jsonl:

import json
from utils.file_util import read_jsonl_in_batches, dump_as_jsonl, read_status_file
from counter import Counter
from openai.prompt import LLM_MODEL

batch_dir = 'batch'
output_file_name = f'{batch_dir}/items_classified.jsonl'

status = read_status_file()
counter = Counter(LLM_MODEL)

batch_index = 0

with open(output_file_name, 'w') as output_file:
    for items in read_jsonl_in_batches(f'{batch_dir}/items.jsonl', chunk_size=50000):
        output_filename = status[batch_index]['output_filenames']['ok']['local']
        output_file_path = f'{batch_dir}/out/{output_filename}'
        with open(output_file_path, 'r') as file:
            responses = [json.loads(line) for line in file]

        for i, item in enumerate(items):
            process_response(responses[i], item, counter)
            dump_as_jsonl(output_file, item)

        batch_index += 1

print(counter)

Function process_response validates the response, updates statistics and adds LLM model response to the item:

def process_response(response, item, counter):
    response = validate_response(response, item['uuid'])

    body = response['body']
    counter.update(food_items=1)

    prompt_tokens = body['usage']['prompt_tokens']
    completion_tokens = body['usage']['completion_tokens']
    counter.update(llm_prompt_tokens = prompt_tokens, llm_completion_tokens = completion_tokens)

    model_response = body['choices'][0]['message']['content']

    item['llm'] = model_response

Function validate_response makes sure that ids of the original data and response match, and that there were no processing errors:

def validate_response(response, uuid):
    response_custom_id = int(response['custom_id'])

    if response_custom_id != uuid:
        raise Exception(f"Expected custom_id {uuid}, got {response_custom_id}")
    
    if response['error'] != None:
        raise Exception(f"Expected no error, got {response['error']} for custom_id {uuid}")
    
    response = response['response']
    
    if response['status_code'] != 200:
        raise Exception(f"Expected status_code 200, got {response['status_code']} for custom_id {uuid}")
    
    return response

Counter class is used to aggregate counters (items count, LLM tokens), as well as to calculate the price:

class Counter:
  items = 0

  llm_prompt_tokens = 0
  llm_completion_tokens = 0

  def __init__(self, llm_model):
    self.llm_model = llm_model

  def update(self, items=0, llm_prompt_tokens=0, llm_completion_tokens=0):
    self.items += items
    self.llm_prompt_tokens += llm_prompt_tokens
    self.llm_completion_tokens += llm_completion_tokens

  def getPrice(self):
    return ((self.llm_prompt_tokens / 1000000.0) * self.llm_model['price_per_1m']['input'] + (self.llm_completion_tokens / 1000000.0) * self.llm_model['price_per_1m']['output'])*0.5 #batch processing has 50% off the price
    
  def __str__(self):
    return f'price: {self.getPrice():.2f} USD; items: {self.items}; tokens (llm in: {self.llm_prompt_tokens}, llm out: {self.llm_completion_tokens})'

Conclusion

This article describes the generic procedure to perform batch processing of data using LLM. This approach has been split into individual steps, each of which is performed using simple Pyhon application. The code has been battle-tested in the massive data processing of ~4M LLM requests. Since the described procedure is generic, you can easily use scripts in this article to build your own batch-processing procedure. Massive data processing using this approach is not only 50% cheaper but usually is faster. At the same time, doing the initial phase of research might not be convenient while using this procedure, since usually it requires processing small amounts of input data quickly and easily, ideally in one step. In this case, you can opt out of using synchronous API calls instead, which requires a completely different application structure.

I hope you enjoyed this article and that you will find it useful.

Happy coding!

Disclaimer: Code and article content are provided ‘as-is’ without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of code or article content.