metacar.sceneapi 源代码

import logging
from pathlib import Path
from pydantic import TypeAdapter, Field
from typing import Annotated
from .sockets import ModelSocket, StreamingSocket, ConnectionClosedError
from .geometry import Vector3
from .models import (
    CameraFrame,
    SimCarMsgOutput,
    VehicleControl,
    VehicleControlDTO,
    RoadInfo,
    SceneStaticData,
    VLAExtensionOutput,
    Code1,
    Code2,
    Code3,
    Code4,
    Code5,
)

logger = logging.getLogger(__name__)


[文档] class SceneAPI: """SceneAPI 是与仿真环境通信的主要接口。 该类封装了与仿真环境建立连接、获取场景信息、读取车辆状态以及发送控制命令等功能。 使用流程通常是:创建实例 -> 连接 -> 获取静态数据 -> 进入主循环获取动态数据并发送控制命令。 """ def __init__(self): """初始化 SceneAPI 实例,但不会立即连接。 需要调用 connect() 方法与仿真环境建立连接。 """ self._move_to_start = 0 self._move_to_end = 0 self._model_socket = ModelSocket("127.0.0.1", 5061) self._streaming_socket = StreamingSocket("127.0.0.1", 5063) def _load_static_data(self, code1: Code1): """读取文件内容,组装场景静态信息。 从指定的地图配置中读取路径文件和地图文件,解析后组装成场景静态信息。 :param code1: 场景发送的 code1 消息 """ map_info = code1.map_info dir_path = Path(map_info.path) route_path = dir_path / map_info.route with route_path.open("rb") as route_file: route = TypeAdapter(list[Vector3]).validate_json(route_file.read()) map_path = dir_path / map_info.map with map_path.open("rb") as map_file: road_lines = TypeAdapter(list[RoadInfo]).validate_json(map_file.read()) self._scene_static_data = SceneStaticData( route=route, roads=road_lines, sub_scenes=map_info.sub_scenes, vla_extension=code1.vla_extension, )
[文档] def connect(self): """与场景建立连接,会产生阻塞,直到与场景连接成功。 此方法会阻塞执行,直到成功与仿真环境建立连接并完成握手。 连接成功后会加载场景静态数据,可通过 get_scene_static_data() 获取。 """ self._model_socket.accept() # 连接 json socket self._streaming_socket.accept() # 连接视频流 code1: Code1 = self._model_socket.recv(Code1) self._load_static_data(code1)
[文档] def get_scene_static_data(self): """获取场景静态信息,仅在 connect() 函数调用后可用 此方法返回加载的场景静态数据,包括路线、道路信息和子场景信息。 必须在调用 connect() 方法后才能使用。 :return: 场景静态数据 """ return self._scene_static_data
[文档] def main_loop(self): """生成器,每次迭代返回 :class:`~metacar.models.SimCarMsg` 和图像帧,场景结束时退出。 此方法是一个生成器,每次迭代会返回当前的仿真车辆消息和摄像头图像帧。 当场景结束或连接中断时,生成器会自动退出。 :return: 元组 (sim_car_msg, frames),其中: - sim_car_msg: :class:`~metacar.models.SimCarMsg` 对象,包含车辆状态、传感器数据等信息 - frames: 当前相机视图的列表,每个元素为 :class:`~metacar.models.CameraFrame` 对象 """ # 先发送 code2,告知场景已经就绪 self._model_socket.send(Code2(code=2), Code2) # 进入主循环,持续从场景接收消息 try: while True: message: Code3 | Code5 = self._model_socket.recv( Annotated[Code3 | Code5, Field(discriminator="code")] ) if isinstance(message, Code5): logger.info("场景结束") return sim_car_msg = message.sim_car_msg frames = [ CameraFrame(id=camera_info.id, frame=self._streaming_socket.recv()) for camera_info in sim_car_msg.sensor.ego_rgb_cams ] yield sim_car_msg, frames except ConnectionClosedError: logger.warning("连接中断,退出场景") return finally: self._model_socket.close() self._streaming_socket.close()
[文档] def set_vehicle_control( self, vc: VehicleControl, vla_extension: VLAExtensionOutput | None = None ): """发送车辆控制命令到仿真环境 将给定的车辆控制命令发送到场景,用于控制车辆的油门、刹车、转向等行为。 :param vc: 车辆控制命令,包含油门、刹车、转向等参数 :param vla_extension: VLA 相关的输出,非 VLA 场景为 None """ vc_dto = VehicleControlDTO( **vc.model_dump(), move_to_start=self._move_to_start, move_to_end=self._move_to_end, ) sim_car_msg = SimCarMsgOutput( vehicle_control=vc_dto, vla_extension=vla_extension ) self._model_socket.send(Code4(code=4, sim_car_msg=sim_car_msg), Code4)
[文档] def retry_level(self): """重试关卡 增加重试关卡计数器,在下一次发送控制命令时会通知场景重试当前关卡。 """ self._move_to_start += 1 logger.info("重试关卡")
[文档] def skip_level(self): """跳过关卡 增加跳过关卡计数器,在下一次发送控制命令时会通知场景跳过当前关卡。 """ self._move_to_end += 1 logger.info("跳过关卡")