RPC接口测试

Python实现RPC Client测试各个RPC接口

import grpc

import robot_pb2
import robot_pb2_grpc


def print_resp_data(rpc_resp):
    """
    把所有RPC返回值打印
    """
    if hasattr(rpc_resp, "data"):
        print(rpc_resp.data)
    if hasattr(rpc_resp, "ret"):
        print(rpc_resp.ret)
    if hasattr(rpc_resp, "data1"):
        print(rpc_resp.data1)
    if hasattr(rpc_resp, "data2"):
        print(rpc_resp.data2)


def getPosAndPose():
    """
    TODO 暂未实现
    获取当前位姿
    """
    resp = stub.getPosAndPose(robot_pb2.ReqInt(req=0))
    print_resp_data(resp.data)


def MoveA():
    """
    MoveA
    """
    resp = stub.MoveA(robot_pb2.ReqMoveA(
        rjoint=joint,
        rspeed=speed
    ))
    print_resp_data(resp)


def MoveJ():
    """
    MoveJ
    """
    resp = stub.MoveJ(robot_pb2.ReqMoveJ(
        rpose=pose,
        rspeed=speed,
        rzone=zone,
        rtool=tool,
        wobj=wobj
    ))
    print_resp_data(resp)


def TGetRobotNum():
    """
    获取机器人个数
    """
    resp = stub.TGetRobotNum(robot_pb2.ReqVoid())
    print_resp_data(resp)


def TStreamRobotJoint():
    """
    流式传输,获取机械臂关节
    """
    resp = stub.TStreamRobotJoint(robot_pb2.ReqVoid())
    for r in resp:
        print_resp_data(r)


def TStreamRobotCartesian():
    """
    """
    resp = stub.TStreamRobotCartesian(robot_pb2.ReqVoid())
    for r in resp:
        print_resp_data(r)


def TRobotHome():
    """
    机械臂回零点位置
    """
    resp = stub.TRobotHome(robot_pb2.ReqVoid())
    print_resp_data(resp)


def ConnectRPC():
    """
    测试RPC服务状态
    """
    resp = stub.ConnectRPC(robot_pb2.ReqVoid())
    print_resp_data(resp)


def MoveStart():
    """
    机械臂上电
    """
    resp = stub.MoveStart(robot_pb2.ReqVoid())
    print_resp_data(resp)


def MoveStop():
    """
    机械臂下电
    """
    resp = stub.MoveStop(robot_pb2.ReqVoid())
    print_resp_data(resp)


def MoveL():
    """
    MoveL
    """
    resp = stub.MoveL(robot_pb2.ReqMoveL(rpose=pose,
                                         rspeed=speed,
                                         rzone=zone,
                                         rtool=tool,
                                         wobj=wobj))
    print_resp_data(resp)


def MoveC():
    """
    MoveC
    """
    resp = stub.MoveC(robot_pb2.ReqMoveC(rpose=pose,
                                         rpose_mid=pose2,
                                         rspeed=speed,
                                         rzone=zone,
                                         rtool=tool,
                                         wobj=wobj))
    print_resp_data(resp)


def getRobotJoint():
    """
    获取机械臂当前关节
    """
    resp = stub.getRobotJoint(robot_pb2.ReqInt(req=1))
    print_resp_data(resp)


def Offs():
    """
    TODO
    """
    resp = stub.Offs(robot_pb2.ReqArraydouble2(reqarr1=joint,
                                               reqarr2=joint))
    print_resp_data(resp)


def robot_getDOF():
    """
    获取机器人自由度
    """
    resp = stub.robot_getDOF(robot_pb2.ReqInt(req=0))
    print_resp_data(resp)


def additionaxis_getDOF():
    """
    获取机器人附加轴数量
    """
    resp = stub.additionaxis_getDOF(robot_pb2.ReqInt(req=0))
    print_resp_data(resp)


def robot_getNUM():
    """
    获取机器人数量
    """
    resp = stub.robot_getNUM(robot_pb2.ReqVoid())
    print_resp_data(resp)


if __name__ == '__main__':
    # 创建RPC Client
    channel = grpc.insecure_channel("192.168.1.167:50051")
    stub = robot_pb2_grpc.RobotServiceStub(channel)
    # 定义一些常量
    speed = robot_pb2.HyySpeed(dof=6, per_flag=2, tcp_flag=0, per=[2] * 10, tcp=0.5, orl=0.5)
    pose = stub.ReadRobpose(robot_pb2.ReqString(reqstr="p2"))
    pose2 = stub.ReadRobpose(robot_pb2.ReqString(reqstr="p3"))
    zone = stub.ReadZone(robot_pb2.ReqString(reqstr="z0"))
    tool = stub.ReadTool(robot_pb2.ReqString(reqstr="tool0"))
    wobj = stub.ReadWobj(robot_pb2.ReqString(reqstr="wobj0"))
    joint = stub.ReadRobjoint(robot_pb2.ReqString(reqstr="jhhh"))
    # 测试各个RPC接口
    # MoveL()
    # MoveC()
    # TRobotHome()