Skip to content

Analytics

zenml.analytics special

The 'analytics' module of ZenML.

This module is based on the 'analytics-python' package created by Segment. The base functionalities are adapted to work with the ZenML analytics server.

group(user_id, group_id, traits)

Send a group call with the default client.

Parameters:

Name Type Description Default
user_id UUID

The user ID.

required
group_id UUID

The group ID.

required
traits Optional[Dict[Any, Any]]

Traits to assign to the group.

required

Returns:

Type Description
Tuple[bool, str]

Tuple (success flag, the original message).

Source code in zenml/analytics/__init__.py
def group(
    user_id: UUID, group_id: UUID, traits: Optional[Dict[Any, Any]]
) -> Tuple[bool, str]:
    """Send a group call with the default client.

    Args:
        user_id: The user ID.
        group_id: The group ID.
        traits: Traits to assign to the group.

    Returns:
        Tuple (success flag, the original message).
    """

    set_default_client()
    assert default_client is not None
    return default_client.group(
        user_id=user_id, group_id=group_id, traits=traits
    )

identify(user_id, traits)

Send an identify call with the default client.

Parameters:

Name Type Description Default
user_id UUID

The user ID.

required
traits Optional[Dict[Any, Any]]

The traits for the identification process.

required

Returns:

Type Description
Tuple[bool, str]

Tuple (success flag, the original message).

Source code in zenml/analytics/__init__.py
def identify(
    user_id: UUID, traits: Optional[Dict[Any, Any]]
) -> Tuple[bool, str]:
    """Send an identify call with the default client.

    Args:
        user_id: The user ID.
        traits: The traits for the identification process.

    Returns:
        Tuple (success flag, the original message).
    """
    set_default_client()
    assert default_client is not None
    return default_client.identify(
        user_id=user_id,
        traits=traits,
    )

set_default_client()

Sets up a default client with the default configuration.

Source code in zenml/analytics/__init__.py
def set_default_client() -> None:
    """Sets up a default client with the default configuration."""
    global default_client
    if default_client is None:
        default_client = Client(
            debug=debug,
            max_queue_size=max_queue_size,
            send=send,
            on_error=on_error,
            max_retries=max_retries,
            sync_mode=sync_mode,
            timeout=timeout,
        )

track(user_id, event, properties)

Send a track call with the default client.

Parameters:

Name Type Description Default
user_id UUID

The user ID.

required
event AnalyticsEvent

The type of the event.

required
properties Optional[Dict[Any, Any]]

Dict of additional properties for the event.

required

Returns:

Type Description
Tuple[bool, str]

Tuple (success flag, the original message).

Source code in zenml/analytics/__init__.py
def track(
    user_id: UUID,
    event: "AnalyticsEvent",
    properties: Optional[Dict[Any, Any]],
) -> Tuple[bool, str]:
    """Send a track call with the default client.

    Args:
        user_id: The user ID.
        event: The type of the event.
        properties: Dict of additional properties for the event.

    Returns:
        Tuple (success flag, the original message).
    """
    set_default_client()
    assert default_client is not None
    return default_client.track(
        user_id=user_id, event=event, properties=properties
    )

client

The analytics module of ZenML.

This module is based on the 'analytics-python' package created by Segment. The base functionalities are adapted to work with the ZenML analytics server.

AnalyticsEncoder (JSONEncoder)

Helper encoder class for JSON serialization.

Source code in zenml/analytics/client.py
class AnalyticsEncoder(json.JSONEncoder):
    """Helper encoder class for JSON serialization."""

    def default(self, obj: Any) -> Any:
        """The default method to handle UUID and 'AnalyticsEvent' objects.

        Args:
            obj: The object to encode.

        Returns:
            The encoded object.
        """
        from zenml.utils.analytics_utils import AnalyticsEvent

        # If the object is UUID, we simply return the value of UUID
        if isinstance(obj, UUID):
            return str(obj)

        # If the object is an AnalyticsEvent, return its value
        elif isinstance(obj, AnalyticsEvent):
            return str(obj.value)

        return json.JSONEncoder.default(self, obj)
default(self, obj)

The default method to handle UUID and 'AnalyticsEvent' objects.

Parameters:

Name Type Description Default
obj Any

The object to encode.

required

Returns:

Type Description
Any

The encoded object.

Source code in zenml/analytics/client.py
def default(self, obj: Any) -> Any:
    """The default method to handle UUID and 'AnalyticsEvent' objects.

    Args:
        obj: The object to encode.

    Returns:
        The encoded object.
    """
    from zenml.utils.analytics_utils import AnalyticsEvent

    # If the object is UUID, we simply return the value of UUID
    if isinstance(obj, UUID):
        return str(obj)

    # If the object is an AnalyticsEvent, return its value
    elif isinstance(obj, AnalyticsEvent):
        return str(obj.value)

    return json.JSONEncoder.default(self, obj)

Client

The client class for ZenML analytics.

