signal-tracker / api.py
vumichien's picture
up
7e6b994
raw
history blame
7.61 kB
from fastapi import APIRouter, Depends, HTTPException, Header, status
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from models import StatusRecord, Device, StatusRecordBatch, SystemSetting
from database import get_db
from datetime import datetime, timedelta
import uuid as uuid_module
import random
from sqlalchemy.exc import IntegrityError
from typing import Dict
api_router = APIRouter(prefix="/api", tags=["api"])
def authenticate_device(device_id: str, device_password: str, db: Session = Depends(get_db)):
device = db.query(Device).filter(Device.device_id == device_id).first()
if not device or device.password != device_password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid device credentials",
)
return device
@api_router.post("/generate-data")
def generate_data(
device_id: str = Header(...),
device_password: str = Header(...),
db: Session = Depends(get_db),
):
authenticate_device(device_id, device_password, db)
base_latitude = 35.6837
base_longitude = 139.6805
start_date = datetime(2024, 8, 1)
end_date = datetime(2024, 8, 7)
delta = end_date - start_date
for _ in range(100):
random_days = random.randint(0, delta.days)
random_seconds = random.randint(0, 86400)
random_time = start_date + timedelta(days=random_days, seconds=random_seconds)
random_latitude = base_latitude + random.uniform(-0.01, 0.01)
random_longitude = base_longitude + random.uniform(-0.01, 0.01)
random_connect_status = random.choice([0, 1])
status_record = StatusRecord(
device_id=device_id,
latitude=random_latitude,
longitude=random_longitude,
timestamp=random_time,
connect_status=random_connect_status,
)
db.add(status_record)
db.commit()
return {"message": "Demo data generated successfully"}
@api_router.delete("/delete-data", summary="Delete all status records")
def delete_all_data(
device_id: str = Header(...),
device_password: str = Header(...),
db: Session = Depends(get_db),
):
"""
Delete all status records from the database.
Requires device authentication.
"""
authenticate_device(device_id, device_password, db)
try:
db.query(StatusRecord).delete()
db.commit()
return {"message": "All data deleted successfully"}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
@api_router.delete(
"/delete-data/{device_id}", summary="Delete status records for a specific device"
)
def delete_device_data(
device_id: str,
auth_device_id: str = Header(...),
device_password: str = Header(...),
db: Session = Depends(get_db),
):
"""
Delete status records for a specific device ID.
Requires device authentication.
"""
authenticate_device(auth_device_id, device_password, db)
try:
deleted_count = (
db.query(StatusRecord).filter(StatusRecord.device_id == device_id).delete()
)
db.commit()
if deleted_count == 0:
return {"message": f"No data found for device ID: {device_id}"}
return {"message": f"Data for device ID {device_id} deleted successfully"}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
@api_router.post("/upload_batch")
async def upload_data_batch(
records: StatusRecordBatch,
device_id: str = Header(...),
device_password: str = Header(...),
db: Session = Depends(get_db),
):
"""
Upload multiple status records in a single request.
Requires device authentication and unique UUIDs for each record.
Uses the device_id from the header for all records.
"""
authenticate_device(device_id, device_password, db)
successful_uploads = 0
failed_uploads = 0
error_messages = []
failed_records = []
for record in records.records:
try:
# Validate UUID
uuid_obj = uuid_module.UUID(record.uuid)
# Validate timestamp
timestamp_dt = datetime.strptime(record.timestamp, "%Y-%m-%d %H:%M:%S")
status_record = StatusRecord(
uuid=str(uuid_obj),
device_id=device_id,
latitude=record.latitude,
longitude=record.longitude,
timestamp=timestamp_dt,
connect_status=record.connect_status,
)
db.add(status_record)
successful_uploads += 1
except ValueError as ve:
failed_uploads += 1
error_messages.append(f"Invalid data format: {str(ve)}")
failed_records.append(str(uuid_obj))
except IntegrityError:
db.rollback()
failed_uploads += 1
error_messages.append(f"Duplicate UUID: {record.uuid}")
failed_records.append(str(uuid_obj))
except Exception as e:
db.rollback()
failed_uploads += 1
error_messages.append(f"Error processing record: {str(e)}")
failed_records.append(str(uuid_obj))
try:
db.commit()
except Exception as e:
db.rollback()
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"message": f"Error committing to database: {str(e)}"},
)
return JSONResponse(
status_code=status.HTTP_200_OK,
content={
"status": "ok",
"message": "Batch upload completed",
"successful_uploads": successful_uploads,
"failed_uploads": failed_uploads,
"errors": error_messages,
"failed_records": failed_records,
},
)
@api_router.get("/health_check", summary="Check if the API is functioning correctly")
def health_check(
device_id: str = Header(...),
device_password: str = Header(...),
db: Session = Depends(get_db),
):
"""
Perform a health check on the API.
Requires device authentication.
Returns a 200 status code if successful.
Returns a 401 Unauthorized error if authentication fails.
"""
try:
authenticate_device(device_id, device_password, db)
return JSONResponse(content={"status": "ok"}, status_code=status.HTTP_200_OK)
except HTTPException as e:
if e.status_code == status.HTTP_401_UNAUTHORIZED:
return JSONResponse(
content={"status": "error", "detail": "Unauthorized"},
status_code=status.HTTP_401_UNAUTHORIZED,
)
raise e
@api_router.get(
"/config", summary="Get system configuration", response_model=Dict[str, int]
)
def get_config(
device_id: str = Header(...),
device_password: str = Header(...),
db: Session = Depends(get_db),
):
"""
Retrieve the system configuration from SystemSetting.
Requires device authentication.
"""
authenticate_device(device_id, device_password, db)
system_setting = db.query(SystemSetting).first()
if not system_setting:
raise HTTPException(status_code=404, detail="System settings not found")
return {
"check_connect_period": system_setting.check_connect_period,
"data_sync_period": system_setting.data_sync_period,
"get_config_period": system_setting.get_config_period,
"point_distance": system_setting.point_distance,
}