Code Examples
Python Examples
Production-ready Python examples for Django, Flask, FastAPI
Complete Python examples for integrating BrainUs API with popular frameworks.
Django REST Framework
Django 3.1+ supports async views.
# views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from brainus_ai import BrainusAI, BrainusError
from django.conf import settings
from django.core.cache import cache
import hashlib
class QueryView(APIView):
async def post(self, request):
query = request.data.get('query')
store_id = request.data.get('store_id', 'default')
# Cache key based on query
cache_key = f"brainus_{hashlib.md5(query.encode()).hexdigest()}"
# Check cache first
cached_result = await cache.aget(cache_key)
if cached_result:
return Response({
'answer': cached_result,
'cached': True
})
try:
# Make API request
async with BrainusAI(api_key=settings.BRAINUS_API_KEY) as client:
result = await client.query(query=query, store_id=store_id)
# Cache for 1 hour
await cache.aset(cache_key, result.answer, 3600)
return Response({
'answer': result.answer,
'citations': result.citations,
'has_citations': result.has_citations,
'cached': False
})
except BrainusError as e:
return Response(
{'error': str(e)},
status=status.HTTP_400_BAD_REQUEST
)Flask Application
Flask 2.0+ supports async routes.
# app.py
from flask import Flask, request, jsonify
from brainus_ai import BrainusAI, RateLimitError
import os
import time
import asyncio
app = Flask(__name__)
async def query_with_retry(query, store_id="default", max_retries=3):
"""Helper to retry on rate limit errors"""
async with BrainusAI(api_key=os.getenv("BRAINUS_API_KEY")) as client:
for attempt in range(max_retries):
try:
return await client.query(query=query, store_id=store_id)
except RateLimitError as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(e.retry_after)
@app.route('/api/query', methods=['POST'])
async def query():
data = request.json
if not data or 'query' not in data:
return jsonify({'error': 'Query is required'}), 400
try:
result = await query_with_retry(
query=data['query'],
store_id=data.get('store_id', 'default')
)
return jsonify({
'answer': result.answer,
# Serialize citations using model_dump() if Pydantic, or dict comprehension
'citations': [c.model_dump() if hasattr(c, 'model_dump') else c.__dict__ for c in result.citations] if result.citations else [],
'has_citations': result.has_citations
})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True)FastAPI Application
FastAPI is built for async.
# main.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Dict, List
from brainus_ai import BrainusAI, BrainusError
import os
app = FastAPI(title="BrainUs Proxy API")
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
class QueryRequest(BaseModel):
query: str
store_id: str = "default"
filters: Optional[Dict] = None
class QueryResponse(BaseModel):
answer: str
citations: List[dict]
metadata: dict
@app.post("/api/query", response_model=QueryResponse)
async def query_endpoint(request: QueryRequest, background_tasks: BackgroundTasks):
try:
async with BrainusAI(api_key=os.getenv("BRAINUS_API_KEY")) as client:
result = await client.query(
query=request.query,
store_id=request.store_id,
filters=request.filters,
)
return QueryResponse(
answer=result.answer,
citations=[c.model_dump() if hasattr(c, 'model_dump') else c.__dict__ for c in result.citations] if result.citations else [],
has_citations=result.has_citations
)
except BrainusError as e:
raise HTTPException(status_code=400, detail=str(e))
def log_query(query: str, query_id: str):
"""Log query for analytics"""
print(f"Query logged: {query_id} - {query[:50]}...")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)Async/Await Pattern
import asyncio
from brainus_ai import BrainusAI
import os
async def query_multiple(queries: list[str]):
"""Query multiple questions in parallel"""
async with BrainusAI(api_key=os.getenv("BRAINUS_API_KEY")) as client:
tasks = [
client.query(query=q, store_id="default")
for q in queries
]
results = await asyncio.gather(*tasks)
return results
async def main():
queries = [
"What is photosynthesis?",
"Explain the water cycle",
"What causes earthquakes?"
]
results = await query_multiple(queries)
for query, result in zip(queries, results):
print(f"\nQ: {query}")
print(f"A: {result.answer[:100]}...")
if __name__ == "__main__":
asyncio.run(main())Batch Processing
from brainus_ai import BrainusAI
import pandas as pd
from typing import List
import time
import asyncio
import os
async def process_batch(queries: List[str], batch_size: int = 10):
"""Process queries in batches with rate limiting"""
results = []
async with BrainusAI(api_key=os.getenv("BRAINUS_API_KEY")) as client:
for i in range(0, len(queries), batch_size):
batch = queries[i:i + batch_size]
# Create tasks for the batch
tasks = [
client.query(query=q, store_id="default")
for q in batch
]
# Exec batch
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for query, result in zip(batch, batch_results):
if isinstance(result, Exception):
results.append({
'query': query,
'error': str(result)
})
else:
results.append({
'query': query,
'answer': result.answer,
'citations_count': len(result.citations) if result.citations else 0,
})
# Rate limiting: wait between batches
if i + batch_size < len(queries):
await asyncio.sleep(1)
return pd.DataFrame(results)
# Usage
# df = pd.read_csv('questions.csv')
# results_df = asyncio.run(process_batch(df['query'].tolist()))
# results_df.to_csv('results.csv', index=False)Error Handling
from brainus_ai import (
BrainusAI,
BrainusError,
AuthenticationError,
RateLimitError,
QuotaExceededError,
APIError
)
import asyncio
import os
async def robust_query(query: str, max_retries: int = 3):
"""Query with comprehensive error handling"""
async with BrainusAI(api_key=os.getenv("BRAINUS_API_KEY")) as client:
for attempt in range(max_retries):
try:
result = await client.query(query=query, store_id="default")
return result
except AuthenticationError as e:
print(f"Authentication failed: {e}")
print("Check your API key!")
return None
except RateLimitError as e:
print(f"Rate limited. Waiting {e.retry_after}s...")
await asyncio.sleep(e.retry_after)
continue
except QuotaExceededError as e:
print(f"Quota exceeded: {e}")
print("Consider upgrading your plan!")
return None
except APIError as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"API error. Retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
continue
else:
print(f"API error after {max_retries} attempts: {e}")
return None
except BrainusError as e:
print(f"Unexpected error: {e}")
return None
return None
# Usage
# result = asyncio.run(robust_query("What is photosynthesis?"))
# if result:
# print(f"Answer: {result.answer}")All examples are available in our GitHub repository.
Next Steps
- JavaScript Examples - Node.js, React, Next.js
- Go Examples - Standard library and frameworks
- Authentication - Secure API key management