Source code in zenml/analytics/client.py
class Client(object):
    """The client class for ZenML analytics."""

    class DefaultConfig(object):
        """The configuration class for the client.

        Attributes:
            on_error: Function to call if an error occurs.
            debug: Flag to set to switch to the debug mode.
            send: Flag to determine whether to send the message.
            sync_mode: Flag, if set to True, uses the main thread to send
                the messages, and if set to False, creates other threads
                for the analytics.
            max_queue_size: The maximum number of entries a single queue
                can hold.
            timeout: Timeout in seconds.
            max_retries: The number of max tries before failing.
            thread: The number of additional threads to create for the
                analytics if the 'sync_mode' is set to False.
            upload_interval: The upload_interval in seconds if the
                'sync_mode' is set to False.
            upload_size: The maximum size for messages a consumer can send
                if the 'sync_mode' is set to False.
        """

        on_error: Optional[Callable[..., Any]] = None
        debug: bool = False
        send: bool = True
        sync_mode: bool = True
        max_queue_size: int = 10000
        timeout: int = 15
        max_retries: int = 1
        thread: int = 1
        upload_interval: float = 0.5
        upload_size: int = 100

    def __init__(
        self,
        debug: bool = DefaultConfig.debug,
        max_queue_size: int = DefaultConfig.max_queue_size,
        send: bool = DefaultConfig.send,
        on_error: Optional[Callable[..., Any]] = DefaultConfig.on_error,
        max_retries: int = DefaultConfig.max_retries,
        sync_mode: bool = DefaultConfig.sync_mode,
        timeout: int = DefaultConfig.timeout,
        thread: int = DefaultConfig.thread,
        upload_size: int = DefaultConfig.upload_size,
        upload_interval: float = DefaultConfig.upload_interval,
    ) -> None:
        """Initialization of the client.

        Args:
            debug: Flag to set to switch to the debug mode.
            max_queue_size: The maximum number of entries a single queue
                can hold.
            send: Flag to determine whether to send the message.
            on_error: Function to call if an error occurs.
            max_retries: The number of max tries before failing.
            sync_mode: Flag, if set to True, uses the main thread to send
                the messages, and if set to False, creates other threads
                for the analytics.
            timeout: Timeout in seconds.
            thread: The number of additional threads to create for the
                analytics if the 'sync_mode' is set to False.
            upload_size: The maximum size for messages a consumer can send
                if the 'sync_mode' is set to False.
            upload_interval: The upload_interval in seconds if the
                'sync_mode' is set to False.
        """
        self.queue = queue.Queue(max_queue_size)  # type: ignore[var-annotated]
        self.on_error = on_error
        self.debug = debug
        self.send = send
        self.sync_mode = sync_mode
        self.timeout = timeout

        if debug:
            logger.setLevel(logging.DEBUG)

        if sync_mode:
            self.consumers = None
        else:
            # On program exit, allow the consumer thread to exit cleanly.
            # This prevents exceptions and a messy shutdown when the
            # interpreter is destroyed before the daemon thread finishes
            # execution. However, it is *not* the same as flushing the queue!
            # To guarantee all messages have been delivered, you'll still need
            # to call flush().
            if send:
                atexit.register(self.join)
            for _ in range(thread):
                self.consumers = []
                consumer = Consumer(
                    self.queue,
                    base_source_context=source_context.get(),
                    on_error=on_error,
                    upload_size=upload_size,
                    upload_interval=upload_interval,
                    retries=max_retries,
                    timeout=timeout,
                )
                self.consumers.append(consumer)

                # if we've disabled sending, just don't start the consumer
                if send:
                    consumer.start()

    def identify(
        self, user_id: UUID, traits: Optional[Dict[Any, Any]]
    ) -> Tuple[bool, str]:
        """Method to identify a user with given traits.

        Args:
            user_id: The user ID.
            traits: The traits for the identification process.

        Returns:
            Tuple (success flag, the original message).
        """
        msg = {
            "user_id": user_id,
            "traits": traits or {},
            "type": "identify",
            "debug": IS_DEBUG_ENV,
        }
        return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))

    def track(
        self,
        user_id: UUID,
        event: "AnalyticsEvent",
        properties: Optional[Dict[Any, Any]],
    ) -> Tuple[bool, str]:
        """Method to track events.

        Args:
            user_id: The user ID.
            event: The type of the event.
            properties: Dict of additional properties for the event.

        Returns:
            Tuple (success flag, the original message).
        """
        msg = {
            "user_id": user_id,
            "event": event,
            "properties": properties or {},
            "type": "track",
            "debug": IS_DEBUG_ENV,
        }
        return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))

    def group(
        self, user_id: UUID, group_id: UUID, traits: Optional[Dict[Any, Any]]
    ) -> Tuple[bool, str]:
        """Method to group users.

        Args:
            user_id: The user ID.
            group_id: The group ID.
            traits: Traits to assign to the group.

        Returns:
            Tuple (success flag, the original message).
        """
        msg = {
            "user_id": user_id,
            "group_id": group_id,
            "traits": traits or {},
            "type": "group",
            "debug": IS_DEBUG_ENV,
        }
        return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))

    def _enqueue(self, msg: str) -> Tuple[bool, str]:
        """Method to queue messages to be sent.

        Args:
            msg: The message to queue.

        Returns:
            Tuple (success flag, the original message).
        """
        # if send is False, return msg as if it was successfully queued
        if not self.send:
            return True, msg

        if self.sync_mode:
            post(timeout=self.timeout, batch=[msg])
            return True, msg

        try:
            self.queue.put(msg, block=False)
            return True, msg
        except queue.Full:
            logger.debug("ZenML analytics-python queue is full")
            return False, msg

    def flush(self) -> None:
        """Method to force a flush from the internal queue to the server."""
        q = self.queue
        size = q.qsize()
        q.join()
        # Note that this message may not be precise, because of threading.
        logger.debug("successfully flushed about %s items.", size)

    def join(self) -> None:
        """Method to end the consumer thread once the queue is empty."""
        for consumer in self.consumers:
            consumer.pause()
            try:
                consumer.join()
            except RuntimeError:
                # consumer thread has not started
                pass

    def shutdown(self) -> None:
        """Method to flush all messages and cleanly shutdown the client."""
        self.flush()
        self.join()
