system/expected_workflows/flow_parallel_expected.py
Ankur Malik 9ee74f14cc
All checks were successful
CI Workflow / Testing the Flow (push) Successful in 15s
CI Workflow / Containerize the Flow (push) Successful in 37s
Update flow wrapper expected workflows and template
2025-12-04 11:08:57 -05:00

271 lines
14 KiB
Python

import temporalio.workflow
from typing import Any, Dict, List, Callable, Awaitable
import logging
import asyncio
import json
import datetime
import re
import jmespath
from temporalio.exceptions import ApplicationError
# Allow jmespath/random inside Temporal's workflow sandbox so JSONPath
# evaluation for block inputs does not get blocked as non-deterministic.
try: # pragma: no cover - sandbox config
from temporalio.worker import workflow_sandbox
# jmespath internally uses random.sample; both need to be allowed.
workflow_sandbox.allow_import("jmespath")
workflow_sandbox.allow_import("random")
except Exception:
# On older temporalio versions or outside worker context, this is a no-op.
pass
# Configure logging
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
@temporalio.workflow.defn
class test_repo_test_branch_1234567890:
@temporalio.workflow.run
async def run(self, root_inputs: Dict[str, Any]) -> Dict[str, Any]:
workflow_info = temporalio.workflow.info()
workflow_output: Dict[str, Any] = {
"workflow_id": workflow_info.workflow_id,
"run_id": workflow_info.run_id,
"name": "test_repo_test_branch_1234567890",
"status": "in_progress",
"blocks": [],
"root_input": root_inputs
}
try:
# Initialize results
results: Dict[str, Any] = {}
# Define task functions
task_functions: Dict[str, Callable[[], Awaitable[Any]]] = {}
async def task_2():
node_id = "2"
block_name = "addition"
# Prepare inputs
input_params: Dict[str, Any] = {}
try:
with temporalio.workflow.unsafe.sandbox_unrestricted():
jsonpath_expr = jmespath.compile("a")
value = jsonpath_expr.search(root_inputs)
input_params["a"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'a' for parameter 'a': {e}")
input_params["a"] = None
try:
with temporalio.workflow.unsafe.sandbox_unrestricted():
jsonpath_expr = jmespath.compile("b")
value = jsonpath_expr.search(root_inputs)
input_params["b"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'b' for parameter 'b': {e}")
input_params["b"] = None
logger.info(f"Starting 'addition' activity on task queue 'blocks_transformer_addition_97ec9e0d50' with inputs: %s", input_params)
try:
# Convert timeouts and intervals from milliseconds to seconds
schedule_to_close_timeout_value_ms = 0
start_to_close_timeout_value_ms = 0
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
initial_interval_value_ms = 1000
maximum_interval_value_ms = 100000
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
maximum_attempts_value = 0
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
result = await temporalio.workflow.execute_activity(
"block_main_activity",
input_params,
schedule_to_close_timeout=schedule_to_close_timeout,
start_to_close_timeout=start_to_close_timeout,
task_queue="blocks_transformer_addition_97ec9e0d50",
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=maximum_attempts,
initial_interval=initial_interval,
backoff_coefficient=2,
maximum_interval=maximum_interval
)
)
logger.info(f"Completed 'addition' activity with result: %s", result)
block_status = "completed"
block_error = None
results[node_id] = result
except Exception as e:
logger.error(f"Activity 'addition' failed with error: {e}")
result = None
block_status = "failed"
block_error = {
"code": type(e).__name__,
"description": str(e),
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
}
workflow_output["status"] = "failed"
# Collect block output
workflow_output["blocks"].append({
"activity_id": node_id,
"name": block_name,
"status": block_status,
"input": input_params,
"result": result,
"error": block_error
})
task_functions["2"] = task_2
async def task_m3aiq7ixuo6du35h8tr():
node_id = "m3aiq7ixuo6du35h8tr"
block_name = "multiply"
# Prepare inputs
input_params: Dict[str, Any] = {}
try:
with temporalio.workflow.unsafe.sandbox_unrestricted():
jsonpath_expr = jmespath.compile("sum")
value = jsonpath_expr.search(root_inputs)
input_params["sum"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'sum' for parameter 'sum': {e}")
input_params["sum"] = None
logger.info(f"Starting 'multiply' activity on task queue 'blocks_transformer_multiply_db086f09c9' with inputs: %s", input_params)
try:
# Convert timeouts and intervals from milliseconds to seconds
schedule_to_close_timeout_value_ms = 0
start_to_close_timeout_value_ms = 0
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
initial_interval_value_ms = 1000
maximum_interval_value_ms = 100000
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
maximum_attempts_value = 0
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
result = await temporalio.workflow.execute_activity(
"block_main_activity",
input_params,
schedule_to_close_timeout=schedule_to_close_timeout,
start_to_close_timeout=start_to_close_timeout,
task_queue="blocks_transformer_multiply_db086f09c9",
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=maximum_attempts,
initial_interval=initial_interval,
backoff_coefficient=2,
maximum_interval=maximum_interval
)
)
logger.info(f"Completed 'multiply' activity with result: %s", result)
block_status = "completed"
block_error = None
results[node_id] = result
except Exception as e:
logger.error(f"Activity 'multiply' failed with error: {e}")
result = None
block_status = "failed"
block_error = {
"code": type(e).__name__,
"description": str(e),
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
}
workflow_output["status"] = "failed"
# Collect block output
workflow_output["blocks"].append({
"activity_id": node_id,
"name": block_name,
"status": block_status,
"input": input_params,
"result": result,
"error": block_error
})
task_functions["m3aiq7ixuo6du35h8tr"] = task_m3aiq7ixuo6du35h8tr
async def task_m3aiqkrv4k1y6654ymr():
node_id = "m3aiqkrv4k1y6654ymr"
block_name = "power"
# Prepare inputs
input_params: Dict[str, Any] = {}
try:
with temporalio.workflow.unsafe.sandbox_unrestricted():
jsonpath_expr = jmespath.compile("product")
value = jsonpath_expr.search(root_inputs)
input_params["product"] = value
except Exception as e:
logger.error(f"Error parsing jsonpath 'product' for parameter 'product': {e}")
input_params["product"] = None
logger.info(f"Starting 'power' activity on task queue 'blocks_transformer_power_057645be0c' with inputs: %s", input_params)
try:
# Convert timeouts and intervals from milliseconds to seconds
schedule_to_close_timeout_value_ms = 0
start_to_close_timeout_value_ms = 0
schedule_to_close_timeout = None if schedule_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=schedule_to_close_timeout_value_ms / 1000.0)
start_to_close_timeout = None if start_to_close_timeout_value_ms == 0 else datetime.timedelta(seconds=start_to_close_timeout_value_ms / 1000.0)
initial_interval_value_ms = 1000
maximum_interval_value_ms = 100000
initial_interval = datetime.timedelta(seconds=initial_interval_value_ms / 1000.0)
maximum_interval = datetime.timedelta(seconds=maximum_interval_value_ms / 1000.0)
maximum_attempts_value = 0
maximum_attempts = None if maximum_attempts_value == 0 else maximum_attempts_value
result = await temporalio.workflow.execute_activity(
"block_main_activity",
input_params,
schedule_to_close_timeout=schedule_to_close_timeout,
start_to_close_timeout=start_to_close_timeout,
task_queue="blocks_transformer_power_057645be0c",
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=maximum_attempts,
initial_interval=initial_interval,
backoff_coefficient=2,
maximum_interval=maximum_interval
)
)
logger.info(f"Completed 'power' activity with result: %s", result)
block_status = "completed"
block_error = None
results[node_id] = result
except Exception as e:
logger.error(f"Activity 'power' failed with error: {e}")
result = None
block_status = "failed"
block_error = {
"code": type(e).__name__,
"description": str(e),
"details": {"cause": str(getattr(e, "cause", "No additional details"))}
}
workflow_output["status"] = "failed"
# Collect block output
workflow_output["blocks"].append({
"activity_id": node_id,
"name": block_name,
"status": block_status,
"input": input_params,
"result": result,
"error": block_error
})
task_functions["m3aiqkrv4k1y6654ymr"] = task_m3aiqkrv4k1y6654ymr
# Execute tasks according to execution steps
# Execution step 1
tasks = [task_functions[node_id]() for node_id in ['2', 'm3aiq7ixuo6du35h8tr', 'm3aiqkrv4k1y6654ymr']]
results_step = await asyncio.gather(*tasks, return_exceptions=True)
for result in results_step:
if isinstance(result, Exception):
logger.error(f"Task failed with exception: {result}")
workflow_output["status"] = "failed"
# Update workflow status to completed if not failed
if workflow_output["status"] != "failed":
workflow_output["status"] = "completed"
else:
raise ApplicationError("Activity error occurred", type="ActivityError", non_retryable=True)
return workflow_output
except Exception as e:
logger.error(f"Workflow failed with error: {e}")
workflow_output["status"] = "failed"
raise temporalio.exceptions.ApplicationError("Workflow failed",workflow_output,str(e),type="WorkflowError",non_retryable=True) from e