Adapting Data Sources¶
The data source integrates the orchestrator, and must instantiate it and provide a ROS node for it. The ROS node must not be shared with functionality of the data source, which usually results in 2 ROS nodes being instantiated in the data source. Both nodes may be executed by the same executor, which must also be provided to the orchestrator to enable it to process inputs while waiting for events.
The following changes need to be made:
Call
Orchestrator.initialize_ros_communication()
once the data source’s own publishers and subscribers are set up.Call
Orchestrator.wait_until_publish_allowed()
before publishing any message, and wait until the returned Future is ready before publishing. This does not apply to messages on the/clock
topic.Call
Orchestrator.wait_until_time_publish_allowed()
with the specific message before publishing it on/clock
, and wait until the returned Future is ready before publishing.Call
Orchestrator.wait_until_dataprovider_state_update_allowed()
and wait until the returned Future is ready before changing internal state, such as by executing a simulation timestep which may depend on external inputs.Call
Orchestrator.wait_until_pending_actions_complete()
before exiting.
Orchestrator Class Reference¶
- class orchestrator.lib.Orchestrator(ros_node: Node, executor: Executor, node_config: List[NodeModel], logger: RcutilsLogger | None = None, timing_analysis: bool = False, state_sequence_recording: bool = False, intercepted_topic_callback: Callable[[str, type, Any], None] | None = None)¶
The orchestrator.
- Parameters:
ros_node – A ROS node instance for the orchestrator. Should be separate to the hosting node to avoid name conflicts. Used to create subscriptions and publishers for topic interception.
executor – The executor which should be used to spin the above node while waiting for callbacks.
node_config – Node models loaded from launch+node configuration files.
logger – Logger for orchestrator logs. If not provided, default logger of ros_node is used.
timing_analysis – Log time from wait_until… API call to future resolution.
state_sequence_recording – See https://uulm-mrm.github.io/ros2_def/dev_docs/debugging.html#state-sequences
intercepted_topic_callback – A callback that is executed when a message on an intercepted topic was received, e.g. for logging purposes. Parameters are the topic, topic type and the message. Message may be serialized.
- dataprovider_publish(topic: str, message: Any) None ¶
Gives a message to the orchestrator before it will actually be published. If it is a Status message, it must not be published after it was given to dataprovider_publish! Other types of messages must be published normally after this function is called.
- dump_state_sequence() None ¶
Dump recorded state sequence json files for all nodes
- initialize_ros_communication() None ¶
Subscribe to all required ros topics as per launch-config.
If a topic is not found, or a subscriber/publisher does not exist, this will wait for it and spin the node while waiting.
- plot_graph() None ¶
Display interactive visualization of current callback graph. Requires netgraph, which is optional otherwise.
- reconfigure(new_node_config: List[NodeModel]) None ¶
- Reconfiguration workflow:1. Stop providing data (messages/time)2. Wait until reconfiguration is allowed:
wait_until_reconfiguration_allowed()
3. Apply reconfiguration (stop nodes, start nodes, change communication topology)4. Load new node configs: orchestrator.reconfigure()5. Continue providing data
- wait_until_dataprovider_state_update_allowed() Future ¶
Data provider should spin until this future completes before modifying its state outside of any callbacks, such as calculating the next timestep in the simulator.
- wait_until_pending_actions_complete() Future ¶
Blocks until no actions are expected anymore (until the callback graph is empty).
- wait_until_publish_allowed(topic: str) Future ¶
Get a future that will be complete once the specified topic can be published. The caller should spin the executor of the RosNode while waiting, otherwise the future may never be done (and processing might not continue).
- wait_until_reconfiguration_allowed() Future ¶
Reconfiguration must only occur after the returned future has been completed and before aditional inputs are provided, to guarantee that it won’t interfere with regular processing.
- wait_until_time_publish_allowed(t: Time, allow_jump: bool = False) Future ¶
Get a future that will be complete once the specified timestamp can be published. The caller should spin the executor of the RosNode while waiting, otherwise the future may never be done (and processing might not continue).