DefaultConfig

The configuration class for the client.

Attributes:

Name Type Description
on_error Optional[Callable[..., Any]]

Function to call if an error occurs.

debug bool

Flag to set to switch to the debug mode.

send bool

Flag to determine whether to send the message.

sync_mode bool

Flag, if set to True, uses the main thread to send the messages, and if set to False, creates other threads for the analytics.

max_queue_size int

The maximum number of entries a single queue can hold.

timeout int

Timeout in seconds.

max_retries int

The number of max tries before failing.

thread int

The number of additional threads to create for the analytics if the 'sync_mode' is set to False.

upload_interval float

The upload_interval in seconds if the 'sync_mode' is set to False.

upload_size int

The maximum size for messages a consumer can send if the 'sync_mode' is set to False.

Source code in zenml/analytics/client.py
class DefaultConfig(object):
    """The configuration class for the client.

    Attributes:
        on_error: Function to call if an error occurs.
        debug: Flag to set to switch to the debug mode.
        send: Flag to determine whether to send the message.
        sync_mode: Flag, if set to True, uses the main thread to send
            the messages, and if set to False, creates other threads
            for the analytics.
        max_queue_size: The maximum number of entries a single queue
            can hold.
        timeout: Timeout in seconds.
        max_retries: The number of max tries before failing.
        thread: The number of additional threads to create for the
            analytics if the 'sync_mode' is set to False.
        upload_interval: The upload_interval in seconds if the
            'sync_mode' is set to False.
        upload_size: The maximum size for messages a consumer can send
            if the 'sync_mode' is set to False.
    """

    on_error: Optional[Callable[..., Any]] = None
    debug: bool = False
    send: bool = True
    sync_mode: bool = True
    max_queue_size: int = 10000
    timeout: int = 15
    max_retries: int = 1
    thread: int = 1
    upload_interval: float = 0.5
    upload_size: int = 100
__init__(self, debug=False, max_queue_size=10000, send=True, on_error=None, max_retries=1, sync_mode=True, timeout=15, thread=1, upload_size=100, upload_interval=0.5) special

Initialization of the client.

Parameters:

Name Type Description Default
debug bool

Flag to set to switch to the debug mode.

False
max_queue_size int

The maximum number of entries a single queue can hold.

10000
send bool

Flag to determine whether to send the message.

True
on_error Optional[Callable[..., Any]]

Function to call if an error occurs.

None
max_retries int

The number of max tries before failing.

1
sync_mode bool

Flag, if set to True, uses the main thread to send the messages, and if set to False, creates other threads for the analytics.

True
timeout int

Timeout in seconds.

15
thread int

The number of additional threads to create for the analytics if the 'sync_mode' is set to False.

1
upload_size int

The maximum size for messages a consumer can send if the 'sync_mode' is set to False.

100
upload_interval float

The upload_interval in seconds if the 'sync_mode' is set to False.

0.5
Source code in zenml/analytics/client.py
def __init__(
    self,
    debug: bool = DefaultConfig.debug,
    max_queue_size: int = DefaultConfig.max_queue_size,
    send: bool = DefaultConfig.send,
    on_error: Optional[Callable[..., Any]] = DefaultConfig.on_error,
    max_retries: int = DefaultConfig.max_retries,
    sync_mode: bool = DefaultConfig.sync_mode,
    timeout: int = DefaultConfig.timeout,
    thread: int = DefaultConfig.thread,
    upload_size: int = DefaultConfig.upload_size,
    upload_interval: float = DefaultConfig.upload_interval,
) -> None:
    """Initialization of the client.

    Args:
        debug: Flag to set to switch to the debug mode.
        max_queue_size: The maximum number of entries a single queue
            can hold.
        send: Flag to determine whether to send the message.
        on_error: Function to call if an error occurs.
        max_retries: The number of max tries before failing.
        sync_mode: Flag, if set to True, uses the main thread to send
            the messages, and if set to False, creates other threads
            for the analytics.
        timeout: Timeout in seconds.
        thread: The number of additional threads to create for the
            analytics if the 'sync_mode' is set to False.
        upload_size: The maximum size for messages a consumer can send
            if the 'sync_mode' is set to False.
        upload_interval: The upload_interval in seconds if the
            'sync_mode' is set to False.
    """
    self.queue = queue.Queue(max_queue_size)  # type: ignore[var-annotated]
    self.on_error = on_error
    self.debug = debug
    self.send = send
    self.sync_mode = sync_mode
    self.timeout = timeout

    if debug:
        logger.setLevel(logging.DEBUG)

    if sync_mode:
        self.consumers = None
    else:
        # On program exit, allow the consumer thread to exit cleanly.
        # This prevents exceptions and a messy shutdown when the
        # interpreter is destroyed before the daemon thread finishes
        # execution. However, it is *not* the same as flushing the queue!
        # To guarantee all messages have been delivered, you'll still need
        # to call flush().
        if send:
            atexit.register(self.join)
        for _ in range(thread):
            self.consumers = []
            consumer = Consumer(
                self.queue,
                base_source_context=source_context.get(),
                on_error=on_error,
                upload_size=upload_size,
                upload_interval=upload_interval,
                retries=max_retries,
                timeout=timeout,
            )
            self.consumers.append(consumer)

            # if we've disabled sending, just don't start the consumer
            if send:
                consumer.start()
