Rate Limits
Optimization Strategies
Reduce API calls and optimize your usage
Learn strategies to reduce API calls and stay within your rate limits.
1. Implement Caching
Cache responses to avoid duplicate requests:
# Caching async functions requires care or specialized libraries.
# Here is a conceptual example using a simple dictionary.
from brainus_ai import BrainusAI
import os
import asyncio
# client = BrainusAI(api_key=os.getenv("BRAINUS_API_KEY"))
cache = {}
async def cached_query(client, query: str):
if query in cache:
return cache[query]
result = await client.query(query=query, store_id="default")
cache[query] = result.answer
return result.answer
# First call: hits API
# answer1 = await cached_query(client, "What is photosynthesis?")
# Second call: uses cache
# answer2 = await cached_query(client, "What is photosynthesis?")Redis Caching
import redis
import json
import hashlib
from brainus_ai import BrainusAI
import asyncio
import os
# client = BrainusAI(api_key=os.getenv("BRAINUS_API_KEY"))
# Using a sync redis client for simplicity, but consider aioredis for full async apps
cache = redis.Redis(host='localhost', port=6379, db=0)
async def query_with_cache(client, query: str, ttl: int = 3600):
# Generate cache key
cache_key = f"brainus:{hashlib.md5(query.encode()).hexdigest()}"
# Check cache
cached = cache.get(cache_key)
if cached:
return json.loads(cached)
# Query API
result = await client.query(query=query, store_id="default")
# Cache result
# Note: Serialization logic depends on SDK response implementation
cache.setex(
cache_key,
ttl,
json.dumps({
"answer": result.answer,
# "citations": [c.model_dump() for c in result.citations] # If using Pydantic
})
)
return result2. Batch Similar Queries
Group related queries:
# Bad: 3 separate API calls (if done sequentially without gathering)
# result1 = await client.query("Math for grade 6", store_id="default")
# result2 = await client.query("Math for grade 7", store_id="default")
# result3 = await client.query("Math for grade 8", store_id="default")
# Good: 1 API call with filters
# result = await client.query(
# query="Mathematics concepts",
# store_id="default",
# filters={"grade": ["6", "7", "8"]}
# )3. Debounce User Input
Wait for user to stop typing:
// React example
import { useState, useEffect } from "react";
function SearchBox() {
const [query, setQuery] = useState("");
const [results, setResults] = useState(null);
useEffect(() => {
// Debounce: wait 500ms after user stops typing
const timer = setTimeout(async () => {
if (query.length > 3) {
// Use your backend proxy endpoint here
const response = await fetch("/api/query", {
method: "POST",
body: JSON.stringify({ query }),
});
const data = await response.json();
setResults(data);
}
}, 500);
return () => clearTimeout(timer);
}, [query]);
return (
<input
type="text"
value={query}
onChange={(e) => setQuery(e.target.value)}
placeholder="Search..."
/>
);
}4. Use Conditional Requests
Only fetch if data changed:
import hashlib
last_query_hash = None
async def query_if_changed(client, query: str):
global last_query_hash
query_hash = hashlib.md5(query.encode()).hexdigest()
if query_hash == last_query_hash:
print("Query unchanged, skipping API call")
return None
last_query_hash = query_hash
return await client.query(query=query, store_id="default")5. Prefetch Common Queries
Preload frequently accessed data:
from brainus_ai import BrainusAI
import asyncio
import os
# COMMON_QUERIES = [
# "What is photosynthesis?",
# "Explain the water cycle",
# "What causes earthquakes?"
# ]
async def prefetch_queries():
async with BrainusAI(api_key=os.getenv("BRAINUS_API_KEY")) as client:
# Create tasks for concurrent execution
tasks = [
client.query(query=q, store_id="default")
for q in COMMON_QUERIES
]
results = await asyncio.gather(*tasks)
# Store in cache
# for query, result in zip(COMMON_QUERIES, results):
# cache.set(query, result)
# Run at startup
# asyncio.run(prefetch_queries())6. Implement Request Pooling
Combine requests from multiple users:
import time
from collections import defaultdict
pending_queries = defaultdict(list)
def pooled_query(query: str, callback):
"""Pool requests and execute in batch"""
pending_queries[query].append(callback)
# Execute after 100ms or when 10 requests pooled
if len(pending_queries[query]) >= 10:
execute_pooled_query(query)
def execute_pooled_query(query: str):
callbacks = pending_queries.pop(query, [])
if not callbacks:
return
result = client.query(query=query, store_id="default")
# Notify all waiting callbacks
for callback in callbacks:
callback(result)7. Use Webhooks
Instead of polling for updates, use webhooks:
# Bad: Polling every minute
import time
while True:
usage = client.get_usage()
if usage.quota.percentage_used > 80:
send_alert()
time.sleep(60) # Wastes API calls!
# Good: Use webhooks
# Configure at: https://developers.brainus.lk/dashboard/webhooks8. Monitor and Alert
Track usage patterns:
from dataclasses import dataclass
from datetime import datetime
@dataclass
class UsageMetrics:
requests_made: int = 0
cache_hits: int = 0
cache_misses: int = 0
@property
def cache_hit_rate(self) -> float:
total = self.cache_hits + self.cache_misses
return (self.cache_hits / total * 100) if total > 0 else 0
metrics = UsageMetrics()
def tracked_query(query: str):
# Check cache
cached = cache.get(query)
if cached:
metrics.cache_hits += 1
return cached
# API call
metrics.cache_misses += 1
metrics.requests_made += 1
result = client.query(query=query, store_id="default")
# Log metrics
if metrics.requests_made % 100 == 0:
print(f"Cache hit rate: {metrics.cache_hit_rate:.1f}%")
print(f"Total requests: {metrics.requests_made}")
return resultA good cache hit rate is 70%+. If yours is lower, review your caching strategy!
Summary
| Strategy | Savings | Difficulty |
|---|---|---|
| Caching | 50-80% | Easy |
| Batching | 30-50% | Medium |
| Debouncing | 40-60% | Easy |
| Prefetching | 20-30% | Medium |
| Request Pooling | 30-50% | Hard |
Next Steps
- Understanding Limits - How limits work
- Handling Rate Limits - Implementation strategies
- Usage API - Track your usage