RPC接口测试
Python实现RPC Client测试各个RPC接口
import grpc
import robot_pb2
import robot_pb2_grpc
def print_resp_data(rpc_resp):
"""
把所有RPC返回值打印
"""
if hasattr(rpc_resp, "data"):
print(rpc_resp.data)
if hasattr(rpc_resp, "ret"):
print(rpc_resp.ret)
if hasattr(rpc_resp, "data1"):
print(rpc_resp.data1)
if hasattr(rpc_resp, "data2"):
print(rpc_resp.data2)
def getPosAndPose():
"""
TODO 暂未实现
获取当前位姿
"""
resp = stub.getPosAndPose(robot_pb2.ReqInt(req=0))
print_resp_data(resp.data)
def MoveA():
"""
MoveA
"""
resp = stub.MoveA(robot_pb2.ReqMoveA(
rjoint=joint,
rspeed=speed
))
print_resp_data(resp)
def MoveJ():
"""
MoveJ
"""
resp = stub.MoveJ(robot_pb2.ReqMoveJ(
rpose=pose,
rspeed=speed,
rzone=zone,
rtool=tool,
wobj=wobj
))
print_resp_data(resp)
def TGetRobotNum():
"""
获取机器人个数
"""
resp = stub.TGetRobotNum(robot_pb2.ReqVoid())
print_resp_data(resp)
def TStreamRobotJoint():
"""
流式传输,获取机械臂关节
"""
resp = stub.TStreamRobotJoint(robot_pb2.ReqVoid())
for r in resp:
print_resp_data(r)
def TStreamRobotCartesian():
"""
"""
resp = stub.TStreamRobotCartesian(robot_pb2.ReqVoid())
for r in resp:
print_resp_data(r)
def TRobotHome():
"""
机械臂回零点位置
"""
resp = stub.TRobotHome(robot_pb2.ReqVoid())
print_resp_data(resp)
def ConnectRPC():
"""
测试RPC服务状态
"""
resp = stub.ConnectRPC(robot_pb2.ReqVoid())
print_resp_data(resp)
def MoveStart():
"""
机械臂上电
"""
resp = stub.MoveStart(robot_pb2.ReqVoid())
print_resp_data(resp)
def MoveStop():
"""
机械臂下电
"""
resp = stub.MoveStop(robot_pb2.ReqVoid())
print_resp_data(resp)
def MoveL():
"""
MoveL
"""
resp = stub.MoveL(robot_pb2.ReqMoveL(rpose=pose,
rspeed=speed,
rzone=zone,
rtool=tool,
wobj=wobj))
print_resp_data(resp)
def MoveC():
"""
MoveC
"""
resp = stub.MoveC(robot_pb2.ReqMoveC(rpose=pose,
rpose_mid=pose2,
rspeed=speed,
rzone=zone,
rtool=tool,
wobj=wobj))
print_resp_data(resp)
def getRobotJoint():
"""
获取机械臂当前关节
"""
resp = stub.getRobotJoint(robot_pb2.ReqInt(req=1))
print_resp_data(resp)
def Offs():
"""
TODO
"""
resp = stub.Offs(robot_pb2.ReqArraydouble2(reqarr1=joint,
reqarr2=joint))
print_resp_data(resp)
def robot_getDOF():
"""
获取机器人自由度
"""
resp = stub.robot_getDOF(robot_pb2.ReqInt(req=0))
print_resp_data(resp)
def additionaxis_getDOF():
"""
获取机器人附加轴数量
"""
resp = stub.additionaxis_getDOF(robot_pb2.ReqInt(req=0))
print_resp_data(resp)
def robot_getNUM():
"""
获取机器人数量
"""
resp = stub.robot_getNUM(robot_pb2.ReqVoid())
print_resp_data(resp)
if __name__ == '__main__':
# 创建RPC Client
channel = grpc.insecure_channel("192.168.1.167:50051")
stub = robot_pb2_grpc.RobotServiceStub(channel)
# 定义一些常量
speed = robot_pb2.HyySpeed(dof=6, per_flag=2, tcp_flag=0, per=[2] * 10, tcp=0.5, orl=0.5)
pose = stub.ReadRobpose(robot_pb2.ReqString(reqstr="p2"))
pose2 = stub.ReadRobpose(robot_pb2.ReqString(reqstr="p3"))
zone = stub.ReadZone(robot_pb2.ReqString(reqstr="z0"))
tool = stub.ReadTool(robot_pb2.ReqString(reqstr="tool0"))
wobj = stub.ReadWobj(robot_pb2.ReqString(reqstr="wobj0"))
joint = stub.ReadRobjoint(robot_pb2.ReqString(reqstr="jhhh"))
# 测试各个RPC接口
# MoveL()
# MoveC()
# TRobotHome()