flush(self)

Method to force a flush from the internal queue to the server.

Source code in zenml/analytics/client.py
def flush(self) -> None:
    """Method to force a flush from the internal queue to the server."""
    q = self.queue
    size = q.qsize()
    q.join()
    # Note that this message may not be precise, because of threading.
    logger.debug("successfully flushed about %s items.", size)
group(self, user_id, group_id, traits)

Method to group users.

Parameters:

Name Type Description Default
user_id UUID

The user ID.

required
group_id UUID

The group ID.

required
traits Optional[Dict[Any, Any]]

Traits to assign to the group.

required

Returns:

Type Description
Tuple[bool, str]

Tuple (success flag, the original message).

Source code in zenml/analytics/client.py
def group(
    self, user_id: UUID, group_id: UUID, traits: Optional[Dict[Any, Any]]
) -> Tuple[bool, str]:
    """Method to group users.

    Args:
        user_id: The user ID.
        group_id: The group ID.
        traits: Traits to assign to the group.

    Returns:
        Tuple (success flag, the original message).
    """
    msg = {
        "user_id": user_id,
        "group_id": group_id,
        "traits": traits or {},
        "type": "group",
        "debug": IS_DEBUG_ENV,
    }
    return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
identify(self, user_id, traits)

Method to identify a user with given traits.

Parameters:

Name Type Description Default
user_id UUID

The user ID.

required
traits Optional[Dict[Any, Any]]

The traits for the identification process.

required

Returns:

Type Description
Tuple[bool, str]

Tuple (success flag, the original message).

Source code in zenml/analytics/client.py
def identify(
    self, user_id: UUID, traits: Optional[Dict[Any, Any]]
) -> Tuple[bool, str]:
    """Method to identify a user with given traits.

    Args:
        user_id: The user ID.
        traits: The traits for the identification process.

    Returns:
        Tuple (success flag, the original message).
    """
    msg = {
        "user_id": user_id,
        "traits": traits or {},
        "type": "identify",
        "debug": IS_DEBUG_ENV,
    }
    return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))
join(self)

Method to end the consumer thread once the queue is empty.

Source code in zenml/analytics/client.py
def join(self) -> None:
    """Method to end the consumer thread once the queue is empty."""
    for consumer in self.consumers:
        consumer.pause()
        try:
            consumer.join()
        except RuntimeError:
            # consumer thread has not started
            pass
shutdown(self)

Method to flush all messages and cleanly shutdown the client.

Source code in zenml/analytics/client.py
def shutdown(self) -> None:
    """Method to flush all messages and cleanly shutdown the client."""
    self.flush()
    self.join()
track(self, user_id, event, properties)

Method to track events.

Parameters:

Name Type Description Default
user_id UUID

The user ID.

required
event AnalyticsEvent

The type of the event.

required
properties Optional[Dict[Any, Any]]

Dict of additional properties for the event.

required

Returns:

Type Description
Tuple[bool, str]

Tuple (success flag, the original message).

Source code in zenml/analytics/client.py
def track(
    self,
    user_id: UUID,
    event: "AnalyticsEvent",
    properties: Optional[Dict[Any, Any]],
) -> Tuple[bool, str]:
    """Method to track events.

    Args:
        user_id: The user ID.
        event: The type of the event.
        properties: Dict of additional properties for the event.

    Returns:
        Tuple (success flag, the original message).
    """
    msg = {
        "user_id": user_id,
        "event": event,
        "properties": properties or {},
        "type": "track",
        "debug": IS_DEBUG_ENV,
    }
    return self._enqueue(json.dumps(msg, cls=AnalyticsEncoder))

consumer

The analytics module of ZenML.

This module is based on the 'analytics-python' package created by Segment. The base functionalities are adapted to work with the ZenML analytics server.

Consumer (Thread)

Consumes the messages from the client's queue.

