Source code for securehst_webhook_notifier.main

import re
from collections.abc import Callable
from datetime import datetime
from functools import wraps
from typing import Any

import requests

try:
    from prefect import flow
    from prefect.states import StateType

    PREFECT_AVAILABLE = True
except ImportError:
    PREFECT_AVAILABLE = False
    flow = None
    StateType = None


[docs] def notify_webhook( webhook_url: str, func_identifier: str, platform: str = "mattermost", user_id: str | None = None, custom_message: str | None = None, ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """ Decorator that sends start, end, and error notifications via a webhook. Args: webhook_url (str): The webhook URL to send notifications to. func_identifier (str): A string identifier representing the function being decorated. platform (str, optional): Messaging platform type: "mattermost", "slack", or "discord". Defaults to "mattermost". user_id (Optional[str], optional): User ID or username to mention on errors. Platform-specific formatting is applied. Defaults to None. custom_message (Optional[str], optional): Optional custom message to include. Defaults to None. Returns: Callable: A wrapped function with webhook notifications. """ platform = platform.lower() if platform not in {"mattermost", "slack", "discord"}: raise ValueError(f"Unsupported platform '{platform}'. Supported platforms are: mattermost, slack, discord.") def decorator(func: Callable[..., Any]) -> Callable[..., Any]: @wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: start_time = datetime.now() start_message = ( f"⏳ Automation has started.\n" f"Start Time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n" f"Function Caller: {func_identifier}" ) send_webhook_message(webhook_url, start_message, platform) try: result = func(*args, **kwargs) end_time = datetime.now() duration = end_time - start_time custom_message_str = f"\nReturn Message: {result}" if result else "" end_message = ( f"✅ Automation has completed successfully.\n" f"Start Time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n" f"End Time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n" f"Duration: {duration}\n" f"Function Caller: {func_identifier}" f"{custom_message_str}" ) send_webhook_message(webhook_url, end_message, platform) return result except Exception as err: end_time = datetime.now() duration = end_time - start_time error_message = str(err) # Attempt to clean long SQL errors if detected if "SQL: " in error_message: error_message = re.sub(r"\[SQL: .*?\]", "", error_message).strip() # User mention formatting per platform user_mention = "" if user_id: if platform == "slack": user_mention = f"<@{user_id}> " elif platform in {"mattermost", "discord"}: user_mention = f"@{user_id} " error_message_text = ( f"{user_mention}\n" f"🆘 Automation has crashed.\n" f"Start Time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n" f"End Time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n" f"Duration: {duration}\n" f"Function Caller: {func_identifier}\n" f"Error: {error_message}" ) send_webhook_message(webhook_url, error_message_text, platform) raise err return wrapper return decorator
[docs] def send_webhook_message(webhook_url: str, message: str, platform: str) -> None: """ Sends a formatted message to the specified webhook URL. Args: webhook_url (str): The destination webhook URL. message (str): The message content to send. platform (str): Platform type to determine payload structure ("mattermost", "slack", "discord"). Raises: ValueError: If the platform is unsupported. requests.RequestException: If the HTTP request fails. """ platform = platform.lower() if platform == "discord": payload = {"content": message} elif platform in {"mattermost", "slack"}: payload = {"text": message} else: raise ValueError(f"Unsupported platform '{platform}' for webhook messaging.") try: response = requests.post(webhook_url, json=payload, timeout=10) response.raise_for_status() except requests.RequestException as e: print(f"Failed to send webhook notification to {platform}: {e}") raise
[docs] def send_prefect_notification(webhook_url: str, message: str, platform: str = "mattermost") -> None: """ Send a notification to the webhook URL with error swallowing. Args: webhook_url (str): The destination webhook URL. message (str): The message content to send. platform (str): Platform type for payload structure ("mattermost", "slack", "discord"). """ platform = platform.lower() if platform == "discord": payload = {"content": message} else: payload = {"text": message} try: response = requests.post(webhook_url, json=payload, timeout=10) response.raise_for_status() except requests.RequestException as e: # Swallow errors to prevent affecting flow state print(f"Failed to send Prefect webhook notification: {e}")
[docs] def send_start_notification( webhook_url: str, display_name: str, start_time: datetime, platform: str = "mattermost", start_message: str | None = None, ) -> None: """Send flow start notification with timestamp.""" if start_message: message = start_message else: message = ( f"⏳ Automation has started.\n" f"Start Time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n" f"Function Caller: {display_name}" ) send_prefect_notification(webhook_url, message, platform)
[docs] def create_state_hooks( webhook_url: str, display_name: str, user_id: str | None, silent_success: bool, start_time_holder: dict, platform: str = "mattermost", success_message: str | None = None, failure_message: str | None = None, ) -> dict: """Create state change hooks for Prefect flow lifecycle notifications.""" if not PREFECT_AVAILABLE: return {} def on_completion_hook(flow, flow_run, state): end_time = datetime.now() start_time = start_time_holder.get("start_time", end_time) duration = end_time - start_time # Try to get result from state result = None try: result = state.result(raise_on_failure=False) if hasattr(state, "result") else None except Exception: result = None return_msg = f"\nReturn Message: {result}" if result else "" if success_message: message = success_message else: message = ( f"✅ Automation has completed successfully.\n" f"Start Time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n" f"End Time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n" f"Duration: {duration}\n" f"Function Caller: {display_name}" f"{return_msg}" ) if not silent_success and user_id: if platform == "slack": message = f"<@{user_id}>\n{message}" else: message = f"@{user_id}\n{message}" send_prefect_notification(webhook_url, message, platform) def on_failure_hook(flow, flow_run, state): end_time = datetime.now() start_time = start_time_holder.get("start_time", end_time) duration = end_time - start_time # Determine failure type based on state failure_type = "failed" if state.type == StateType.CRASHED: failure_type = "crashed" elif state.type == StateType.CANCELLED: failure_type = "was cancelled" elif state.type == StateType.CANCELLING: failure_type = "is being cancelled" # Extract error message from state error_message = "" if state.message: error_message = str(state.message) # Clean SQL errors if "SQL: " in error_message: error_message = re.sub(r"\[SQL: .*?\]", "", error_message).strip() # Format user mention based on platform user_mention = "" if user_id: if platform == "slack": user_mention = f"<@{user_id}>\n" else: user_mention = f"@{user_id}\n" if failure_message: message = f"{user_mention}{failure_message}" else: message = ( f"{user_mention}" f"🆘 Automation has {failure_type}.\n" f"Start Time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n" f"End Time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n" f"Duration: {duration}\n" f"Function Caller: {display_name}" ) if error_message: message += f"\nError: {error_message}" send_prefect_notification(webhook_url, message, platform) return { "on_completion": [on_completion_hook], "on_failure": [on_failure_hook], "on_crashed": [on_failure_hook], "on_cancellation": [on_failure_hook], }
[docs] def prefect_notify_webhook( webhook_url: str, display_name: str, user_id: str | None = None, silent_success: bool = True, platform: str = "mattermost", start_message: str | None = None, success_message: str | None = None, failure_message: str | None = None, ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """ Decorator to add comprehensive Prefect flow notifications. This decorator should be applied BEFORE the @flow decorator to ensure proper integration with Prefect's state management system. Args: webhook_url (str): The webhook URL to send notifications to. display_name (str): Human readable name for the flow (e.g., "D. Miller & Associates - dmiller-etl"). user_id (Optional[str], optional): User to mention on failures (e.g., "securehst"). Defaults to None. silent_success (bool, optional): If True, success notifications won't mention users. Defaults to True. platform (str, optional): Messaging platform type: "mattermost", "slack", or "discord". Defaults to "mattermost". start_message (Optional[str], optional): Custom start message. Defaults to detailed message with start time. success_message (Optional[str], optional): Custom success message. Defaults to detailed message with timing info. failure_message (Optional[str], optional): Custom failure message. Defaults to detailed message with timing and error info. Returns: Callable: A wrapped function with Prefect webhook notifications. Raises: ImportError: If Prefect is not available. Example: @prefect_notify_webhook( webhook_url="https://mattermost.example.com/hooks/abc123", display_name="ETL Pipeline", user_id="admin" ) @flow def my_etl_flow(): pass """ if not PREFECT_AVAILABLE: raise ImportError("Prefect is required to use prefect_notify_webhook. Install with: pip install prefect>=3.0.0") # Shared dict to pass start time from wrapper to hooks start_time_holder: dict = {} def decorator(func: Callable[..., Any]) -> Callable[..., Any]: # Check if the function is already a Prefect flow if hasattr(func, "with_options"): # Function is already a flow, add hooks to it hooks = create_state_hooks( webhook_url, display_name, user_id, silent_success, start_time_holder, platform, success_message, failure_message, ) # Send start notification when flow starts original_fn = func.fn if hasattr(func, "fn") else func @wraps(original_fn) def wrapper_with_start_notification(*args: Any, **kwargs: Any) -> Any: start_time_holder["start_time"] = datetime.now() send_start_notification( webhook_url, display_name, start_time_holder["start_time"], platform, start_message ) return original_fn(*args, **kwargs) # Replace the flow's underlying function directly func.fn = wrapper_with_start_notification # Apply hooks (without fn= parameter which is not supported) return func.with_options(**hooks) else: # Function is not a flow yet, create a regular wrapper @wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: start_time_holder["start_time"] = datetime.now() send_start_notification( webhook_url, display_name, start_time_holder["start_time"], platform, start_message ) return func(*args, **kwargs) # Store webhook config for when this becomes a flow wrapper._webhook_config = { "webhook_url": webhook_url, "display_name": display_name, "user_id": user_id, "silent_success": silent_success, "platform": platform, "success_message": success_message, "failure_message": failure_message, "start_time_holder": start_time_holder, } return wrapper return decorator