Source code for unified_planning.engines.mixins.anytime_planner

# Copyright 2021-2023 AIPlan4EU project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import unified_planning as up
from abc import ABC, abstractmethod
from enum import Enum, auto
from typing import IO, Optional, Iterator
from warnings import warn


class _GetSolutionsWithParamsNotImplementedError(Exception):
    """
    Exception raised when the planner does not override the `_get_solutions_with_params` method.
    """


[docs] class AnytimeGuarantee(Enum): INCREASING_QUALITY = auto() OPTIMAL_PLANS = auto()
class AnytimePlannerMixin(ABC): """Base class that must be extended by an :class:`~unified_planning.engines.Engine` that is also a `AnytimePlanner`.""" def __init__(self): self.optimality_metric_required = False @staticmethod def is_anytime_planner() -> bool: return True @staticmethod def ensures(anytime_guarantee: AnytimeGuarantee) -> bool: """ :param anytime_guarantee: The `anytime_guarantee` that must be satisfied. :return: `True` if the `AnytimePlannerMixin` implementation ensures the given `anytime_guarantee`, `False` otherwise. """ return False def get_solutions( self, problem: "up.model.AbstractProblem", timeout: Optional[float] = None, output_stream: Optional[IO[str]] = None, warm_start_plan: Optional["up.plans.Plan"] = None, **kwargs, ) -> Iterator["up.engines.results.PlanGenerationResult"]: """ This method takes a `AbstractProblem` and returns an iterator of `PlanGenerationResult`, which contains information about the solution to the problem given by the planner. :param problem: is the `AbstractProblem` to solve. :param timeout: is the time in seconds that the planner has at max to solve the problem, defaults to `None`. :param output_stream: is a stream of strings where the planner writes his output (and also errors) while it is solving the problem; defaults to `None`. :param warm_start_plan: is a plan that the planner can use to warm start the search, defaults to `None`. :return: an iterator of `PlanGenerationResult` created by the planner. The only required parameter is `problem` but the planner should warn the user if `timeout` or `output_stream` are not `None` and the planner ignores them.""" assert isinstance(self, up.engines.engine.Engine) problem_kind = problem.kind if not self.skip_checks and not self.supports(problem_kind): msg = f"We cannot establish whether {self.name} can solve this problem!" if self.error_on_failed_checks: raise up.exceptions.UPUsageError(msg) else: warn(msg) if not problem_kind.has_quality_metrics() and self.optimality_metric_required: msg = f"The problem has no quality metrics but the engine is required to satisfies some optimality guarantee!" raise up.exceptions.UPUsageError(msg) try: kwargs.setdefault("warm_start_plan", warm_start_plan) yield from self._get_solutions_with_params( problem=problem, timeout=timeout, output_stream=output_stream, **kwargs, ) except _GetSolutionsWithParamsNotImplementedError: yield from self._get_solutions( problem=problem, timeout=timeout, output_stream=output_stream, ) def _get_solutions_with_params( self, problem: "up.model.AbstractProblem", timeout: Optional[float] = None, output_stream: Optional[IO[str]] = None, warm_start_plan: Optional["up.plans.Plan"] = None, **kwargs, ) -> Iterator["up.engines.results.PlanGenerationResult"]: """Method called by the AnytimePlannerMixin.get_solutions method.""" raise _GetSolutionsWithParamsNotImplementedError @abstractmethod def _get_solutions( self, problem: "up.model.AbstractProblem", timeout: Optional[float] = None, output_stream: Optional[IO[str]] = None, ) -> Iterator["up.engines.results.PlanGenerationResult"]: """ Method called by the AnytimePlannerMixin.get_solutions method. This method is deprecated in favor of `_get_solutions_with_params`. This method is kept for backward compatibility with older versions of UPF. If you are implementing a new planner, you should override this method and transfer the call to `_get_solutions_with_params`: ```python def _get_solutions(self, problem, timeout=None, output_stream=None): return self._get_solutions_with_params(problem, timeout, output_stream) ``` """ raise NotImplementedError