Source code in zenml/analytics/consumer.py
class Consumer(Thread):
    """Consumes the messages from the client's queue."""

    def __init__(
        self,
        queue: Queue,  # type: ignore[type-arg]
        base_source_context: SourceContextTypes,
        upload_size: int = 100,
        on_error: Optional[Callable[..., Any]] = None,
        upload_interval: float = 0.5,
        retries: int = 10,
        timeout: int = 15,
    ) -> None:
        """Initialize and create a consumer thread.

        Args:
            queue: The list of messages in the queue.
            base_source_context: the context type which will be set for the
                thread as this consumer runs.
            upload_size: The maximum size for messages a consumer can send
                if the 'sync_mode' is set to False.
            on_error: Function to call if an error occurs.
            upload_interval: The upload_interval in seconds
            retries: The number of max tries before failing.
            timeout: Timeout in seconds.
        """
        Thread.__init__(self)

        # Initialization of the logging, that silences the backoff logger
        init_logging()

        # Store the base context to set for the thread
        self.base_source_context = base_source_context

        # Set the source context from the base
        # Make consumer a daemon thread so that it doesn't block program exit
        self.daemon = True
        self.upload_size = upload_size
        self.upload_interval = upload_interval
        self.on_error = on_error
        self.queue = queue
        # It's important to set running in the constructor: if we are asked to
        # pause immediately after construction, we might set running to True in
        # run() *after* we set it to False in pause... and keep running
        # forever.
        self.running = True
        self.retries = retries
        self.timeout = timeout

    def run(self) -> None:
        """Runs the consumer."""
        # Set the base context for each thread
        from zenml.analytics import source_context

        source_context.set(self.base_source_context)

        # Run the thread
        logger.debug("Consumer is running...")
        while self.running:
            self.upload()

        logger.debug("Consumer exited.")

    def pause(self) -> None:
        """Pause the consumer."""
        self.running = False

    def upload(self) -> bool:
        """Upload the next batch of items, return whether successful.

        Returns:
            If the upload succeeded.
        """
        success = False
        batch = self.next()
        if len(batch) == 0:
            return False

        try:
            self.request(batch)
            success = True
        except Exception as e:
            logger.debug("error uploading: %s", e)
            success = False
            if self.on_error:
                self.on_error(e, batch)
        finally:
            # mark items as acknowledged from queue
            for _ in batch:
                self.queue.task_done()
            return success

    def next(self) -> List[str]:
        """Return the next batch of items to upload.

        Returns:
            The next batch of items to upload.
        """
        queue = self.queue
        items: List[str] = []

        start_time = monotonic.monotonic()
        total_size = 0

        while len(items) < self.upload_size:
            elapsed = monotonic.monotonic() - start_time
            if elapsed >= self.upload_interval:
                break
            try:
                item = queue.get(
                    block=True, timeout=self.upload_interval - elapsed
                )
                item_size = len(item.encode())

                if item_size > MAX_MSG_SIZE:
                    logger.debug(
                        "Item exceeds 32kb limit, dropping. (%s)", str(item)
                    )
                    continue
                items.append(item)
                total_size += item_size
                if total_size >= BATCH_SIZE_LIMIT:
                    logger.debug("hit batch size limit (size: %d)", total_size)
                    break
            except Empty:
                break

        return items

    def request(self, batch: List[str]) -> None:
        """Attempt to upload the batch and retry before raising an error.

        Args:
            batch: The batch to upload.
        """

        def fatal_exception(exc: Any) -> bool:
            if isinstance(exc, AnalyticsAPIError):
                # retry on server errors and client errors
                # with 429 status code (rate limited),
                # don't retry on other client errors
                return (400 <= exc.status < 500) and exc.status != 429
            else:
                # retry on all other errors (e.g. network)
                return False

        @backoff.on_exception(  # type: ignore[misc]
            backoff.expo,
            Exception,
            max_tries=self.retries + 1,
            giveup=fatal_exception,
        )
        def send_request() -> None:
            """Function to send a batch of messages."""
            post(timeout=self.timeout, batch=batch)

        send_request()
__init__(self, queue, base_source_context, upload_size=100, on_error=None, upload_interval=0.5, retries=10, timeout=15) special

Initialize and create a consumer thread.

Parameters:

Name Type Description Default
queue Queue

The list of messages in the queue.

required
base_source_context SourceContextTypes

the context type which will be set for the thread as this consumer runs.

required
upload_size int

The maximum size for messages a consumer can send if the 'sync_mode' is set to False.

100
on_error Optional[Callable[..., Any]]

Function to call if an error occurs.

None
upload_interval float

The upload_interval in seconds

0.5
retries int

The number of max tries before failing.

10
timeout int

Timeout in seconds.

15
Source code in zenml/analytics/consumer.py
def __init__(
    self,
    queue: Queue,  # type: ignore[type-arg]
    base_source_context: SourceContextTypes,
    upload_size: int = 100,
    on_error: Optional[Callable[..., Any]] = None,
    upload_interval: float = 0.5,
    retries: int = 10,
    timeout: int = 15,
) -> None:
    """Initialize and create a consumer thread.

    Args:
        queue: The list of messages in the queue.
        base_source_context: the context type which will be set for the
            thread as this consumer runs.
        upload_size: The maximum size for messages a consumer can send
            if the 'sync_mode' is set to False.
        on_error: Function to call if an error occurs.
        upload_interval: The upload_interval in seconds
        retries: The number of max tries before failing.
        timeout: Timeout in seconds.
    """
    Thread.__init__(self)

    # Initialization of the logging, that silences the backoff logger
    init_logging()

    # Store the base context to set for the thread
    self.base_source_context = base_source_context

    # Set the source context from the base
    # Make consumer a daemon thread so that it doesn't block program exit
    self.daemon = True
    self.upload_size = upload_size
    self.upload_interval = upload_interval
    self.on_error = on_error
    self.queue = queue
    # It's important to set running in the constructor: if we are asked to
    # pause immediately after construction, we might set running to True in
    # run() *after* we set it to False in pause... and keep running
    # forever.
    self.running = True
    self.retries = retries
    self.timeout = timeout
next(self)

Return the next batch of items to upload.

Returns:

Type Description
List[str]

The next batch of items to upload.

Source code in zenml/analytics/consumer.py
def next(self) -> List[str]:
    """Return the next batch of items to upload.

    Returns:
        The next batch of items to upload.
    """
    queue = self.queue
    items: List[str] = []

    start_time = monotonic.monotonic()
    total_size = 0

    while len(items) < self.upload_size:
        elapsed = monotonic.monotonic() - start_time
        if elapsed >= self.upload_interval:
            break
        try:
            item = queue.get(
                block=True, timeout=self.upload_interval - elapsed
            )
            item_size = len(item.encode())

            if item_size > MAX_MSG_SIZE:
                logger.debug(
                    "Item exceeds 32kb limit, dropping. (%s)", str(item)
                )
                continue
            items.append(item)
            total_size += item_size
            if total_size >= BATCH_SIZE_LIMIT:
                logger.debug("hit batch size limit (size: %d)", total_size)
                break
        except Empty:
            break

    return items
