#!/usr/bin/env python
# coding: utf8
#
# Copyright (C) 2023 CNES.
#
# This file is part of cars-mesh
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Class associated to CARS-MESH state machine
"""
# Standard imports
import logging
# Third party imports
from transitions import Machine, MachineError
# Cars-mesh imports
from . import param
from .tools.handlers import Mesh
[docs]
class CarsMeshMachine(Machine):
"""CARS-MESH state machine"""
def __init__(
self, mesh_data: Mesh, initial_state: str = "initial_pcd"
) -> None:
"""
Init a state machine for 3D mesh reconstruction
Parameters
----------
mesh_data: Mesh
Mesh instance to process
initial_state: str (default='initial_pcd')
Specify the initial state. Can either be 'initial_pcd' (if
input data is a point cloud) or 'meshed_pcd' (if input data is a
mesh)
"""
# Init arguments
self.mesh_data = mesh_data
self.initial_state = initial_state
# Available states
self.states_ = [
"initial_pcd",
"filtered_pcd",
"denoised_pcd",
"meshed_pcd",
"textured_pcd",
]
# Available transitions
self.transitions_ = [
# From the 'initial_pcd' state
{
"trigger": "filter",
"source": "initial_pcd",
"dest": "filtered_pcd",
"after": "filter_run",
},
{
"trigger": "denoise_pcd",
"source": "initial_pcd",
"dest": "denoised_pcd",
"after": "denoise_pcd_run",
},
{
"trigger": "mesh",
"source": "initial_pcd",
"dest": "meshed_pcd",
"after": "mesh_run",
},
# From the 'filtered_pcd' state
{
"trigger": "filter",
"source": "filtered_pcd",
"dest": "filtered_pcd",
"after": "filter_run",
},
{
"trigger": "denoise_pcd",
"source": "filtered_pcd",
"dest": "denoised_pcd",
"after": "denoise_pcd_run",
},
{
"trigger": "mesh",
"source": "filtered_pcd",
"dest": "meshed_pcd",
"after": "mesh_run",
},
# From the 'denoised_pcd' state
{
"trigger": "denoise_pcd",
"source": "denoised_pcd",
"dest": "denoised_pcd",
"after": "denoise_pcd_run",
},
{
"trigger": "mesh",
"source": "denoised_pcd",
"dest": "meshed_pcd",
"after": "mesh_run",
},
# From the 'meshed_pcd' state
{
"trigger": "simplify_mesh",
"source": "meshed_pcd",
"dest": "meshed_pcd",
"after": "simplify_mesh_run",
},
{
"trigger": "denoise_mesh",
"source": "meshed_pcd",
"dest": "meshed_pcd",
"after": "denoise_mesh_run",
},
{
"trigger": "texture",
"source": "meshed_pcd",
"dest": "textured_pcd",
"after": "texture_run",
},
]
# Initialize a machine model
Machine.__init__(
self,
states=self.states_,
initial=self.initial_state,
transitions=self.transitions_,
auto_transitions=False,
)
[docs]
def run(self, step: dict, cfg: dict) -> None:
"""
Run CARS-MESH step by triggering the corresponding machine transition
Parameters
----------
step: str
Name of the step to trigger
cfg: dict
Configuration dictionary
"""
try:
self.trigger(step["action"], step, cfg)
except (MachineError, KeyError, AttributeError):
logging.error(
f"A problem occurs during CARS-MESH running {step['action']} "
f"step. Be sure of your sequencing."
)
raise
[docs]
def filter_run(
self, step: dict, cfg: dict, check_mode: bool = False
) -> None:
"""
Filter the point cloud from outliers
Parameters
----------
step: dict
Parameters of the step to run
cfg: dict
Configuration dictionary
check_mode: bool (default=False)
Option to run the transition checker
"""
assert isinstance(cfg, dict)
if check_mode:
# For checking transition validity
pass
else:
# Apply the filtering method chosen by the user
self.mesh_data.pcd = param.TRANSITIONS_METHODS[step["action"]][
step["method"]
](self.mesh_data.pcd, **step["params"])
[docs]
def denoise_pcd_run(
self, step: dict, cfg: dict, check_mode: bool = False
) -> None:
"""
Denoise the point cloud
Parameters
----------
step: dict
Parameters of the step to run
cfg: dict
Configuration dictionary
check_mode: bool (default=False)
Option to run the transition checker
"""
assert isinstance(cfg, dict)
if check_mode:
# For checking transition validity
pass
else:
# Apply the denoising method chosen by the user
self.mesh_data.pcd = param.TRANSITIONS_METHODS[step["action"]][
step["method"]
](self.mesh_data.pcd, **step["params"])
[docs]
def mesh_run(self, step: dict, cfg: dict, check_mode: bool = False) -> None:
"""
Mesh the point cloud
Parameters
----------
step: dict
Parameters of the step to run
cfg: dict
Configuration dictionary
check_mode: bool (default=False)
Option to run the transition checker
"""
assert isinstance(cfg, dict)
if check_mode:
# For checking transition validity
pass
else:
# Apply the meshing method chosen by the user
self.mesh_data = param.TRANSITIONS_METHODS[step["action"]][
step["method"]
](self.mesh_data.pcd, **step["params"])
[docs]
def simplify_mesh_run(
self, step: dict, cfg: dict, check_mode: bool = False
) -> None:
"""
Simplify the mesh to reduce the number of faces
Parameters
----------
step: dict
Parameters of the step to run
cfg: dict
Configuration dictionary
check_mode: bool (default=False)
Option to run the transition checker
"""
assert isinstance(cfg, dict)
if check_mode:
# For checking transition validity
pass
else:
# Apply the meshing method chosen by the user
self.mesh_data = param.TRANSITIONS_METHODS[step["action"]][
step["method"]
](self.mesh_data, **step["params"])
[docs]
def denoise_mesh_run(
self, step: dict, cfg: dict, check_mode: bool = False
) -> None:
"""
Denoise the mesh
Parameters
----------
step: dict
Parameters of the step to run
cfg: dict
Configuration dictionary
check_mode: bool (default=False)
Option to run the transition checker
"""
assert isinstance(cfg, dict)
if check_mode:
# For checking transition validity
pass
else:
# Apply the meshing method chosen by the user
self.mesh_data = param.TRANSITIONS_METHODS[step["action"]][
step["method"]
](self.mesh_data, **step["params"])
[docs]
def texture_run(
self, step: dict, cfg: dict, check_mode: bool = False
) -> None:
"""
Texture the mesh
Parameters
----------
step: dict
Parameters of the step to run
cfg: dict
Configuration dictionary
check_mode: bool (default=False)
Option to run the transition checker
"""
assert isinstance(cfg, dict)
if check_mode:
# For checking transition validity
pass
else:
# Apply the texturing method chosen by the user
self.mesh_data = param.TRANSITIONS_METHODS[step["action"]][
step["method"]
](self.mesh_data, cfg, **step["params"])
[docs]
def check_transitions(self, cfg: dict) -> None:
"""
Browse user defined steps and pass them just to check they are valid
steps in the state machine.
No action is done.
Parameters
----------
cfg: dict
Configuration dictionary
"""
logging.debug("Check state machine transitions' validity.")
try:
# Check if the sequencing asked by the user is valid
for _, step in enumerate(cfg["state_machine"]):
self.trigger(step["action"], step, cfg, check_mode=True)
except (MachineError, KeyError, AttributeError):
logging.error(
"A problem occurs during CARS-MESH transition check. "
"Be sure of your sequencing."
)
raise
# Add transition to reset
self.add_transition(
trigger="reset", source=self.state, dest=f"{self.initial_state}"
)
# Reset at initial step
self.trigger("reset", cfg)
# Remove transition for resetting
self.remove_transition(
trigger="reset", source=self.state, dest=f"{self.initial_state}"
)