system/block_wrapper.py

163 lines
5.7 KiB
Python
Raw Normal View History

2025-02-05 17:51:00 +00:00
import importlib
import json
import asyncio
import logging
import os
import re
import sys
from temporalio import activity
2025-02-25 13:16:47 +00:00
from temporalio.exceptions import ApplicationError
2025-02-05 17:51:00 +00:00
from jsonschema import validate, ValidationError
from temporalio.client import Client
from temporalio.worker import Worker
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Automatically determine if in a test environment
IS_TEST_ENVIRONMENT = "unittest" in sys.modules
# Retrieve environment variables
REPO_NAME = os.getenv('REPO_NAME')
BRANCH_NAME = os.getenv('BRANCH_NAME')
COMMIT_ID = os.getenv('VERSION')
NAMESPACE = os.getenv('NAMESPACE')
FLOWX_ENGINE_ADDRESS = os.getenv('FLOWX_ENGINE_ADDRESS')
2025-02-25 13:16:47 +00:00
if not BRANCH_NAME or not COMMIT_ID or not NAMESPACE or not FLOWX_ENGINE_ADDRESS:
2025-02-05 17:51:00 +00:00
raise ValueError("Environment variables BRANCH_NAME, VERSION, BRANCH_NAME, NAMESPACE and FLOWX_ENGINE_ADDRESS must be set.")
# Shorten the commit ID to the first 10 characters
COMMIT_ID_SHORT = COMMIT_ID[:10]
# Sanitize flow name and commit ID to create a valid task queue name
def sanitize_name(name):
# Replace non-alphanumeric characters or invalid start with underscores
sanitized = re.sub(r'\W|^(?=\d)', '_', name)
# Replace multiple consecutive underscores with a single underscore
sanitized = re.sub(r'_+', '_', sanitized)
# Remove trailing underscores
return sanitized.strip('_')
BLOCK_NAME = REPO_NAME + "_" + BRANCH_NAME
block_name_safe = sanitize_name(BLOCK_NAME)
commit_id_safe = sanitize_name(COMMIT_ID_SHORT)
2025-02-25 13:16:47 +00:00
2025-02-05 17:51:00 +00:00
# Construct the task queue name
TASK_QUEUE = f"{block_name_safe}_{commit_id_safe}"
# Load schemas for input validation and output validation
def load_schema(schema_path):
try:
with open(schema_path, 'r') as schema_file:
return json.load(schema_file)
except Exception as e:
logger.error("Failed to load schema from %s: %s", schema_path, e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"Schema loading failed: {e}")
else:
raise ValueError(f"Schema loading failed: {e}")
# Load block.py dynamically and get the main function
def load_block_main():
try:
block_module = importlib.import_module("block")
if not hasattr(block_module, "__main__"):
raise AttributeError("The module block.py does not have a __main__ function")
logger.info("Successfully loaded __main__ function from block.py")
return block_module.__main__
except ImportError as e:
logger.error("Failed to import block.py: %s", e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"block.py import failed: {e}")
else:
raise ValueError(f"block.py import failed: {e}")
except AttributeError as e:
logger.error("block.py does not contain a __main__ function: %s", e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"__main__ function not found in block.py: {e}")
else:
raise ValueError(f"__main__ function not found in block.py: {e}")
# Validate input data against request schema
def validate_input(input_data):
request_schema = load_schema("/app/request_schema.json")
try:
validate(instance=input_data, schema=request_schema)
logger.info("Input data validated successfully")
except ValidationError as e:
logger.error("Input validation failed: %s", e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"Input validation error: {e}")
else:
raise ValueError(f"Input validation error: {e}")
# Validate output data against response schema
def validate_output(output_data):
response_schema = load_schema("/app/response_schema.json")
try:
validate(instance=output_data, schema=response_schema)
logger.info("Output data validated successfully")
except ValidationError as e:
logger.error("Output validation failed: %s", e)
if not IS_TEST_ENVIRONMENT:
raise ApplicationError(f"Output validation error: {e}")
else:
raise ValueError(f"Output validation error: {e}")
# Registering activity
@activity.defn
async def block_main_activity(input_data):
"""
Wraps the main function from block.py as an activity.
Validates input data against request schema and output data against response schema.
"""
# Validate the input
validate_input(input_data)
# Load the main function and call it with validated input
main_func = load_block_main()
try:
# Pass input data as keyword arguments to the main function
result = main_func(**input_data)
logger.info("block.py executed successfully with result: %s", result)
# Validate output against response schema
validate_output(result)
return result
except Exception as e:
logger.error("Error executing block.py: %s", e)
if not IS_TEST_ENVIRONMENT:
2025-02-25 13:16:47 +00:00
raise ApplicationError(f"Error during block execution: {e}") from e
2025-02-05 17:51:00 +00:00
else:
raise RuntimeError("Error during block.py execution") from e
# Worker function
async def main():
"""
Initialize and run the worker with the activity.
"""
try:
client = await Client.connect(FLOWX_ENGINE_ADDRESS, namespace=NAMESPACE)
worker = Worker(
client,
task_queue=TASK_QUEUE,
activities=[block_main_activity],
)
logger.info("Worker starting, listening to task queue: %s", TASK_QUEUE)
await worker.run()
except Exception as e:
logger.critical("Worker failed to start: %s", e)
raise
# Main function to initialize worker
if __name__ == "__main__":
# Run the worker
asyncio.run(main())