Module bento.client
Expand source code
#
# Bentobox
# SDK
# Engine gRPC Client
#
import grpc
from typing import List
from grpc import RpcError, StatusCode
from bento.protos.sim_pb2 import SimulationDef
from bento.protos.services_pb2_grpc import EngineServiceStub
from bento.protos.services_pb2 import (
GetVersionReq,
GetVersionResp,
ApplySimulationReq,
ApplySimulationResp,
GetSimulationReq,
GetSimulationResp,
ListSimulationReq,
ListSimulationResp,
DropSimulationReq,
DropSimulationResp,
GetAttributeReq,
GetAttributeResp,
SetAttributeReq,
SetAttributeResp,
StepSimulationReq,
StepSimulationResp,
)
from bento.protos.references_pb2 import AttributeRef
from bento.protos.values_pb2 import Value
def raise_native(err: RpcError):
"""Convert the given gRPC error into a native exception and raise it."""
status = err.code()
if status == StatusCode.DEADLINE_EXCEEDED:
raise TimeoutError(err.details())
elif status == StatusCode.UNIMPLEMENTED:
raise NotImplementedError(err.details())
elif status == StatusCode.INVALID_ARGUMENT:
raise ValueError(err.details())
elif status == StatusCode.NOT_FOUND:
raise LookupError(err.details())
elif status == StatusCode.ALREADY_EXISTS:
raise FileExistsError(err.details())
else:
raise RuntimeError(err.details())
class Client:
"""Client provides methods to interface and with the Engine's API.
Client sets up a gRPC channel to talk to the Engine's gRPC API providing a
facade to the RPC calls exposed by the Engine's API, automatically mapping
between native and Protobuf objects used by gRPC.
"""
def __init__(self, host: str, port: int):
"""Construct a new Client that connects to the Engine endpoint.
Args:
host: Hostname/IP of the host that runs the Engine to connect.
port: Port number that can be the Engine listens on.
"""
# try to connect to the engine's grpc endpoint
self.channel = grpc.insecure_channel(f"{host}:{port}")
# setup grpc service stubs
self.sim_grpc = EngineServiceStub(self.channel)
def connect(self, timeout_sec: int = 30) -> bool:
"""Attempt to connect the configured gRPC Engine endpoint.
Args:
timeout_sec: Max no. of seconds to try connecting to the Engine before timing out.
Returns:
True if connection attempt is successful.
Raises:
TimeoutError: If connection attempt times out.
"""
try:
grpc.channel_ready_future(self.channel).result(timeout_sec)
except grpc.FutureTimeoutError:
raise TimeoutError(f"Timed out trying to connect to Bentobox Engine")
return True
def get_version(self) -> str:
"""Get version info from the Engine instance
Returns:
VCS Commit Hash of commit which the engine was built on.
"""
try:
response = self.sim_grpc.GetVersion(GetVersionReq())
except RpcError as e:
raise_native(e)
return response.commit_hash
def apply_sim(self, simulation: SimulationDef) -> SimulationDef:
"""Apply the given simulation to the Engine.
Creates the simulation if no simulation with the same name exists,
otherwise updates the existing simulation with the same name.
Args:
simulation: Specification of the simulation to apply to the Engine.
Returns
Specification of the applied simulation with the ids set.
"""
try:
response = self.sim_grpc.ApplySimulation(
ApplySimulationReq(simulation=simulation)
)
except RpcError as e:
raise_native(e)
return response.simulation
def get_sim(self, name: str) -> SimulationDef:
"""Get the simulation with the given name
Args:
name: Name of the simulation to retrieve.
Returns:
Specification of the simulation with the given name.
Raises:
LookupError: If the no simulation with the given is name exists on the Engine.
"""
try:
response = self.sim_grpc.GetSimulation(GetSimulationReq(name=name))
except RpcError as e:
raise_native(e)
return response.simulation
def list_sims(self) -> List[str]:
"""List the names of the simulations registered in the Engine.
Returns:
List of the names of the simulations registered in the Engine.
"""
try:
response = self.sim_grpc.ListSimulation(ListSimulationReq())
except RpcError as e:
raise_native(e)
return list(response.sim_names)
def remove_sim(self, name: str):
"""Remove the simulation with the given name.
Args:
name: Name of the simulation to remove.
Raises:
LookupError: If the no simulation with the given is name exists on the Engine.
"""
try:
response = self.sim_grpc.DropSimulation(DropSimulationReq(name=name))
except RpcError as e:
raise_native(e)
def step_sim(self, name: str):
"""Run simulation with the given name for a single step
Runs the specified simulation's systems in the order they are registered.
Blocks until all systems of that simulation have finished running.
Args:
name: Name of the simulation to step.
Raises:
LookupError: If the no simulation with the given is name exists on the Engine.
"""
try:
response = self.sim_grpc.StepSimulation(StepSimulationReq(name=name))
except RpcError as e:
raise_native(e)
def get_attr(self, sim_name: str, attr_ref: AttributeRef) -> Value:
"""Get the value of the attribute referenced by the given attr_ref
Args:
sim_name: The name of the Simulation to retrieve the attribute from.
attr_ref: AttributeRef that specifies the target attribute to get.
Raises:
LookupError: If the no simulation with the given is name exists on the Engine
or if no such attribute exists for the given attr_ref.
"""
try:
response = self.sim_grpc.GetAttribute(
GetAttributeReq(
sim_name=sim_name,
attribute=attr_ref,
)
)
except RpcError as e:
raise_native(e)
return response.value
def set_attr(self, sim_name: str, attr_ref: AttributeRef, value: Value):
"""Set the attribute referenced by the given attr_ref to the given value
Args:
sim_name: The name of the Simulation to set the attribute in.
attr_ref: AttributeRef that specifies the target attribute to set.
value: The value to set the target attribute to.
Raises:
LookupError: If the no simulation with the given is name exists on the Engine
or if no such attribute exists for the given attr_ref.
ValueError: If an invalid value is given.
"""
try:
response = self.sim_grpc.SetAttribute(
SetAttributeReq(
sim_name=sim_name,
attribute=attr_ref,
value=value,
)
)
except RpcError as e:
raise_native(e)
Functions
def raise_native(err: grpc.RpcError)
-
Convert the given gRPC error into a native exception and raise it.
Expand source code
def raise_native(err: RpcError): """Convert the given gRPC error into a native exception and raise it.""" status = err.code() if status == StatusCode.DEADLINE_EXCEEDED: raise TimeoutError(err.details()) elif status == StatusCode.UNIMPLEMENTED: raise NotImplementedError(err.details()) elif status == StatusCode.INVALID_ARGUMENT: raise ValueError(err.details()) elif status == StatusCode.NOT_FOUND: raise LookupError(err.details()) elif status == StatusCode.ALREADY_EXISTS: raise FileExistsError(err.details()) else: raise RuntimeError(err.details())
Classes
class Client (host: str, port: int)
-
Client provides methods to interface and with the Engine's API.
Client sets up a gRPC channel to talk to the Engine's gRPC API providing a facade to the RPC calls exposed by the Engine's API, automatically mapping between native and Protobuf objects used by gRPC.
Construct a new Client that connects to the Engine endpoint.
Args
host
- Hostname/IP of the host that runs the Engine to connect.
port
- Port number that can be the Engine listens on.
Expand source code
class Client: """Client provides methods to interface and with the Engine's API. Client sets up a gRPC channel to talk to the Engine's gRPC API providing a facade to the RPC calls exposed by the Engine's API, automatically mapping between native and Protobuf objects used by gRPC. """ def __init__(self, host: str, port: int): """Construct a new Client that connects to the Engine endpoint. Args: host: Hostname/IP of the host that runs the Engine to connect. port: Port number that can be the Engine listens on. """ # try to connect to the engine's grpc endpoint self.channel = grpc.insecure_channel(f"{host}:{port}") # setup grpc service stubs self.sim_grpc = EngineServiceStub(self.channel) def connect(self, timeout_sec: int = 30) -> bool: """Attempt to connect the configured gRPC Engine endpoint. Args: timeout_sec: Max no. of seconds to try connecting to the Engine before timing out. Returns: True if connection attempt is successful. Raises: TimeoutError: If connection attempt times out. """ try: grpc.channel_ready_future(self.channel).result(timeout_sec) except grpc.FutureTimeoutError: raise TimeoutError(f"Timed out trying to connect to Bentobox Engine") return True def get_version(self) -> str: """Get version info from the Engine instance Returns: VCS Commit Hash of commit which the engine was built on. """ try: response = self.sim_grpc.GetVersion(GetVersionReq()) except RpcError as e: raise_native(e) return response.commit_hash def apply_sim(self, simulation: SimulationDef) -> SimulationDef: """Apply the given simulation to the Engine. Creates the simulation if no simulation with the same name exists, otherwise updates the existing simulation with the same name. Args: simulation: Specification of the simulation to apply to the Engine. Returns Specification of the applied simulation with the ids set. """ try: response = self.sim_grpc.ApplySimulation( ApplySimulationReq(simulation=simulation) ) except RpcError as e: raise_native(e) return response.simulation def get_sim(self, name: str) -> SimulationDef: """Get the simulation with the given name Args: name: Name of the simulation to retrieve. Returns: Specification of the simulation with the given name. Raises: LookupError: If the no simulation with the given is name exists on the Engine. """ try: response = self.sim_grpc.GetSimulation(GetSimulationReq(name=name)) except RpcError as e: raise_native(e) return response.simulation def list_sims(self) -> List[str]: """List the names of the simulations registered in the Engine. Returns: List of the names of the simulations registered in the Engine. """ try: response = self.sim_grpc.ListSimulation(ListSimulationReq()) except RpcError as e: raise_native(e) return list(response.sim_names) def remove_sim(self, name: str): """Remove the simulation with the given name. Args: name: Name of the simulation to remove. Raises: LookupError: If the no simulation with the given is name exists on the Engine. """ try: response = self.sim_grpc.DropSimulation(DropSimulationReq(name=name)) except RpcError as e: raise_native(e) def step_sim(self, name: str): """Run simulation with the given name for a single step Runs the specified simulation's systems in the order they are registered. Blocks until all systems of that simulation have finished running. Args: name: Name of the simulation to step. Raises: LookupError: If the no simulation with the given is name exists on the Engine. """ try: response = self.sim_grpc.StepSimulation(StepSimulationReq(name=name)) except RpcError as e: raise_native(e) def get_attr(self, sim_name: str, attr_ref: AttributeRef) -> Value: """Get the value of the attribute referenced by the given attr_ref Args: sim_name: The name of the Simulation to retrieve the attribute from. attr_ref: AttributeRef that specifies the target attribute to get. Raises: LookupError: If the no simulation with the given is name exists on the Engine or if no such attribute exists for the given attr_ref. """ try: response = self.sim_grpc.GetAttribute( GetAttributeReq( sim_name=sim_name, attribute=attr_ref, ) ) except RpcError as e: raise_native(e) return response.value def set_attr(self, sim_name: str, attr_ref: AttributeRef, value: Value): """Set the attribute referenced by the given attr_ref to the given value Args: sim_name: The name of the Simulation to set the attribute in. attr_ref: AttributeRef that specifies the target attribute to set. value: The value to set the target attribute to. Raises: LookupError: If the no simulation with the given is name exists on the Engine or if no such attribute exists for the given attr_ref. ValueError: If an invalid value is given. """ try: response = self.sim_grpc.SetAttribute( SetAttributeReq( sim_name=sim_name, attribute=attr_ref, value=value, ) ) except RpcError as e: raise_native(e)
Methods
def apply_sim(self, simulation: SimulationDef) ‑> SimulationDef
-
Apply the given simulation to the Engine. Creates the simulation if no simulation with the same name exists, otherwise updates the existing simulation with the same name.
Args
simulation
- Specification of the simulation to apply to the Engine.
Returns Specification of the applied simulation with the ids set.
Expand source code
def apply_sim(self, simulation: SimulationDef) -> SimulationDef: """Apply the given simulation to the Engine. Creates the simulation if no simulation with the same name exists, otherwise updates the existing simulation with the same name. Args: simulation: Specification of the simulation to apply to the Engine. Returns Specification of the applied simulation with the ids set. """ try: response = self.sim_grpc.ApplySimulation( ApplySimulationReq(simulation=simulation) ) except RpcError as e: raise_native(e) return response.simulation
def connect(self, timeout_sec: int = 30) ‑> bool
-
Attempt to connect the configured gRPC Engine endpoint.
Args
timeout_sec
- Max no. of seconds to try connecting to the Engine before timing out.
Returns
True if connection attempt is successful.
Raises
TimeoutError
- If connection attempt times out.
Expand source code
def connect(self, timeout_sec: int = 30) -> bool: """Attempt to connect the configured gRPC Engine endpoint. Args: timeout_sec: Max no. of seconds to try connecting to the Engine before timing out. Returns: True if connection attempt is successful. Raises: TimeoutError: If connection attempt times out. """ try: grpc.channel_ready_future(self.channel).result(timeout_sec) except grpc.FutureTimeoutError: raise TimeoutError(f"Timed out trying to connect to Bentobox Engine") return True
def get_attr(self, sim_name: str, attr_ref: AttributeRef) ‑> Value
-
Get the value of the attribute referenced by the given attr_ref
Args
sim_name
- The name of the Simulation to retrieve the attribute from.
attr_ref
- AttributeRef that specifies the target attribute to get.
Raises
LookupError
- If the no simulation with the given is name exists on the Engine or if no such attribute exists for the given attr_ref.
Expand source code
def get_attr(self, sim_name: str, attr_ref: AttributeRef) -> Value: """Get the value of the attribute referenced by the given attr_ref Args: sim_name: The name of the Simulation to retrieve the attribute from. attr_ref: AttributeRef that specifies the target attribute to get. Raises: LookupError: If the no simulation with the given is name exists on the Engine or if no such attribute exists for the given attr_ref. """ try: response = self.sim_grpc.GetAttribute( GetAttributeReq( sim_name=sim_name, attribute=attr_ref, ) ) except RpcError as e: raise_native(e) return response.value
def get_sim(self, name: str) ‑> SimulationDef
-
Get the simulation with the given name
Args
name
- Name of the simulation to retrieve.
Returns
Specification of the simulation with the given name.
Raises
LookupError
- If the no simulation with the given is name exists on the Engine.
Expand source code
def get_sim(self, name: str) -> SimulationDef: """Get the simulation with the given name Args: name: Name of the simulation to retrieve. Returns: Specification of the simulation with the given name. Raises: LookupError: If the no simulation with the given is name exists on the Engine. """ try: response = self.sim_grpc.GetSimulation(GetSimulationReq(name=name)) except RpcError as e: raise_native(e) return response.simulation
def get_version(self) ‑> str
-
Get version info from the Engine instance
Returns
VCS Commit Hash of commit which the engine was built on.
Expand source code
def get_version(self) -> str: """Get version info from the Engine instance Returns: VCS Commit Hash of commit which the engine was built on. """ try: response = self.sim_grpc.GetVersion(GetVersionReq()) except RpcError as e: raise_native(e) return response.commit_hash
def list_sims(self) ‑> List[str]
-
List the names of the simulations registered in the Engine.
Returns
List of the names of the simulations registered in the Engine.
Expand source code
def list_sims(self) -> List[str]: """List the names of the simulations registered in the Engine. Returns: List of the names of the simulations registered in the Engine. """ try: response = self.sim_grpc.ListSimulation(ListSimulationReq()) except RpcError as e: raise_native(e) return list(response.sim_names)
def remove_sim(self, name: str)
-
Remove the simulation with the given name.
Args
name
- Name of the simulation to remove.
Raises
LookupError
- If the no simulation with the given is name exists on the Engine.
Expand source code
def remove_sim(self, name: str): """Remove the simulation with the given name. Args: name: Name of the simulation to remove. Raises: LookupError: If the no simulation with the given is name exists on the Engine. """ try: response = self.sim_grpc.DropSimulation(DropSimulationReq(name=name)) except RpcError as e: raise_native(e)
def set_attr(self, sim_name: str, attr_ref: AttributeRef, value: Value)
-
Set the attribute referenced by the given attr_ref to the given value
Args
sim_name
- The name of the Simulation to set the attribute in.
attr_ref
- AttributeRef that specifies the target attribute to set.
value
- The value to set the target attribute to.
Raises
LookupError
- If the no simulation with the given is name exists on the Engine or if no such attribute exists for the given attr_ref.
ValueError
- If an invalid value is given.
Expand source code
def set_attr(self, sim_name: str, attr_ref: AttributeRef, value: Value): """Set the attribute referenced by the given attr_ref to the given value Args: sim_name: The name of the Simulation to set the attribute in. attr_ref: AttributeRef that specifies the target attribute to set. value: The value to set the target attribute to. Raises: LookupError: If the no simulation with the given is name exists on the Engine or if no such attribute exists for the given attr_ref. ValueError: If an invalid value is given. """ try: response = self.sim_grpc.SetAttribute( SetAttributeReq( sim_name=sim_name, attribute=attr_ref, value=value, ) ) except RpcError as e: raise_native(e)
def step_sim(self, name: str)
-
Run simulation with the given name for a single step
Runs the specified simulation's systems in the order they are registered. Blocks until all systems of that simulation have finished running.
Args
name
- Name of the simulation to step.
Raises
LookupError
- If the no simulation with the given is name exists on the Engine.
Expand source code
def step_sim(self, name: str): """Run simulation with the given name for a single step Runs the specified simulation's systems in the order they are registered. Blocks until all systems of that simulation have finished running. Args: name: Name of the simulation to step. Raises: LookupError: If the no simulation with the given is name exists on the Engine. """ try: response = self.sim_grpc.StepSimulation(StepSimulationReq(name=name)) except RpcError as e: raise_native(e)