Sergii Vershynskyi
Sergii Vershynskyi
Creator of this blog.
Jul 26, 2024 6 min read 1123 words

PERFORM SYNCHRONOUS DATA PROCESSING USING LLM AT SCALE

Abstract: While batch processing of data using LLM has huge advantages, a subset of LLM applications requires synchronous processing. This article provides a solution to synchronously process data at scale using OpenAI API.

Introduction

Before we described the solution to process massive amounts of data by using OpenAI Batch API. Batch processing is cheaper and easier to run when you have huge amounts of data. At the same time, if your application cannot wait until the batch process is finished, the only option would be to call OpenAI inference endpoints directly. This task is easy to accomplish when the desired request rate is low, but when your application needs to send thousands or tens of thousands of requests simultaneously, this can become a more complicated task, since you should take care of API rate limits, network errors, technologies for parallelization of sending requests, retry logic, etc. Experience shows that changing application from low-volume processing to massive processing leads to a complete rewrite of the code.

This article describes the generic code, which can be used to write program to perform ASAP processing of massive amounts of data by LLM.

Main loop for data processing

We use one Python program to implement the main loop for data processing by LLM, which reads data from the input filename items.jsonl, prepares the batch in the current_batch array, sends the batch to LLM by calling LLMProvider.call_llm function, processes the results (function LLMProvider.process_llm_response), writes them to the output file and repeats everything from the beginning until there will be no items to process. The important thing here is that some of the items can be processed incorrectly, and in this case we send them again to the LLM in the next batch. The code, which performs all of these steps, is listed below:

import json
import asyncio
from counter import Counter
import openai.LLMProvider as LLMProvider
from utils.file_util import dump_as_jsonl
from RateLimiter import RateLimiter

MAX_REQUESTS_PER_MIN = 10000

input_filename = 'items.jsonl'
output_file_name = 'items_processed.jsonl'

counter = Counter(LLMProvider.LLM_MODEL)

RATE_LIMITER = RateLimiter()

with open(output_file_name, 'w') as output_file:
  with open(input_filename, 'r') as input_file:
    current_batch = []
    is_eof = False

    while True:
      while ((len(current_batch) < MAX_REQUESTS_PER_MIN) and (not is_eof)):
        line = input_file.readline()

        if not line:
          is_eof = True
          break

        current_batch.append(json.loads(line))

      if len(current_batch) == 0:
        break

      RATE_LIMITER.update_time()
      asyncio.run(LLMProvider.call_llm(current_batch, counter))

      errors_batch = []
      processed_batch = []

      for item in current_batch:
        if LLMProvider.process_llm_response(item, counter):
          processed_batch.append(item)

          counter.update(items=1)
        else:    
          errors_batch.append(item)

      current_batch = errors_batch

      for item in processed_batch:
        dump_as_jsonl(output_file, item)

      print(counter)
      RATE_LIMITER.sleep_if_needed()

print(counter)

Note: Function dump_as_jsonl is the same, as in the batch processing article.

Code for sending requests to LLM

openai Python library supports standard Python asyncio library, which we use to run functions call_llm_impl concurrently:

import asyncio
import getpass
from openai import AsyncOpenAI

client = AsyncOpenAI(
  api_key=getpass.getpass("Enter OpenAI key: ")
)

async def call_llm(items, counter): 
  await asyncio.gather(*[call_llm_impl(item, counter) for item in items])

Using data from the item, function call_llm_impl creates a prompt and sends it to LLM using openai Python library, updates counters and adds results to the item:

LLM_MODEL = {
  'full_name': "gpt-4o",
  'price_per_1m': {
    'input': 5.00,
    'output': 15.00
  }
}

async def call_llm_impl(item, counter):
  prompt = get_prompt(item)

  response = await retry_with_exponential_backoff(
    func=client.chat.completions.create,
    model=LLM_MODEL['full_name'],
    messages=prompt
  )

  prompt_tokens = response.usage.prompt_tokens
  completion_tokens = response.usage.completion_tokens
  model_response = response.choices[0].message.content

  counter.update(llm_prompt_tokens = prompt_tokens, llm_completion_tokens = completion_tokens)

  requests = (item['llm']['requests'] + 1) if ("llm" in item and 'requests' in item['llm']) else 1

  item['llm'] = {
    "response": model_response,
    "requests": requests
  }

