signal-tracker / app.py
vumichien's picture
up
7e6b994
from fastapi import FastAPI, HTTPException, Request, Form, Depends, status, APIRouter
from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from fastapi.security import HTTPBasic
from datetime import datetime
import folium
from folium.plugins import MarkerCluster
from typing import Optional
from sqlalchemy.orm import Session
from models import User, StatusRecord, SystemSetting, Device
import io
import csv
from admin import admin_router
from api import api_router
from database import get_db, SessionLocal
# Create default admin user and system settings
def create_default_data():
db = SessionLocal()
try:
# Create default admin user if not exists
if not db.query(User).filter(User.username == "admin").first():
admin_user = User(
username="admin",
email="[email protected]",
password="admin",
is_admin=True,
is_active=True,
)
db.add(admin_user)
# Create default system settings if not exists
if not db.query(SystemSetting).first():
default_settings = SystemSetting()
db.add(default_settings)
db.commit()
except Exception as e:
db.rollback()
print(f"Error creating default data: {str(e)}")
finally:
db.close()
create_default_data()
app = FastAPI()
templates = Jinja2Templates(directory="templates")
security = HTTPBasic()
def get_current_user(request: Request, db: Session = Depends(get_db)) -> Optional[User]:
username = request.cookies.get("username")
if username:
return db.query(User).filter(User.username == username).first()
return None
def login_required(request: Request, db: Session = Depends(get_db)):
username = request.cookies.get("username")
if not username:
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND)
user = db.query(User).filter(User.username == username).first()
if not user:
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND)
return user
# Device authentication function
def authenticate_device(
device_id: str, device_password: str, db: Session = Depends(get_db)
):
device = db.query(Device).filter(Device.device_id == device_id).first()
if not device or device.password != device_password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid device credentials",
)
return device
@app.get("/login", response_class=HTMLResponse)
async def login_page(request: Request):
return templates.TemplateResponse("login.html", {"request": request})
@app.post("/login")
async def login(
request: Request,
username: str = Form(...),
password: str = Form(...),
db: Session = Depends(get_db),
):
user = (
db.query(User)
.filter(
User.username == username, User.password == password, User.is_active == True
)
.first()
)
if user:
user.last_login = datetime.now()
db.commit()
response = RedirectResponse(url="/", status_code=status.HTTP_302_FOUND)
response.set_cookie(key="username", value=username, httponly=True)
return response
return templates.TemplateResponse(
"login.html",
{"request": request, "error": "Invalid credentials or inactive account"},
)
@app.get("/logout")
async def logout():
response = RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND)
response.delete_cookie("username")
return response
@app.get("/register", response_class=HTMLResponse)
async def register_page(request: Request):
return templates.TemplateResponse("register.html", {"request": request})
@app.post("/register")
async def register(
username: str = Form(...),
email: str = Form(...),
password: str = Form(...),
db: Session = Depends(get_db),
):
existing_user = db.query(User).filter(User.username == username).first()
if existing_user:
return RedirectResponse(
url="/register?error=1", status_code=status.HTTP_302_FOUND
)
new_user = User(username=username, email=email, password=password)
db.add(new_user)
db.commit()
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND)
@app.get("/", response_class=HTMLResponse)
def show_map(
request: Request,
start_date: str = None,
end_date: str = None,
db: Session = Depends(get_db),
):
current_user = login_required(request, db)
if isinstance(current_user, RedirectResponse):
return current_user
query = db.query(StatusRecord)
if start_date and end_date:
start_datetime = datetime.strptime(start_date, "%Y-%m-%d")
end_datetime = datetime.strptime(end_date, "%Y-%m-%d")
query = query.filter(
StatusRecord.timestamp.between(start_datetime, end_datetime)
)
status_data = [
(s.latitude, s.longitude, s.connect_status, s.device_id) for s in query.all()
]
m = folium.Map(location=[35.6837, 139.6805], zoom_start=12)
marker_cluster = MarkerCluster().add_to(m)
for lat, lon, connect_status, device_id in status_data:
color = "green" if connect_status == 1 else "red"
folium.CircleMarker(
location=[lat, lon],
radius=10,
popup=f"{device_id}",
color=color,
fill=True,
fill_opacity=0.6,
).add_to(marker_cluster)
map_html = m._repr_html_()
return templates.TemplateResponse(
"map.html",
{
"request": request,
"map_html": map_html,
"start_date": start_date,
"end_date": end_date,
"current_user": current_user,
},
)
@app.get("/download-csv")
async def download_csv(request: Request, db: Session = Depends(get_db)):
current_user = login_required(request, db)
if isinstance(current_user, RedirectResponse):
return current_user
status_records = db.query(StatusRecord).all()
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(
["UUID", "Device ID", "Latitude", "Longitude", "Timestamp", "Connect Status"]
)
for record in status_records:
writer.writerow(
[
record.uuid,
record.device_id,
record.latitude,
record.longitude,
record.timestamp,
record.connect_status,
]
)
response = StreamingResponse(iter([output.getvalue()]), media_type="text/csv")
response.headers["Content-Disposition"] = "attachment; filename=status_records.csv"
return response
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
if exc.status_code == status.HTTP_401_UNAUTHORIZED:
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND)
return templates.TemplateResponse(
"error.html",
{"request": request, "detail": exc.detail},
status_code=exc.status_code,
)
# Include the routers
app.include_router(admin_router)
app.include_router(api_router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)