Rate Limits
Handling Rate Limits
Best practices for handling rate limits in your application
Learn how to properly handle rate limits and implement retry strategies.
Exponential Backoff
The recommended strategy for handling rate limits:
from brainus_ai import BrainusAI, RateLimitError
import asyncio
import os
# client = BrainusAI(api_key=os.getenv("BRAINUS_API_KEY"))
async def query_with_backoff(client, query, max_retries=3):
for attempt in range(max_retries):
try:
return await client.query(query=query, store_id="default")
except RateLimitError as e:
if attempt == max_retries - 1:
raise
wait_time = e.retry_after or (2 ** attempt)
print(f"Rate limited. Waiting {wait_time}s...")
await asyncio.sleep(wait_time)
return None
# async with BrainusAI(...) as client:
# result = await query_with_backoff(client, "What is photosynthesis?")import { BrainusAI, RateLimitError } from '@brainus/ai';
// const client = new BrainusAI({ apiKey: process.env.BRAINUS_API_KEY });
async function queryWithBackoff(query, maxRetries = 3) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await client.query({ query, storeId: 'default' });
} catch (error) {
// Check error type/code appropriate to SDK
if (!(error instanceof RateLimitError)) throw error;
if (attempt === maxRetries - 1) throw error;
const waitTime = error.retryAfter || (2 ** attempt);
console.log(`Rate limited. Waiting ${waitTime}s...`);
await new Promise(resolve => setTimeout(resolve, waitTime * 1000));
}
}
}
// const result = await queryWithBackoff('What is photosynthesis?');Using Retry-After Header
Always respect the Retry-After header:
import requests
import time
import os
def make_request():
response = requests.post(
"https://api.brainus.lk/api/v1/query",
headers={"X-API-Key": os.getenv("BRAINUS_API_KEY")},
json={"query": "test", "store_id": "default"}
)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
print(f"Rate limited. Retrying after {retry_after}s")
time.sleep(retry_after)
return make_request() # Retry
return response.json()Queue-Based Approach
For high-volume applications:
import asyncio
from brainus_ai import BrainusAI
import os
async def worker(request_queue, client):
while True:
query, future = await request_queue.get()
try:
result = await client.query(query=query, store_id="default")
future.set_result(result)
except Exception as e:
future.set_exception(e)
# Rate limiting: 1 request per second
await asyncio.sleep(1)
request_queue.task_done()
# Usage with asyncio
async def main():
api_key = os.getenv("BRAINUS_API_KEY")
request_queue = asyncio.Queue()
async with BrainusAI(api_key=api_key) as client:
# Start worker
worker_task = asyncio.create_task(worker(request_queue, client))
# Queue request
loop = asyncio.get_running_loop()
future = loop.create_future()
await request_queue.put(("What is photosynthesis?", future))
# Wait for result
try:
result = await future
print(f"Answer: {result.answer}")
except Exception as e:
print(f"Error: {e}")
# Cancel worker
worker_task.cancel()
# asyncio.run(main())Monitoring Limits
Track your usage to avoid hitting limits:
from brainus_ai import BrainusAI
import os
# client = BrainusAI(api_key=os.getenv("BRAINUS_API_KEY"))
async def query_with_monitoring(client, query):
result = await client.query(query=query, store_id="default")
# Check rate limit headers (hypothetical metadata structure)
# remaining = result.metadata.rate_limit_remaining
# limit = result.metadata.rate_limit_limit
# usage_percent = ((limit - remaining) / limit) * 100
# if usage_percent > 80:
# print(f"Warning: {usage_percent:.1f}% of rate limit used")
return resultDon't hammer the API with retries! Always implement exponential backoff and
respect the Retry-After header.
Next Steps
- Optimization - Reduce API calls
- Understanding Limits - How limits work