pause(self)

Pause the consumer.

Source code in zenml/analytics/consumer.py
def pause(self) -> None:
    """Pause the consumer."""
    self.running = False
request(self, batch)

Attempt to upload the batch and retry before raising an error.

Parameters:

Name Type Description Default
batch List[str]

The batch to upload.

required
Source code in zenml/analytics/consumer.py
def request(self, batch: List[str]) -> None:
    """Attempt to upload the batch and retry before raising an error.

    Args:
        batch: The batch to upload.
    """

    def fatal_exception(exc: Any) -> bool:
        if isinstance(exc, AnalyticsAPIError):
            # retry on server errors and client errors
            # with 429 status code (rate limited),
            # don't retry on other client errors
            return (400 <= exc.status < 500) and exc.status != 429
        else:
            # retry on all other errors (e.g. network)
            return False

    @backoff.on_exception(  # type: ignore[misc]
        backoff.expo,
        Exception,
        max_tries=self.retries + 1,
        giveup=fatal_exception,
    )
    def send_request() -> None:
        """Function to send a batch of messages."""
        post(timeout=self.timeout, batch=batch)

    send_request()
run(self)

Runs the consumer.

Source code in zenml/analytics/consumer.py
def run(self) -> None:
    """Runs the consumer."""
    # Set the base context for each thread
    from zenml.analytics import source_context

    source_context.set(self.base_source_context)

    # Run the thread
    logger.debug("Consumer is running...")
    while self.running:
        self.upload()

    logger.debug("Consumer exited.")
upload(self)

Upload the next batch of items, return whether successful.

Returns:

Type Description
bool

If the upload succeeded.

Source code in zenml/analytics/consumer.py
def upload(self) -> bool:
    """Upload the next batch of items, return whether successful.

    Returns:
        If the upload succeeded.
    """
    success = False
    batch = self.next()
    if len(batch) == 0:
        return False

    try:
        self.request(batch)
        success = True
    except Exception as e:
        logger.debug("error uploading: %s", e)
        success = False
        if self.on_error:
            self.on_error(e, batch)
    finally:
        # mark items as acknowledged from queue
        for _ in batch:
            self.queue.task_done()
        return success

context

The analytics module of ZenML.

This module is based on the 'analytics-python' package created by Segment. The base functionalities are adapted to work with the ZenML analytics server.

AnalyticsContext

Client class for ZenML Analytics v2.

Source code in zenml/analytics/context.py
class AnalyticsContext:
    """Client class for ZenML Analytics v2."""

    def __init__(self) -> None:
        """Initialization.

        Use this as a context manager to ensure that analytics are initialized
        properly, only tracked when configured to do so and that any errors
        are handled gracefully.
        """
        self.analytics_opt_in: bool = False

        self.user_id: Optional[UUID] = None
        self.client_id: Optional[UUID] = None
        self.server_id: Optional[UUID] = None

        self.database_type: Optional["ServerDatabaseType"] = None
        self.deployment_type: Optional["ServerDeploymentType"] = None

    @property
    def in_server(self) -> bool:
        """Flag to check whether the code is running in a ZenML server.

        Returns:
            True if running in a server, False otherwise.
        """
        return handle_bool_env_var(ENV_ZENML_SERVER)

    def __enter__(self) -> "AnalyticsContext":
        """Enter analytics context manager.

        Returns:
            The analytics context.
        """
        # Fetch the analytics opt-in setting
        from zenml.config.global_config import GlobalConfiguration

        gc = GlobalConfiguration()
        self.analytics_opt_in = gc.analytics_opt_in

        if not self.analytics_opt_in:
            return self

        try:
            # Fetch the `user_id`
            if self.in_server:
                from zenml.zen_server.auth import get_auth_context

                # If the code is running on the server, use the auth context.
                auth_context = get_auth_context()
                if auth_context is not None:
                    self.user_id = auth_context.user.id
            else:
                # If the code is running on the client, use the default user.
                default_user = gc.zen_store.get_user()
                self.user_id = default_user.id

            # Fetch the `client_id`
            if self.in_server:
                # If the code is running on the server, there is no client id.
                self.client_id = None
            else:
                # If the code is running on the client, attach the client id.
                self.client_id = gc.user_id

            # Fetch the store information including the `server_id`
            store_info = gc.zen_store.get_store_info()

            self.server_id = store_info.id
            self.deployment_type = store_info.deployment_type
            self.database_type = store_info.database_type
        except Exception as e:
            self.analytics_opt_in = False
            logger.debug(f"Analytics initialization failed: {e}")

        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> bool:
        """Exit context manager.

        Args:
            exc_type: Exception type.
            exc_val: Exception value.
            exc_tb: Exception traceback.

        Returns:
            True.
        """
        if exc_val is not None:
            logger.debug(f"Sending telemetry 2.0 data failed: {exc_val}")

        return True

    def identify(self, traits: Optional[Dict[str, Any]] = None) -> bool:
        """Identify the user through segment.

        Args:
            traits: Traits of the user.

        Returns:
            True if tracking information was sent, False otherwise.
        """
        success = False
        if self.analytics_opt_in and self.user_id is not None:
            success, _ = analytics.identify(
                user_id=self.user_id,
                traits=traits,
            )

        return success

    def group(
        self,
        group_id: UUID,
        traits: Optional[Dict[str, Any]] = None,
    ) -> bool:
        """Group the user.

        Args:
            group_id: Group ID.
            traits: Traits of the group.

        Returns:
            True if tracking information was sent, False otherwise.
        """
        success = False
        if self.analytics_opt_in and self.user_id is not None:
            if traits is None:
                traits = {}

            traits.update({"group_id": group_id})

            success, _ = analytics.group(
                user_id=self.user_id,
                group_id=group_id,
                traits=traits,
            )

        return success

    def track(
        self,
        event: "AnalyticsEvent",
        properties: Optional[Dict[str, Any]] = None,
    ) -> bool:
        """Track an event.

        Args:
            event: Event to track.
            properties: Event properties.

        Returns:
            True if tracking information was sent, False otherwise.
        """
        from zenml.utils.analytics_utils import AnalyticsEvent

        if properties is None:
            properties = {}

        if (
            not self.analytics_opt_in
            and event.value
            not in {
                AnalyticsEvent.OPT_OUT_ANALYTICS,
                AnalyticsEvent.OPT_IN_ANALYTICS,
            }
            or self.user_id is None
        ):
            return False

        # add basics
        properties.update(Environment.get_system_info())
        properties.update(
            {
                "environment": get_environment(),
                "python_version": Environment.python_version(),
                "version": __version__,
                "client_id": str(self.client_id),
                "user_id": str(self.user_id),
                "server_id": str(self.server_id),
                "deployment_type": str(self.deployment_type),
                "database_type": str(self.database_type),
            }
        )

        for k, v in properties.items():
            if isinstance(v, UUID):
                properties[k] = str(v)

        success, _ = analytics.track(
            user_id=self.user_id,
            event=event,
            properties=properties,
        )

        logger.debug(
            f"Sending analytics: User: {self.user_id}, Event: {event}, "
            f"Metadata: {properties}"
        )

        return success
