Files
claude-code-monitor/backend/app/api/events.py
felix.zoesch f6ef7ff5d3 Initial commit: Claude Code Monitor v1.0.0
Complete real-time monitoring dashboard for Claude Code

Features:
- FastAPI backend with REST API and WebSocket
- React + TypeScript frontend with dark theme
- SQLite database for event storage
- 6 hook scripts for Claude Code events
- Docker deployment setup
- Real-time event tracking and statistics
- Event filtering (ALL, TOOLS, AGENTS, PROMPTS, SESSIONS)
- Connection status indicator
- Reset stats functionality

Tech Stack:
- Backend: Python 3.11, FastAPI, SQLAlchemy, WebSockets
- Frontend: React 18, TypeScript, Vite
- Database: SQLite
- Deployment: Docker, Docker Compose

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-15 09:49:06 +01:00

140 lines
4.8 KiB
Python

"""API endpoints for event management."""
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from typing import Optional
import json
from app.database import get_db
from app.schemas import EventCreate, EventResponse, EventListResponse
from app.crud import create_event, get_events, get_event_by_id, delete_all_events, get_statistics
from app.websocket import manager
router = APIRouter(prefix="/api/events", tags=["events"])
@router.post("", response_model=EventResponse, status_code=201)
async def create_event_endpoint(event: EventCreate, db: Session = Depends(get_db)):
"""
Create a new event.
This endpoint is called by Claude Code hook scripts to report events.
"""
try:
# Create event in database
db_event = create_event(db, event)
# Convert event to dict for WebSocket
event_dict = {
"id": db_event.id,
"session_id": db_event.session_id,
"event_type": db_event.event_type,
"tool_name": db_event.tool_name,
"tool_input": json.loads(db_event.tool_input) if db_event.tool_input else None,
"tool_output": db_event.tool_output,
"success": db_event.success,
"description": db_event.description,
"timestamp": db_event.timestamp,
"created_at": db_event.created_at.isoformat() if db_event.created_at else None,
}
# Get updated statistics
stats = get_statistics(db)
stats_dict = {
"total_events": stats.total_events,
"total_tools_used": stats.total_tools_used,
"total_agents": stats.total_agents,
"total_sessions": stats.total_sessions,
"last_updated": stats.last_updated,
}
# Broadcast to WebSocket clients
await manager.broadcast_event(event_dict, stats_dict)
return db_event
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error creating event: {str(e)}")
@router.get("", response_model=EventListResponse)
async def list_events(
skip: int = Query(0, ge=0, description="Number of events to skip"),
limit: int = Query(50, ge=1, le=100, description="Maximum number of events to return"),
event_type: Optional[str] = Query(None, description="Filter by event type (comma-separated for multiple)"),
session_id: Optional[str] = Query(None, description="Filter by session ID"),
tool_name: Optional[str] = Query(None, description="Filter by tool name"),
db: Session = Depends(get_db),
):
"""
Get a paginated list of events with optional filtering.
Supports filtering by:
- event_type: Single type or comma-separated list (e.g., "PreToolUse,PostToolUse")
- session_id: Filter events from a specific session
- tool_name: Filter events for a specific tool
"""
try:
events, total = get_events(
db,
skip=skip,
limit=limit,
event_type=event_type,
session_id=session_id,
tool_name=tool_name,
)
# Convert to response format
event_responses = []
for event in events:
event_dict = {
"id": event.id,
"session_id": event.session_id,
"event_type": event.event_type,
"tool_name": event.tool_name,
"tool_input": json.loads(event.tool_input) if event.tool_input else None,
"tool_output": event.tool_output,
"success": event.success,
"description": event.description,
"timestamp": event.timestamp,
"created_at": event.created_at,
}
event_responses.append(EventResponse(**event_dict))
return EventListResponse(
events=event_responses,
total=total,
page=skip // limit + 1,
page_size=limit,
has_more=(skip + limit) < total,
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching events: {str(e)}")
@router.get("/{event_id}", response_model=EventResponse)
async def get_event(event_id: int, db: Session = Depends(get_db)):
"""Get a single event by ID."""
event = get_event_by_id(db, event_id)
if not event:
raise HTTPException(status_code=404, detail="Event not found")
return event
@router.delete("/reset", status_code=204)
async def reset_stats(db: Session = Depends(get_db)):
"""
Reset all statistics by deleting all events.
This is used by the "Reset Stats" button in the frontend.
"""
try:
delete_all_events(db)
# Broadcast reset notification to all clients
await manager.broadcast_stats_reset()
return None
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error resetting stats: {str(e)}")