Skip to content

Slack

zenml.integrations.slack special

Slack integration for alerter components.

SlackIntegration (Integration)

Definition of a Slack integration for ZenML.

Implemented using Slack SDK.

Source code in zenml/integrations/slack/__init__.py
class SlackIntegration(Integration):
    """Definition of a Slack integration for ZenML.

    Implemented using [Slack SDK](https://pypi.org/project/slack-sdk/).
    """

    NAME = SLACK
    REQUIREMENTS = ["slack-sdk>=3.16.1", "aiohttp>=3.8.1"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Slack integration.

        Returns:
            List of new flavors defined by the Slack integration.
        """
        from zenml.integrations.slack.flavors import SlackAlerterFlavor

        return [SlackAlerterFlavor]

flavors() classmethod

Declare the stack component flavors for the Slack integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of new flavors defined by the Slack integration.

Source code in zenml/integrations/slack/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Slack integration.

    Returns:
        List of new flavors defined by the Slack integration.
    """
    from zenml.integrations.slack.flavors import SlackAlerterFlavor

    return [SlackAlerterFlavor]

alerters special

Alerter components defined by the Slack integration.

slack_alerter

Implementation for slack flavor of alerter component.

SlackAlerter (BaseAlerter)

Send messages to Slack channels.

Source code in zenml/integrations/slack/alerters/slack_alerter.py
class SlackAlerter(BaseAlerter):
    """Send messages to Slack channels."""

    @property
    def config(self) -> SlackAlerterConfig:
        """Returns the `SlackAlerterConfig` config.

        Returns:
            The configuration.
        """
        return cast(SlackAlerterConfig, self._config)

    def _get_channel_id(
        self, params: Optional[BaseAlerterStepParameters]
    ) -> str:
        """Get the Slack channel ID to be used by post/ask.

        Args:
            params: Optional parameters.

        Returns:
            ID of the Slack channel to be used.

        Raises:
            RuntimeError: if config is not of type `BaseAlerterStepConfig`.
            ValueError: if a slack channel was neither defined in the config
                nor in the slack alerter component.
        """
        if not isinstance(params, BaseAlerterStepParameters):
            raise RuntimeError(
                "The config object must be of type `BaseAlerterStepParameters`."
            )
        if (
            isinstance(params, SlackAlerterParameters)
            and hasattr(params, "slack_channel_id")
            and params.slack_channel_id is not None
        ):
            return params.slack_channel_id
        if self.config.default_slack_channel_id is not None:
            return self.config.default_slack_channel_id
        raise ValueError(
            "Neither the `SlackAlerterConfig.slack_channel_id` in the runtime "
            "configuration, nor the `default_slack_channel_id` in the alerter "
            "stack component is specified. Please specify at least one."
        )

    def _get_approve_msg_options(
        self, params: Optional[BaseAlerterStepParameters]
    ) -> List[str]:
        """Define which messages will lead to approval during ask().

        Args:
            params: Optional parameters.

        Returns:
            Set of messages that lead to approval in alerter.ask().
        """
        if (
            isinstance(params, SlackAlerterParameters)
            and hasattr(params, "approve_msg_options")
            and params.approve_msg_options is not None
        ):
            return params.approve_msg_options
        return DEFAULT_APPROVE_MSG_OPTIONS

    def _get_disapprove_msg_options(
        self, params: Optional[BaseAlerterStepParameters]
    ) -> List[str]:
        """Define which messages will lead to disapproval during ask().

        Args:
            params: Optional parameters.

        Returns:
            Set of messages that lead to disapproval in alerter.ask().
        """
        if (
            isinstance(params, SlackAlerterParameters)
            and hasattr(params, "disapprove_msg_options")
            and params.disapprove_msg_options is not None
        ):
            return params.disapprove_msg_options
        return DEFAULT_DISAPPROVE_MSG_OPTIONS

    def post(
        self, message: str, params: Optional[BaseAlerterStepParameters]
    ) -> bool:
        """Post a message to a Slack channel.

        Args:
            message: Message to be posted.
            params: Optional parameters.

        Returns:
            True if operation succeeded, else False
        """
        slack_channel_id = self._get_channel_id(params=params)
        client = WebClient(token=self.config.slack_token)
        try:
            response = client.chat_postMessage(
                channel=slack_channel_id,
                text=message,
            )
            return True
        except SlackApiError as error:
            response = error.response["error"]
            logger.error(f"SlackAlerter.post() failed: {response}")
            return False

    def ask(
        self, message: str, params: Optional[BaseAlerterStepParameters]
    ) -> bool:
        """Post a message to a Slack channel and wait for approval.

        Args:
            message: Initial message to be posted.
            params: Optional parameters.

        Returns:
            True if a user approved the operation, else False
        """
        rtm = RTMClient(token=self.config.slack_token)
        slack_channel_id = self._get_channel_id(params=params)

        approved = False  # will be modified by handle()

        @RTMClient.run_on(event="hello")  # type: ignore
        def post_initial_message(**payload: Any) -> None:
            """Post an initial message in a channel and start listening.

            Args:
                payload: payload of the received Slack event.
            """
            web_client = payload["web_client"]
            web_client.chat_postMessage(channel=slack_channel_id, text=message)

        @RTMClient.run_on(event="message")  # type: ignore
        def handle(**payload: Any) -> None:
            """Listen / handle messages posted in the channel.

            Args:
                payload: payload of the received Slack event.
            """
            event = payload["data"]
            if event["channel"] == slack_channel_id:

                # approve request (return True)
                if event["text"] in self._get_approve_msg_options(params):
                    print(f"User {event['user']} approved on slack.")
                    nonlocal approved
                    approved = True
                    rtm.stop()  # type: ignore

                # disapprove request (return False)
                elif event["text"] in self._get_disapprove_msg_options(params):
                    print(f"User {event['user']} disapproved on slack.")
                    rtm.stop()  # type:ignore

        # start another thread until `rtm.stop()` is called in handle()
        rtm.start()

        return approved
config: SlackAlerterConfig property readonly

Returns the SlackAlerterConfig config.

Returns:

Type Description
SlackAlerterConfig

The configuration.

ask(self, message, params)

Post a message to a Slack channel and wait for approval.

Parameters:

Name Type Description Default
message str

Initial message to be posted.

required
params Optional[zenml.steps.step_interfaces.base_alerter_step.BaseAlerterStepParameters]

Optional parameters.

required

Returns:

Type Description
bool

True if a user approved the operation, else False

Source code in zenml/integrations/slack/alerters/slack_alerter.py
def ask(
    self, message: str, params: Optional[BaseAlerterStepParameters]
) -> bool:
    """Post a message to a Slack channel and wait for approval.

    Args:
        message: Initial message to be posted.
        params: Optional parameters.

    Returns:
        True if a user approved the operation, else False
    """
    rtm = RTMClient(token=self.config.slack_token)
    slack_channel_id = self._get_channel_id(params=params)

    approved = False  # will be modified by handle()

    @RTMClient.run_on(event="hello")  # type: ignore
    def post_initial_message(**payload: Any) -> None:
        """Post an initial message in a channel and start listening.

        Args:
            payload: payload of the received Slack event.
        """
        web_client = payload["web_client"]
        web_client.chat_postMessage(channel=slack_channel_id, text=message)

    @RTMClient.run_on(event="message")  # type: ignore
    def handle(**payload: Any) -> None:
        """Listen / handle messages posted in the channel.

        Args:
            payload: payload of the received Slack event.
        """
        event = payload["data"]
        if event["channel"] == slack_channel_id:

            # approve request (return True)
            if event["text"] in self._get_approve_msg_options(params):
                print(f"User {event['user']} approved on slack.")
                nonlocal approved
                approved = True
                rtm.stop()  # type: ignore

            # disapprove request (return False)
            elif event["text"] in self._get_disapprove_msg_options(params):
                print(f"User {event['user']} disapproved on slack.")
                rtm.stop()  # type:ignore

    # start another thread until `rtm.stop()` is called in handle()
    rtm.start()

    return approved
post(self, message, params)

Post a message to a Slack channel.

Parameters:

Name Type Description Default
message str

Message to be posted.

required
params Optional[zenml.steps.step_interfaces.base_alerter_step.BaseAlerterStepParameters]

Optional parameters.

required

Returns:

Type Description
bool

True if operation succeeded, else False

Source code in zenml/integrations/slack/alerters/slack_alerter.py
def post(
    self, message: str, params: Optional[BaseAlerterStepParameters]
) -> bool:
    """Post a message to a Slack channel.

    Args:
        message: Message to be posted.
        params: Optional parameters.

    Returns:
        True if operation succeeded, else False
    """
    slack_channel_id = self._get_channel_id(params=params)
    client = WebClient(token=self.config.slack_token)
    try:
        response = client.chat_postMessage(
            channel=slack_channel_id,
            text=message,
        )
        return True
    except SlackApiError as error:
        response = error.response["error"]
        logger.error(f"SlackAlerter.post() failed: {response}")
        return False
SlackAlerterParameters (BaseAlerterStepParameters) pydantic-model

Slack alerter parameters.

Source code in zenml/integrations/slack/alerters/slack_alerter.py
class SlackAlerterParameters(BaseAlerterStepParameters):
    """Slack alerter parameters."""

    # The ID of the Slack channel to use for communication.
    slack_channel_id: Optional[str] = None

    # Set of messages that lead to approval in alerter.ask()
    approve_msg_options: Optional[List[str]] = None

    # Set of messages that lead to disapproval in alerter.ask()
    disapprove_msg_options: Optional[List[str]] = None

flavors special

Slack integration flavors.

slack_alerter_flavor

Slack alerter flavor.

SlackAlerterConfig (BaseAlerterConfig) pydantic-model

Slack alerter config.

Attributes:

Name Type Description
slack_token str

The Slack token tied to the Slack account to be used.

default_slack_channel_id Optional[str]

The ID of the Slack channel to use for communication if no channel ID is provided in the step config.

Source code in zenml/integrations/slack/flavors/slack_alerter_flavor.py
class SlackAlerterConfig(BaseAlerterConfig):
    """Slack alerter config.

    Attributes:
        slack_token: The Slack token tied to the Slack account to be used.
        default_slack_channel_id: The ID of the Slack channel to use for
            communication if no channel ID is provided in the step config.

    """

    slack_token: str = SecretField()
    default_slack_channel_id: Optional[str] = None  # TODO: Potential setting
SlackAlerterFlavor (BaseAlerterFlavor)

Slack alerter flavor.

Source code in zenml/integrations/slack/flavors/slack_alerter_flavor.py
class SlackAlerterFlavor(BaseAlerterFlavor):
    """Slack alerter flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return SLACK_ALERTER_FLAVOR

    @property
    def config_class(self) -> Type[SlackAlerterConfig]:
        """Returns `SlackAlerterConfig` config class.

        Returns:
                The config class.
        """
        return SlackAlerterConfig

    @property
    def implementation_class(self) -> Type["SlackAlerter"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.slack.alerters import SlackAlerter

        return SlackAlerter
config_class: Type[zenml.integrations.slack.flavors.slack_alerter_flavor.SlackAlerterConfig] property readonly

Returns SlackAlerterConfig config class.

Returns:

Type Description
Type[zenml.integrations.slack.flavors.slack_alerter_flavor.SlackAlerterConfig]

The config class.

implementation_class: Type[SlackAlerter] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[SlackAlerter]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

steps special

Built-in steps for the Slack integration.

slack_alerter_ask_step

Step that allows you to send messages to Slack and wait for a response.

slack_alerter_ask_step (BaseStep)

Posts a message to the Slack alerter component and waits for approval.

This can be useful, e.g. to easily get a human in the loop before deploying models.

Parameters:

Name Type Description Default
params

Parameters for the Slack alerter.

required
context

StepContext of the ZenML repository.

required
message

Initial message to be posted.

required

Returns:

Type Description

True if a user approved the operation, else False.

Exceptions:

Type Description
RuntimeError

If currently active alerter is not a SlackAlerter.

PARAMETERS_CLASS (BaseAlerterStepParameters) pydantic-model

Slack alerter parameters.

Source code in zenml/integrations/slack/steps/slack_alerter_ask_step.py
class SlackAlerterParameters(BaseAlerterStepParameters):
    """Slack alerter parameters."""

    # The ID of the Slack channel to use for communication.
    slack_channel_id: Optional[str] = None

    # Set of messages that lead to approval in alerter.ask()
    approve_msg_options: Optional[List[str]] = None

    # Set of messages that lead to disapproval in alerter.ask()
    disapprove_msg_options: Optional[List[str]] = None
entrypoint(params, context, message) staticmethod

Posts a message to the Slack alerter component and waits for approval.

This can be useful, e.g. to easily get a human in the loop before deploying models.

Parameters:

Name Type Description Default
params SlackAlerterParameters

Parameters for the Slack alerter.

required
context StepContext

StepContext of the ZenML repository.

required
message str

Initial message to be posted.

required

Returns:

Type Description
bool

True if a user approved the operation, else False.

Exceptions:

Type Description
RuntimeError

If currently active alerter is not a SlackAlerter.

Source code in zenml/integrations/slack/steps/slack_alerter_ask_step.py
@step
def slack_alerter_ask_step(
    params: SlackAlerterParameters, context: StepContext, message: str
) -> bool:
    """Posts a message to the Slack alerter component and waits for approval.

    This can be useful, e.g. to easily get a human in the loop before
    deploying models.

    Args:
        params: Parameters for the Slack alerter.
        context: StepContext of the ZenML repository.
        message: Initial message to be posted.

    Returns:
        True if a user approved the operation, else False.

    Raises:
        RuntimeError: If currently active alerter is not a `SlackAlerter`.
    """
    alerter = get_active_alerter(context)
    if not isinstance(alerter, SlackAlerter):
        # TODO: potential duplicate code for other components
        # -> generalize to `check_component_flavor()` utility function?
        raise RuntimeError(
            "Step `slack_alerter_ask_step` requires an alerter component of "
            "flavor `slack`, but the currently active alerter is of type "
            f"{type(alerter)}, which is not a subclass of `SlackAlerter`."
        )
    return alerter.ask(message, params)

slack_alerter_post_step

Step that allows you to post messages to Slack.

slack_alerter_post_step (BaseStep)

Post a message to the Slack alerter component of the active stack.

Parameters:

Name Type Description Default
params

Parameters for the Slack alerter.

required
context

StepContext of the ZenML repository.

required
message

Message to be posted.

required

Returns:

Type Description

True if operation succeeded, else False.

Exceptions:

Type Description
RuntimeError

If currently active alerter is not a SlackAlerter.

PARAMETERS_CLASS (BaseAlerterStepParameters) pydantic-model

Slack alerter parameters.

Source code in zenml/integrations/slack/steps/slack_alerter_post_step.py
class SlackAlerterParameters(BaseAlerterStepParameters):
    """Slack alerter parameters."""

    # The ID of the Slack channel to use for communication.
    slack_channel_id: Optional[str] = None

    # Set of messages that lead to approval in alerter.ask()
    approve_msg_options: Optional[List[str]] = None

    # Set of messages that lead to disapproval in alerter.ask()
    disapprove_msg_options: Optional[List[str]] = None
entrypoint(params, context, message) staticmethod

Post a message to the Slack alerter component of the active stack.

Parameters:

Name Type Description Default
params SlackAlerterParameters

Parameters for the Slack alerter.

required
context StepContext

StepContext of the ZenML repository.

required
message str

Message to be posted.

required

Returns:

Type Description
bool

True if operation succeeded, else False.

Exceptions:

Type Description
RuntimeError

If currently active alerter is not a SlackAlerter.

Source code in zenml/integrations/slack/steps/slack_alerter_post_step.py
@step
def slack_alerter_post_step(
    params: SlackAlerterParameters, context: StepContext, message: str
) -> bool:
    """Post a message to the Slack alerter component of the active stack.

    Args:
        params: Parameters for the Slack alerter.
        context: StepContext of the ZenML repository.
        message: Message to be posted.

    Returns:
        True if operation succeeded, else False.

    Raises:
        RuntimeError: If currently active alerter is not a `SlackAlerter`.
    """
    alerter = get_active_alerter(context)
    if not isinstance(alerter, SlackAlerter):
        raise RuntimeError(
            "Step `slack_alerter_post_step` requires an alerter component of "
            "flavor `slack`, but the currently active alerter is of type "
            f"{type(alerter)}, which is not a subclass of `SlackAlerter`."
        )
    return alerter.post(message, params)