in_server: bool property readonly

Flag to check whether the code is running in a ZenML server.

Returns:

Type Description
bool

True if running in a server, False otherwise.

__enter__(self) special

Enter analytics context manager.

Returns:

Type Description
AnalyticsContext

The analytics context.

Source code in zenml/analytics/context.py
def __enter__(self) -> "AnalyticsContext":
    """Enter analytics context manager.

    Returns:
        The analytics context.
    """
    # Fetch the analytics opt-in setting
    from zenml.config.global_config import GlobalConfiguration

    gc = GlobalConfiguration()
    self.analytics_opt_in = gc.analytics_opt_in

    if not self.analytics_opt_in:
        return self

    try:
        # Fetch the `user_id`
        if self.in_server:
            from zenml.zen_server.auth import get_auth_context

            # If the code is running on the server, use the auth context.
            auth_context = get_auth_context()
            if auth_context is not None:
                self.user_id = auth_context.user.id
        else:
            # If the code is running on the client, use the default user.
            default_user = gc.zen_store.get_user()
            self.user_id = default_user.id

        # Fetch the `client_id`
        if self.in_server:
            # If the code is running on the server, there is no client id.
            self.client_id = None
        else:
            # If the code is running on the client, attach the client id.
            self.client_id = gc.user_id

        # Fetch the store information including the `server_id`
        store_info = gc.zen_store.get_store_info()

        self.server_id = store_info.id
        self.deployment_type = store_info.deployment_type
        self.database_type = store_info.database_type
    except Exception as e:
        self.analytics_opt_in = False
        logger.debug(f"Analytics initialization failed: {e}")

    return self
__exit__(self, exc_type, exc_val, exc_tb) special

Exit context manager.

Parameters:

Name Type Description Default
exc_type Optional[Type[BaseException]]

Exception type.

required
exc_val Optional[BaseException]

Exception value.

required
exc_tb Optional[traceback]

Exception traceback.

required

Returns:

Type Description
bool

True.

Source code in zenml/analytics/context.py
def __exit__(
    self,
    exc_type: Optional[Type[BaseException]],
    exc_val: Optional[BaseException],
    exc_tb: Optional[TracebackType],
) -> bool:
    """Exit context manager.

    Args:
        exc_type: Exception type.
        exc_val: Exception value.
        exc_tb: Exception traceback.

    Returns:
        True.
    """
    if exc_val is not None:
        logger.debug(f"Sending telemetry 2.0 data failed: {exc_val}")

    return True
__init__(self) special

Initialization.

Use this as a context manager to ensure that analytics are initialized properly, only tracked when configured to do so and that any errors are handled gracefully.

Source code in zenml/analytics/context.py
def __init__(self) -> None:
    """Initialization.

    Use this as a context manager to ensure that analytics are initialized
    properly, only tracked when configured to do so and that any errors
    are handled gracefully.
    """
    self.analytics_opt_in: bool = False

    self.user_id: Optional[UUID] = None
    self.client_id: Optional[UUID] = None
    self.server_id: Optional[UUID] = None

    self.database_type: Optional["ServerDatabaseType"] = None
    self.deployment_type: Optional["ServerDeploymentType"] = None
group(self, group_id, traits=None)

Group the user.

Parameters:

Name Type Description Default
group_id UUID

Group ID.

required
traits Optional[Dict[str, Any]]

Traits of the group.

None

Returns:

Type Description
bool

True if tracking information was sent, False otherwise.

