Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New RRT based path planner #247

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions spot_rl_experiments/configs/cg_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ FILL_UP_GRID_LOCATION: [[0, 0, 0, 0]]
DILATION_SIZE: 1
OCCUPANCY_SCALE: 10
DISTANCE_THRESHOLD_TO_ADD_POINT: 1.0 # The distance threshold to add waypoints in the navigation waypoints
PLANNER_TYPE: "A_STAR" #OR RRT_I
86 changes: 62 additions & 24 deletions spot_rl_experiments/spot_rl/utils/path_planning.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
map_y_from_cg_to_grid,
pkl,
)
from spot_rl.utils.rrtsconnect import rrt_connect_informed

PATH_TO_CONFIG_FILE = osp.join(
osp.dirname(osp.dirname(osp.dirname(osp.abspath(__file__)))),
Expand All @@ -43,6 +44,8 @@

DISTANCE_THRESHOLD_TO_ADD_POINT = cg_config["DISTANCE_THRESHOLD_TO_ADD_POINT"]

PLANNER_TYPE = cg_config["PLANNER_TYPE"]


def pick_points(pcd):
print("")
Expand Down Expand Up @@ -311,11 +314,18 @@ def path_planning_using_a_star(
)
goal_in_grid = np.array(goal_in_grid)
print(f"start pos {start_in_grid}, goal in grid {goal_in_grid}")
path = astar(
dilated_occupancy_grid,
(start_in_grid[0], start_in_grid[1]),
(goal_in_grid[0], goal_in_grid[1]),
)
if PLANNER_TYPE == "A_STAR":
path = astar(
dilated_occupancy_grid,
(start_in_grid[0], start_in_grid[1]),
(goal_in_grid[0], goal_in_grid[1]),
)
elif PLANNER_TYPE == "RRT_I":
path, start_tree, goal_tree = rrt_connect_informed(
dilated_occupancy_grid,
(start_in_grid[0], start_in_grid[1]),
(goal_in_grid[0], goal_in_grid[1]),
)

occupancy_grid_visualization = cv2.circle(
occupancy_grid_visualization,
Expand Down Expand Up @@ -351,27 +361,55 @@ def path_planning_using_a_star(
(238, 211, 14),
1,
)
if len(path) > 2:
filter_path = convert_path_to_real_waypoints(
path, occupancy_min_x, occupancy_min_y
)
for iter, waypoint in enumerate(filter_path):
x = floor(
map_x_from_cg_to_grid(waypoint[0], occupancy_min_x, occupancy_max_x)
* occupancy_scale
if PLANNER_TYPE == "A_STAR":
if len(path) > 2:
filter_path = convert_path_to_real_waypoints(
path, occupancy_min_x, occupancy_min_y
)
y = floor(
map_y_from_cg_to_grid(waypoint[1], occupancy_min_y, occupancy_max_y)
* occupancy_scale
)
occupancy_grid_visualization = cv2.circle(
occupancy_grid_visualization,
(y, x),
0,
(0, 255, 0),
-1,
for iter, waypoint in enumerate(filter_path):
x = floor(
map_x_from_cg_to_grid(waypoint[0], occupancy_min_x, occupancy_max_x)
* occupancy_scale
)
y = floor(
map_y_from_cg_to_grid(waypoint[1], occupancy_min_y, occupancy_max_y)
* occupancy_scale
)
occupancy_grid_visualization = cv2.circle(
occupancy_grid_visualization,
(y, x),
0,
(0, 255, 0),
-1,
)
bestpath = path
else:
if len(path) > 0:
filter_path = convert_path_to_real_waypoints(
path, occupancy_min_x, occupancy_min_y
)
bestpath = path
# Prepare points for polylines
points = []
for waypoint in filter_path:
x = floor(
map_x_from_cg_to_grid(waypoint[0], occupancy_min_x, occupancy_max_x)
* occupancy_scale
)
y = floor(
map_y_from_cg_to_grid(waypoint[1], occupancy_min_y, occupancy_max_y)
* occupancy_scale
)
bestpath = path
points.append((y, x)) # Store in (y, x) format for OpenCV

for waypoint in points:
occupancy_grid_visualization = cv2.circle(
occupancy_grid_visualization,
waypoint,
1,
(0, 255, 0),
-1,
)
if other_view_poses:
for view_pose in other_view_poses:
x = floor(
Expand Down
265 changes: 265 additions & 0 deletions spot_rl_experiments/spot_rl/utils/rrtsconnect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
import math
import random

import numpy as np


# Creating a node class initializing position and assigning parent as None. Like Dijkstra, we set the initial cost to be positive infinity
class Node:
def __init__(self, position):
self.x = position[0]
self.y = position[1]
self.parent = None
self.cost = float("inf")


# def heuristic(a, b,grid):
# return np.linalg.norm(np.array([a.x, a.y]) - np.array([b.x, b.y]))
def heuristic(a, b, grid):
"""
In this heurestic , we will get nodes A being parent and B being the child and compute the L2 distance along with cost
The heuristic considers a penalty for obstacles by calling the obstacle_cost function,
which assesses how close the child node b is to any obstacles in the grid.This forms the informed heurestic along with distance
"""
# Calculate the Euclidean distance
distance = np.linalg.norm(np.array([a.x, a.y]) - np.array([b.x, b.y]))
# calculate penalty
penalty = obstacle_cost(grid, (b.x, b.y))
return distance + penalty


def is_valid_position(grid, position, radius=2.0):
"""
Checking if the current position is valid - check using square proximity. For each point in the square area, checking
whether the point is within the bounds of the grid and if the point is free.
"""
x, y = position
for dx in np.arange(-radius, radius + 0.5, 0.5):
for dy in np.arange(-radius, radius + 0.5, 0.5):
if np.sqrt(dx**2 + dy**2) <= radius:
nx, ny = int(x + dx), int(y + dy)
if (
not (0 <= nx < grid.shape[0] and 0 <= ny < grid.shape[1])
or grid[nx, ny] != 0
):
return False
return True


def get_new_position(current, random_point, step_size):
direction = np.array([random_point.x, random_point.y]) - np.array(
[current.x, current.y]
)
if np.linalg.norm(direction) > step_size:
direction = direction / np.linalg.norm(direction) * step_size
return Node((current.x + direction[0], current.y + direction[1]))


def obstacle_cost(grid, position):
"""
Constructing a 7x7 grid , which size can be customizable (used 7x7 toget better processing speed not trading off the accuracy)
"""
x, y = position
penalty = 0
for i in range(-2, 5):
for j in range(-2, 5):
# getting position on grid wrt child point
cx, cy = int(x + i), int(y + j)
if 0 <= cx < grid.shape[0] and 0 <= cy < grid.shape[1]:
if grid[cx, cy] != 0:
distance = np.sqrt(i**2 + j**2)
penalty += 1 / (
distance + 1.5
) # distance to an obstacle decreases, the penalty increases
return penalty


def rrt_connect_informed(
grid,
start,
goal,
max_iterations=1000,
step_size=5.0,
expand_range=5,
goal_sample_rate=0.2,
):
"""
This algorithm will build two trees, one starting from the start and the other from the goal,
and attempts to connect them bi-directionally. The node expansion is informed by a heuristic that accounts
for the cost associated with obstacles.

max_iterations: Setting maximum number of times search needs to be done bi-directionally
step_size : Max distance to extend a node in the tree during each iteration
goal_sample_rate : Probablilty of random sampled node is the Goal node
"""
# Define start amd goal
start_tree = [Node(start)]
goal_node = Node(goal)
goal_tree = [goal_node]

# Set intial cost as 0
start_tree[0].cost = 0

for i in range(max_iterations):
# get a random point and check with sample rate
if random.random() < goal_sample_rate:
r_point = goal_node
# get random point in the node near goal as Node((x,y))
else:
r_point = Node(
(
random.randint(0, grid.shape[0] - 1),
random.randint(0, grid.shape[1] - 1),
)
)
# Calling tree expansion from start node: if return true then we build path from start to goal
if expand_tree(
grid, start_tree, r_point, goal_node, expand_range, step_size, goal_tree
):
return build_full_path(start_tree[-1], goal_tree[-1]), start_tree, goal_tree

# Calling tree expansion from Goal node: if return true then we build path from start to goal
if expand_tree(
grid,
goal_tree,
r_point,
start_tree[-1],
expand_range,
step_size,
start_tree,
):
return build_full_path(start_tree[-1], goal_tree[-1]), start_tree, goal_tree

print(f"No path found after {max_iterations} iterations.")
return [], [], []


def expand_tree(
grid, tree, random_point, other_goal, expand_range, step_size, other_tree
):
"""
Function calls to expand the tree by adding new node based on nearest node and heurestic.
The it will generate a new position based on the nearest node and the specified step size, then
it will create a new node , update its cost based on its path from parent.It will check whether
new node is within range of the other goal and attempting to connect the two trees
"""
near_node = None
nearest_distance = float("inf")

# Find the nearest node to the random_point
for node in tree:
distance = heuristic(node, random_point, grid)
if distance < nearest_distance:
nearest_distance = distance
near_node = node

# getting new position using the node obtained after heurestic check
new_position = get_new_position(near_node, random_point, step_size)

if is_valid_position(grid, (new_position.x, new_position.y)):
new_node = Node((new_position.x, new_position.y))

cost_t_come = heuristic(near_node, new_node, grid)
new_node.parent = near_node
new_node.cost = near_node.cost + cost_t_come # Update cost
tree.append(new_node)

if is_goal_within_local_range(
new_node, other_goal, expand_range, grid.shape[0], grid.shape[1]
):
if connect_trees(new_node, other_tree, step_size, grid):
return True

return False


def is_goal_within_local_range(current, goal, expand_range, height, width):
local_top = current.x - expand_range
local_bottom = current.x + expand_range
local_left = current.y - expand_range
local_right = current.y + expand_range

is_local_area_within_bounds = (
0 <= local_top < height
and 0 <= local_bottom < height
and 0 <= local_left < width
and 0 <= local_right < width
)
is_goal_in_local_area = (
local_top <= goal.x <= local_bottom and local_left <= goal.y <= local_right
)
return is_goal_in_local_area and is_local_area_within_bounds


def connect_trees(node, tree, step_size, grid):
"""
Trying to onnect two trees by extending the current node in random directions
and checking for valid connections within the given step size.
"""
for _ in range(20): # Restricting to 20 iterations
rand_dir = (random.uniform(-1, 1), random.uniform(-1, 1))
check_pose = get_new_position(node, Node((rand_dir[0], rand_dir[1])), step_size)

if is_valid_position(grid, (check_pose.x, check_pose.y)):
n_node = None
nearest_distance = float("inf")

# Find the nearest node in the tree to the new position
for n in tree:
distance = heuristic(n, check_pose, grid)
if distance < nearest_distance:
nearest_distance = distance
n_node = n

if nearest_distance < step_size:
check_pose.parent = n_node
return True

return False


# Helper function to calculate the Euclidean distance
def distance(point1, point2):
return np.linalg.norm(np.array(point1) - np.array(point2))


def add_point(current_point, path, distance_threshold=15.0):
# Function to add points to the path while checking the distance
if not path:
path.append(current_point)
else:
last_point = path[-1]
if distance(current_point, last_point) > distance_threshold: # Check distance
path.append(current_point)


def build_full_path(start_node, goal_node, distance_threshold=15.0):
path = []
visited = set()
node = start_node
# Logic to prevent looping which occurs in RRT often - so we track visited nodes
while node is not None:
current_point = (node.x, node.y)
if current_point not in visited:
add_point(current_point, path)
visited.add(current_point)
node = node.parent

path.reverse()

# Now check the goal_node path
node = goal_node
while node is not None:
current_point = (node.x, node.y)
# Logic to prevent looping which occurs in RRT often - so we track visited nodes
if current_point not in visited:
add_point(current_point, path)
visited.add(current_point)
node = node.parent

refined_path = []
for point in path:
if not refined_path or distance(point, refined_path[-1]) > distance_threshold:
refined_path.append(point)

return refined_path