Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AP_DDS: Goal topic publisher #28372

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from geopy import point
from ardupilot_msgs.srv import ArmMotors
from ardupilot_msgs.srv import ModeSwitch
from geographic_msgs.msg import GeoPointStamped


PLANE_MODE_TAKEOFF = 13
Expand Down Expand Up @@ -78,6 +79,15 @@ def __init__(self):

self._subscription_geopose = self.create_subscription(GeoPoseStamped, self._geopose_topic, self.geopose_cb, qos)
self._cur_geopose = GeoPoseStamped()

self.declare_parameter("goal_topic", "/ap/goal_lla")
self._goal_topic = self.get_parameter("goal_topic").get_parameter_value().string_value
qos = rclpy.qos.QoSProfile(
reliability=rclpy.qos.ReliabilityPolicy.RELIABLE, durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, depth=1
)

self._subscription_goal = self.create_subscription(GeoPointStamped, self._goal_topic, self.goal_cb, qos)
self._cur_goal = GeoPointStamped()

def geopose_cb(self, msg: GeoPoseStamped):
"""Process a GeoPose message."""
Expand All @@ -87,6 +97,15 @@ def geopose_cb(self, msg: GeoPoseStamped):

# Store current state
self._cur_geopose = msg

def goal_cb(self, msg: GeoPointStamped):
"""Process a Goal message."""
stamp = msg.header.stamp
self.get_logger().info("From AP : Goal [sec:{}, nsec: {}, lat:{} lon:{}]"
.format(stamp.sec, stamp.nanosec,msg.position.latitude, msg.position.longitude))

# Store current state
self._cur_goal = msg

