blanchon's picture
πŸ”₯ First commit
08d80be
from typing import Literal, Optional
from io import IOBase
import os
from pathlib import Path
import shutil
import subprocess
from rich.progress import Progress
from rich.console import Console
console = Console()
class FailedProcess(Exception):
pass
def colmap_feature_extraction(
database_path: Path,
image_path: Path,
camera: Literal["OPENCV"],
colmap_command: str = "colmap",
use_gpu: bool = True,
stream_file: Optional[IOBase] = None
):
total = len(list(image_path.glob("*.jpg")))
with Progress(console=console) as progress:
task = progress.add_task("Feature Extraction", total=total)
database_path.parent.mkdir(parents=True, exist_ok=True)
cmd = [
colmap_command,
"feature_extractor",
"--database_path", database_path.as_posix(),
"--image_path", image_path.as_posix(),
"--ImageReader.single_camera", "1",
"--ImageReader.camera_model", camera,
"--SiftExtraction.use_gpu", "1" if use_gpu else "0",
# "--SiftExtraction.domain_size_pooling", "1",
# "--SiftExtraction.estimate_affine_shape", "1"
]
console.log(f"πŸ’» Executing command: {' '.join(cmd)}")
_stdout = stream_file if stream_file else subprocess.PIPE
with subprocess.Popen(cmd, stdout=_stdout, stderr=subprocess.STDOUT, text=True) as process:
if process.stdout:
for line in process.stdout:
if line.startswith("Processed file "):
line_process = line\
.replace("Processed file [", "")\
.replace("]", "")\
.replace("\n", "")
current, total = line_process.split("/")
progress.update(task, completed=int(current), total=int(total), refresh=True)
progress.update(task, completed=int(total), refresh=True)
return_code = process.returncode
if return_code == 0:
console.log(f'Feature stored in {database_path.as_posix()}.')
console.log('βœ… Feature extraction completed.')
else:
raise FailedProcess("Feature extraction failed.")
def colmap_feature_matching(
database_path: Path,
image_path: Path,
colmap_command: str = "colmap",
use_gpu: bool = True,
stream_file: Optional[IOBase] = None
):
total = len(list(image_path.glob("*.jpg")))
with Progress(console=console) as progress:
task = progress.add_task("Feature Matching", total=total)
database_path
cmd = [
colmap_command,
"exhaustive_matcher",
"--database_path", database_path.as_posix(),
"--SiftMatching.use_gpu", "1" if use_gpu else "0"
]
console.log(f"πŸ’» Executing command: {' '.join(cmd)}")
_stdout = stream_file if stream_file else subprocess.PIPE
with subprocess.Popen(cmd, stdout=_stdout, stderr=subprocess.STDOUT, text=True) as process:
if process.stdout:
for line in process.stdout:
pass
progress.update(task, completed=int(total), refresh=True)
return_code = process.returncode
if return_code == 0:
console.log('βœ… Feature matching completed.')
else:
raise FailedProcess("Feature matching failed.")
def colmap_bundle_adjustment(
database_path: Path,
image_path: Path,
sparse_path: Path,
colmap_command: str = "colmap",
stream_file: Optional[IOBase] = None
):
total = len(list(image_path.glob("*.jpg")))
with Progress(console=console) as progress:
task = progress.add_task("Bundle Adjustment", total=total)
cmd = [
colmap_command,
"mapper",
"--database_path", database_path.as_posix(),
"--image_path", image_path.as_posix(),
"--output_path", sparse_path.as_posix(),
"--Mapper.ba_global_function_tolerance=0.000001"
# "--Mapper.ba_local_max_num_iterations", "40",
# "--Mapper.ba_global_max_num_iterations", "100",
# "--Mapper.ba_local_max_refinements", "3",
# "--Mapper.ba_global_max_refinements", "5"
]
console.log(f"πŸ’» Executing command: {' '.join(cmd)}")
sparse_path.mkdir(parents=True, exist_ok=True)
_stdout = stream_file if stream_file else subprocess.PIPE
with subprocess.Popen(cmd, stdout=_stdout, stderr=subprocess.STDOUT, text=True) as process:
if process.stdout:
for line in process.stdout:
print(line)
if line.startswith("Registering image #"):
line_process = line\
.replace("Registering image #", "")\
.replace("\n", "")
*_, current = line_process.split("(")
current, *_ = current.split(")")
progress.update(task, completed=int(current), refresh=True)
progress.update(task, completed=int(total), refresh=True)
return_code = process.returncode
if return_code == 0:
console.log('βœ… Bundle adjustment completed.')
else:
raise FailedProcess("Bundle adjustment failed.")
def colmap_image_undistortion(
image_path: Path,
sparse0_path: Path,
source_path: Path,
colmap_command: str = "colmap",
stream_file: Optional[IOBase] = None
):
total = len(list(image_path.glob("*.jpg")))
with Progress(console=console) as progress:
task = progress.add_task("Image Undistortion", total=total)
cmd = [
colmap_command,
"image_undistorter",
"--image_path", image_path.as_posix(),
"--input_path", sparse0_path.as_posix(),
"--output_path", source_path.as_posix(),
"--output_type", "COLMAP"
]
console.log(f"πŸ’» Executing command: {' '.join(cmd)}")
_stdout = stream_file if stream_file else subprocess.PIPE
with subprocess.Popen(cmd, stdout=_stdout, stderr=subprocess.STDOUT, text=True) as process:
if process.stdout:
for line in process.stdout:
if line.startswith("Undistorting image ["):
line_process = line\
.replace("Undistorting image [", "")\
.replace("]", "")\
.replace("\n", "")
current, total = line_process.split("/")
progress.update(task, completed=int(current), total=int(total), refresh=True)
progress.update(task, completed=int(total), refresh=True)
return_code = process.returncode
if return_code == 0:
console.log('βœ… Image undistortion completed.')
else:
raise FailedProcess("Image undistortion failed.")
def colmap(
source_path: Path,
camera: Literal["OPENCV"] = "OPENCV",
colmap_command: str = "colmap",
use_gpu: bool = True,
skip_matching: bool = False,
stream_file: Optional[IOBase] = None
):
image_path = source_path / "input"
if not image_path.exists():
raise Exception(f"Image path {image_path} does not exist. Exiting.")
total = len(list(image_path.glob("*.jpg")))
if total == 0:
raise Exception(f"No images found in {image_path}. Exiting.")
database_path = source_path / "distorted" / "database.db"
sparse_path = source_path / "distorted" / "sparse"
if not skip_matching:
colmap_feature_extraction(database_path, image_path, camera, colmap_command, use_gpu, stream_file)
colmap_feature_matching(database_path, image_path, colmap_command, use_gpu, stream_file)
colmap_bundle_adjustment(database_path, image_path, sparse_path, colmap_command, stream_file)
colmap_image_undistortion(image_path, sparse_path / "0", source_path, colmap_command, stream_file)
origin_path = source_path / "sparse"
destination_path = source_path / "sparse" / "0"
destination_path.mkdir(exist_ok=True)
console.log(f"🌟 Moving files from {origin_path} to {destination_path}")
for file in os.listdir(origin_path):
if file == '0':
continue
source_file = os.path.join(origin_path, file)
destination_file = os.path.join(destination_path, file)
shutil.copy(source_file, destination_file)
if __name__ == "__main__":
import tempfile
with tempfile.NamedTemporaryFile(mode='w+t') as temp_file:
print(f"Using temp file: {temp_file.name}")
try:
colmap(
source_path = Path("/home/europe/Desktop/gaussian-splatting-kit/test/"),
camera = "OPENCV",
colmap_command = "colmap",
use_gpu = True,
skip_matching = False,
stream_file = open("/home/europe/Desktop/gaussian-splatting-kit/test.log", "w+t")
)
except FailedProcess:
console.log("🚨 Error executing colmap.")
temp_file.seek(0)
print(temp_file.read())