tf.data.experimental.service.DispatchServer
Stay organized with collections
Save and categorize content based on your preferences.
An in-process tf.data service dispatch server.
tf.data.experimental.service.DispatchServer(
config=None, start=True
)
A tf.data.experimental.service.DispatchServer
coordinates a cluster of
tf.data.experimental.service.WorkerServer
s. When the workers start, they
register themselves with the dispatcher.
dispatcher = tf.data.experimental.service.DispatchServer()
dispatcher_address = dispatcher.target.split("://")[1]
worker = tf.data.experimental.service.WorkerServer(
tf.data.experimental.service.WorkerConfig(
dispatcher_address=dispatcher_address))
dataset = tf.data.Dataset.range(10)
dataset = dataset.apply(tf.data.experimental.service.distribute(
processing_mode="parallel_epochs", service=dispatcher.target))
print(list(dataset.as_numpy_iterator()))
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
When starting a dedicated tf.data dispatch process, use join() to block
after starting up the server, until the server terminates.
dispatcher = tf.data.experimental.service.DispatchServer(
tf.data.experimental.service.DispatcherConfig(port=5050))
dispatcher.join()
Call stop() to gracefully terminate the dispatcher. The server automatically
stops when all reference to it have been deleted.
To start a DispatchServer
in fault-tolerant mode, set work_dir
and
fault_tolerant_mode
like below:
dispatcher = tf.data.experimental.service.DispatchServer(
tf.data.experimental.service.DispatcherConfig(
port=5050,
work_dir="gs://my-bucket/dispatcher/work_dir",
fault_tolerant_mode=True))
Args |
config
|
(Optional.) A tf.data.experimental.service.DispatcherConfig
configration. If None , the dispatcher will use default
configuration values.
|
start
|
(Optional.) Boolean, indicating whether to start the server after
creating it. Defaults to True.
|
Attributes |
target
|
Returns a target that can be used to connect to the server.
dispatcher = tf.data.experimental.service.DispatchServer()
dataset = tf.data.Dataset.range(10)
dataset = dataset.apply(tf.data.experimental.service.distribute(
processing_mode="parallel_epochs", service=dispatcher.target))
The returned string will be in the form protocol://address, e.g.
"grpc://localhost:5050".
|
Methods
join
View source
join() -> None
Blocks until the server has shut down.
This is useful when starting a dedicated dispatch process.
dispatcher = tf.data.experimental.service.DispatchServer(
tf.data.experimental.service.DispatcherConfig(port=5050))
dispatcher.join()
Raises |
tf.errors.OpError
|
Or one of its subclasses if an error occurs while
joining the server.
|
start
View source
start()
Starts this server.
dispatcher = tf.data.experimental.service.DispatchServer(start=False)
dispatcher.start()
Raises |
tf.errors.OpError
|
Or one of its subclasses if an error occurs while
starting the server.
|
stop
View source
stop() -> None
Stops the server.
Raises |
tf.errors.OpError
|
Or one of its subclasses if an error occurs while
stopping the server.
|
Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates. Some content is licensed under the numpy license.
Last updated 2024-04-26 UTC.
[[["Easy to understand","easyToUnderstand","thumb-up"],["Solved my problem","solvedMyProblem","thumb-up"],["Other","otherUp","thumb-up"]],[["Missing the information I need","missingTheInformationINeed","thumb-down"],["Too complicated / too many steps","tooComplicatedTooManySteps","thumb-down"],["Out of date","outOfDate","thumb-down"],["Samples / code issue","samplesCodeIssue","thumb-down"],["Other","otherDown","thumb-down"]],["Last updated 2024-04-26 UTC."],[],[],null,["# tf.data.experimental.service.DispatchServer\n\n\u003cbr /\u003e\n\n|----------------------------------------------------------------------------------------------------------------------------------------------------|\n| [View source on GitHub](https://github.com/tensorflow/tensorflow/blob/v2.16.1/tensorflow/python/data/experimental/service/server_lib.py#L130-L289) |\n\nAn in-process tf.data service dispatch server. \n\n tf.data.experimental.service.DispatchServer(\n config=None, start=True\n )\n\nA [`tf.data.experimental.service.DispatchServer`](../../../../tf/data/experimental/service/DispatchServer) coordinates a cluster of\n[`tf.data.experimental.service.WorkerServer`](../../../../tf/data/experimental/service/WorkerServer)s. When the workers start, they\nregister themselves with the dispatcher. \n\n dispatcher = tf.data.experimental.service.DispatchServer()\n dispatcher_address = dispatcher.target.split(\"://\")[1]\n worker = tf.data.experimental.service.WorkerServer(\n tf.data.experimental.service.WorkerConfig(\n dispatcher_address=dispatcher_address))\n dataset = tf.data.Dataset.range(10)\n dataset = dataset.apply(tf.data.experimental.service.distribute(\n processing_mode=\"parallel_epochs\", service=dispatcher.target))\n print(list(dataset.as_numpy_iterator()))\n [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]\n\nWhen starting a dedicated tf.data dispatch process, use join() to block\nafter starting up the server, until the server terminates. \n\n dispatcher = tf.data.experimental.service.DispatchServer(\n tf.data.experimental.service.DispatcherConfig(port=5050))\n dispatcher.join()\n\nCall stop() to gracefully terminate the dispatcher. The server automatically\nstops when all reference to it have been deleted.\n\nTo start a `DispatchServer` in fault-tolerant mode, set `work_dir` and\n`fault_tolerant_mode` like below: \n\n dispatcher = tf.data.experimental.service.DispatchServer(\n tf.data.experimental.service.DispatcherConfig(\n port=5050,\n work_dir=\"gs://my-bucket/dispatcher/work_dir\",\n fault_tolerant_mode=True))\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n| Args ---- ||\n|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|\n| `config` | (Optional.) A [`tf.data.experimental.service.DispatcherConfig`](../../../../tf/data/experimental/service/DispatcherConfig) configration. If `None`, the dispatcher will use default configuration values. |\n| `start` | (Optional.) Boolean, indicating whether to start the server after creating it. Defaults to True. |\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n| Attributes ---------- ||\n|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|\n| `target` | Returns a target that can be used to connect to the server. \u003cbr /\u003e dispatcher = tf.data.experimental.service.DispatchServer() dataset = tf.data.Dataset.range(10) dataset = dataset.apply(tf.data.experimental.service.distribute( processing_mode=\"parallel_epochs\", service=dispatcher.target)) The returned string will be in the form protocol://address, e.g. \"grpc://localhost:5050\". |\n\n\u003cbr /\u003e\n\nMethods\n-------\n\n### `join`\n\n[View source](https://github.com/tensorflow/tensorflow/blob/v2.16.1/tensorflow/python/data/experimental/service/server_lib.py#L221-L236) \n\n join() -\u003e None\n\nBlocks until the server has shut down.\n\nThis is useful when starting a dedicated dispatch process. \n\n dispatcher = tf.data.experimental.service.DispatchServer(\n tf.data.experimental.service.DispatcherConfig(port=5050))\n dispatcher.join()\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n| Raises ||\n|-------------------------------------------------------------------------------------|-----------------------------------------------------------------------|\n| [`tf.errors.OpError`](https://www.tensorflow.org/api_docs/python/tf/errors/OpError) | Or one of its subclasses if an error occurs while joining the server. |\n\n\u003cbr /\u003e\n\n### `start`\n\n[View source](https://github.com/tensorflow/tensorflow/blob/v2.16.1/tensorflow/python/data/experimental/service/server_lib.py#L209-L219) \n\n start()\n\nStarts this server. \n\n dispatcher = tf.data.experimental.service.DispatchServer(start=False)\n dispatcher.start()\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n| Raises ||\n|-------------------------------------------------------------------------------------|------------------------------------------------------------------------|\n| [`tf.errors.OpError`](https://www.tensorflow.org/api_docs/python/tf/errors/OpError) | Or one of its subclasses if an error occurs while starting the server. |\n\n\u003cbr /\u003e\n\n### `stop`\n\n[View source](https://github.com/tensorflow/tensorflow/blob/v2.16.1/tensorflow/python/data/experimental/service/server_lib.py#L238-L245) \n\n stop() -\u003e None\n\nStops the server.\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n| Raises ||\n|-------------------------------------------------------------------------------------|------------------------------------------------------------------------|\n| [`tf.errors.OpError`](https://www.tensorflow.org/api_docs/python/tf/errors/OpError) | Or one of its subclasses if an error occurs while stopping the server. |\n\n\u003cbr /\u003e"]]