Adapting Data Sources

The data source integrates the orchestrator, and must instantiate it and provide a ROS node for it. The ROS node must not be shared with functionality of the data source, which usually results in 2 ROS nodes being instantiated in the data source. Both nodes may be executed by the same executor, which must also be provided to the orchestrator to enable it to process inputs while waiting for events.

The following changes need to be made:

Orchestrator Class Reference

class orchestrator.lib.Orchestrator(ros_node: Node, executor: Executor, node_config: List[NodeModel], logger: RcutilsLogger | None = None, timing_analysis: bool = False, state_sequence_recording: bool = False, intercepted_topic_callback: Callable[[str, type, Any], None] | None = None)

The orchestrator.

Parameters:
  • ros_node – A ROS node instance for the orchestrator. Should be separate to the hosting node to avoid name conflicts. Used to create subscriptions and publishers for topic interception.

  • executor – The executor which should be used to spin the above node while waiting for callbacks.

  • node_config – Node models loaded from launch+node configuration files.

  • logger – Logger for orchestrator logs. If not provided, default logger of ros_node is used.

  • timing_analysis – Log time from wait_until… API call to future resolution.

  • state_sequence_recording – See https://uulm-mrm.github.io/ros2_def/dev_docs/debugging.html#state-sequences

  • intercepted_topic_callback – A callback that is executed when a message on an intercepted topic was received, e.g. for logging purposes. Parameters are the topic, topic type and the message. Message may be serialized.

dataprovider_publish(topic: str, message: Any) None

Gives a message to the orchestrator before it will actually be published. If it is a Status message, it must not be published after it was given to dataprovider_publish! Other types of messages must be published normally after this function is called.

dump_state_sequence() None

Dump recorded state sequence json files for all nodes

initialize_ros_communication() None

Subscribe to all required ros topics as per launch-config.

If a topic is not found, or a subscriber/publisher does not exist, this will wait for it and spin the node while waiting.

plot_graph() None

Display interactive visualization of current callback graph. Requires netgraph, which is optional otherwise.

reconfigure(new_node_config: List[NodeModel]) None
Reconfiguration workflow:
1. Stop providing data (messages/time)
2. Wait until reconfiguration is allowed: wait_until_reconfiguration_allowed()
3. Apply reconfiguration (stop nodes, start nodes, change communication topology)
4. Load new node configs: orchestrator.reconfigure()
5. Continue providing data
wait_until_dataprovider_state_update_allowed() Future

Data provider should spin until this future completes before modifying its state outside of any callbacks, such as calculating the next timestep in the simulator.

wait_until_pending_actions_complete() Future

Blocks until no actions are expected anymore (until the callback graph is empty).

wait_until_publish_allowed(topic: str) Future

Get a future that will be complete once the specified topic can be published. The caller should spin the executor of the RosNode while waiting, otherwise the future may never be done (and processing might not continue).

wait_until_reconfiguration_allowed() Future

Reconfiguration must only occur after the returned future has been completed and before aditional inputs are provided, to guarantee that it won’t interfere with regular processing.

wait_until_time_publish_allowed(t: Time, allow_jump: bool = False) Future

Get a future that will be complete once the specified timestamp can be published. The caller should spin the executor of the RosNode while waiting, otherwise the future may never be done (and processing might not continue).