Spaces:
Running
Running
from fastapi import FastAPI, HTTPException, Request, Form, Depends, status, APIRouter | |
from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse | |
from fastapi.templating import Jinja2Templates | |
from fastapi.security import HTTPBasic | |
from datetime import datetime | |
import folium | |
from folium.plugins import MarkerCluster | |
from typing import Optional | |
from sqlalchemy.orm import Session | |
from models import User, StatusRecord, SystemSetting, Device | |
import io | |
import csv | |
from admin import admin_router | |
from api import api_router | |
from database import get_db, SessionLocal | |
# Create default admin user and system settings | |
def create_default_data(): | |
db = SessionLocal() | |
try: | |
# Create default admin user if not exists | |
if not db.query(User).filter(User.username == "admin").first(): | |
admin_user = User( | |
username="admin", | |
email="[email protected]", | |
password="admin", | |
is_admin=True, | |
is_active=True, | |
) | |
db.add(admin_user) | |
# Create default system settings if not exists | |
if not db.query(SystemSetting).first(): | |
default_settings = SystemSetting() | |
db.add(default_settings) | |
db.commit() | |
except Exception as e: | |
db.rollback() | |
print(f"Error creating default data: {str(e)}") | |
finally: | |
db.close() | |
create_default_data() | |
app = FastAPI() | |
templates = Jinja2Templates(directory="templates") | |
security = HTTPBasic() | |
def get_current_user(request: Request, db: Session = Depends(get_db)) -> Optional[User]: | |
username = request.cookies.get("username") | |
if username: | |
return db.query(User).filter(User.username == username).first() | |
return None | |
def login_required(request: Request, db: Session = Depends(get_db)): | |
username = request.cookies.get("username") | |
if not username: | |
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
user = db.query(User).filter(User.username == username).first() | |
if not user: | |
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
return user | |
# Device authentication function | |
def authenticate_device( | |
device_id: str, device_password: str, db: Session = Depends(get_db) | |
): | |
device = db.query(Device).filter(Device.device_id == device_id).first() | |
if not device or device.password != device_password: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Invalid device credentials", | |
) | |
return device | |
async def login_page(request: Request): | |
return templates.TemplateResponse("login.html", {"request": request}) | |
async def login( | |
request: Request, | |
username: str = Form(...), | |
password: str = Form(...), | |
db: Session = Depends(get_db), | |
): | |
user = ( | |
db.query(User) | |
.filter( | |
User.username == username, User.password == password, User.is_active == True | |
) | |
.first() | |
) | |
if user: | |
user.last_login = datetime.now() | |
db.commit() | |
response = RedirectResponse(url="/", status_code=status.HTTP_302_FOUND) | |
response.set_cookie(key="username", value=username, httponly=True) | |
return response | |
return templates.TemplateResponse( | |
"login.html", | |
{"request": request, "error": "Invalid credentials or inactive account"}, | |
) | |
async def logout(): | |
response = RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
response.delete_cookie("username") | |
return response | |
async def register_page(request: Request): | |
return templates.TemplateResponse("register.html", {"request": request}) | |
async def register( | |
username: str = Form(...), | |
email: str = Form(...), | |
password: str = Form(...), | |
db: Session = Depends(get_db), | |
): | |
existing_user = db.query(User).filter(User.username == username).first() | |
if existing_user: | |
return RedirectResponse( | |
url="/register?error=1", status_code=status.HTTP_302_FOUND | |
) | |
new_user = User(username=username, email=email, password=password) | |
db.add(new_user) | |
db.commit() | |
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
def show_map( | |
request: Request, | |
start_date: str = None, | |
end_date: str = None, | |
db: Session = Depends(get_db), | |
): | |
current_user = login_required(request, db) | |
if isinstance(current_user, RedirectResponse): | |
return current_user | |
query = db.query(StatusRecord) | |
if start_date and end_date: | |
start_datetime = datetime.strptime(start_date, "%Y-%m-%d") | |
end_datetime = datetime.strptime(end_date, "%Y-%m-%d") | |
query = query.filter( | |
StatusRecord.timestamp.between(start_datetime, end_datetime) | |
) | |
status_data = [ | |
(s.latitude, s.longitude, s.connect_status, s.device_id) for s in query.all() | |
] | |
m = folium.Map(location=[35.6837, 139.6805], zoom_start=12) | |
marker_cluster = MarkerCluster().add_to(m) | |
for lat, lon, connect_status, device_id in status_data: | |
color = "green" if connect_status == 1 else "red" | |
folium.CircleMarker( | |
location=[lat, lon], | |
radius=10, | |
popup=f"{device_id}", | |
color=color, | |
fill=True, | |
fill_opacity=0.6, | |
).add_to(marker_cluster) | |
map_html = m._repr_html_() | |
return templates.TemplateResponse( | |
"map.html", | |
{ | |
"request": request, | |
"map_html": map_html, | |
"start_date": start_date, | |
"end_date": end_date, | |
"current_user": current_user, | |
}, | |
) | |
async def download_csv(request: Request, db: Session = Depends(get_db)): | |
current_user = login_required(request, db) | |
if isinstance(current_user, RedirectResponse): | |
return current_user | |
status_records = db.query(StatusRecord).all() | |
output = io.StringIO() | |
writer = csv.writer(output) | |
writer.writerow( | |
["UUID", "Device ID", "Latitude", "Longitude", "Timestamp", "Connect Status"] | |
) | |
for record in status_records: | |
writer.writerow( | |
[ | |
record.uuid, | |
record.device_id, | |
record.latitude, | |
record.longitude, | |
record.timestamp, | |
record.connect_status, | |
] | |
) | |
response = StreamingResponse(iter([output.getvalue()]), media_type="text/csv") | |
response.headers["Content-Disposition"] = "attachment; filename=status_records.csv" | |
return response | |
async def http_exception_handler(request: Request, exc: HTTPException): | |
if exc.status_code == status.HTTP_401_UNAUTHORIZED: | |
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
return templates.TemplateResponse( | |
"error.html", | |
{"request": request, "detail": exc.detail}, | |
status_code=exc.status_code, | |
) | |
# Include the routers | |
app.include_router(admin_router) | |
app.include_router(api_router) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |