PERFORM BATCH DATA PROCESSING USING LLM AT SCALE
Abstract: LLMs usage is expanding into new applications. This article provides a solution to process data at scale using OpenAI Batch API.
Introduction
LLM “revolution” continues, which results in LLM to start being used in applications, which were historically performed by humans only. While LLM can be used to successfully solve a large class of tasks, with more coming in the near future, it is crucial to perform foundation work, for example: evaluation of the accuracy of the model outputs, proper data preparation, prompt engineering, model fine-tuning (if needed), etc. After initial research, the next logical step might be to send massive amounts of data to the LLM. This article focuses only on the latter phase.
Advantages of using Batch API compared to synchronous API calls:
- 50% cheaper;
- faster to implement, since there is no need to write code to send requests at scale and to process network errors.
The batch processing procedure is split into a series of steps, each of which is performed by a simple Pyhton application, which are described below.
Batch data preparation
Let’s create the following directory structure, which we will use to store batch-related data:
batch/
in/
out/
The next step is to transform data to requests in OpenAI batch API format, split them into batches of 50K requests each (current limit for 1 batch), and save batches to batch/in
directory. We use status.json
file as a global storage of batch processing state, which is used by Python programs in this article. For these purposes, we can use the following code:
import os
from utils.prompt import get_batch_request
from utils.file_util import read_jsonl_in_batches, dump_as_jsonl, write_status_file
batch_dir = 'batch'
counter = 1
max_file_size = 100 * 1024 * 1024 #100 Mb
status = []
for batch in read_jsonl_in_batches(f'{batch_dir}/items.jsonl', chunk_size=50000):
file_name = f'batch_{counter:03d}.jsonl'
file_path = f'{batch_dir}/in/{file_name}'
with open(file_path, 'w') as output_file:
for item in batch:
request = get_batch_request(item)
dump_as_jsonl(output_file, request)
if os.path.getsize(file_path) > max_file_size:
raise Exception(f"File <{file_name}> exceeds the maximum file size of 100 Mb.")
print(f"Saved <{file_name}>.")
counter += 1
status.append({
'input_filename': {
'local': file_name
}
})
write_status_file(status)
utils/prompt.py
defines the LLM model to use, as well as contains the function get_batch_request
to transform item
to batch request:
LLM_MODEL = {
'full_name': 'gpt-4o',
'price_per_1m': {
'input': 5.00,
'output': 15.00
}
}
def get_batch_request(item):
return {
"custom_id": str(item["uuid"]),
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": LLM_MODEL['full_name'],
"messages": get_prompt(item),
"max_tokens": 50
}
}
def get_prompt(item):
item_info = get_item_info(item)
return [
{"role": "system", "content": system_message},
{"role": "user", "content": f": {item_info}"}
]
def get_item_info(item):
#transform item to a string item_info
return item_info.replace("\n", " ").replace('"', "'")
Each batch request has a unique field custom_id
, which is used later to restore the order of batch responses.
For reading/writing json
and giant json
files we use utils/file_util.py
:
import json
def dump_as_jsonl(output_file, object):
object_json = json.dumps(object, ensure_ascii=False)
output_file.write(object_json + '\n')
def read_json(file_path):
with open(file_path, 'r') as file:
data = json.load(file)
return data
def write_json(data, file_path):
with open(file_path, 'w') as file:
json.dump(data, file, indent=4)
STATUS_FILE_PATH = 'batch/status.json'
def read_status_file():
return read_json(STATUS_FILE_PATH)
def write_status_file(status):
write_json(status, STATUS_FILE_PATH)
The following function read_jsonl_in_batches
allows to read giant jsonl
files in small chunks, which makes programs to have a small RAM footprint:
def read_jsonl_in_batches(file_path, chunk_size=100):
with open(file_path, 'r') as file:
while True:
lines = []
for _ in range(chunk_size):
line = file.readline()
if not line:
break
lines.append(json.loads(line))
if not lines:
break
yield lines
Uploading of batch data
To start batch processing we should upload files with batch requests, created during the previous step, to the OpenAI platform:
import os
import requests
import getpass
from utils.file_util import read_status_file, write_status_file
api_key=getpass.getpass("Enter OpenAI key: ")
headers = {
'Authorization': f'Bearer {api_key}'
}
def upload_file_to_openai(file_path):
with open(file_path, 'rb') as file:
response = requests.post(
'https://api.openai.com/v1/files',
headers=headers,
files={
'file': (os.path.basename(file_path), file, 'application/json')
},
data={
'purpose': 'batch'
}
)
if response.status_code in [200, 202]:
return response.json()['id']
else:
raise Exception(f'Failed to upload {file_path}: {response}')
status = read_status_file()
for item in status:
local_file_name = item['input_filename']['local']
if 'remote' in item['input_filename'] and len(item['input_filename']['remote']) > 0:
#there was error in uploading files, skip successful uploads
print(f'Skipping {local_file_name} as it was already uploaded.')
continue
file_path = os.path.join('./batch/in', local_file_name)
remote_file_id = upload_file_to_openai(file_path)
print(f'Successfully uploaded {file_path}, id: {remote_file_id}')
item['input_filename']['remote'] = remote_file_id
write_status_file(status)
Note: Here we save the status file after every successful upload in order to not lose
remote_file_id(s)
if the next upload fails.
Creating batches
Now that we have remote batch files, we can create batches, which automatically submit them to the processing queue:
import requests
import getpass
from utils.file_util import read_status_file, write_status_file
api_key=getpass.getpass("Enter OpenAI key: ")
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
def create_batch_in_openai(input_file_id):
response = requests.post(
'https://api.openai.com/v1/batches',
headers=headers,
json={
"input_file_id": input_file_id,
"endpoint": "/v1/chat/completions",
"completion_window": "24h"
}
)
if response.status_code in [200, 202]:
return response.json()['id']
else:
raise Exception(f'Failed to create batch {input_file_id}: {response}')
status = read_status_file()
for item in status:
remote_file_id = item['input_filename']['remote']
batch_id = create_batch_in_openai(remote_file_id)
item['batch_id'] = batch_id
print(f'Successfully started batch <{batch_id}> for file {remote_file_id}')
write_status_file(status)
Wait for batches to complete
OpenAI can start and finish processing of batch files anytime during the 24 hours. The small Python script below checks the status of all batches, as well as updates the status file with the ids of the output filenames.
import requests
import getpass
from utils.file_util import read_status_file, write_status_file
api_key=getpass.getpass("Enter OpenAI key: ")
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
def check_batch_status_in_openai(batch_id):
response = requests.get(
f'https://api.openai.com/v1/batches/{batch_id}',
headers=headers
)
if response.status_code in [200, 202]:
return response.json()
else:
raise Exception(f'Failed to retrieve batch {batch_id}: {response}')
status = read_status_file()
status_counts = {}
for item in status:
batch_id = item['batch_id']
response = check_batch_status_in_openai(batch_id)
batch_status = response['status']
item['status'] = batch_status
if batch_status in status_counts:
status_counts[batch_status] += 1
else:
status_counts[batch_status] = 1
batch_errors = response['errors']
item['errors'] = batch_errors
if batch_errors:
print(batch_errors)
if batch_status == 'completed':
item['output_filenames'] = {
'ok': {
'remote': response['output_file_id']
},
'failed': {
'remote': response['error_file_id']
}
}
write_status_file(status)
for status, count in status_counts.items():
print(f"{status}: {count}")
Get batch results
Now that all batches are completed, we can download remote files with batch responses to /batch/out/
directory:
import requests
import getpass
from utils.file_util import read_status_file, write_status_file
api_key=getpass.getpass("Enter OpenAI key: ")
headers = {
'Authorization': f'Bearer {api_key}'
}
def download_file(file_id):
response = requests.get(
f'https://api.openai.com/v1/files/{file_id}/content',
headers=headers
)
if response.status_code in [200, 202]:
return response.text
else:
raise Exception(f'Failed to download file {file_id}: {response}')
def download_and_save_file(file_id, output_file_name):
file_content = download_file(file_id)
with open(f'./batch/out/{output_file_name}', 'w') as output_file:
output_file.write(file_content)
print(f'Successfully downloaded {output_file_name}')
status = read_status_file()
for item in status:
if 'local' in item['output_filenames']['ok']:
#there was error in downloading files, skip successful downloads
print(f'Skipping {item["output_filenames"]["ok"]["local"]} as it was already downloaded.')
continue
ok_file_id = item['output_filenames']['ok']['remote']
ok_local_filename = item['input_filename']['local']
item['output_filenames']['ok']['local'] = ok_local_filename
download_and_save_file(ok_file_id, ok_local_filename)
error_file_id = item['output_filenames']['failed']['remote']
if error_file_id:
error_local_filename = ok_local_filename.replace('.jsonl', '_error.jsonl')
download_and_save_file(error_file_id, error_local_filename)
item['output_filenames']['failed']['local'] = error_local_filename
write_status_file(status)
The names of output files with responses will match the names of the input files with requests. Files with errors (with suffix _error.jsonl
) should be reviewed before proceeding to the next step.
Cleaning up
After we downloaded all results from OpenAI platform, we should delete all remote files:
import requests
import getpass
from utils.file_util import read_status_file, write_status_file
api_key=getpass.getpass("Enter OpenAI key: ")
headers = {
'Authorization': f'Bearer {api_key}'
}
def delete_file(file):
file_id = file['remote']
if 'is_remote_deleted' in file and file['is_remote_deleted'] == True:
print(f'File {file_id} is already deleted')
return
response = requests.delete(
f'https://api.openai.com/v1/files/{file_id}',
headers=headers
)
if response.status_code in [200, 202] and response.json()['deleted'] == True:
print(f'Successfully deleted {file_id}')
file['is_remote_deleted'] = True
else:
raise Exception(f'Failed to delete file {file_id}: {response}')
status = read_status_file()
for item in status:
delete_file(item['input_filename'])
delete_file(item['output_filenames']['ok'])
if (item['output_filenames']['failed']['remote']):
delete_file(item['output_filenames']['failed'])
write_status_file(status)
Sort responses
Since OpenAI platform does not guarantee that responses will be in the same order, as requests, we should restore the original order. As we used a simple counter as custom_id
, restoring the order is as simple, as sorting by it:
import json
from utils.file_util import read_status_file
status = read_status_file()
custom_id = 1
for item in status:
output_filename = item['output_filenames']['ok']['local']
output_file_path = f'batch/out/{output_filename}'
with open(output_file_path, 'r') as file:
data = [json.loads(line) for line in file]
# Sort by 'custom_id'
sorted_data = sorted(data, key=lambda x: int(x['custom_id']))
# Check if the 'custom_id' is without gaps
for response in sorted_data:
response_custom_id = int(response['custom_id'])
if response_custom_id != custom_id:
raise Exception(f"Expected custom_id {custom_id}, got {response_custom_id}")
custom_id += 1
with open(output_file_path, 'w') as file:
for item in sorted_data:
file.write(json.dumps(item) + '\n')
Note: While this can be redundant, we verify that all requests have been processed and there are no omissions.
Add LLM resposes to the original data
Finally, we read LLM responses from files in batch/out
directory, add them to items in batch/items.jsonl
and write the result to batch/items_classified.jsonl
:
import json
from utils.file_util import read_jsonl_in_batches, dump_as_jsonl, read_status_file
from counter import Counter
from openai.prompt import LLM_MODEL
batch_dir = 'batch'
output_file_name = f'{batch_dir}/items_classified.jsonl'
status = read_status_file()
counter = Counter(LLM_MODEL)
batch_index = 0
with open(output_file_name, 'w') as output_file:
for items in read_jsonl_in_batches(f'{batch_dir}/items.jsonl', chunk_size=50000):
output_filename = status[batch_index]['output_filenames']['ok']['local']
output_file_path = f'{batch_dir}/out/{output_filename}'
with open(output_file_path, 'r') as file:
responses = [json.loads(line) for line in file]
for i, item in enumerate(items):
process_response(responses[i], item, counter)
dump_as_jsonl(output_file, item)
batch_index += 1
print(counter)
Function process_response
validates the response, updates statistics and adds LLM model response to the item
:
def process_response(response, item, counter):
response = validate_response(response, item['uuid'])
body = response['body']
counter.update(food_items=1)
prompt_tokens = body['usage']['prompt_tokens']
completion_tokens = body['usage']['completion_tokens']
counter.update(llm_prompt_tokens = prompt_tokens, llm_completion_tokens = completion_tokens)
model_response = body['choices'][0]['message']['content']
item['llm'] = model_response
Function validate_response
makes sure that ids of the original data and response match, and that there were no processing errors:
def validate_response(response, uuid):
response_custom_id = int(response['custom_id'])
if response_custom_id != uuid:
raise Exception(f"Expected custom_id {uuid}, got {response_custom_id}")
if response['error'] != None:
raise Exception(f"Expected no error, got {response['error']} for custom_id {uuid}")
response = response['response']
if response['status_code'] != 200:
raise Exception(f"Expected status_code 200, got {response['status_code']} for custom_id {uuid}")
return response
Counter
class is used to aggregate counters (items count, LLM tokens), as well as to calculate the price:
class Counter:
items = 0
llm_prompt_tokens = 0
llm_completion_tokens = 0
def __init__(self, llm_model):
self.llm_model = llm_model
def update(self, items=0, llm_prompt_tokens=0, llm_completion_tokens=0):
self.items += items
self.llm_prompt_tokens += llm_prompt_tokens
self.llm_completion_tokens += llm_completion_tokens
def getPrice(self):
return ((self.llm_prompt_tokens / 1000000.0) * self.llm_model['price_per_1m']['input'] + (self.llm_completion_tokens / 1000000.0) * self.llm_model['price_per_1m']['output'])*0.5 #batch processing has 50% off the price
def __str__(self):
return f'price: {self.getPrice():.2f} USD; items: {self.items}; tokens (llm in: {self.llm_prompt_tokens}, llm out: {self.llm_completion_tokens})'
Conclusion
This article describes the generic procedure to perform batch processing of data using LLM. This approach has been split into individual steps, each of which is performed using simple Pyhon application. The code has been battle-tested in the massive data processing of ~4M LLM requests. Since the described procedure is generic, you can easily use scripts in this article to build your own batch-processing procedure. Massive data processing using this approach is not only 50% cheaper but usually is faster. At the same time, doing the initial phase of research might not be convenient while using this procedure, since usually it requires processing small amounts of input data quickly and easily, ideally in one step. In this case, you can opt out of using synchronous API calls instead, which requires a completely different application structure.
I hope you enjoyed this article and that you will find it useful.
Happy coding!
Disclaimer: Code and article content are provided ‘as-is’ without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of code or article content.