def arm(self):
req = ArmMotors.Request()
Expand Down Expand Up @@ -127,6 +146,10 @@ def switch_mode_with_timeout(self, desired_mode: int, timeout: rclpy.duration.Du
def get_cur_geopose(self):
"""Return latest geopose."""
return self._cur_geopose

def get_cur_goal(self):
"""Return latest goal."""
return self._cur_goal

def send_goal_position(self, goal_global_pos):
"""Send goal position. Must be in guided for this to work."""
Expand All @@ -148,6 +171,15 @@ def achieved_goal(goal_global_pos, cur_geopose):
print(f"Goal is {euclidian_distance} meters away")
return euclidian_distance < 150

def going_to_goal(goal_global_pos, cur_goal):
p1 = (goal_global_pos.latitude, goal_global_pos.longitude, goal_global_pos.altitude)
cur_pos_lla = cur_goal.position
p2 = (cur_pos_lla.latitude, cur_pos_lla.longitude, cur_pos_lla.altitude)

flat_distance = distance.distance(p1[:2], p2[:2]).m
euclidian_distance = math.sqrt(flat_distance**2 + (p2[2] - p1[2]) ** 2)
print(f"Commanded and received goal are {euclidian_distance} meters away")
return euclidian_distance < 1

def main(args=None):
"""Node entry point."""
Expand Down Expand Up @@ -191,11 +223,15 @@ def main(args=None):

start = node.get_clock().now()
has_achieved_goal = False
is_going_to_goal = False
while not has_achieved_goal and node.get_clock().now() - start < rclpy.duration.Duration(seconds=120):
rclpy.spin_once(node)
is_going_to_goal = going_to_goal(goal_pos, node.get_cur_goal())
has_achieved_goal = achieved_goal(goal_pos, node.get_cur_geopose())
time.sleep(1.0)

if not is_going_to_goal:
raise RuntimeError("Unable to go to goal location")
if not has_achieved_goal:
raise RuntimeError("Unable to achieve goal location")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Copyright 2023 ArduPilot.org.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""
Bring up ArduPilot SITL and check the Goal GeoPointStamped message is being published.

colcon test --packages-select ardupilot_dds_tests \
--event-handlers=console_cohesion+ --pytest-args -k test_goal_lla_msg_received

"""

import launch_pytest
import math
import pytest
import rclpy
import rclpy.node
import threading

from launch import LaunchDescription

from launch_pytest.tools import process as process_tools

from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSDurabilityPolicy

from geographic_msgs.msg import GeoPointStamped

TOPIC = "ap/goal_lla"


class GeoPointListener(rclpy.node.Node):
"""Subscribe to GeoPointStamped messages."""

def __init__(self):
"""Initialise the node."""
super().__init__("geopoint_listener")
self.msg_event_object = threading.Event()

# Declare and acquire `topic` parameter
self.declare_parameter("topic", TOPIC)
self.topic = self.get_parameter("topic").get_parameter_value().string_value

def start_subscriber(self):
"""Start the subscriber."""
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
depth=1,
)

self.subscription = self.create_subscription(GeoPointStamped, self.topic, self.subscriber_callback, qos_profile)

# Add a spin thread.
self.ros_spin_thread = threading.Thread(target=lambda node: rclpy.spin(node), args=(self,))
self.ros_spin_thread.start()

def subscriber_callback(self, msg):
"""Process a GeoPointStamped message."""
print("-----> Got Message")
self.msg_event_object.set()


@launch_pytest.fixture
def launch_sitl_copter_dds_serial(sitl_copter_dds_serial):
"""Fixture to create the launch description."""
sitl_ld, sitl_actions = sitl_copter_dds_serial

ld = LaunchDescription(
[
sitl_ld,
launch_pytest.actions.ReadyToTest(),
]
)
actions = sitl_actions
yield ld, actions


@launch_pytest.fixture
def launch_sitl_copter_dds_udp(sitl_copter_dds_udp):
"""Fixture to create the launch description."""
sitl_ld, sitl_actions = sitl_copter_dds_udp

ld = LaunchDescription(
[
sitl_ld,
launch_pytest.actions.ReadyToTest(),
]
)
actions = sitl_actions
yield ld, actions


@pytest.mark.launch(fixture=launch_sitl_copter_dds_serial)
def test_dds_serial_goal_msg_recv(launch_context, launch_sitl_copter_dds_serial):
"""Test position messages are published by AP_DDS."""
_, actions = launch_sitl_copter_dds_serial
virtual_ports = actions["virtual_ports"].action
micro_ros_agent = actions["micro_ros_agent"].action
mavproxy = actions["mavproxy"].action
sitl = actions["sitl"].action

# Wait for process to start.
process_tools.wait_for_start_sync(launch_context, virtual_ports, timeout=2)
process_tools.wait_for_start_sync(launch_context, micro_ros_agent, timeout=2)
process_tools.wait_for_start_sync(launch_context, mavproxy, timeout=2)
process_tools.wait_for_start_sync(launch_context, sitl, timeout=2)

rclpy.init()
try:
node = GeoPointListener()
node.start_subscriber()
msgs_received_flag = node.msg_event_object.wait(timeout=10.0)
assert msgs_received_flag, f"Did not receive '{TOPIC}' msgs."
finally:
rclpy.shutdown()
yield


@pytest.mark.launch(fixture=launch_sitl_copter_dds_udp)
def test_dds_udp_goal_msg_recv(launch_context, launch_sitl_copter_dds_udp):
"""Test position messages are published by AP_DDS."""
_, actions = launch_sitl_copter_dds_udp
micro_ros_agent = actions["micro_ros_agent"].action
mavproxy = actions["mavproxy"].action
sitl = actions["sitl"].action

# Wait for process to start.
process_tools.wait_for_start_sync(launch_context, micro_ros_agent, timeout=2)
process_tools.wait_for_start_sync(launch_context, mavproxy, timeout=2)
process_tools.wait_for_start_sync(launch_context, sitl, timeout=2)

rclpy.init()
try:
node = GeoPointListener()
Ryanf55 marked this conversation as resolved.
Show resolved Hide resolved
node.start_subscriber()
msgs_received_flag = node.msg_event_object.wait(timeout=10.0)
assert msgs_received_flag, f"Did not receive '{TOPIC}' msgs."
finally:
rclpy.shutdown()
yield
56 changes: 56 additions & 0 deletions libraries/AP_DDS/AP_DDS_Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ static constexpr uint16_t DELAY_AIRSPEED_TOPIC_MS = AP_DDS_DELAY_AIRSPEED_TOPIC_
#if AP_DDS_GEOPOSE_PUB_ENABLED
static constexpr uint16_t DELAY_GEO_POSE_TOPIC_MS = AP_DDS_DELAY_GEO_POSE_TOPIC_MS;
#endif // AP_DDS_GEOPOSE_PUB_ENABLED
#if AP_DDS_GOAL_PUB_ENABLED
static constexpr uint16_t DELAY_GOAL_TOPIC_MS = 200;
#endif // AP_DDS_GOAL_PUB_ENABLED
#if AP_DDS_CLOCK_PUB_ENABLED
static constexpr uint16_t DELAY_CLOCK_TOPIC_MS =AP_DDS_DELAY_CLOCK_TOPIC_MS;
#endif // AP_DDS_CLOCK_PUB_ENABLED
Expand Down Expand Up @@ -536,6 +539,35 @@ void AP_DDS_Client::update_topic(geographic_msgs_msg_GeoPoseStamped& msg)
}
#endif // AP_DDS_GEOPOSE_PUB_ENABLED

#if AP_DDS_GOAL_PUB_ENABLED
bool AP_DDS_Client::update_topic_goal(geographic_msgs_msg_GeoPointStamped& msg)
{
const auto &vehicle = AP::vehicle();
update_topic(msg.header.stamp);
Location target_loc;
vehicle->get_target_location(target_loc);
msg.position.latitude = target_loc.lat * 1e-7;
msg.position.longitude = target_loc.lng * 1e-7;
msg.position.altitude = target_loc.alt * 1e-2;

// Check whether the goal has changed or if the topic has never been published.
const double tolerance_lat_lon = 1e-8; // One order of magnitude smaller than the target's resolution.
const double distance_alt = 1e-3;
if (abs(msg.position.latitude - prev_goal_msg.position.latitude) > tolerance_lat_lon ||
abs(msg.position.longitude - prev_goal_msg.position.longitude) > tolerance_lat_lon ||
abs(msg.position.altitude - prev_goal_msg.position.altitude) > distance_alt ||
prev_goal_msg.header.stamp.sec == 0 ) {
update_topic(prev_goal_msg.header.stamp);
prev_goal_msg.position.latitude = msg.position.latitude;
prev_goal_msg.position.longitude = msg.position.longitude;
prev_goal_msg.position.altitude = msg.position.altitude;
return true;
} else {
return false;
}
}
#endif // AP_DDS_GOAL_PUB_ENABLED

#if AP_DDS_IMU_PUB_ENABLED
void AP_DDS_Client::update_topic(sensor_msgs_msg_Imu& msg)
{
Expand Down Expand Up @@ -1256,6 +1288,22 @@ void AP_DDS_Client::write_gps_global_origin_topic()
}
#endif // AP_DDS_GPS_GLOBAL_ORIGIN_PUB_ENABLED

#if AP_DDS_GOAL_PUB_ENABLED
void AP_DDS_Client::write_goal_topic()
{
WITH_SEMAPHORE(csem);
if (connected) {
ucdrBuffer ub {};
const uint32_t topic_size = geographic_msgs_msg_GeoPointStamped_size_of_topic(&goal_topic, 0);
uxr_prepare_output_stream(&session, reliable_out, topics[to_underlying(TopicIndex::GOAL_PUB)].dw_id, &ub, topic_size);
const bool success = geographic_msgs_msg_GeoPointStamped_serialize_topic(&ub, &goal_topic);
if (!success) {
// AP_HAL::panic("FATAL: DDS_Client failed to serialize\n");
}
}
}
#endif // AP_DDS_GOAL_PUB_ENABLED

void AP_DDS_Client::update()
{
WITH_SEMAPHORE(csem);
Expand Down Expand Up @@ -1335,6 +1383,14 @@ void AP_DDS_Client::update()
write_gps_global_origin_topic();
}
#endif // AP_DDS_GPS_GLOBAL_ORIGIN_PUB_ENABLED
#if AP_DDS_GOAL_PUB_ENABLED
if (cur_time_ms - last_goal_time_ms > DELAY_GOAL_TOPIC_MS) {
if (update_topic_goal(goal_topic)) {
write_goal_topic();
}
last_goal_time_ms = cur_time_ms;
}
#endif // AP_DDS_GOAL_PUB_ENABLED

status_ok = uxr_run_session_time(&session, 1);
}
Expand Down
10 changes: 10 additions & 0 deletions libraries/AP_DDS/AP_DDS_Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ class AP_DDS_Client
static void update_topic(geographic_msgs_msg_GeoPointStamped& msg);
# endif // AP_DDS_GPS_GLOBAL_ORIGIN_PUB_ENABLED

#if AP_DDS_GOAL_PUB_ENABLED
geographic_msgs_msg_GeoPointStamped goal_topic;
// The last ms timestamp AP_DDS wrote a goal message
uint64_t last_goal_time_ms;
//! @brief Serialize the current goal and publish to the IO stream(s)
void write_goal_topic();
bool update_topic_goal(geographic_msgs_msg_GeoPointStamped& msg);
geographic_msgs_msg_GeoPointStamped prev_goal_msg;
# endif // AP_DDS_GOAL_PUB_ENABLED

#if AP_DDS_GEOPOSE_PUB_ENABLED
geographic_msgs_msg_GeoPoseStamped geo_pose_topic;
// The last ms timestamp AP_DDS wrote a GeoPose message
Expand Down
21 changes: 21 additions & 0 deletions libraries/AP_DDS/AP_DDS_Topic_Table.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ enum class TopicIndex: uint8_t {
#if AP_DDS_GEOPOSE_PUB_ENABLED
GEOPOSE_PUB,
#endif // AP_DDS_GEOPOSE_PUB_ENABLED
#ifdef AP_DDS_GOAL_PUB_ENABLED
GOAL_PUB,
#endif // AP_DDS_GOAL_PUB_ENABLED
#if AP_DDS_CLOCK_PUB_ENABLED
CLOCK_PUB,
#endif // AP_DDS_CLOCK_PUB_ENABLED
Expand Down Expand Up @@ -232,6 +235,24 @@ constexpr struct AP_DDS_Client::Topic_table AP_DDS_Client::topics[] = {
},
},
#endif // AP_DDS_GEOPOSE_PUB_ENABLED
#if AP_DDS_GOAL_PUB_ENABLED
{
.topic_id = to_underlying(TopicIndex::GOAL_PUB),
.pub_id = to_underlying(TopicIndex::GOAL_PUB),
.sub_id = to_underlying(TopicIndex::GOAL_PUB),
.dw_id = uxrObjectId{.id=to_underlying(TopicIndex::GOAL_PUB), .type=UXR_DATAWRITER_ID},
.dr_id = uxrObjectId{.id=to_underlying(TopicIndex::GOAL_PUB), .type=UXR_DATAREADER_ID},
.topic_rw = Topic_rw::DataWriter,
.topic_name = "rt/ap/goal_lla",
.type_name = "geographic_msgs::msg::dds_::GeoPointStamped_",
.qos = {
.durability = UXR_DURABILITY_TRANSIENT_LOCAL,
.reliability = UXR_RELIABILITY_RELIABLE,
.history = UXR_HISTORY_KEEP_LAST,
.depth = 1,
},
},
#endif // AP_DDS_GOAL_PUB_ENABLED
#if AP_DDS_CLOCK_PUB_ENABLED
{
.topic_id = to_underlying(TopicIndex::CLOCK_PUB),
Expand Down
4 changes: 4 additions & 0 deletions libraries/AP_DDS/AP_DDS_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@
#define AP_DDS_DELAY_CLOCK_TOPIC_MS 10
#endif

#ifndef AP_DDS_GOAL_PUB_ENABLED
#define AP_DDS_GOAL_PUB_ENABLED 1
#endif

#ifndef AP_DDS_JOY_SUB_ENABLED
#define AP_DDS_JOY_SUB_ENABLED 1
#endif
Expand Down
Loading