BrainUs LogoBrainUs AI
Rate Limits

Optimization Strategies

Reduce API calls and optimize your usage

Learn strategies to reduce API calls and stay within your rate limits.

1. Implement Caching

Cache responses to avoid duplicate requests:

# Caching async functions requires care or specialized libraries.
# Here is a conceptual example using a simple dictionary.

from brainus_ai import BrainusAI
import os
import asyncio

# client = BrainusAI(api_key=os.getenv("BRAINUS_API_KEY"))
cache = {}

async def cached_query(client, query: str):
    if query in cache:
        return cache[query]
    
    result = await client.query(query=query, store_id="default")
    cache[query] = result.answer
    return result.answer

# First call: hits API
# answer1 = await cached_query(client, "What is photosynthesis?")

# Second call: uses cache
# answer2 = await cached_query(client, "What is photosynthesis?")

Redis Caching

import redis
import json
import hashlib
from brainus_ai import BrainusAI
import asyncio
import os

# client = BrainusAI(api_key=os.getenv("BRAINUS_API_KEY"))
# Using a sync redis client for simplicity, but consider aioredis for full async apps
cache = redis.Redis(host='localhost', port=6379, db=0)

async def query_with_cache(client, query: str, ttl: int = 3600):
    # Generate cache key
    cache_key = f"brainus:{hashlib.md5(query.encode()).hexdigest()}"

    # Check cache
    cached = cache.get(cache_key)
    if cached:
        return json.loads(cached)

    # Query API
    result = await client.query(query=query, store_id="default")

    # Cache result
    # Note: Serialization logic depends on SDK response implementation
    cache.setex(
        cache_key,
        ttl,
        json.dumps({
            "answer": result.answer,
            # "citations": [c.model_dump() for c in result.citations] # If using Pydantic
        })
    )

    return result

2. Batch Similar Queries

Group related queries:

# Bad: 3 separate API calls (if done sequentially without gathering)
# result1 = await client.query("Math for grade 6", store_id="default")
# result2 = await client.query("Math for grade 7", store_id="default")
# result3 = await client.query("Math for grade 8", store_id="default")

# Good: 1 API call with filters
# result = await client.query(
#     query="Mathematics concepts",
#     store_id="default",
#     filters={"grade": ["6", "7", "8"]}
# )

3. Debounce User Input

Wait for user to stop typing:

// React example
import { useState, useEffect } from "react";

function SearchBox() {
  const [query, setQuery] = useState("");
  const [results, setResults] = useState(null);

  useEffect(() => {
    // Debounce: wait 500ms after user stops typing
    const timer = setTimeout(async () => {
      if (query.length > 3) {
        // Use your backend proxy endpoint here
        const response = await fetch("/api/query", {
          method: "POST",
          body: JSON.stringify({ query }),
        });
        const data = await response.json();
        setResults(data);
      }
    }, 500);

    return () => clearTimeout(timer);
  }, [query]);

  return (
    <input
      type="text"
      value={query}
      onChange={(e) => setQuery(e.target.value)}
      placeholder="Search..."
    />
  );
}

4. Use Conditional Requests

Only fetch if data changed:

import hashlib

last_query_hash = None

async def query_if_changed(client, query: str):
    global last_query_hash

    query_hash = hashlib.md5(query.encode()).hexdigest()

    if query_hash == last_query_hash:
        print("Query unchanged, skipping API call")
        return None

    last_query_hash = query_hash
    return await client.query(query=query, store_id="default")

5. Prefetch Common Queries

Preload frequently accessed data:

from brainus_ai import BrainusAI
import asyncio
import os

# COMMON_QUERIES = [
#     "What is photosynthesis?",
#     "Explain the water cycle",
#     "What causes earthquakes?"
# ]

async def prefetch_queries():
    async with BrainusAI(api_key=os.getenv("BRAINUS_API_KEY")) as client:
        # Create tasks for concurrent execution
        tasks = [
            client.query(query=q, store_id="default")
            for q in COMMON_QUERIES
        ]
        results = await asyncio.gather(*tasks)

        # Store in cache
        # for query, result in zip(COMMON_QUERIES, results):
        #     cache.set(query, result)

# Run at startup
# asyncio.run(prefetch_queries())

6. Implement Request Pooling

Combine requests from multiple users:

import time
from collections import defaultdict

pending_queries = defaultdict(list)

def pooled_query(query: str, callback):
    """Pool requests and execute in batch"""
    pending_queries[query].append(callback)

    # Execute after 100ms or when 10 requests pooled
    if len(pending_queries[query]) >= 10:
        execute_pooled_query(query)

def execute_pooled_query(query: str):
    callbacks = pending_queries.pop(query, [])
    if not callbacks:
        return

    result = client.query(query=query, store_id="default")

    # Notify all waiting callbacks
    for callback in callbacks:
        callback(result)

7. Use Webhooks

Instead of polling for updates, use webhooks:

# Bad: Polling every minute
import time

while True:
    usage = client.get_usage()
    if usage.quota.percentage_used > 80:
        send_alert()
    time.sleep(60)  # Wastes API calls!

# Good: Use webhooks
# Configure at: https://developers.brainus.lk/dashboard/webhooks

8. Monitor and Alert

Track usage patterns:

from dataclasses import dataclass
from datetime import datetime

@dataclass
class UsageMetrics:
    requests_made: int = 0
    cache_hits: int = 0
    cache_misses: int = 0

    @property
    def cache_hit_rate(self) -> float:
        total = self.cache_hits + self.cache_misses
        return (self.cache_hits / total * 100) if total > 0 else 0

metrics = UsageMetrics()

def tracked_query(query: str):
    # Check cache
    cached = cache.get(query)
    if cached:
        metrics.cache_hits += 1
        return cached

    # API call
    metrics.cache_misses += 1
    metrics.requests_made += 1
    result = client.query(query=query, store_id="default")

    # Log metrics
    if metrics.requests_made % 100 == 0:
        print(f"Cache hit rate: {metrics.cache_hit_rate:.1f}%")
        print(f"Total requests: {metrics.requests_made}")

    return result

A good cache hit rate is 70%+. If yours is lower, review your caching strategy!

Summary

StrategySavingsDifficulty
Caching50-80%Easy
Batching30-50%Medium
Debouncing40-60%Easy
Prefetching20-30%Medium
Request Pooling30-50%Hard

Next Steps

On this page