from pathlib import Path
import shutil
import tempfile
import gradio as gr
import uuid
from typing_extensions import TypedDict, Tuple
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
import uvicorn
app = FastAPI()
# create a static directory to store the static files
gs_dir = Path(str(tempfile.gettempdir())) / "gaussian_splatting_gradio"
gs_dir.mkdir(parents=True, exist_ok=True)
# mount FastAPI StaticFiles server
app.mount("/static", StaticFiles(directory=gs_dir), name="static")
StateDict = TypedDict("StateDict", {
"uuid": str,
})
def getHTML():
html_body = """
"""
html = f"""
3D Gaussian Splatting Viewer
{html_body}
"""
return f""""""
def createStateSession() -> StateDict:
# Create new session
session_uuid = str(uuid.uuid4())
print("createStateSession")
print(session_uuid)
return StateDict(
uuid=session_uuid,
)
def removeStateSession(session_state_value: StateDict):
# Clean up previous session
return StateDict(
uuid=None,
)
def makeButtonVisible() -> Tuple[gr.Button, gr.Button]:
process_button = gr.Button(visible=True)
reset_button = gr.Button(visible=False) #TODO: I will bring this back when I figure out how to stop the process
return process_button, reset_button
def resetSession(state: StateDict) -> Tuple[StateDict, gr.Button, gr.Button]:
print("resetSession")
new_state = removeStateSession(state)
process_button = gr.Button(visible=False)
reset_button = gr.Button(visible=False)
return new_state, process_button, reset_button
def process(
# *args, **kwargs
session_state_value: StateDict,
filepath: str,
ffmpeg_fps: int,
ffmpeg_qscale: int,
colmap_camera: str,
):
if session_state_value["uuid"] is None:
return
print("process")
# print(args)
# print(kwargs)
# return
print(session_state_value)
print(f"Processing {filepath}")
try:
session_tmpdirname = gs_dir / str(session_state_value['uuid'])
session_tmpdirname.mkdir(parents=True, exist_ok=True)
print('Created temporary directory', session_tmpdirname)
gs_dir_path = Path(session_tmpdirname)
logfile_path = Path(session_tmpdirname) / "log.txt"
logfile_path.touch()
with logfile_path.open("w") as log_file:
# Create log file
logfile_path.touch()
from services.ffmpeg import ffmpeg_run
ffmpeg_run(
video_path = Path(filepath),
output_path = gs_dir_path,
fps = int(ffmpeg_fps),
qscale = int(ffmpeg_qscale),
stream_file=log_file
)
from services.colmap import colmap
colmap(
source_path=gs_dir_path,
camera=str(colmap_camera),
stream_file=log_file
)
print("Done with colmap")
# Create a zip of the gs_dir_path folder
print(gs_dir, gs_dir_path)
print(gs_dir_path.name)
archive = shutil.make_archive("result", 'zip', gs_dir, gs_dir_path)
print('Created zip file', archive)
# Move the zip file to the gs_dir_path folder
shutil.move(archive, gs_dir_path)
from services.gaussian_splatting_cuda import gaussian_splatting_cuda
gaussian_splatting_cuda(
data_path = gs_dir_path,
output_path = gs_dir_path / "output",
gs_command = str(Path(__file__).parent.absolute() / "build" / 'gaussian_splatting_cuda'),
iterations = 100,
convergence_rate = 0.01,
resolution = 512,
enable_cr_monitoring = False,
force = False,
empty_gpu_cache = False,
stream_file = log_file
)
except Exception:
pass
# print('Error - Removing temporary directory', session_tmpdirname)
# shutil.rmtree(session_tmpdirname)
def updateLog(session_state_value: StateDict) -> str:
if session_state_value["uuid"] is None:
return ""
log_file = gs_dir / str(session_state_value['uuid']) / "log.txt"
if not log_file.exists():
return ""
with log_file.open("r") as log_file:
logs = log_file.read()
return logs
with gr.Blocks() as demo:
session_state = gr.State({
"uuid": None,
})
with gr.Row():
with gr.Column():
video_input = gr.PlayableVideo(
format="mp4",
source="upload",
label="Upload a video",
include_audio=False
)
with gr.Row(variant="panel"):
ffmpeg_fps = gr.Number(
label="FFMPEG FPE",
value=1,
minimum=1,
maximum=5,
step=0.10,
)
ffmpeg_qscale = gr.Number(
label="FFMPEG QSCALE",
value=1,
minimum=1,
maximum=5,
step=1,
)
colmap_camera = gr.Dropdown(
label="COLMAP Camera",
value="OPENCV",
choices=["OPENCV", "SIMPLE_PINHOLE", "PINHOLE", "SIMPLE_RADIAL", "RADIAL"],
)
text_log = gr.Textbox(
label="Logs",
info="Logs",
interactive=False,
show_copy_button=True
)
# text_log = gr.Code(
# label="Logs",
# language=None,
# interactive=False,
# )
process_button = gr.Button("Process", visible=False)
reset_button = gr.ClearButton(
components=[video_input, text_log, ffmpeg_fps, ffmpeg_qscale, colmap_camera],
label="Reset",
visible=False,
)
process_event = process_button.click(
fn=process,
inputs=[session_state, video_input, ffmpeg_fps, ffmpeg_qscale, colmap_camera],
outputs=[],
)
upload_event = video_input.upload(
fn=makeButtonVisible,
inputs=[],
outputs=[process_button, reset_button]
).then(
fn=createStateSession,
inputs=[],
outputs=[session_state],
).then(
fn=updateLog,
inputs=[session_state],
outputs=[text_log],
every=2,
)
reset_button.click(
fn=resetSession,
inputs=[session_state],
outputs=[session_state, process_button, reset_button],
cancels=[process_event]
)
video_input.clear(
fn=resetSession,
inputs=[session_state],
outputs=[session_state, process_button, reset_button],
cancels=[process_event]
)
demo.close
# gr.LoginButton, gr.LogoutButton
# gr.HuggingFaceDatasetSaver
# gr.OAuthProfile
# with gr.Tab("jsdn"):
# input_mic = gr.HTML(getHTML())
demo.queue()
# demo.launch()
# mount Gradio app to FastAPI app
app = gr.mount_gradio_app(app, demo, path="/")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)