"""API endpoints for event management.""" from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.orm import Session from typing import Optional import json from app.database import get_db from app.schemas import EventCreate, EventResponse, EventListResponse from app.crud import create_event, get_events, get_event_by_id, delete_all_events, get_statistics from app.websocket import manager router = APIRouter(prefix="/api/events", tags=["events"]) @router.post("", response_model=EventResponse, status_code=201) async def create_event_endpoint(event: EventCreate, db: Session = Depends(get_db)): """ Create a new event. This endpoint is called by Claude Code hook scripts to report events. """ try: # Create event in database db_event = create_event(db, event) # Convert event to dict for WebSocket # Try to parse tool_input as JSON, but keep as string if it fails tool_input_value = None if db_event.tool_input: try: tool_input_value = json.loads(db_event.tool_input) except (json.JSONDecodeError, TypeError): tool_input_value = db_event.tool_input event_dict = { "id": db_event.id, "session_id": db_event.session_id, "event_type": db_event.event_type, "tool_name": db_event.tool_name, "tool_input": tool_input_value, "tool_output": db_event.tool_output, "success": db_event.success, "description": db_event.description, "timestamp": db_event.timestamp, "created_at": db_event.created_at.isoformat() if db_event.created_at else None, } # Get updated statistics stats = get_statistics(db) stats_dict = { "total_events": stats.total_events, "total_tools_used": stats.total_tools_used, "total_agents": stats.total_agents, "total_sessions": stats.total_sessions, "last_updated": stats.last_updated, } # Broadcast to WebSocket clients await manager.broadcast_event(event_dict, stats_dict) return db_event except Exception as e: raise HTTPException(status_code=500, detail=f"Error creating event: {str(e)}") @router.get("", response_model=EventListResponse) async def list_events( skip: int = Query(0, ge=0, description="Number of events to skip"), limit: int = Query(50, ge=1, le=100, description="Maximum number of events to return"), event_type: Optional[str] = Query(None, description="Filter by event type (comma-separated for multiple)"), session_id: Optional[str] = Query(None, description="Filter by session ID"), tool_name: Optional[str] = Query(None, description="Filter by tool name"), db: Session = Depends(get_db), ): """ Get a paginated list of events with optional filtering. Supports filtering by: - event_type: Single type or comma-separated list (e.g., "PreToolUse,PostToolUse") - session_id: Filter events from a specific session - tool_name: Filter events for a specific tool """ try: events, total = get_events( db, skip=skip, limit=limit, event_type=event_type, session_id=session_id, tool_name=tool_name, ) # Convert to response format event_responses = [] for event in events: # Try to parse tool_input as JSON, but keep as string if it fails tool_input_value = None if event.tool_input: try: tool_input_value = json.loads(event.tool_input) except (json.JSONDecodeError, TypeError): tool_input_value = event.tool_input event_dict = { "id": event.id, "session_id": event.session_id, "event_type": event.event_type, "tool_name": event.tool_name, "tool_input": tool_input_value, "tool_output": event.tool_output, "success": event.success, "description": event.description, "timestamp": event.timestamp, "created_at": event.created_at, } event_responses.append(EventResponse(**event_dict)) return EventListResponse( events=event_responses, total=total, page=skip // limit + 1, page_size=limit, has_more=(skip + limit) < total, ) except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching events: {str(e)}") @router.get("/{event_id}", response_model=EventResponse) async def get_event(event_id: int, db: Session = Depends(get_db)): """Get a single event by ID.""" event = get_event_by_id(db, event_id) if not event: raise HTTPException(status_code=404, detail="Event not found") return event @router.delete("/reset", status_code=204) async def reset_stats(db: Session = Depends(get_db)): """ Reset all statistics by deleting all events. This is used by the "Reset Stats" button in the frontend. """ try: delete_all_events(db) # Broadcast reset notification to all clients await manager.broadcast_stats_reset() return None except Exception as e: raise HTTPException(status_code=500, detail=f"Error resetting stats: {str(e)}")