tf.distribute.experimental.rpc.Client
Stay organized with collections
Save and categorize content based on your preferences.
Client class for invoking RPCs to the server.
Methods
call
View source
call(
method_name: str,
args: Optional[Sequence[core_tf_types.Tensor]] = None,
output_specs=None,
timeout_in_ms=0
)
Method for making RPC calls to remote server.
This invokes RPC to the server, executing the registered method_name
remotely.
Args:
method_name: Remote registered method to invoke
args: List of arguments for the registered method.
output_specs: Output specs for the output from method.
For example, if tf.function is: @tf.function(input_signature=[
tf.TensorSpec([], tf.int32), tf.TensorSpec([], tf.int32) ])
def multiply_fn(a, b): return tf.math.multiply(a, b)
output_spec is: tf.TensorSpec((), tf.int32) If you have access to TF
Function, the output specs can be generated
from tf.function by calling: output_specs =
tf.nest.map_structure(tf.type_spec_from_value,
tf_function.get_concrete_function().structured_outputs If output_specs
are not provided, flattened list of tensors will be returned in
response.
timeout_in_ms: Timeout for this call. If 0, default client timeout will be
used.
Returns |
An instance of StatusOrResult class with the following available
methods.
is_ok() :
Returns True of RPC was successful.
get_error() :
Returns TF error_code and error message for the RPC.
get_value() :
Returns the returned value from remote TF function execution
when RPC is successful.
Calling any of the above methods will block till RPC is completed and
result is available.
|
create
View source
@staticmethod
create(
rpc_layer, address, name='', timeout_in_ms=0
)
Create TF RPC client to connect to the given address.
Args |
rpc_layer
|
Communication layer between client and server. Only "grpc" rpc
layer is supported at the moment.
|
address
|
Address of the server to connect the RPC client to.
|
name
|
Name of the RPC Client. You can create multiple clients connecting
to same server and distinguish them using different names.
|
timeout_in_ms
|
The default timeout to use for outgoing RPCs from client. 0
indicates no timeout. Exceeding timeout during RPC will raise
DeadlineExceeded error.
|
Returns |
An instance of tf.distribute.experimental.rpc.Client with the following
dynamically added methods for eagerly created clients:
Registered methods e.g. multiply(**args):
If Client is created when executing eagerly, client will request the
list of registered methods from server during client creation.
The convenience methods for RPCs will be dynamically added to the
created Client instance.
For example, when a server has method "multiply" registered, the
client object created in eager mode will have 'multiply' method
available. Users can use client.multiply(..) to make RPC, instead of
client.call("multiply", ...)
Both "call" and "multiply" methods are non-blocking i.e. they return
a StatusOrResult object which should be used to wait for getting
value or error.
Along with the above, blocking versions of the registered
methods are also dynamically added to client instance.
e.g. multiply_blocking(**args). These methods block till the RPC is
finished and return response for successful RPC. Otherwise raise
exception.
These methods are not available when Client is created inside a
tf.function.
|
Raises |
A ValueError if rpc_layer other than "grpc" is used. Only GRPC
is supported at the moment.
A DeadlineExceeded exception in eager mode if timeout exceeds while
creating and listing client methods.
|
Example usage |
>>> # Have server already started.
>>> import portpicker
>>> @tf.function(input_signature=[
... tf.TensorSpec([], tf.int32),
... tf.TensorSpec([], tf.int32)])
... def remote_fn(a, b):
... return tf.add(a, b)
port = portpicker.pick_unused_port()
address = "localhost:{}".format(port)
server = tf.distribute.experimental.rpc.Server.create("grpc", address)
server.register("addition", remote_fn)
server.start()
# Start client
client = tf.distribute.experimental.rpc.Client.create("grpc",
address=address, name="test_client")
a = tf.constant(2, dtype=tf.int32)
b = tf.constant(3, dtype=tf.int32)
result = client.call(
args=[a, b],
method_name="addition",
output_specs=tf.TensorSpec((), tf.int32))
if result.is_ok():
result.get_value()
result = client.addition(a, b)
if result.is_ok():
result.get_value()
value = client.addition_blocking(a, b)
|
Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates. Some content is licensed under the numpy license.
Last updated 2024-04-26 UTC.
[[["Easy to understand","easyToUnderstand","thumb-up"],["Solved my problem","solvedMyProblem","thumb-up"],["Other","otherUp","thumb-up"]],[["Missing the information I need","missingTheInformationINeed","thumb-down"],["Too complicated / too many steps","tooComplicatedTooManySteps","thumb-down"],["Out of date","outOfDate","thumb-down"],["Samples / code issue","samplesCodeIssue","thumb-down"],["Other","otherDown","thumb-down"]],["Last updated 2024-04-26 UTC."],[],[],null,["# tf.distribute.experimental.rpc.Client\n\n\u003cbr /\u003e\n\n|---------------------------------------------------------------------------------------------------------------------------------------------------|\n| [View source on GitHub](https://github.com/tensorflow/tensorflow/blob/v2.16.1/tensorflow/python/distribute/experimental/rpc/rpc_ops.py#L117-L257) |\n\nClient class for invoking RPCs to the server.\n\nMethods\n-------\n\n### `call`\n\n[View source](https://github.com/tensorflow/tensorflow/blob/v2.16.1/tensorflow/python/distribute/experimental/rpc/rpc_ops.py#L217-L257) \n\n call(\n method_name: str,\n args: Optional[Sequence[core_tf_types.Tensor]] = None,\n output_specs=None,\n timeout_in_ms=0\n )\n\nMethod for making RPC calls to remote server.\n\nThis invokes RPC to the server, executing the registered method_name\nremotely.\nArgs:\nmethod_name: Remote registered method to invoke\nargs: List of arguments for the registered method.\noutput_specs: Output specs for the output from method.\nFor example, if tf.function is: @tf.function(input_signature=\\[\ntf.TensorSpec(\\[\\], tf.int32), tf.TensorSpec(\\[\\], tf.int32) \\])\ndef multiply_fn(a, b): return tf.math.multiply(a, b)\noutput_spec is: tf.TensorSpec((), tf.int32) If you have access to TF\nFunction, the output specs can be generated\nfrom tf.function by calling: output_specs =\ntf.nest.map_structure(tf.type_spec_from_value,\ntf_function.get_concrete_function().structured_outputs If output_specs\nare not provided, flattened list of tensors will be returned in\nresponse.\ntimeout_in_ms: Timeout for this call. If 0, default client timeout will be\nused.\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n| Returns ||\n|---|---|\n| An instance of `StatusOrResult` class with the following available methods. \u003cbr /\u003e - `is_ok()`: Returns True of RPC was successful. - `get_error()`: Returns TF error_code and error message for the RPC. - `get_value()`: Returns the returned value from remote TF function execution when RPC is successful. Calling any of the above methods will block till RPC is completed and result is available. ||\n\n\u003cbr /\u003e\n\n### `create`\n\n[View source](https://github.com/tensorflow/tensorflow/blob/v2.16.1/tensorflow/python/distribute/experimental/rpc/rpc_ops.py#L121-L215) \n\n @staticmethod\n create(\n rpc_layer, address, name='', timeout_in_ms=0\n )\n\nCreate TF RPC client to connect to the given address.\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n| Args ||\n|-----------------|---------------------------------------------------------------------------------------------------------------------------------------------------|\n| `rpc_layer` | Communication layer between client and server. Only \"grpc\" rpc layer is supported at the moment. |\n| `address` | Address of the server to connect the RPC client to. |\n| `name` | Name of the RPC Client. You can create multiple clients connecting to same server and distinguish them using different names. |\n| `timeout_in_ms` | The default timeout to use for outgoing RPCs from client. 0 indicates no timeout. Exceeding timeout during RPC will raise DeadlineExceeded error. |\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n| Returns ||\n|---|---|\n| An instance of [`tf.distribute.experimental.rpc.Client`](../../../../tf/distribute/experimental/rpc/Client) with the following dynamically added methods for eagerly created clients: \u003cbr /\u003e - `Registered methods` e.g. multiply(\\*\\*args): If Client is created when executing eagerly, client will request the list of registered methods from server during client creation. The convenience methods for RPCs will be dynamically added to the created Client instance. For example, when a server has method \"multiply\" registered, the client object created in eager mode will have 'multiply' method available. Users can use client.multiply(..) to make RPC, instead of client.call(\"multiply\", ...) Both \"call\" and \"multiply\" methods are non-blocking i.e. they return a StatusOrResult object which should be used to wait for getting value or error. Along with the above, blocking versions of the registered methods are also dynamically added to client instance. e.g. multiply_blocking(\\*\\*args). These methods block till the RPC is finished and return response for successful RPC. Otherwise raise exception. These methods are not available when Client is created inside a tf.function. ||\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n| Raises ||\n|---|---|\n| A ValueError if rpc_layer other than \"grpc\" is used. Only GRPC is supported at the moment. A DeadlineExceeded exception in eager mode if timeout exceeds while creating and listing client methods. ||\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e\n\n| Example usage ||\n|---|---|\n| \u003cbr /\u003e \u003e\u003e\u003e # Have server already started. \u003e\u003e\u003e import portpicker \u003e\u003e\u003e @tf.function(input_signature=[ ... tf.TensorSpec([], tf.int32), ... tf.TensorSpec([], tf.int32)]) ... def remote_fn(a, b): ... return tf.add(a, b) port = portpicker.pick_unused_port() address = \"localhost:{}\".format(port) server = tf.distribute.experimental.rpc.Server.create(\"grpc\", address) server.register(\"addition\", remote_fn) server.start() # Start client client = tf.distribute.experimental.rpc.Client.create(\"grpc\", address=address, name=\"test_client\") a = tf.constant(2, dtype=tf.int32) b = tf.constant(3, dtype=tf.int32) result = client.call( args=[a, b], method_name=\"addition\", output_specs=tf.TensorSpec((), tf.int32)) if result.is_ok(): result.get_value() result = client.addition(a, b) if result.is_ok(): result.get_value() value = client.addition_blocking(a, b) \u003cbr /\u003e ||\n\n\u003cbr /\u003e"]]