From 9ee74f14ccbbaeae4e8703d675beed7d934e53d2 Mon Sep 17 00:00:00 2001 From: Ankur Malik Date: Thu, 4 Dec 2025 11:08:57 -0500 Subject: [PATCH] Update flow wrapper expected workflows and template --- expected_workflows/flow_hybrid_expected.py | 32 ++++++++++++++----- expected_workflows/flow_parallel_expected.py | 32 ++++++++++++++----- .../flow_sequential_expected.py | 32 ++++++++++++++----- templates/workflow_template.py.j2 | 22 ++++++++++--- 4 files changed, 90 insertions(+), 28 deletions(-) diff --git a/expected_workflows/flow_hybrid_expected.py b/expected_workflows/flow_hybrid_expected.py index 36e2e2e..96dd688 100644 --- a/expected_workflows/flow_hybrid_expected.py +++ b/expected_workflows/flow_hybrid_expected.py @@ -8,6 +8,18 @@ import re import jmespath from temporalio.exceptions import ApplicationError +# Allow jmespath/random inside Temporal's workflow sandbox so JSONPath +# evaluation for block inputs does not get blocked as non-deterministic. +try: # pragma: no cover - sandbox config + from temporalio.worker import workflow_sandbox + + # jmespath internally uses random.sample; both need to be allowed. + workflow_sandbox.allow_import("jmespath") + workflow_sandbox.allow_import("random") +except Exception: + # On older temporalio versions or outside worker context, this is a no-op. + pass + # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") @@ -38,15 +50,17 @@ class test_repo_test_branch_1234567890: # Prepare inputs input_params: Dict[str, Any] = {} try: - jsonpath_expr = jmespath.compile("a") - value = jsonpath_expr.search(root_inputs) + with temporalio.workflow.unsafe.sandbox_unrestricted(): + jsonpath_expr = jmespath.compile("a") + value = jsonpath_expr.search(root_inputs) input_params["a"] = value except Exception as e: logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}") input_params["a"] = None try: - jsonpath_expr = jmespath.compile("b") - value = jsonpath_expr.search(root_inputs) + with temporalio.workflow.unsafe.sandbox_unrestricted(): + jsonpath_expr = jmespath.compile("b") + value = jsonpath_expr.search(root_inputs) input_params["b"] = value except Exception as e: logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}") @@ -110,8 +124,9 @@ class test_repo_test_branch_1234567890: input_params: Dict[str, Any] = {} try: source_data = results.get("2", {}) - jsonpath_expr = jmespath.compile("sum") - value = jsonpath_expr.search(source_data) + with temporalio.workflow.unsafe.sandbox_unrestricted(): + jsonpath_expr = jmespath.compile("sum") + value = jsonpath_expr.search(source_data) input_params["sum"] = value except Exception as e: logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum' from node '2': {e}") @@ -175,8 +190,9 @@ class test_repo_test_branch_1234567890: input_params: Dict[str, Any] = {} try: source_data = results.get("2", {}) - jsonpath_expr = jmespath.compile("sum") - value = jsonpath_expr.search(source_data) + with temporalio.workflow.unsafe.sandbox_unrestricted(): + jsonpath_expr = jmespath.compile("sum") + value = jsonpath_expr.search(source_data) input_params["product"] = value except Exception as e: logger.error(f"Error parsing jsonpath 'sum' for parameter 'product' from node '2': {e}") diff --git a/expected_workflows/flow_parallel_expected.py b/expected_workflows/flow_parallel_expected.py index 2f7e008..f6cfce0 100644 --- a/expected_workflows/flow_parallel_expected.py +++ b/expected_workflows/flow_parallel_expected.py @@ -8,6 +8,18 @@ import re import jmespath from temporalio.exceptions import ApplicationError +# Allow jmespath/random inside Temporal's workflow sandbox so JSONPath +# evaluation for block inputs does not get blocked as non-deterministic. +try: # pragma: no cover - sandbox config + from temporalio.worker import workflow_sandbox + + # jmespath internally uses random.sample; both need to be allowed. + workflow_sandbox.allow_import("jmespath") + workflow_sandbox.allow_import("random") +except Exception: + # On older temporalio versions or outside worker context, this is a no-op. + pass + # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") @@ -38,15 +50,17 @@ class test_repo_test_branch_1234567890: # Prepare inputs input_params: Dict[str, Any] = {} try: - jsonpath_expr = jmespath.compile("a") - value = jsonpath_expr.search(root_inputs) + with temporalio.workflow.unsafe.sandbox_unrestricted(): + jsonpath_expr = jmespath.compile("a") + value = jsonpath_expr.search(root_inputs) input_params["a"] = value except Exception as e: logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}") input_params["a"] = None try: - jsonpath_expr = jmespath.compile("b") - value = jsonpath_expr.search(root_inputs) + with temporalio.workflow.unsafe.sandbox_unrestricted(): + jsonpath_expr = jmespath.compile("b") + value = jsonpath_expr.search(root_inputs) input_params["b"] = value except Exception as e: logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}") @@ -109,8 +123,9 @@ class test_repo_test_branch_1234567890: # Prepare inputs input_params: Dict[str, Any] = {} try: - jsonpath_expr = jmespath.compile("sum") - value = jsonpath_expr.search(root_inputs) + with temporalio.workflow.unsafe.sandbox_unrestricted(): + jsonpath_expr = jmespath.compile("sum") + value = jsonpath_expr.search(root_inputs) input_params["sum"] = value except Exception as e: logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum': {e}") @@ -173,8 +188,9 @@ class test_repo_test_branch_1234567890: # Prepare inputs input_params: Dict[str, Any] = {} try: - jsonpath_expr = jmespath.compile("product") - value = jsonpath_expr.search(root_inputs) + with temporalio.workflow.unsafe.sandbox_unrestricted(): + jsonpath_expr = jmespath.compile("product") + value = jsonpath_expr.search(root_inputs) input_params["product"] = value except Exception as e: logger.error(f"Error parsing jsonpath 'product' for parameter 'product': {e}") diff --git a/expected_workflows/flow_sequential_expected.py b/expected_workflows/flow_sequential_expected.py index 3559ece..3ff3780 100644 --- a/expected_workflows/flow_sequential_expected.py +++ b/expected_workflows/flow_sequential_expected.py @@ -8,6 +8,18 @@ import re import jmespath from temporalio.exceptions import ApplicationError +# Allow jmespath/random inside Temporal's workflow sandbox so JSONPath +# evaluation for block inputs does not get blocked as non-deterministic. +try: # pragma: no cover - sandbox config + from temporalio.worker import workflow_sandbox + + # jmespath internally uses random.sample; both need to be allowed. + workflow_sandbox.allow_import("jmespath") + workflow_sandbox.allow_import("random") +except Exception: + # On older temporalio versions or outside worker context, this is a no-op. + pass + # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") @@ -38,15 +50,17 @@ class test_repo_test_branch_1234567890: # Prepare inputs input_params: Dict[str, Any] = {} try: - jsonpath_expr = jmespath.compile("a") - value = jsonpath_expr.search(root_inputs) + with temporalio.workflow.unsafe.sandbox_unrestricted(): + jsonpath_expr = jmespath.compile("a") + value = jsonpath_expr.search(root_inputs) input_params["a"] = value except Exception as e: logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}") input_params["a"] = None try: - jsonpath_expr = jmespath.compile("b") - value = jsonpath_expr.search(root_inputs) + with temporalio.workflow.unsafe.sandbox_unrestricted(): + jsonpath_expr = jmespath.compile("b") + value = jsonpath_expr.search(root_inputs) input_params["b"] = value except Exception as e: logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}") @@ -110,8 +124,9 @@ class test_repo_test_branch_1234567890: input_params: Dict[str, Any] = {} try: source_data = results.get("2", {}) - jsonpath_expr = jmespath.compile("sum") - value = jsonpath_expr.search(source_data) + with temporalio.workflow.unsafe.sandbox_unrestricted(): + jsonpath_expr = jmespath.compile("sum") + value = jsonpath_expr.search(source_data) input_params["sum"] = value except Exception as e: logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum' from node '2': {e}") @@ -175,8 +190,9 @@ class test_repo_test_branch_1234567890: input_params: Dict[str, Any] = {} try: source_data = results.get("m3aiq7ixuo6du35h8tr", {}) - jsonpath_expr = jmespath.compile("product") - value = jsonpath_expr.search(source_data) + with temporalio.workflow.unsafe.sandbox_unrestricted(): + jsonpath_expr = jmespath.compile("product") + value = jsonpath_expr.search(source_data) input_params["product"] = value except Exception as e: logger.error(f"Error parsing jsonpath 'product' for parameter 'product' from node 'm3aiq7ixuo6du35h8tr': {e}") diff --git a/templates/workflow_template.py.j2 b/templates/workflow_template.py.j2 index 4f8dd1b..c774483 100644 --- a/templates/workflow_template.py.j2 +++ b/templates/workflow_template.py.j2 @@ -8,6 +8,18 @@ import re import jmespath from temporalio.exceptions import ApplicationError +# Allow jmespath/random inside Temporal's workflow sandbox so JSONPath +# evaluation for block inputs does not get blocked as non-deterministic. +try: # pragma: no cover - sandbox config + from temporalio.worker import workflow_sandbox + + # jmespath internally uses random.sample; both need to be allowed. + workflow_sandbox.allow_import("jmespath") + workflow_sandbox.allow_import("random") +except Exception: + # On older temporalio versions or outside worker context, this is a no-op. + pass + # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") @@ -52,8 +64,9 @@ class {{ workflow_class_name }}: {%- set source = details.get("source", "") %} {%- if source and source.startswith("$root.") %} try: - jsonpath_expr = jmespath.compile("{{ source[6:] }}") - value = jsonpath_expr.search(root_inputs) + with temporalio.workflow.unsafe.sandbox_unrestricted(): + jsonpath_expr = jmespath.compile("{{ source[6:] }}") + value = jsonpath_expr.search(root_inputs) input_params["{{ param }}"] = value except Exception as e: logger.error(f"Error parsing jsonpath '{{ source[6:] }}' for parameter '{{ param }}': {e}") @@ -64,8 +77,9 @@ class {{ workflow_class_name }}: {%- set source_path = source_parts[1] if source_parts|length > 1 else '' %} try: source_data = results.get("{{ source_node_id }}", {}) - jsonpath_expr = jmespath.compile("{{ source_path }}") - value = jsonpath_expr.search(source_data) + with temporalio.workflow.unsafe.sandbox_unrestricted(): + jsonpath_expr = jmespath.compile("{{ source_path }}") + value = jsonpath_expr.search(source_data) input_params["{{ param }}"] = value except Exception as e: logger.error(f"Error parsing jsonpath '{{ source_path }}' for parameter '{{ param }}' from node '{{ source_node_id }}': {e}")