Spaces:
Build error
Build error
from typing import Literal, Optional | |
from io import IOBase | |
import os | |
from pathlib import Path | |
import shutil | |
import subprocess | |
from rich.progress import Progress | |
from rich.console import Console | |
console = Console() | |
class FailedProcess(Exception): | |
pass | |
def colmap_feature_extraction( | |
database_path: Path, | |
image_path: Path, | |
camera: Literal["OPENCV"], | |
colmap_command: str = "colmap", | |
use_gpu: bool = True, | |
stream_file: Optional[IOBase] = None | |
): | |
total = len(list(image_path.glob("*.jpg"))) | |
with Progress(console=console) as progress: | |
task = progress.add_task("Feature Extraction", total=total) | |
database_path.parent.mkdir(parents=True, exist_ok=True) | |
cmd = [ | |
colmap_command, | |
"feature_extractor", | |
"--database_path", database_path.as_posix(), | |
"--image_path", image_path.as_posix(), | |
"--ImageReader.single_camera", "1", | |
"--ImageReader.camera_model", camera, | |
"--SiftExtraction.use_gpu", "1" if use_gpu else "0", | |
# "--SiftExtraction.domain_size_pooling", "1", | |
# "--SiftExtraction.estimate_affine_shape", "1" | |
] | |
console.log(f"π» Executing command: {' '.join(cmd)}") | |
_stdout = stream_file if stream_file else subprocess.PIPE | |
with subprocess.Popen(cmd, stdout=_stdout, stderr=subprocess.STDOUT, text=True) as process: | |
if process.stdout: | |
for line in process.stdout: | |
if line.startswith("Processed file "): | |
line_process = line\ | |
.replace("Processed file [", "")\ | |
.replace("]", "")\ | |
.replace("\n", "") | |
current, total = line_process.split("/") | |
progress.update(task, completed=int(current), total=int(total), refresh=True) | |
progress.update(task, completed=int(total), refresh=True) | |
return_code = process.returncode | |
if return_code == 0: | |
console.log(f'Feature stored in {database_path.as_posix()}.') | |
console.log('β Feature extraction completed.') | |
else: | |
raise FailedProcess("Feature extraction failed.") | |
def colmap_feature_matching( | |
database_path: Path, | |
image_path: Path, | |
colmap_command: str = "colmap", | |
use_gpu: bool = True, | |
stream_file: Optional[IOBase] = None | |
): | |
total = len(list(image_path.glob("*.jpg"))) | |
with Progress(console=console) as progress: | |
task = progress.add_task("Feature Matching", total=total) | |
database_path | |
cmd = [ | |
colmap_command, | |
"exhaustive_matcher", | |
"--database_path", database_path.as_posix(), | |
"--SiftMatching.use_gpu", "1" if use_gpu else "0" | |
] | |
console.log(f"π» Executing command: {' '.join(cmd)}") | |
_stdout = stream_file if stream_file else subprocess.PIPE | |
with subprocess.Popen(cmd, stdout=_stdout, stderr=subprocess.STDOUT, text=True) as process: | |
if process.stdout: | |
for line in process.stdout: | |
pass | |
progress.update(task, completed=int(total), refresh=True) | |
return_code = process.returncode | |
if return_code == 0: | |
console.log('β Feature matching completed.') | |
else: | |
raise FailedProcess("Feature matching failed.") | |
def colmap_bundle_adjustment( | |
database_path: Path, | |
image_path: Path, | |
sparse_path: Path, | |
colmap_command: str = "colmap", | |
stream_file: Optional[IOBase] = None | |
): | |
total = len(list(image_path.glob("*.jpg"))) | |
with Progress(console=console) as progress: | |
task = progress.add_task("Bundle Adjustment", total=total) | |
cmd = [ | |
colmap_command, | |
"mapper", | |
"--database_path", database_path.as_posix(), | |
"--image_path", image_path.as_posix(), | |
"--output_path", sparse_path.as_posix(), | |
"--Mapper.ba_global_function_tolerance=0.000001" | |
# "--Mapper.ba_local_max_num_iterations", "40", | |
# "--Mapper.ba_global_max_num_iterations", "100", | |
# "--Mapper.ba_local_max_refinements", "3", | |
# "--Mapper.ba_global_max_refinements", "5" | |
] | |
console.log(f"π» Executing command: {' '.join(cmd)}") | |
sparse_path.mkdir(parents=True, exist_ok=True) | |
_stdout = stream_file if stream_file else subprocess.PIPE | |
with subprocess.Popen(cmd, stdout=_stdout, stderr=subprocess.STDOUT, text=True) as process: | |
if process.stdout: | |
for line in process.stdout: | |
print(line) | |
if line.startswith("Registering image #"): | |
line_process = line\ | |
.replace("Registering image #", "")\ | |
.replace("\n", "") | |
*_, current = line_process.split("(") | |
current, *_ = current.split(")") | |
progress.update(task, completed=int(current), refresh=True) | |
progress.update(task, completed=int(total), refresh=True) | |
return_code = process.returncode | |
if return_code == 0: | |
console.log('β Bundle adjustment completed.') | |
else: | |
raise FailedProcess("Bundle adjustment failed.") | |
def colmap_image_undistortion( | |
image_path: Path, | |
sparse0_path: Path, | |
source_path: Path, | |
colmap_command: str = "colmap", | |
stream_file: Optional[IOBase] = None | |
): | |
total = len(list(image_path.glob("*.jpg"))) | |
with Progress(console=console) as progress: | |
task = progress.add_task("Image Undistortion", total=total) | |
cmd = [ | |
colmap_command, | |
"image_undistorter", | |
"--image_path", image_path.as_posix(), | |
"--input_path", sparse0_path.as_posix(), | |
"--output_path", source_path.as_posix(), | |
"--output_type", "COLMAP" | |
] | |
console.log(f"π» Executing command: {' '.join(cmd)}") | |
_stdout = stream_file if stream_file else subprocess.PIPE | |
with subprocess.Popen(cmd, stdout=_stdout, stderr=subprocess.STDOUT, text=True) as process: | |
if process.stdout: | |
for line in process.stdout: | |
if line.startswith("Undistorting image ["): | |
line_process = line\ | |
.replace("Undistorting image [", "")\ | |
.replace("]", "")\ | |
.replace("\n", "") | |
current, total = line_process.split("/") | |
progress.update(task, completed=int(current), total=int(total), refresh=True) | |
progress.update(task, completed=int(total), refresh=True) | |
return_code = process.returncode | |
if return_code == 0: | |
console.log('β Image undistortion completed.') | |
else: | |
raise FailedProcess("Image undistortion failed.") | |
def colmap( | |
source_path: Path, | |
camera: Literal["OPENCV"] = "OPENCV", | |
colmap_command: str = "colmap", | |
use_gpu: bool = True, | |
skip_matching: bool = False, | |
stream_file: Optional[IOBase] = None | |
): | |
image_path = source_path / "input" | |
if not image_path.exists(): | |
raise Exception(f"Image path {image_path} does not exist. Exiting.") | |
total = len(list(image_path.glob("*.jpg"))) | |
if total == 0: | |
raise Exception(f"No images found in {image_path}. Exiting.") | |
database_path = source_path / "distorted" / "database.db" | |
sparse_path = source_path / "distorted" / "sparse" | |
if not skip_matching: | |
colmap_feature_extraction(database_path, image_path, camera, colmap_command, use_gpu, stream_file) | |
colmap_feature_matching(database_path, image_path, colmap_command, use_gpu, stream_file) | |
colmap_bundle_adjustment(database_path, image_path, sparse_path, colmap_command, stream_file) | |
colmap_image_undistortion(image_path, sparse_path / "0", source_path, colmap_command, stream_file) | |
origin_path = source_path / "sparse" | |
destination_path = source_path / "sparse" / "0" | |
destination_path.mkdir(exist_ok=True) | |
console.log(f"π Moving files from {origin_path} to {destination_path}") | |
for file in os.listdir(origin_path): | |
if file == '0': | |
continue | |
source_file = os.path.join(origin_path, file) | |
destination_file = os.path.join(destination_path, file) | |
shutil.copy(source_file, destination_file) | |
if __name__ == "__main__": | |
import tempfile | |
with tempfile.NamedTemporaryFile(mode='w+t') as temp_file: | |
print(f"Using temp file: {temp_file.name}") | |
try: | |
colmap( | |
source_path = Path("/home/europe/Desktop/gaussian-splatting-kit/test/"), | |
camera = "OPENCV", | |
colmap_command = "colmap", | |
use_gpu = True, | |
skip_matching = False, | |
stream_file = open("/home/europe/Desktop/gaussian-splatting-kit/test.log", "w+t") | |
) | |
except FailedProcess: | |
console.log("π¨ Error executing colmap.") | |
temp_file.seek(0) | |
print(temp_file.read()) |