Source code for cars_mesh.reconstruct_pipeline

#!/usr/bin/env python
# coding: utf8
#
# Copyright (C) 2023 CNES.
#
# This file is part of cars-mesh

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Main Reconstruct pipeline module of cars-mesh tool.
"""

import logging
import os

from . import setup_logging
from .config import check_config, save_config_file
from .state_machine import CarsMeshMachine
from .tools.handlers import Mesh, read_input_path


[docs] def run(cars_mesh_machine: CarsMeshMachine, cfg: dict) -> Mesh: """ Run the state machine Parameters ---------- cars_mesh_machine: CarsMeshMachine CARS-MESH state machine model cfg: dict Configuration dictionary Returns ------- mesh: Mesh Mesh object """ if not cfg["state_machine"]: # There is no step given to the state machine logging.warning("State machine is empty. Returning the initial data.") else: logging.debug(f"Initial state: {cars_mesh_machine.initial_state}") # Check transitions' validity cars_mesh_machine.check_transitions(cfg) # Browse user defined steps and execute them for k, step in enumerate(cfg["state_machine"]): # Logger logging.info( f"Step #{k + 1}: {step['action']} with {step['method']} method" ) # Run action cars_mesh_machine.run(step, cfg) # (Optional) Save intermediate results to disk if asked if "save_output" in step: if step["save_output"]: # Create directory to save intermediate results intermediate_folder = os.path.join( cfg["output_dir"], "intermediate_results" ) os.makedirs(intermediate_folder, exist_ok=True) if k == 0 and os.listdir(intermediate_folder): logging.warning( f"Directory '{intermediate_folder}' is not empty. " f"Some files might be overwritten." ) # Save intermediates results intermediate_filepath = os.path.join( intermediate_folder, f"{(str(k+1)).zfill(2)}" f"_{step['action']}" f"_{step['method']}", ) if cars_mesh_machine.mesh_data.df is not None: # Mesh serialisation extension = "ply" cars_mesh_machine.mesh_data.serialize( filepath=intermediate_filepath + "." + extension, extension=extension, ) else: # Point cloud serialisation extension = "laz" cars_mesh_machine.mesh_data.pcd.serialize( filepath=intermediate_filepath + "." + extension, extension=extension, ) # Logger debug logging.debug( f"Step #{k + 1}: Results saved in " f"'{intermediate_filepath + '.' + extension}'." ) return cars_mesh_machine.mesh_data
[docs] def main(cfg_path: str) -> None: """ Main function to apply cars-mesh pipeline Parameters ---------- cfg_path: str Path to the JSON configuration file """ # To avoid having a logger INFO for each state machine step logging.getLogger("transitions").setLevel(logging.WARNING) # Check the validity of the config and update cfg with absolute paths cfg = check_config(cfg_path) # Create output directory and update config output_dir = os.path.abspath(cfg["output_dir"]) cfg["output_dir"] = output_dir # Create output_dir os.makedirs(cfg["output_dir"], exist_ok=True) # Save the configuration in the output dir # with inputs and output absolute paths save_config_file( os.path.join(cfg["output_dir"], os.path.basename(cfg_path)), cfg ) # Write logs to disk setup_logging.add_log_file(cfg["output_dir"], "cars_mesh") logging.info("Configuration file checked.") # Read input data mesh = read_input_path(cfg["input_path"]) # Init state machine model cars_mesh_machine = CarsMeshMachine( mesh_data=mesh, initial_state=cfg["initial_state"] ) # Run the pipeline according to the user configuration out_mesh = run(cars_mesh_machine, cfg) # Serialize data # Check if user specified an output name # otherwise assign a default one if "output_name" not in cfg: cfg["output_name"] = "output_cars-mesh" # default output name # If data is only a point cloud, prefer output extension 'laz' # otherwise use 'ply' extension = "ply" if out_mesh.df is not None else "laz" out_filename = cfg["output_name"] + "." + extension if out_mesh.df is not None: # Mesh Serialisation out_mesh.serialize( filepath=os.path.join(cfg["output_dir"], out_filename), extension=extension, ) logging.info(f"Mesh serialized as a '{extension}' file") else: # Point Cloud Serialisation out_mesh.pcd.serialize( filepath=os.path.join(cfg["output_dir"], out_filename), extension=extension, ) logging.info(f"Point cloud serialized as a '{extension}' file")