Source code for unified_planning.engines.results

# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""This module defines the PlanGenerationResult class."""


from fractions import Fraction
import unified_planning as up
from unified_planning.exceptions import UPUsageError, UPValueError
from unified_planning.model import AbstractProblem, Problem, PlanQualityMetric
from unified_planning.plans import ActionInstance, TimeTriggeredPlan, Plan
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Callable, Dict, Optional, List, Union, cast


class ValidationResultStatus(Enum):
    """
    Enum representing the 3 possible values in the `status` field of a :class:`~unified_planning.engines.ValidationResult`:
    VALID, INVALID or UNKNOWN.
    """

    VALID = (
        auto()
    )  # The plan is valid for the problem, it satisfies all the hard constraints
    INVALID = (
        auto()
    )  # The plan is invalid for the problem, it does not satisfy all the hard constraints
    UNKNOWN = (
        auto()
    )  # The planner can't tell if the plan is valid or invalid for the given problem

    def __bool__(self):
        if self == ValidationResultStatus.VALID:
            return True
        else:
            return False


class FailedValidationReason(Enum):
    """Enum representing the possible reasons the plan validation failed."""

    INAPPLICABLE_ACTION = auto()
    UNSATISFIED_GOALS = auto()
    MUTEX_CONFLICT = auto()


class PlanGenerationResultStatus(Enum):
    """
    Enum representing the 9 possible values in the status field of a :class:`~unified_planning.engines.PlanGenerationResult`:
    SOLVED_SATISFICING        -> Valid plan found.
    SOLVED_OPTIMALLY          -> Optimal plan found.
    UNSOLVABLE_PROVEN         -> The problem is impossible, no valid plan exists.
    UNSOLVABLE_INCOMPLETELY   -> The planner could not find a plan, but it's not sure that
    the problem is impossible (The planner is incomplete)
    TIMEOUT                   -> The planner ran out of time
    MEMOUT                    -> The planner ran out of memory
    INTERNAL_ERROR            -> The planner had an internal error
    UNSUPPORTED_PROBLEM       -> The problem given is not supported by the planner
    INTERMEDIATE              -> The report is not a final one but it's given through the callback function
    """

    SOLVED_SATISFICING = auto()  # Valid plan found.
    SOLVED_OPTIMALLY = auto()  # Optimal plan found.
    UNSOLVABLE_PROVEN = auto()  # The problem is impossible, no valid plan exists.
    UNSOLVABLE_INCOMPLETELY = (
        auto()
    )  # The planner could not find a plan, but it's not sure that the problem is impossible (The planner is incomplete)
    TIMEOUT = auto()  # The planner ran out of time
    MEMOUT = auto()  # The planner ran out of memory
    INTERNAL_ERROR = auto()  # The planner had an internal error
    UNSUPPORTED_PROBLEM = auto()  # The problem given is not supported by the planner
    INTERMEDIATE = (
        auto()
    )  # The report is not a final one but it's an intermediate anytime result


POSITIVE_OUTCOMES = frozenset(
    [
        PlanGenerationResultStatus.SOLVED_SATISFICING,
        PlanGenerationResultStatus.SOLVED_OPTIMALLY,
    ]
)

NEGATIVE_OUTCOMES = frozenset(
    [
        PlanGenerationResultStatus.UNSOLVABLE_PROVEN,
        PlanGenerationResultStatus.UNSOLVABLE_INCOMPLETELY,
        PlanGenerationResultStatus.UNSUPPORTED_PROBLEM,
    ]
)


class LogLevel(Enum):
    """
    Enum representing the 4 possible values in the verbosity level of a :class:`~unified_planning.engines.LogMessage`:
    DEBUG, INFO, WARNING and ERROR
    """

    DEBUG = auto()
    INFO = auto()
    WARNING = auto()
    ERROR = auto()


