gRPC API
: 본 페이지는 MORAI Simulator gRPC API Example 사용 방법을 소개한다.
gRPC API는 UI를 사용하지 않고 gRPC를 이용하여 시뮬레이터를 제어 할 수 있는 기능.
사용자가 구현한 모듈에서 시뮬레이터에서 제공하는 gRPC API를 사용해서 시뮬레이터 맵 로드, NPC 차량 제어 등을 수행할 수 있고, 이를 통해 시뮬레이터에 사용자가 원하는 테스트 시나리오 환경을 구성할 수 있다.
환경설정
Example code 다운로드
github 또는 s3 등에 올린 후 다운로드 링크 제공
22.R1.2 버전과 호환되는 예제 코드
API 명세서
RPC python docs
Python 환경설정
python 버전
example code는 python 3.7.11에서 작성 및 테스트 진행
설치 해야하는 python 모듈
grpcio : 1.39.0
grpcio-tools : 1.39.0
실행 방법 및 결과
실행 방법
Morai Launcher 실행 후 로그인
Morai Launcher에서 특정 시뮬레이터 버전 설치 및 실행
시뮬레이터의 맵 선택 및 차량 선택 화면에 위치한 상태에서 example.py 스크립트 실행

실행 결과
맵이 로드되고 Ego 차량이 Cruise 모드로 주행하는 것을 확인
NPC 1, 2 차량이 생성되어 원을 그리며 주행하는 것을 확인
장애물이 2개 생성된 것을 확인
상세 설명
파일 구조
example 코드
example.py
gRPC 통신에 사용되는 proto 파일
morai_openscenario_base_pb2.py
morai_openscenario_base_pb2_grpc.py
morai_openscenario_msgs_pb2.py
morai_openscenario_msgs_pb2_grpc.py
geometry_msgs_pb2.py
geometry_msgs_pb2_grpc.py
예제 코드 동작 방식 설명
main 함수에서 GRPCClient 클래스를 생성하여 해당 클래스를 통해 gRPC API를 호출하여 시뮬레이터와 gRPC 통신을 수행
현재 시뮬레이터와 gRPC 통신을 하기 위해서는 아래와 같은 IP, Port를 사용
localhost:7789
프로그램은 바로 종료되지 않기 위해 무한 루프를 돌다가 ‘Ctrl + C’를 누르면 프로그램 종료
예제 코드 흐름
start command 호출하여 gRPC 통신을 시작
map load command 호출하여 시뮬레이터에 사용자가 원하는 맵을 로드
ego control command를 호출하여 ego를 원하는 위치로 이동
object pause 함수를 호출하여 ego 차량의 주행 시작
create npc vehicle command를 호출하여 npc 차량 생성
npc vehicle control command를 호출하여 npc 제어 명령 수행
create obstacle command를 호출하여 장애물 생성
start__status_worker를 thread로 생성하여 ego 및 npc 차량의 상태 정보를 주기적으로 수신
import os, sys
from trace import Trace
current_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.normpath(os.path.join(current_path, './')))
import morai_openscenario_base_pb2
import morai_openscenario_base_pb2_grpc
import morai_openscenario_msgs_pb2
import morai_openscenario_msgs_pb2_grpc
import geometry_msgs_pb2
import geometry_msgs_pb2_grpc
import grpc
import asyncio
import math
import threading
import time
import json
class GRPCClient():
def __init__(self, parent=None):
self.cnt = 0
self.event_loop = None
async def get_vehicle_state(self):
async with grpc.aio.insecure_channel('localhost:7789') as channel:
stub = morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub(channel)
req = morai_openscenario_base_pb2.ServiceRequest()
req.service_name = ""
req.msg = bytes("", 'utf-8')
call = stub.Connect(req)
async for resp in call:
if resp.service_name == "/morai_msgs/MultiEgoState" :
info = json.loads(str(resp.msg))
elif resp.service_name == "/morai_msgs/EgoState" :
info = json.loads(str(resp.msg))
def start__status_worker(self):
self.event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.event_loop)
try:
asyncio.get_event_loop().run_until_complete(self.get_vehicle_state())
except:
return
def send_ego_ctrl_cmd(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub) :
k_city = {"x": 200.93956082984351, "y": 1766.8720364740955, "z": 0.0, "roll": 0.0, "pitch": 0.0, "yaw": -90.0}
request = morai_openscenario_msgs_pb2.EgoCtrlCmd()
request.service_name = "/morai_msgs/EgoCtrlCmd"
request.position.x = k_city["x"]
request.position.y = k_city["y"]
request.position.z = k_city["z"]
request.rotation.x = k_city["roll"]
request.rotation.y = k_city["pitch"]
request.rotation.z = k_city["yaw"]
request.cruise_settings.cruise_on = True
request.cruise_settings.cruise_type = morai_openscenario_msgs_pb2.EgoCruiseCtrl.CONSTANT
request.cruise_settings.link_speed_ratio = 100
request.cruise_settings.constant_velocity = 50
request.velocity = 50
request.pause = True
try:
response = stub.SendEgoCtrlCmd(request)
print("SendEgoCtrlCmd - " + str(response))
except:
return
def set_multi_ego_ctrl_cmd(self, ctrl_info, unique_id, longCmdType, accel, brake, steering, velocity, acceleration) :
ctrl_info.unique_id = unique_id
ctrl_info.longCmdType = longCmdType
ctrl_info.accel = accel
ctrl_info.brake = brake
ctrl_info.steering = steering
ctrl_info.velocity = velocity
ctrl_info.acceleration = acceleration
return ctrl_info
def send_multi_ego_ctrl_cmd(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub) :
while self.cnt < 15:
self.cnt += 1
request = morai_openscenario_msgs_pb2.MultiEgoCtrlCmdList()
request.service_name = "/morai_msgs/MultiEgoCtrlCmd"
ctrl_info = morai_openscenario_msgs_pb2.MultiEgoCtrlCmd()
ctrl_info = self.set_multi_ego_ctrl_cmd(ctrl_info, "NPC_1", 1, 0.9, 0, 1, 66, 77)
request.multi_ego_ctrl_cmd.append(ctrl_info)
ctrl_info_2 = morai_openscenario_msgs_pb2.MultiEgoCtrlCmd()
ctrl_info_2 = self.set_multi_ego_ctrl_cmd(ctrl_info_2, "NPC_2", 2, 0.6, 0, 1, 30, 77)
request.multi_ego_ctrl_cmd.append(ctrl_info_2)
try:
response = stub.SendMultiEgoCtrlCmd(request)
except:
return
time.sleep(1)
def create_multi_ego_vehicle(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub) :
request = morai_openscenario_msgs_pb2.CreateMultiEgoVehicleRequestList()
request.service_name = "/morai_msgs/CreateMultiEgoVehicle"
create_info = morai_openscenario_msgs_pb2.CreateMultiEgoVehicleRequest()
create_info.unique_id = "NPC_1"
create_info.position.x = 201.1550653096617
create_info.position.y = 1726.8726365584039
create_info.position.z = 0.0
create_info.rotation.x = 0.0
create_info.rotation.y = 0.0
create_info.rotation.z = -90.0
create_info.velocity = 50
create_info.vehicleName = "2016_Hyundai_Genesis_DH"
create_info.pause = False
request.req_list.append(create_info)
create_info_2 = morai_openscenario_msgs_pb2.CreateMultiEgoVehicleRequest()
create_info_2.unique_id = "NPC_2"
create_info_2.position.x = 201.70380861467328
create_info_2.position.y = 1620.374109200712
create_info_2.position.z = 0.0
create_info_2.rotation.x = 0.0
create_info_2.rotation.y = 0.0
create_info_2.rotation.z = -90.0
create_info_2.velocity = 50
create_info_2.vehicleName = "2016_Hyundai_Ioniq"
create_info_2.pause = False
request.req_list.append(create_info_2)
try:
response = stub.SendCreateMultiEgoVehicle(request)
print("SendCreateMultiEgoVehicle" + str(response))
except:
return
def start_cmd(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub):
request = morai_openscenario_msgs_pb2.StartRequest()
request.service_name = "/morai_msgs/StartCmd"
request.cmd_start = True
try:
response = stub.Start(request)
except:
return
finally:
time.sleep(2)
def stop_cmd(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub):
request = morai_openscenario_msgs_pb2.StopRequest()
request.service_name = "/morai_msgs/StopCmd"
request.cmd_stop = True
try:
response = stub.Stop(request)
except:
return
finally:
time.sleep(2)
def load_map(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub):
request = morai_openscenario_msgs_pb2.Map()
# extra asset bundle map을 로드하려는 경우
# extra_asset_bundle_name = "las_test"
# request.map_name = f"V_Extra_Scene,{extra_asset_bundle_name}"
request.map_name = "R_KR_PG_KATRI"
request.ego_name = "2016_Hyundai_Ioniq"
try:
response = stub.LoadMap(request)
print(response)
except:
return
def send_delete_multi_ego_vehicle(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub):
request = morai_openscenario_msgs_pb2.DeleteMultiEgoVehicleRequest()
request.service_name = "/morai_msgs/DeleteMultiEgoVehicle"
request.req_delete = True
try:
response = stub.SendDeleteMultiEgoVehicle(request)
print(response)
except:
return
def object_pause(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub, object_list):
request = morai_openscenario_msgs_pb2.ObjectPauseList()
for object in object_list:
msg = morai_openscenario_msgs_pb2.ObjectPause()
msg.unique_id = object['unique_id']
msg.obj_type = morai_openscenario_msgs_pb2.ObjectPause.EGO if object['is_ego'] else morai_openscenario_msgs_pb2.ObjectPause.MULTIEGO
msg.set_pause = object['is_pause']
request.req_list.append(msg)
try:
response = stub.ObjectPause(request)
except BaseException as e:
return
def create_obstacle(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub):
request = morai_openscenario_msgs_pb2.ObstacleSpawnList()
create_info = morai_openscenario_msgs_pb2.ObstacleSpawn()
create_info.unique_id = 'obstacle_1'
create_info.position.x = 208.68639761359265
create_info.position.y = 1661.4038564971388
create_info.position.z = 0
create_info.rotation.x = 0.0
create_info.rotation.y = 0.0
create_info.rotation.z = 0.0
create_info.scale.x = 1.0
create_info.scale.y = 1.0
create_info.scale.z = 1.0
create_info.obstacle_name = 'CargoBox'
request.req_list.append(create_info)
create_info_2 = morai_openscenario_msgs_pb2.ObstacleSpawn()
create_info_2.unique_id = 'obstacle_2'
create_info_2.position.x = 212.4813728324434
create_info_2.position.y = 1605.9207673354679
create_info_2.position.z = 0
create_info_2.rotation.x = 0.0
create_info_2.rotation.y = 0.0
create_info_2.rotation.z = 0.0
create_info_2.scale.x = 1.0
create_info_2.scale.y = 1.0
create_info_2.scale.z = 1.0
create_info_2.obstacle_name = 'WoodBox'
request.req_list.append(create_info_2)
# send message & get response
try:
response = stub.CreateObstacle(request)
except BaseException as e:
return
def delete_objects(self, stub : morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub):
"""Spawn Obstacles 제거"""
req = morai_openscenario_msgs_pb2.CategoryObstacles()
req.vehicle = True
req.pedestrian = True
req.obstacle = True
req.spawn_point = True
req.map_object = True
try:
stub.DeleteSpawnObstacles(req)
except BaseException as e:
return
if __name__ == "__main__":
print('start example.')
print('press ctrl + c for exit.')
client = GRPCClient()
channel = grpc.insecure_channel('localhost:7789')
stub = morai_openscenario_base_pb2_grpc.MoraiOpenScenarioBaseServiceStub(channel)
# start command
client.start_cmd(stub)
# load map command
client.load_map(stub)
# ego Control
client.send_ego_ctrl_cmd(stub)
# resume ego vehicle
ego_object_list = []
ego_object = {
'unique_id': 'ego',
'is_ego': True,
'is_pause': False
}
ego_object_list.append(ego_object)
client.object_pause(stub, ego_object_list)
# create npc vehicle
client.create_multi_ego_vehicle(stub)
# control npc vehicle
t_stop = threading.Thread(target=client.send_multi_ego_ctrl_cmd, daemon=True, args=(stub,))
t_stop.start()
# create obstacle
client.create_obstacle(stub)
# start to receive vehicle status
t_car_status = threading.Thread(target=client.start__status_worker)
t_car_status.start()
try:
while True:
pass
# press ctrl + c for exit
except KeyboardInterrupt:
# delete all scenario objects
client.delete_objects(stub)
# stop receiving vehicle status
if client.event_loop != None:
client.event_loop.stop()
# stop command
client.stop_cmd(stub)
# gRPC channel close
channel.close()
sys.exit()