Source code in zenml/analytics/context.py
def group(
    self,
    group_id: UUID,
    traits: Optional[Dict[str, Any]] = None,
) -> bool:
    """Group the user.

    Args:
        group_id: Group ID.
        traits: Traits of the group.

    Returns:
        True if tracking information was sent, False otherwise.
    """
    success = False
    if self.analytics_opt_in and self.user_id is not None:
        if traits is None:
            traits = {}

        traits.update({"group_id": group_id})

        success, _ = analytics.group(
            user_id=self.user_id,
            group_id=group_id,
            traits=traits,
        )

    return success
identify(self, traits=None)

Identify the user through segment.

Parameters:

Name Type Description Default
traits Optional[Dict[str, Any]]

Traits of the user.

None

Returns:

Type Description
bool

True if tracking information was sent, False otherwise.

Source code in zenml/analytics/context.py
def identify(self, traits: Optional[Dict[str, Any]] = None) -> bool:
    """Identify the user through segment.

    Args:
        traits: Traits of the user.

    Returns:
        True if tracking information was sent, False otherwise.
    """
    success = False
    if self.analytics_opt_in and self.user_id is not None:
        success, _ = analytics.identify(
            user_id=self.user_id,
            traits=traits,
        )

    return success
track(self, event, properties=None)

Track an event.

Parameters:

Name Type Description Default
event AnalyticsEvent

Event to track.

required
properties Optional[Dict[str, Any]]

Event properties.

None

Returns:

Type Description
bool

True if tracking information was sent, False otherwise.

Source code in zenml/analytics/context.py
def track(
    self,
    event: "AnalyticsEvent",
    properties: Optional[Dict[str, Any]] = None,
) -> bool:
    """Track an event.

    Args:
        event: Event to track.
        properties: Event properties.

    Returns:
        True if tracking information was sent, False otherwise.
    """
    from zenml.utils.analytics_utils import AnalyticsEvent

    if properties is None:
        properties = {}

    if (
        not self.analytics_opt_in
        and event.value
        not in {
            AnalyticsEvent.OPT_OUT_ANALYTICS,
            AnalyticsEvent.OPT_IN_ANALYTICS,
        }
        or self.user_id is None
    ):
        return False

    # add basics
    properties.update(Environment.get_system_info())
    properties.update(
        {
            "environment": get_environment(),
            "python_version": Environment.python_version(),
            "version": __version__,
            "client_id": str(self.client_id),
            "user_id": str(self.user_id),
            "server_id": str(self.server_id),
            "deployment_type": str(self.deployment_type),
            "database_type": str(self.database_type),
        }
    )

    for k, v in properties.items():
        if isinstance(v, UUID):
            properties[k] = str(v)

    success, _ = analytics.track(
        user_id=self.user_id,
        event=event,
        properties=properties,
    )

    logger.debug(
        f"Sending analytics: User: {self.user_id}, Event: {event}, "
        f"Metadata: {properties}"
    )

    return success

request

The 'analytics' module of ZenML.

This module is based on the 'analytics-python' package created by Segment. The base functionalities are adapted to work with the ZenML analytics server.

AnalyticsAPIError (Exception)

Custom exception class for API-related errors.

Source code in zenml/analytics/request.py
class AnalyticsAPIError(Exception):
    """Custom exception class for API-related errors."""

    def __init__(self, status: int, message: str) -> None:
        """Initialization.

        Args:
            status: The status code of the response.
            message: The text of the response.
        """
        self.message = message
        self.status = status

    def __str__(self) -> str:
        """Method to represent the instance as a string.

        Returns:
            A representation of the message and the status code.
        """
        msg = "[ZenML Analytics] {1}: {0}"
        return msg.format(self.message, self.status)
__init__(self, status, message) special

Initialization.

Parameters:

Name Type Description Default
status int

The status code of the response.

required
message str

The text of the response.

required
Source code in zenml/analytics/request.py
def __init__(self, status: int, message: str) -> None:
    """Initialization.

    Args:
        status: The status code of the response.
        message: The text of the response.
    """
    self.message = message
    self.status = status
__str__(self) special

Method to represent the instance as a string.

Returns:

Type Description
str

A representation of the message and the status code.

Source code in zenml/analytics/request.py
def __str__(self) -> str:
    """Method to represent the instance as a string.

    Returns:
        A representation of the message and the status code.
    """
    msg = "[ZenML Analytics] {1}: {0}"
    return msg.format(self.message, self.status)

post(batch, timeout=15)

Post a batch of messages to the ZenML analytics server.

Parameters:

Name Type Description Default
batch List[str]

The messages to send.

required
timeout int

Timeout in seconds.

15

Returns:

Type Description
Response

The response.

Exceptions:

Type Description
AnalyticsAPIError

If the post request has failed.

Source code in zenml/analytics/request.py
def post(batch: List[str], timeout: int = 15) -> requests.Response:
    """Post a batch of messages to the ZenML analytics server.

    Args:
        batch: The messages to send.
        timeout: Timeout in seconds.

    Returns:
        The response.

    Raises:
        AnalyticsAPIError: If the post request has failed.
    """
    from zenml.analytics import source_context

    headers = {
        "accept": "application/json",
        "content-type": "application/json",
        source_context.name: source_context.get().value,
    }
    response = requests.post(
        url=ANALYTICS_SERVER_URL + "/batch",
        headers=headers,
        data=f"[{','.join(batch)}]",
        timeout=timeout,
    )

    if response.status_code == 200:
        logger.debug("data uploaded successfully")
        return response

    raise AnalyticsAPIError(response.status_code, response.text)