@dataclass
class LogMessage:
    """
    This class is composed by a message and the Enum LogLevel indicating
    this message level, like Debug, Info, Warning or Error.
    """

    level: LogLevel
    message: str

    def __str__(self):
        return f"[{self.level.name}] {self.message}"


@dataclass
class Result:
    """This class represents the base class for results given by the engines to the user."""

    def is_definitive_result(self, *args) -> bool:
        """This predicate should state if the Result is definitive or if it can be improved."""
        raise NotImplementedError


[docs] @dataclass class PlanGenerationResult(Result): """Class that represents the result of a plan generation call.""" status: PlanGenerationResultStatus plan: Optional["up.plans.Plan"] engine_name: str metrics: Optional[Dict[str, str]] = field(default=None) log_messages: Optional[List[LogMessage]] = field(default=None) def __post__init(self): # Checks that plan and status are consistent if self.status in POSITIVE_OUTCOMES and self.plan is None: raise UPUsageError( f"The Result status is {str(self.status)} but no plan is set." ) elif self.status in NEGATIVE_OUTCOMES and self.plan is not None: raise UPUsageError( f"The Result status is {str(self.status)} but the plan is {str(self.plan)}.\nWith this status the plan must be None." ) return self def __str__(self) -> str: ret = [ f"status: {self.status.name}", f"engine: {self.engine_name}", ] if self.plan is not None: ret.append(f"plan: {self.plan}") else: ret.append("plan: None") return "\n".join(ret)
[docs] def is_definitive_result(self, *args) -> bool: optimality_required = False if len(args) > 0: optimality_required = ( len(args[0].quality_metrics) > 0 ) # Require optimality if the problem has at least one quality metric. return ( self.status == PlanGenerationResultStatus.SOLVED_OPTIMALLY or self.status == PlanGenerationResultStatus.UNSOLVABLE_PROVEN or ( optimality_required and self.status == PlanGenerationResultStatus.SOLVED_SATISFICING ) )
def correct_plan_generation_result( result: PlanGenerationResult, problem: Problem, engine_epsilon: Optional[Union[int, float, str, Fraction]], ) -> PlanGenerationResult: """ This function takes a PlanGenerationResult of a temporal problem and corrects it considering the epsilon requested by the problem. This method works only with TimeTriggeredPlans when the result contains a Plan. :param result: The PlanGenerationResult that must be checked. :param problem: The Problem the given PlanGenerationResult refers to. :param engine_epsilon: The epsilon used by the Engine; if None it means that the Engine does not guarantee a minimum separation value. :return: The new PlanGenerationResult that enforces policy of handling different epsilons between the engine and the problem. """ assert result.plan is None or isinstance( result.plan, TimeTriggeredPlan ), "This method works only for TimeTriggeredPlans" if not isinstance(engine_epsilon, Fraction) and engine_epsilon is not None: try: engine_epsilon = Fraction(engine_epsilon) except ValueError as e: raise UPValueError( f"Given engine_epsilon is not convertible to Fraction: {str(e)}." ) if engine_epsilon == problem.epsilon: return result elif engine_epsilon is None or ( problem.epsilon is not None and engine_epsilon < problem.epsilon ): # if engine_epsilon is not specified or it's smaller than the problem's # requested epsilon, if the plan is not found the result is fine. # If the plan is found, it must be checked for the plan's epsilon. # if the plan epsilon is smaller than the one requested by the problem, # the result is not valid. assert problem.epsilon is not None if result.status in POSITIVE_OUTCOMES: # check that the solution fits the problem assert isinstance(result.plan, TimeTriggeredPlan) plan_epsilon = result.plan.extract_epsilon(problem) if plan_epsilon is not None and plan_epsilon < problem.epsilon: return PlanGenerationResult( PlanGenerationResultStatus.UNSOLVABLE_INCOMPLETELY, None, result.engine_name, result.metrics, result.log_messages, ) elif problem.epsilon is None or ( engine_epsilon is not None and problem.epsilon < engine_epsilon ): # If the problem's epsilon is not specified or it's smaller than the # epsilon specified by the Engine, the given solution might not be # final, therefore unsatisfiability or optimality can't be proven assert engine_epsilon is not None if result.status == PlanGenerationResultStatus.UNSOLVABLE_PROVEN: return PlanGenerationResult( PlanGenerationResultStatus.UNSOLVABLE_INCOMPLETELY, None, result.engine_name, result.metrics, result.log_messages, ) elif result.status == PlanGenerationResultStatus.SOLVED_OPTIMALLY: return PlanGenerationResult( PlanGenerationResultStatus.SOLVED_SATISFICING, None, result.engine_name, result.metrics, result.log_messages, ) return result
[docs] @dataclass class ValidationResult(Result): """Class that represents the result of a validate call.""" status: ValidationResultStatus engine_name: str log_messages: Optional[List[LogMessage]] = field(default=None) metric_evaluations: Optional[Dict[PlanQualityMetric, Union[int, Fraction]]] = field( default=None ) reason: Optional[FailedValidationReason] = field(default=None) inapplicable_action: Optional[up.plans.ActionInstance] = field(default=None) metrics: Optional[Dict[str, str]] = field(default=None) # The trace is either the sequences of states until the first validation error or a map from time to state for each event up to the first validation error trace: Optional[ Union[List[up.model.State], Dict[Fraction, up.model.State]] ] = field(default=None) def __post_init__(self): assert ( self.inapplicable_action is None or self.reason == FailedValidationReason.INAPPLICABLE_ACTION ), "The inapplicable_action can be set only if the reason of the failed plan is an inapplicable action." def __str__(self) -> str: ret = [ f"status: {self.status.name}", f"engine: {self.engine_name}", ] if self.metric_evaluations is not None: ret.append(f"metrics: ") for metric, value in self.metric_evaluations.items(): ret.append(f" {metric}: {value}") if self.reason is not None: ret.append(f"reason: {self.reason.name}") if self.inapplicable_action is not None: ret.append(f"inapplicable action: {self.inapplicable_action}") return "\n".join(ret)
[docs] def is_definitive_result(self, *args) -> bool: return True
def __bool__(self): return bool(self.status)
[docs] @dataclass class CompilerResult(Result): """Class that represents the result of a compile call.""" problem: Optional[AbstractProblem] map_back_action_instance: Optional[ Callable[[ActionInstance], Optional[ActionInstance]] ] engine_name: str log_messages: Optional[List[LogMessage]] = field(default=None) metrics: Optional[Dict[str, str]] = field(default=None) plan_back_conversion: Optional[Callable[[Plan], Plan]] = field(default=None) plan_forward_conversion: Optional[Callable[[Plan], Plan]] = field(default=None) def _post_init(self): # Check that compiled problem and map_back_action_instance or plan_back_conversion are consistent with each other if self.problem is None: if self.map_back_action_instance is not None: raise UPUsageError( "The compiled Problem is None but the map_back_action_instance Callable is not None." ) if self.plan_back_conversion is not None: raise UPUsageError( "The compiled Problem is None but the plan_back_conversion Callable is not None." ) elif ( self.map_back_action_instance is None and self.plan_back_conversion is None ): raise UPUsageError( f"The compiled Problem is not None but both map_back_action_instance and plan_back_conversion are None." ) if self.map_back_action_instance is not None: if self.plan_back_conversion is not None: raise UPUsageError( "Both map_back_action_instance and plan_back_conversion can't be specified" ) self.plan_back_conversion = lambda x: x.replace_action_instances( cast( Callable[[ActionInstance], Optional[ActionInstance]], self.map_back_action_instance, ) ) def __str__(self) -> str: ret = [ f"problem: {self.problem}", f"engine: {self.engine_name}", ] return "\n".join(ret)
[docs] def is_definitive_result(self, *args) -> bool: return self.problem is not None