{ "cells": [ { "cell_type": "markdown", "id": "action-selector-00", "metadata": {}, "source": [ "# ActionSelector and Contingent Execution Environment\n", "\n", "This notebook demonstrates a custom `ActionSelector` and its interaction with a `SimulatedExecutionEnvironment`.\n", "\n", "[![Open In GitHub](https://img.shields.io/badge/see-Github-579aca?logo=github)](https:///github.com/aiplan4eu/unified-planning/blob/master/docs/notebooks/15-action-selector-and-execution-environment.ipynb)\n", "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/aiplan4eu/unified-planning/blob/master/docs/notebooks/15-action-selector-and-execution-environment.ipynb)\n" ] }, { "cell_type": "markdown", "id": "action-selector-01", "metadata": {}, "source": [ "## Setup\n", "\n", "If needed, install unified-planning and contingent dependencies." ] }, { "cell_type": "code", "execution_count": null, "id": "action-selector-02", "metadata": { "tags": [ "remove_from_CI" ] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Requirement already satisfied: unified-planning in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (1.2.0.311.dev1)\n", "Requirement already satisfied: pyparsing in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from unified-planning) (3.3.2)\n", "Requirement already satisfied: networkx in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from unified-planning) (3.6.1)\n", "Requirement already satisfied: ConfigSpace in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from unified-planning) (1.2.2)\n", "Requirement already satisfied: pddl in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from unified-planning) (0.4.5)\n", "Requirement already satisfied: pysmt in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from unified-planning) (0.9.6)\n", "Requirement already satisfied: numpy in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from ConfigSpace->unified-planning) (1.26.4)\n", "Requirement already satisfied: scipy in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from ConfigSpace->unified-planning) (1.17.0)\n", "Requirement already satisfied: typing_extensions in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from ConfigSpace->unified-planning) (4.15.0)\n", "Requirement already satisfied: more_itertools in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from ConfigSpace->unified-planning) (10.8.0)\n", "Requirement already satisfied: lark<1.2.0,>=1.1.5 in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from pddl->unified-planning) (1.1.9)\n", "Requirement already satisfied: click<9.0.0,>=8.1.3 in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (from pddl->unified-planning) (8.3.1)\n", "Note: you may need to restart the kernel to use updated packages.\n", "Requirement already satisfied: pysmt in /opt/homebrew/Caskroom/miniforge/base/envs/up/lib/python3.11/site-packages (0.9.6)\n", "Note: you may need to restart the kernel to use updated packages.\n" ] } ], "source": [ "%pip install unified-planning\n", "%pip install pysmt\n", "!yes | pysmt-install --z3" ] }, { "cell_type": "markdown", "id": "action-selector-03", "metadata": {}, "source": [ "## Imports" ] }, { "cell_type": "code", "execution_count": 2, "id": "action-selector-04", "metadata": {}, "outputs": [], "source": [ "from typing import Dict, Optional\n", "\n", "from unified_planning.environment import Environment\n", "from unified_planning.engines.engine import Engine\n", "from unified_planning.engines.mixins.action_selector import ActionSelectorMixin\n", "from unified_planning.model import Fluent, InstantaneousAction, Problem, ProblemKind\n", "from unified_planning.model.contingent import (\n", " ContingentProblem,\n", " SensingAction,\n", " SimulatedExecutionEnvironment,\n", ")" ] }, { "cell_type": "markdown", "id": "action-selector-05", "metadata": {}, "source": [ "## Define a custom ActionSelector\n", "\n", "An `ActionSelector` is responsible for selecting the next action to execute based on the current state of the environment. In this example, we will create a simple `ActionSelector` that always selects a specific action depending on the first fluent in the environment observation." ] }, { "cell_type": "code", "execution_count": 3, "id": "action-selector-06", "metadata": {}, "outputs": [], "source": [ "class DemoActionSelector(Engine, ActionSelectorMixin):\n", " def __init__(\n", " self,\n", " problem,\n", " error_on_failed_checks: bool = True,\n", " default_action: Optional[str] = None,\n", " true_action: Optional[str] = None,\n", " false_action: Optional[str] = None,\n", " **kwargs,\n", " ):\n", " Engine.__init__(self)\n", " self.error_on_failed_checks = error_on_failed_checks\n", " self._default_action = default_action\n", " self._true_action = true_action\n", " self._false_action = false_action\n", " self._next_action_name: Optional[str] = None\n", " ActionSelectorMixin.__init__(self, problem)\n", " if self._default_action is None:\n", " self._default_action = next(iter(problem.actions)).name\n", " self._next_action_name = self._default_action\n", "\n", " @property\n", " def name(self):\n", " return \"demo-action-selector\"\n", "\n", " @staticmethod\n", " def supported_kind() -> ProblemKind:\n", " return ProblemKind()\n", "\n", " @staticmethod\n", " def supports(problem_kind: ProblemKind) -> bool:\n", " return True\n", "\n", " def _get_action(self):\n", " assert self._next_action_name is not None\n", " return self._problem.action(self._next_action_name)()\n", "\n", " def _update(self, observation: Dict):\n", " if (\n", " not observation\n", " or self._true_action is None\n", " or self._false_action is None\n", " ):\n", " return\n", " observed_value = next(iter(observation.values()))\n", " if observed_value.is_bool_constant():\n", " self._next_action_name = (\n", " self._true_action\n", " if observed_value.bool_constant_value()\n", " else self._false_action\n", " )\n" ] }, { "cell_type": "markdown", "id": "action-selector-07", "metadata": {}, "source": [ "## Minimal ActionSelector Flow\n", "\n", "In this section, we demonstrate a minimal flow of using an `ActionSelector`. We create a simple problem with a single fluent which can be toggled by an action. The `ActionSelector` will always choose to flip the fluent value." ] }, { "cell_type": "code", "execution_count": 4, "id": "action-selector-08", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "set_true set_false\n" ] } ], "source": [ "env = Environment()\n", "problem = Problem(\"minimal_action_selector_problem\", env)\n", "flag = Fluent(\"flag\", environment=env)\n", "problem.add_fluent(flag, default_initial_value=False)\n", "\n", "set_true = InstantaneousAction(\"set_true\", _env=env)\n", "set_true.add_effect(flag, True)\n", "set_false = InstantaneousAction(\"set_false\", _env=env)\n", "set_false.add_effect(flag, False)\n", "problem.add_actions([set_true, set_false])\n", "\n", "env.factory.add_engine(\"demo-action-selector\", __name__, \"DemoActionSelector\")\n", "with env.factory.ActionSelector(\n", " problem,\n", " name=\"demo-action-selector\",\n", " params={\n", " \"default_action\": \"set_true\",\n", " \"true_action\": \"set_false\",\n", " \"false_action\": \"set_true\",\n", " },\n", ") as selector:\n", " first_action = selector.get_action()\n", " selector.update({flag(): env.expression_manager.TRUE()})\n", " second_action = selector.get_action()\n", "\n", "assert first_action.action.name == \"set_true\"\n", "assert second_action.action.name == \"set_false\"\n", "print(first_action, second_action)\n" ] }, { "cell_type": "markdown", "id": "action-selector-09", "metadata": {}, "source": [ "## End-to-end contingent closed loop\n", "\n", "In this section, we demonstrate an end-to-end flow of using a `SimulatedExecutionEnvironment` with the same `ActionSelector`. We create a parcel delivery problem where the agent must deliver a parcel, but it is initially unknown whether the parcel is a box or a bag. The `ActionSelector` will always choose first sense whether it is a box, and then choose the appropriate delivery action based on the observation." ] }, { "cell_type": "code", "execution_count": 6, "id": "action-selector-10", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Selected action: sense_box\n", "Observation: {is_box: true}\n", "Selected action: pick_box\n", "Observation: {}\n", "Goal reached: True\n" ] } ], "source": [ "env2 = Environment()\n", "problem2 = ContingentProblem(\"parcel_delivery\", environment=env2)\n", "\n", "is_box = Fluent(\"is_box\", environment=env2)\n", "delivered = Fluent(\"delivered\", environment=env2)\n", "problem2.add_fluent(is_box, default_initial_value=False)\n", "problem2.add_fluent(delivered, default_initial_value=False)\n", "problem2.add_unknown_initial_constraint(is_box)\n", "\n", "sense_box = SensingAction(\"sense_box\", _env=env2)\n", "sense_box.add_observed_fluent(is_box())\n", "\n", "pick_box = InstantaneousAction(\"pick_box\", _env=env2)\n", "pick_box.add_precondition(is_box())\n", "pick_box.add_effect(delivered, True)\n", "\n", "pick_bag = InstantaneousAction(\"pick_bag\", _env=env2)\n", "pick_bag.add_precondition(env2.expression_manager.Not(is_box()))\n", "pick_bag.add_effect(delivered, True)\n", "\n", "problem2.add_actions([sense_box, pick_box, pick_bag])\n", "problem2.add_goal(delivered())\n", "\n", "env2.factory.add_engine(\"demo-action-selector\", __name__, \"DemoActionSelector\")\n", "execution_env = SimulatedExecutionEnvironment(problem2, max_constraints=1)\n", "\n", "with env2.factory.ActionSelector(\n", " problem2,\n", " name=\"demo-action-selector\",\n", " params={\n", " \"default_action\": \"sense_box\",\n", " \"true_action\": \"pick_box\",\n", " \"false_action\": \"pick_bag\",\n", " },\n", ") as selector:\n", " for _ in range(2):\n", " action_instance = selector.get_action()\n", " print(\"Selected action:\", action_instance)\n", " observation = execution_env.apply(action_instance)\n", " print(\"Observation:\", observation)\n", " selector.update(observation)\n", " if execution_env.is_goal_reached():\n", " break\n", "\n", "assert execution_env.is_goal_reached()\n", "print(\"Goal reached:\", execution_env.is_goal_reached())\n" ] } ], "metadata": { "kernelspec": { "display_name": "up", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.14" } }, "nbformat": 4, "nbformat_minor": 5 }