PERFORM SYNCHRONOUS DATA PROCESSING USING LLM AT SCALE
Abstract: While batch processing of data using LLM has huge advantages, a subset of LLM applications requires synchronous processing. This article provides a solution to synchronously process data at scale using OpenAI API.
Introduction
Before we described the solution to process massive amounts of data by using OpenAI Batch API. Batch processing is cheaper and easier to run when you have huge amounts of data. At the same time, if your application cannot wait until the batch process is finished, the only option would be to call OpenAI inference endpoints directly. This task is easy to accomplish when the desired request rate is low, but when your application needs to send thousands or tens of thousands of requests simultaneously, this can become a more complicated task, since you should take care of API rate limits, network errors, technologies for parallelization of sending requests, retry logic, etc. Experience shows that changing application from low-volume processing to massive processing leads to a complete rewrite of the code.
This article describes the generic code, which can be used to write program to perform ASAP processing of massive amounts of data by LLM.
Main loop for data processing
We use one Python program to implement the main loop for data processing by LLM, which reads data from the input filename items.jsonl
, prepares the batch in the current_batch
array, sends the batch to LLM by calling LLMProvider.call_llm
function, processes the results (function LLMProvider.process_llm_response
), writes them to the output file and repeats everything from the beginning until there will be no items to process. The important thing here is that some of the items can be processed incorrectly, and in this case we send them again to the LLM in the next batch. The code, which performs all of these steps, is listed below:
import json
import asyncio
from counter import Counter
import openai.LLMProvider as LLMProvider
from utils.file_util import dump_as_jsonl
from RateLimiter import RateLimiter
MAX_REQUESTS_PER_MIN = 10000
input_filename = 'items.jsonl'
output_file_name = 'items_processed.jsonl'
counter = Counter(LLMProvider.LLM_MODEL)
RATE_LIMITER = RateLimiter()
with open(output_file_name, 'w') as output_file:
with open(input_filename, 'r') as input_file:
current_batch = []
is_eof = False
while True:
while ((len(current_batch) < MAX_REQUESTS_PER_MIN) and (not is_eof)):
line = input_file.readline()
if not line:
is_eof = True
break
current_batch.append(json.loads(line))
if len(current_batch) == 0:
break
RATE_LIMITER.update_time()
asyncio.run(LLMProvider.call_llm(current_batch, counter))
errors_batch = []
processed_batch = []
for item in current_batch:
if LLMProvider.process_llm_response(item, counter):
processed_batch.append(item)
counter.update(items=1)
else:
errors_batch.append(item)
current_batch = errors_batch
for item in processed_batch:
dump_as_jsonl(output_file, item)
print(counter)
RATE_LIMITER.sleep_if_needed()
print(counter)
Note: Function
dump_as_jsonl
is the same, as in the batch processing article.
Code for sending requests to LLM
openai
Python library supports standard Python asyncio
library, which we use to run functions call_llm_impl
concurrently:
import asyncio
import getpass
from openai import AsyncOpenAI
client = AsyncOpenAI(
api_key=getpass.getpass("Enter OpenAI key: ")
)
async def call_llm(items, counter):
await asyncio.gather(*[call_llm_impl(item, counter) for item in items])
Using data from the item
, function call_llm_impl
creates a prompt and sends it to LLM using openai
Python library, updates counters and adds results to the item
:
LLM_MODEL = {
'full_name': "gpt-4o",
'price_per_1m': {
'input': 5.00,
'output': 15.00
}
}
async def call_llm_impl(item, counter):
prompt = get_prompt(item)
response = await retry_with_exponential_backoff(
func=client.chat.completions.create,
model=LLM_MODEL['full_name'],
messages=prompt
)
prompt_tokens = response.usage.prompt_tokens
completion_tokens = response.usage.completion_tokens
model_response = response.choices[0].message.content
counter.update(llm_prompt_tokens = prompt_tokens, llm_completion_tokens = completion_tokens)
requests = (item['llm']['requests'] + 1) if ("llm" in item and 'requests' in item['llm']) else 1
item['llm'] = {
"response": model_response,
"requests": requests
}
Note: Functions
get_prompt
andget_item_info
are the same, as in the batch processing article.
Some requests can error out, and for them we added exponential backoff code, implemented in the retry_with_exponential_backoff
function:
import random
import time
async def retry_with_exponential_backoff(
initial_delay: float = 1,
exponential_base: float = 2,
jitter: bool = True,
max_retries: int = 2,
func=None,
**kwargs
):
"""Retry a function with exponential backoff."""
num_retries = 0
delay = initial_delay
while True:
try:
return await func(**kwargs)
except Exception as e:
num_retries += 1
if num_retries > max_retries:
raise Exception(
f"Maximum number of retries ({max_retries}) exceeded."
)
random_number = random.random()
randomness = (1 + jitter*random_number)
delay *= exponential_base*randomness
if (delay > 61):
delay = 60 + random_number*10
print(f"Got {type(e).__name__}. Retrying call ({num_retries}) to AI API in {int(delay)} seconds...")
time.sleep(delay)
Processing the results
Function LLMProvider.process_llm_response
returns True
if processing is finished for the item, otherwise, it will be added to the errors_batch
in the main loop for re-processing. The processing is considered done if function is_response_valid
returns True
or if the number of the calls to LLM to reprocess the item
reaches MAX_REQUESTS_COUNT
. Resending the same prompt to LLM helps in case LLM fails to respond in the desired format, and the code has the ability to easily detect it.
MAX_REQUESTS_COUNT = 4
def process_llm_response(item, counter):
model_response = item["llm"]["response"]
if is_response_valid(model_response):
item["llm"]["response"] = model_response
return True
counter.update(bad_responses=1)
if (item["llm"]["requests"] == MAX_REQUESTS_COUNT):
counter.update(invalid_classifications=1)
return True
return False
Rate limiting
OpenAI API has rate limits, which will lead to RateLimitError
s if we exceed them. To stay within limits, we can use the following class:
import time
class RateLimiter:
creation_time = 0
def update_time(self):
self.creation_time = time.time()
def sleep_if_needed(self):
time_to_sleep = 60 - (time.time() - self.creation_time) # + 1
if time_to_sleep > 0:
print(f"sleeping for {int(time_to_sleep)}s...")
time.sleep(time_to_sleep)
Counters
To monitor the progress of LLM inference, the number of errors and money spent we use Counter
class, similar to what we had in the batch processing article:
class Counter:
items = 0
llm_prompt_tokens = 0
llm_completion_tokens = 0
bad_responses = 0
invalid_classifications = 0
def __init__(self, llm_model):
self.llm_model = llm_model
def update(self, items=0, llm_prompt_tokens=0, llm_completion_tokens=0, bad_responses=0, invalid_classifications=0):
self.items += items
self.llm_prompt_tokens += llm_prompt_tokens
self.llm_completion_tokens += llm_completion_tokens
self.bad_responses += bad_responses
self.invalid_classifications += invalid_classifications
def getPrice(self):
return (self.llm_prompt_tokens / 1000000.0) * self.llm_model['price_per_1m']['input'] + (self.llm_completion_tokens / 1000000.0) * self.llm_model['price_per_1m']['output']
def __str__(self):
return f'price: {self.getPrice():.2f} USD; items: {self.items}, invalid: {self.invalid_classifications}; bad responses: {self.bad_responses}; tokens (llm in: {self.llm_prompt_tokens}, llm out: {self.llm_completion_tokens})'
Conclusion
This article provides foundation code to perform highly concurrent synchronous processing of data using LLM. It is worth mentioning, that the batch processing approach is simpler and currently 50% cheaper than using direct calls to inference endpoints, but in case your task requires responses ASAP or the multistep batch procedure is inconvenient for any reason, you can easily reuse code in this article as a foundation to solve your tasks, since it is generic. In the phase of exploring and experimenting with building an LLM-based solution, which frequently requires processing small amounts of input data quickly and easily, the described approach can be very convenient, since it processes data in one step.
I hope you enjoyed this article and that you will find it useful.
Happy coding!
Disclaimer: Code and article content are provided ‘as-is’ without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of code or article content.