BrainUs LogoBrainUs AI
Code Examples

Python Examples

Production-ready Python examples for Django, Flask, FastAPI

Complete Python examples for integrating BrainUs API with popular frameworks.

Django REST Framework

Django 3.1+ supports async views.

# views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from brainus_ai import BrainusAI, BrainusError
from django.conf import settings
from django.core.cache import cache
import hashlib

class QueryView(APIView):
    async def post(self, request):
        query = request.data.get('query')
        store_id = request.data.get('store_id', 'default')

        # Cache key based on query
        cache_key = f"brainus_{hashlib.md5(query.encode()).hexdigest()}"

        # Check cache first
        cached_result = await cache.aget(cache_key)
        if cached_result:
            return Response({
                'answer': cached_result,
                'cached': True
            })

        try:
            # Make API request
            async with BrainusAI(api_key=settings.BRAINUS_API_KEY) as client:
                result = await client.query(query=query, store_id=store_id)

            # Cache for 1 hour
            await cache.aset(cache_key, result.answer, 3600)

            return Response({
                'answer': result.answer,
                'citations': result.citations,
                'has_citations': result.has_citations,
                'cached': False
            })

        except BrainusError as e:
            return Response(
                {'error': str(e)},
                status=status.HTTP_400_BAD_REQUEST
            )

Flask Application

Flask 2.0+ supports async routes.

# app.py
from flask import Flask, request, jsonify
from brainus_ai import BrainusAI, RateLimitError
import os
import time
import asyncio

app = Flask(__name__)

async def query_with_retry(query, store_id="default", max_retries=3):
    """Helper to retry on rate limit errors"""
    async with BrainusAI(api_key=os.getenv("BRAINUS_API_KEY")) as client:
        for attempt in range(max_retries):
            try:
                return await client.query(query=query, store_id=store_id)
            except RateLimitError as e:
                if attempt == max_retries - 1:
                    raise
                await asyncio.sleep(e.retry_after)

@app.route('/api/query', methods=['POST'])
async def query():
    data = request.json

    if not data or 'query' not in data:
        return jsonify({'error': 'Query is required'}), 400

    try:
        result = await query_with_retry(
            query=data['query'],
            store_id=data.get('store_id', 'default')
        )

        return jsonify({
            'answer': result.answer,
            # Serialize citations using model_dump() if Pydantic, or dict comprehension
            'citations': [c.model_dump() if hasattr(c, 'model_dump') else c.__dict__ for c in result.citations] if result.citations else [],
            'has_citations': result.has_citations
        })

    except Exception as e:
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(debug=True)

FastAPI Application

FastAPI is built for async.

# main.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Dict, List
from brainus_ai import BrainusAI, BrainusError
import os

app = FastAPI(title="BrainUs Proxy API")

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

class QueryRequest(BaseModel):
    query: str
    store_id: str = "default"
    filters: Optional[Dict] = None

class QueryResponse(BaseModel):
    answer: str
    citations: List[dict]
    metadata: dict

@app.post("/api/query", response_model=QueryResponse)
async def query_endpoint(request: QueryRequest, background_tasks: BackgroundTasks):
    try:
        async with BrainusAI(api_key=os.getenv("BRAINUS_API_KEY")) as client:
            result = await client.query(
                query=request.query,
                store_id=request.store_id,
                filters=request.filters,
            )

        return QueryResponse(
            answer=result.answer,
            citations=[c.model_dump() if hasattr(c, 'model_dump') else c.__dict__ for c in result.citations] if result.citations else [],
            has_citations=result.has_citations
        )

    except BrainusError as e:
        raise HTTPException(status_code=400, detail=str(e))

def log_query(query: str, query_id: str):
    """Log query for analytics"""
    print(f"Query logged: {query_id} - {query[:50]}...")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Async/Await Pattern

import asyncio
from brainus_ai import BrainusAI
import os

async def query_multiple(queries: list[str]):
    """Query multiple questions in parallel"""
    async with BrainusAI(api_key=os.getenv("BRAINUS_API_KEY")) as client:
        tasks = [
            client.query(query=q, store_id="default")
            for q in queries
        ]
        results = await asyncio.gather(*tasks)
    return results

async def main():
    queries = [
        "What is photosynthesis?",
        "Explain the water cycle",
        "What causes earthquakes?"
    ]

    results = await query_multiple(queries)

    for query, result in zip(queries, results):
        print(f"\nQ: {query}")
        print(f"A: {result.answer[:100]}...")

if __name__ == "__main__":
    asyncio.run(main())

Batch Processing

from brainus_ai import BrainusAI
import pandas as pd
from typing import List
import time
import asyncio
import os

async def process_batch(queries: List[str], batch_size: int = 10):
    """Process queries in batches with rate limiting"""
    results = []
    
    async with BrainusAI(api_key=os.getenv("BRAINUS_API_KEY")) as client:
        for i in range(0, len(queries), batch_size):
            batch = queries[i:i + batch_size]
            
            # Create tasks for the batch
            tasks = [
                client.query(query=q, store_id="default")
                for q in batch
            ]
            
            # Exec batch
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
            
            for query, result in zip(batch, batch_results):
                if isinstance(result, Exception):
                     results.append({
                        'query': query,
                        'error': str(result)
                    })
                else:
                    results.append({
                        'query': query,
                        'answer': result.answer,
                        'citations_count': len(result.citations) if result.citations else 0,
                    })

            # Rate limiting: wait between batches
            if i + batch_size < len(queries):
                await asyncio.sleep(1)

    return pd.DataFrame(results)

# Usage
# df = pd.read_csv('questions.csv')
# results_df = asyncio.run(process_batch(df['query'].tolist()))
# results_df.to_csv('results.csv', index=False)

Error Handling

from brainus_ai import (
    BrainusAI,
    BrainusError,
    AuthenticationError,
    RateLimitError,
    QuotaExceededError,
    APIError
)
import asyncio
import os

async def robust_query(query: str, max_retries: int = 3):
    """Query with comprehensive error handling"""
    
    async with BrainusAI(api_key=os.getenv("BRAINUS_API_KEY")) as client:
        for attempt in range(max_retries):
            try:
                result = await client.query(query=query, store_id="default")
                return result

            except AuthenticationError as e:
                print(f"Authentication failed: {e}")
                print("Check your API key!")
                return None

            except RateLimitError as e:
                print(f"Rate limited. Waiting {e.retry_after}s...")
                await asyncio.sleep(e.retry_after)
                continue

            except QuotaExceededError as e:
                print(f"Quota exceeded: {e}")
                print("Consider upgrading your plan!")
                return None

            except APIError as e:
                if attempt < max_retries - 1:
                    wait_time = 2 ** attempt
                    print(f"API error. Retrying in {wait_time}s...")
                    await asyncio.sleep(wait_time)
                    continue
                else:
                    print(f"API error after {max_retries} attempts: {e}")
                    return None

            except BrainusError as e:
                print(f"Unexpected error: {e}")
                return None

    return None

# Usage
# result = asyncio.run(robust_query("What is photosynthesis?"))
# if result:
#     print(f"Answer: {result.answer}")

All examples are available in our GitHub repository.

Next Steps

On this page