Tensorboard
zenml.integrations.tensorboard
special
Initialization for TensorBoard integration.
TensorBoardIntegration (Integration)
Definition of TensorBoard integration for ZenML.
Source code in zenml/integrations/tensorboard/__init__.py
class TensorBoardIntegration(Integration):
"""Definition of TensorBoard integration for ZenML."""
NAME = TENSORBOARD
REQUIREMENTS = ["tensorboard==2.8.0"]
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.tensorboard import services # noqa
activate()
classmethod
Activates the integration.
Source code in zenml/integrations/tensorboard/__init__.py
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.tensorboard import services # noqa
services
special
Initialization for TensorBoard services.
tensorboard_service
Implementation of the TensorBoard service.
TensorboardService (LocalDaemonService)
pydantic-model
TensorBoard service.
This can be used to start a local TensorBoard server for one or more models.
Attributes:
Name | Type | Description |
---|---|---|
SERVICE_TYPE |
ClassVar[zenml.services.service_type.ServiceType] |
a service type descriptor with information describing the TensorBoard service class |
config |
TensorboardServiceConfig |
service configuration |
endpoint |
LocalDaemonServiceEndpoint |
optional service endpoint |
Source code in zenml/integrations/tensorboard/services/tensorboard_service.py
class TensorboardService(LocalDaemonService):
"""TensorBoard service.
This can be used to start a local TensorBoard server for one or more models.
Attributes:
SERVICE_TYPE: a service type descriptor with information describing
the TensorBoard service class
config: service configuration
endpoint: optional service endpoint
"""
SERVICE_TYPE = ServiceType(
name="tensorboard",
type="visualization",
flavor="tensorboard",
description="TensorBoard visualization service",
)
config: TensorboardServiceConfig
endpoint: LocalDaemonServiceEndpoint
def __init__(
self,
config: Union[TensorboardServiceConfig, Dict[str, Any]],
**attrs: Any,
) -> None:
"""Initialization for TensorBoard service.
Args:
config: service configuration
**attrs: additional attributes
"""
# ensure that the endpoint is created before the service is initialized
# TODO [ENG-697]: implement a service factory or builder for TensorBoard
# deployment services
if (
isinstance(config, TensorboardServiceConfig)
and "endpoint" not in attrs
):
endpoint = LocalDaemonServiceEndpoint(
config=LocalDaemonServiceEndpointConfig(
protocol=ServiceEndpointProtocol.HTTP,
),
monitor=HTTPEndpointHealthMonitor(
config=HTTPEndpointHealthMonitorConfig(
healthcheck_uri_path="",
use_head_request=True,
)
),
)
attrs["endpoint"] = endpoint
super().__init__(config=config, **attrs)
def run(self) -> None:
"""Initialize and run the TensorBoard server."""
logger.info(
"Starting TensorBoard service as blocking "
"process... press CTRL+C once to stop it."
)
self.endpoint.prepare_for_start()
try:
tensorboard = program.TensorBoard(
plugins=default.get_plugins(),
subcommands=[uploader_subcommand.UploaderSubcommand()],
)
tensorboard.configure(
logdir=self.config.logdir,
port=self.endpoint.status.port,
host="localhost",
max_reload_threads=self.config.max_reload_threads,
reload_interval=self.config.reload_interval,
)
tensorboard.main()
except KeyboardInterrupt:
logger.info(
"TensorBoard service stopped. Resuming normal execution."
)
__init__(self, config, **attrs)
special
Initialization for TensorBoard service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
Union[zenml.integrations.tensorboard.services.tensorboard_service.TensorboardServiceConfig, Dict[str, Any]] |
service configuration |
required |
**attrs |
Any |
additional attributes |
{} |
Source code in zenml/integrations/tensorboard/services/tensorboard_service.py
def __init__(
self,
config: Union[TensorboardServiceConfig, Dict[str, Any]],
**attrs: Any,
) -> None:
"""Initialization for TensorBoard service.
Args:
config: service configuration
**attrs: additional attributes
"""
# ensure that the endpoint is created before the service is initialized
# TODO [ENG-697]: implement a service factory or builder for TensorBoard
# deployment services
if (
isinstance(config, TensorboardServiceConfig)
and "endpoint" not in attrs
):
endpoint = LocalDaemonServiceEndpoint(
config=LocalDaemonServiceEndpointConfig(
protocol=ServiceEndpointProtocol.HTTP,
),
monitor=HTTPEndpointHealthMonitor(
config=HTTPEndpointHealthMonitorConfig(
healthcheck_uri_path="",
use_head_request=True,
)
),
)
attrs["endpoint"] = endpoint
super().__init__(config=config, **attrs)
run(self)
Initialize and run the TensorBoard server.
Source code in zenml/integrations/tensorboard/services/tensorboard_service.py
def run(self) -> None:
"""Initialize and run the TensorBoard server."""
logger.info(
"Starting TensorBoard service as blocking "
"process... press CTRL+C once to stop it."
)
self.endpoint.prepare_for_start()
try:
tensorboard = program.TensorBoard(
plugins=default.get_plugins(),
subcommands=[uploader_subcommand.UploaderSubcommand()],
)
tensorboard.configure(
logdir=self.config.logdir,
port=self.endpoint.status.port,
host="localhost",
max_reload_threads=self.config.max_reload_threads,
reload_interval=self.config.reload_interval,
)
tensorboard.main()
except KeyboardInterrupt:
logger.info(
"TensorBoard service stopped. Resuming normal execution."
)
TensorboardServiceConfig (LocalDaemonServiceConfig)
pydantic-model
TensorBoard service configuration.
Attributes:
Name | Type | Description |
---|---|---|
logdir |
str |
location of TensorBoard log files. |
max_reload_threads |
int |
the max number of threads that TensorBoard can use to reload runs. Each thread reloads one run at a time. |
reload_interval |
int |
how often the backend should load more data, in seconds. Set to 0 to load just once at startup. |
Source code in zenml/integrations/tensorboard/services/tensorboard_service.py
class TensorboardServiceConfig(LocalDaemonServiceConfig):
"""TensorBoard service configuration.
Attributes:
logdir: location of TensorBoard log files.
max_reload_threads: the max number of threads that TensorBoard can use
to reload runs. Each thread reloads one run at a time.
reload_interval: how often the backend should load more data, in
seconds. Set to 0 to load just once at startup.
"""
logdir: str
max_reload_threads: int = 1
reload_interval: int = 5
visualizers
special
Initialization for TensorBoard visualizer.
tensorboard_visualizer
Implementation of a TensorBoard visualizer step.
TensorboardVisualizer (BaseVisualizer)
The implementation of a TensorBoard Visualizer.
Source code in zenml/integrations/tensorboard/visualizers/tensorboard_visualizer.py
class TensorboardVisualizer(BaseVisualizer):
"""The implementation of a TensorBoard Visualizer."""
@classmethod
def find_running_tensorboard_server(
cls, logdir: str
) -> Optional[TensorBoardInfo]:
"""Find a local TensorBoard server instance.
Finds when it is running for the supplied logdir location and return its
TCP port.
Args:
logdir: The logdir location where the TensorBoard server is running.
Returns:
The TensorBoardInfo describing the running TensorBoard server or
None if no server is running for the supplied logdir location.
"""
for server in get_all():
if (
server.logdir == logdir
and server.pid
and psutil.pid_exists(server.pid)
):
return server
return None
def visualize(
self,
object: StepView,
height: int = 800,
*args: Any,
**kwargs: Any,
) -> None:
"""Start a TensorBoard server.
Allows for the visualization of all models logged as artifacts by the
indicated step. The server will monitor and display all the models
logged by past and future step runs.
Args:
object: StepView fetched from run.get_step().
height: Height of the generated visualization.
*args: Additional arguments.
**kwargs: Additional keyword arguments.
"""
for _, artifact_view in object.outputs.items():
# filter out anything but model artifacts
if artifact_view.type == ModelArtifact.TYPE_NAME:
logdir = os.path.dirname(artifact_view.uri)
# first check if a TensorBoard server is already running for
# the same logdir location and use that one
running_server = self.find_running_tensorboard_server(logdir)
if running_server:
self.visualize_tensorboard(running_server.port, height)
return
if sys.platform == "win32":
# Daemon service functionality is currently not supported on Windows
print(
"You can run:\n"
f"[italic green] tensorboard --logdir {logdir}"
"[/italic green]\n"
"...to visualize the TensorBoard logs for your trained model."
)
else:
# start a new TensorBoard server
service = TensorboardService(
TensorboardServiceConfig(
logdir=logdir,
)
)
service.start(timeout=20)
if service.endpoint.status.port:
self.visualize_tensorboard(
service.endpoint.status.port, height
)
return
def visualize_tensorboard(
self,
port: int,
height: int,
) -> None:
"""Generate a visualization of a TensorBoard.
Args:
port: the TCP port where the TensorBoard server is listening for
requests.
height: Height of the generated visualization.
"""
if Environment.in_notebook():
notebook.display(port, height=height)
return
print(
"You can visit:\n"
f"[italic green] http://localhost:{port}/[/italic green]\n"
"...to visualize the TensorBoard logs for your trained model."
)
def stop(
self,
object: StepView,
) -> None:
"""Stop the TensorBoard server previously started for a pipeline step.
Args:
object: StepView fetched from run.get_step().
"""
for _, artifact_view in object.outputs.items():
# filter out anything but model artifacts
if artifact_view.type == ModelArtifact.TYPE_NAME:
logdir = os.path.dirname(artifact_view.uri)
# first check if a TensorBoard server is already running for
# the same logdir location and use that one
running_server = self.find_running_tensorboard_server(logdir)
if not running_server:
return
logger.debug(
"Stopping tensorboard server with PID '%d' ...",
running_server.pid,
)
try:
p = psutil.Process(running_server.pid)
except psutil.Error:
logger.error(
"Could not find process for PID '%d' ...",
running_server.pid,
)
continue
p.kill()
return
find_running_tensorboard_server(logdir)
classmethod
Find a local TensorBoard server instance.
Finds when it is running for the supplied logdir location and return its TCP port.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
logdir |
str |
The logdir location where the TensorBoard server is running. |
required |
Returns:
Type | Description |
---|---|
Optional[collections.TensorBoardInfo] |
The TensorBoardInfo describing the running TensorBoard server or None if no server is running for the supplied logdir location. |
Source code in zenml/integrations/tensorboard/visualizers/tensorboard_visualizer.py
@classmethod
def find_running_tensorboard_server(
cls, logdir: str
) -> Optional[TensorBoardInfo]:
"""Find a local TensorBoard server instance.
Finds when it is running for the supplied logdir location and return its
TCP port.
Args:
logdir: The logdir location where the TensorBoard server is running.
Returns:
The TensorBoardInfo describing the running TensorBoard server or
None if no server is running for the supplied logdir location.
"""
for server in get_all():
if (
server.logdir == logdir
and server.pid
and psutil.pid_exists(server.pid)
):
return server
return None
stop(self, object)
Stop the TensorBoard server previously started for a pipeline step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
object |
StepView |
StepView fetched from run.get_step(). |
required |
Source code in zenml/integrations/tensorboard/visualizers/tensorboard_visualizer.py
def stop(
self,
object: StepView,
) -> None:
"""Stop the TensorBoard server previously started for a pipeline step.
Args:
object: StepView fetched from run.get_step().
"""
for _, artifact_view in object.outputs.items():
# filter out anything but model artifacts
if artifact_view.type == ModelArtifact.TYPE_NAME:
logdir = os.path.dirname(artifact_view.uri)
# first check if a TensorBoard server is already running for
# the same logdir location and use that one
running_server = self.find_running_tensorboard_server(logdir)
if not running_server:
return
logger.debug(
"Stopping tensorboard server with PID '%d' ...",
running_server.pid,
)
try:
p = psutil.Process(running_server.pid)
except psutil.Error:
logger.error(
"Could not find process for PID '%d' ...",
running_server.pid,
)
continue
p.kill()
return
visualize(self, object, height=800, *args, **kwargs)
Start a TensorBoard server.
Allows for the visualization of all models logged as artifacts by the indicated step. The server will monitor and display all the models logged by past and future step runs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
object |
StepView |
StepView fetched from run.get_step(). |
required |
height |
int |
Height of the generated visualization. |
800 |
*args |
Any |
Additional arguments. |
() |
**kwargs |
Any |
Additional keyword arguments. |
{} |
Source code in zenml/integrations/tensorboard/visualizers/tensorboard_visualizer.py
def visualize(
self,
object: StepView,
height: int = 800,
*args: Any,
**kwargs: Any,
) -> None:
"""Start a TensorBoard server.
Allows for the visualization of all models logged as artifacts by the
indicated step. The server will monitor and display all the models
logged by past and future step runs.
Args:
object: StepView fetched from run.get_step().
height: Height of the generated visualization.
*args: Additional arguments.
**kwargs: Additional keyword arguments.
"""
for _, artifact_view in object.outputs.items():
# filter out anything but model artifacts
if artifact_view.type == ModelArtifact.TYPE_NAME:
logdir = os.path.dirname(artifact_view.uri)
# first check if a TensorBoard server is already running for
# the same logdir location and use that one
running_server = self.find_running_tensorboard_server(logdir)
if running_server:
self.visualize_tensorboard(running_server.port, height)
return
if sys.platform == "win32":
# Daemon service functionality is currently not supported on Windows
print(
"You can run:\n"
f"[italic green] tensorboard --logdir {logdir}"
"[/italic green]\n"
"...to visualize the TensorBoard logs for your trained model."
)
else:
# start a new TensorBoard server
service = TensorboardService(
TensorboardServiceConfig(
logdir=logdir,
)
)
service.start(timeout=20)
if service.endpoint.status.port:
self.visualize_tensorboard(
service.endpoint.status.port, height
)
return
visualize_tensorboard(self, port, height)
Generate a visualization of a TensorBoard.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
port |
int |
the TCP port where the TensorBoard server is listening for requests. |
required |
height |
int |
Height of the generated visualization. |
required |
Source code in zenml/integrations/tensorboard/visualizers/tensorboard_visualizer.py
def visualize_tensorboard(
self,
port: int,
height: int,
) -> None:
"""Generate a visualization of a TensorBoard.
Args:
port: the TCP port where the TensorBoard server is listening for
requests.
height: Height of the generated visualization.
"""
if Environment.in_notebook():
notebook.display(port, height=height)
return
print(
"You can visit:\n"
f"[italic green] http://localhost:{port}/[/italic green]\n"
"...to visualize the TensorBoard logs for your trained model."
)
get_step(pipeline_name, step_name)
Get the StepView for the specified pipeline and step name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name |
str |
The name of the pipeline. |
required |
step_name |
str |
The name of the step. |
required |
Returns:
Type | Description |
---|---|
StepView |
The StepView for the specified pipeline and step name. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the step is not found. |
Source code in zenml/integrations/tensorboard/visualizers/tensorboard_visualizer.py
def get_step(pipeline_name: str, step_name: str) -> StepView:
"""Get the StepView for the specified pipeline and step name.
Args:
pipeline_name: The name of the pipeline.
step_name: The name of the step.
Returns:
The StepView for the specified pipeline and step name.
Raises:
RuntimeError: If the step is not found.
"""
pipeline = get_pipeline(pipeline_name)
if pipeline is None:
raise RuntimeError(f"No pipeline with name `{pipeline_name}` was found")
last_run = pipeline.runs[-1]
step = last_run.get_step(step=step_name)
if step is None:
raise RuntimeError(
f"No pipeline step with name `{step_name}` was found in "
f"pipeline `{pipeline_name}`"
)
return cast(StepView, step)
stop_tensorboard_server(pipeline_name, step_name)
Stop the TensorBoard server previously started for a pipeline step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name |
str |
the name of the pipeline |
required |
step_name |
str |
pipeline step name |
required |
Source code in zenml/integrations/tensorboard/visualizers/tensorboard_visualizer.py
def stop_tensorboard_server(pipeline_name: str, step_name: str) -> None:
"""Stop the TensorBoard server previously started for a pipeline step.
Args:
pipeline_name: the name of the pipeline
step_name: pipeline step name
"""
step = get_step(pipeline_name, step_name)
TensorboardVisualizer().stop(step)
visualize_tensorboard(pipeline_name, step_name)
Start a TensorBoard server.
Allows for the visualization of all models logged as output by the named pipeline step. The server will monitor and display all the models logged by past and future step runs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name |
str |
the name of the pipeline |
required |
step_name |
str |
pipeline step name |
required |
Source code in zenml/integrations/tensorboard/visualizers/tensorboard_visualizer.py
def visualize_tensorboard(pipeline_name: str, step_name: str) -> None:
"""Start a TensorBoard server.
Allows for the visualization of all models logged as output by the named
pipeline step. The server will monitor and display all the models logged by
past and future step runs.
Args:
pipeline_name: the name of the pipeline
step_name: pipeline step name
"""
step = get_step(pipeline_name, step_name)
TensorboardVisualizer().visualize(step)