from pathlib import Path import shutil import tempfile import gradio as gr import uuid from typing_extensions import TypedDict, Tuple from fastapi import FastAPI from fastapi.staticfiles import StaticFiles import uvicorn app = FastAPI() # create a static directory to store the static files gs_dir = Path(str(tempfile.gettempdir())) / "gaussian_splatting_gradio" gs_dir.mkdir(parents=True, exist_ok=True) # mount FastAPI StaticFiles server app.mount("/static", StaticFiles(directory=gs_dir), name="static") StateDict = TypedDict("StateDict", { "uuid": str, }) def getHTML(): html_body = """
""" html = f""" 3D Gaussian Splatting Viewer {html_body} """ return f"""""" def createStateSession() -> StateDict: # Create new session session_uuid = str(uuid.uuid4()) print("createStateSession") print(session_uuid) return StateDict( uuid=session_uuid, ) def removeStateSession(session_state_value: StateDict): # Clean up previous session return StateDict( uuid=None, ) def makeButtonVisible() -> Tuple[gr.Button, gr.Button]: process_button = gr.Button(visible=True) reset_button = gr.Button(visible=False) #TODO: I will bring this back when I figure out how to stop the process return process_button, reset_button def resetSession(state: StateDict) -> Tuple[StateDict, gr.Button, gr.Button]: print("resetSession") new_state = removeStateSession(state) process_button = gr.Button(visible=False) reset_button = gr.Button(visible=False) return new_state, process_button, reset_button def process( # *args, **kwargs session_state_value: StateDict, filepath: str, ffmpeg_fps: int, ffmpeg_qscale: int, colmap_camera: str, ): if session_state_value["uuid"] is None: return print("process") # print(args) # print(kwargs) # return print(session_state_value) print(f"Processing {filepath}") try: session_tmpdirname = gs_dir / str(session_state_value['uuid']) session_tmpdirname.mkdir(parents=True, exist_ok=True) print('Created temporary directory', session_tmpdirname) gs_dir_path = Path(session_tmpdirname) logfile_path = Path(session_tmpdirname) / "log.txt" logfile_path.touch() with logfile_path.open("w") as log_file: # Create log file logfile_path.touch() from services.ffmpeg import ffmpeg_run ffmpeg_run( video_path = Path(filepath), output_path = gs_dir_path, fps = int(ffmpeg_fps), qscale = int(ffmpeg_qscale), stream_file=log_file ) from services.colmap import colmap colmap( source_path=gs_dir_path, camera=str(colmap_camera), stream_file=log_file ) print("Done with colmap") # Create a zip of the gs_dir_path folder print(gs_dir, gs_dir_path) print(gs_dir_path.name) archive = shutil.make_archive("result", 'zip', gs_dir, gs_dir_path) print('Created zip file', archive) # Move the zip file to the gs_dir_path folder shutil.move(archive, gs_dir_path) from services.gaussian_splatting_cuda import gaussian_splatting_cuda gaussian_splatting_cuda( data_path = gs_dir_path, output_path = gs_dir_path / "output", gs_command = str(Path(__file__).parent.absolute() / "build" / 'gaussian_splatting_cuda'), iterations = 100, convergence_rate = 0.01, resolution = 512, enable_cr_monitoring = False, force = False, empty_gpu_cache = False, stream_file = log_file ) except Exception: pass # print('Error - Removing temporary directory', session_tmpdirname) # shutil.rmtree(session_tmpdirname) def updateLog(session_state_value: StateDict) -> str: if session_state_value["uuid"] is None: return "" log_file = gs_dir / str(session_state_value['uuid']) / "log.txt" if not log_file.exists(): return "" with log_file.open("r") as log_file: logs = log_file.read() return logs with gr.Blocks() as demo: session_state = gr.State({ "uuid": None, }) with gr.Row(): with gr.Column(): video_input = gr.PlayableVideo( format="mp4", source="upload", label="Upload a video", include_audio=False ) with gr.Row(variant="panel"): ffmpeg_fps = gr.Number( label="FFMPEG FPE", value=1, minimum=1, maximum=5, step=0.10, ) ffmpeg_qscale = gr.Number( label="FFMPEG QSCALE", value=1, minimum=1, maximum=5, step=1, ) colmap_camera = gr.Dropdown( label="COLMAP Camera", value="OPENCV", choices=["OPENCV", "SIMPLE_PINHOLE", "PINHOLE", "SIMPLE_RADIAL", "RADIAL"], ) text_log = gr.Textbox( label="Logs", info="Logs", interactive=False, show_copy_button=True ) # text_log = gr.Code( # label="Logs", # language=None, # interactive=False, # ) process_button = gr.Button("Process", visible=False) reset_button = gr.ClearButton( components=[video_input, text_log, ffmpeg_fps, ffmpeg_qscale, colmap_camera], label="Reset", visible=False, ) process_event = process_button.click( fn=process, inputs=[session_state, video_input, ffmpeg_fps, ffmpeg_qscale, colmap_camera], outputs=[], ) upload_event = video_input.upload( fn=makeButtonVisible, inputs=[], outputs=[process_button, reset_button] ).then( fn=createStateSession, inputs=[], outputs=[session_state], ).then( fn=updateLog, inputs=[session_state], outputs=[text_log], every=2, ) reset_button.click( fn=resetSession, inputs=[session_state], outputs=[session_state, process_button, reset_button], cancels=[process_event] ) video_input.clear( fn=resetSession, inputs=[session_state], outputs=[session_state, process_button, reset_button], cancels=[process_event] ) demo.close # gr.LoginButton, gr.LogoutButton # gr.HuggingFaceDatasetSaver # gr.OAuthProfile # with gr.Tab("jsdn"): # input_mic = gr.HTML(getHTML()) demo.queue() # demo.launch() # mount Gradio app to FastAPI app app = gr.mount_gradio_app(app, demo, path="/") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)