Note: Functions get_prompt and get_item_info are the same, as in the batch processing article.

Some requests can error out, and for them we added exponential backoff code, implemented in the retry_with_exponential_backoff function:

import random
import time

async def retry_with_exponential_backoff(
    initial_delay: float = 1,
    exponential_base: float = 2,
    jitter: bool = True,
    max_retries: int = 2,
    func=None,
    **kwargs
):
    """Retry a function with exponential backoff."""

    num_retries = 0
    delay = initial_delay

    while True:
        try:
            return await func(**kwargs)
        except Exception as e:
            num_retries += 1

            if num_retries > max_retries:
                raise Exception(
                    f"Maximum number of retries ({max_retries}) exceeded."
                )

            random_number = random.random()
            randomness = (1 + jitter*random_number)
            delay *= exponential_base*randomness
            if (delay > 61):
                delay = 60 + random_number*10

            print(f"Got {type(e).__name__}. Retrying call ({num_retries}) to AI API in {int(delay)} seconds...")
            time.sleep(delay)

Processing the results

Function LLMProvider.process_llm_response returns True if processing is finished for the item, otherwise, it will be added to the errors_batch in the main loop for re-processing. The processing is considered done if function is_response_valid returns True or if the number of the calls to LLM to reprocess the item reaches MAX_REQUESTS_COUNT. Resending the same prompt to LLM helps in case LLM fails to respond in the desired format, and the code has the ability to easily detect it.

MAX_REQUESTS_COUNT = 4

def process_llm_response(item, counter):
  model_response = item["llm"]["response"]

  if is_response_valid(model_response):
    item["llm"]["response"] = model_response
    return True
  
  counter.update(bad_responses=1)

  if (item["llm"]["requests"] == MAX_REQUESTS_COUNT):
    counter.update(invalid_classifications=1)
    return True

  return False

Rate limiting

OpenAI API has rate limits, which will lead to RateLimitErrors if we exceed them. To stay within limits, we can use the following class:

import time

class RateLimiter:
  creation_time = 0

  def update_time(self):
    self.creation_time = time.time()

  def sleep_if_needed(self):
    time_to_sleep = 60 - (time.time() - self.creation_time) # + 1

    if time_to_sleep > 0:
      print(f"sleeping for {int(time_to_sleep)}s...")
      time.sleep(time_to_sleep)

Counters

To monitor the progress of LLM inference, the number of errors and money spent we use Counter class, similar to what we had in the batch processing article:

class Counter:
  items = 0

  llm_prompt_tokens = 0
  llm_completion_tokens = 0

  bad_responses = 0
  invalid_classifications = 0

  def __init__(self, llm_model):
    self.llm_model = llm_model

  def update(self, items=0, llm_prompt_tokens=0, llm_completion_tokens=0, bad_responses=0, invalid_classifications=0):
    self.items += items
    self.llm_prompt_tokens += llm_prompt_tokens
    self.llm_completion_tokens += llm_completion_tokens
    self.bad_responses += bad_responses
    self.invalid_classifications += invalid_classifications

  def getPrice(self):
    return (self.llm_prompt_tokens / 1000000.0) * self.llm_model['price_per_1m']['input'] + (self.llm_completion_tokens / 1000000.0) * self.llm_model['price_per_1m']['output']
    
  def __str__(self):
    return f'price: {self.getPrice():.2f} USD; items: {self.items}, invalid: {self.invalid_classifications}; bad responses: {self.bad_responses}; tokens (llm in: {self.llm_prompt_tokens}, llm out: {self.llm_completion_tokens})'

Conclusion

This article provides foundation code to perform highly concurrent synchronous processing of data using LLM. It is worth mentioning, that the batch processing approach is simpler and currently 50% cheaper than using direct calls to inference endpoints, but in case your task requires responses ASAP or the multistep batch procedure is inconvenient for any reason, you can easily reuse code in this article as a foundation to solve your tasks, since it is generic. In the phase of exploring and experimenting with building an LLM-based solution, which frequently requires processing small amounts of input data quickly and easily, the described approach can be very convenient, since it processes data in one step.

I hope you enjoyed this article and that you will find it useful.

Happy coding!

Disclaimer: Code and article content are provided ‘as-is’ without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of code or article content.