Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Format using black py38, fixup deprecation warnings, use format strings #1

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 95 additions & 78 deletions scripts/robot_indicator
Original file line number Diff line number Diff line change
@@ -1,53 +1,57 @@
#!/usr/bin/python3
import sys
#!/usr/bin/env python3
import binascii
import collections
import os
import shlex
import signal
import subprocess
import shlex
import binascii
import collections
import sys
from pathlib import Path
import rospkg

import gi.repository
gi.require_version('Gtk', '3.0')
import rospkg

gi.require_version("Gtk", "3.0")
from gi.repository import Gtk
gi.require_version('GLib', '2.0')

gi.require_version("GLib", "2.0")
from gi.repository import GLib
gi.require_version('GObject', '2.0')

gi.require_version("GObject", "2.0")
from gi.repository import GObject
gi.require_version('AppIndicator3', '0.1')

gi.require_version("AppIndicator3", "0.1")
from gi.repository import AppIndicator3


class RobotIndicator:
def __init__(self):
self._icon_path = rospkg.RosPack().get_path('robot_indicator')+"/icons"
self._icon_path = rospkg.RosPack().get_path("robot_indicator") + "/icons"
self._indicator = AppIndicator3.Indicator.new_with_path(
"robot-indicator",
"ros-idle",
AppIndicator3.IndicatorCategory.APPLICATION_STATUS,
self._icon_path
)
"robot-indicator",
"ros-idle",
AppIndicator3.IndicatorCategory.APPLICATION_STATUS,
self._icon_path,
)
self._indicator.set_status(AppIndicator3.IndicatorStatus.ACTIVE)
# TODO: config_file label_guide length
setattr(self._indicator.props,'label_guide',"0123456789012345678901")
setattr(self._indicator.props, "label_guide", "0123456789012345678901")

# TODO: How are ROS_MASTER_URI updates handled?
self._ros_master_uri = os.environ.get('ROS_MASTER_URI')
self._ros_master_uri = os.environ.get("ROS_MASTER_URI")
self._roscore_state = None
# Running: self._launch_state[pkg][launch_file][0]
# Enabled: self._launch_state[pkg][launch_file][1]
self._launch_state = collections.defaultdict(dict)

# TODO: config_file status_bullet
self._status_bullet = ('◌', '○', '●', '◉')
self._status_bullet = ("◌", "○", "●", "◉")

self._menu = Gtk.Menu()
self._menu.set_double_buffered(True)
self._menu = Gtk.Menu.new()

# Dynamic Menu Items
self._roscore_item = Gtk.MenuItem()
self._roscore_ctrl_item = Gtk.MenuItem()
self._roscore_item = Gtk.MenuItem.new()
self._roscore_ctrl_item = Gtk.MenuItem.new()

self._running_units_checksum = None
self._updated = False
Expand All @@ -56,38 +60,37 @@ class RobotIndicator:
# Menu Initialization
################
def init_menu(self):
"""Initialize indicator menu
"""
"""Initialize indicator menu"""
self._roscore_item.show()
self._roscore_ctrl_item.show()
submenu = Gtk.Menu()
submenu = Gtk.Menu.new()
submenu.append(self._roscore_ctrl_item)
self._roscore_item.set_submenu(submenu)
self._menu.append(self._roscore_item)

separator_item = Gtk.SeparatorMenuItem()
separator_item = Gtk.SeparatorMenuItem.new()
separator_item.show()
self._menu.append(separator_item)

launch_units_item = Gtk.MenuItem("Launch Units")
launch_units_item = Gtk.MenuItem.new_with_label("Launch Units")
launch_units_item.show()
self._menu.append(launch_units_item)

launch_unit_launch_item = Gtk.MenuItem("Launch...")
launch_unit_launch_item = Gtk.MenuItem.new_with_label("Launch...")
launch_unit_launch_item.connect("activate", self.ctrl_launch)
launch_unit_launch_item.show()
submenu = Gtk.Menu()
submenu = Gtk.Menu.new()
submenu.append(launch_unit_launch_item)
launch_units_item.set_submenu(submenu)

# Items between "Launch Units" and SeparatorMenuItem
# auto-populated by self.update_menu_launch_units()

separator_item = Gtk.SeparatorMenuItem()
separator_item = Gtk.SeparatorMenuItem.new()
separator_item.show()
self._menu.append(separator_item)

quit_item = Gtk.MenuItem("Quit")
quit_item = Gtk.MenuItem.new_with_label("Quit")
quit_item.connect("activate", self.stop)
quit_item.show()
self._menu.append(quit_item)
Expand All @@ -97,20 +100,19 @@ class RobotIndicator:
# Menu Updates
################
def update_menu_roscore(self):
"""Update roscore menus
"""
"""Update roscore menus"""
s = None
if self._roscore_state:
s = self._status_bullet[2]
self._indicator.set_icon ("ros-active")
self._indicator.set_icon_full("ros-active", "ROSCore is active!")
self._roscore_ctrl_item.set_label("Stop")
self._roscore_ctrl_item.connect("activate", self.ctrl_roscore,"stop")
self._roscore_ctrl_item.connect("activate", self.ctrl_roscore, "stop")
else:
s = self._status_bullet[0]
self._indicator.set_icon ("ros-idle")
self._indicator.set_icon_full("ros-idle", "ROSCore is not active")
self._roscore_ctrl_item.set_label("Start")
self._roscore_ctrl_item.connect("activate", self.ctrl_roscore,"start")
self._roscore_item.set_label(u"%s Master [%s] "%(s, self._ros_master_uri))
self._roscore_ctrl_item.connect("activate", self.ctrl_roscore, "start")
self._roscore_item.set_label(f"{s} Master [{self._ros_master_uri}] ")

def update_menu_launch_units(self):
p = 0
Expand All @@ -125,7 +127,7 @@ class RobotIndicator:
self._menu.remove(item)
else:
# Until reaping begins increment the menu insertion point
p+=1
p += 1

if item.get_label() == "Launch Units":
# Begin reaping after "Launch Units" menu item is located
Expand All @@ -135,81 +137,92 @@ class RobotIndicator:
for pkg in pkgs:
launch_files = self._launch_state[pkg].keys()
for launch_file in launch_files:
launch_unit_ctrl_item = Gtk.MenuItem()
launch_unit_conf_item = Gtk.MenuItem()
launch_unit_ctrl_item = Gtk.MenuItem.new()
launch_unit_conf_item = Gtk.MenuItem.new()

# If the launch unit is 'Running' / 'Stopped'
if self._launch_state[pkg][launch_file][0]:
launch_unit_ctrl_item.set_label("Stop")
launch_unit_ctrl_item.connect("activate", self.ctrl_launch_unit, "stop", pkg, launch_file)
launch_unit_ctrl_item.connect(
"activate", self.ctrl_launch_unit, "stop", pkg, launch_file
)
else:
launch_unit_ctrl_item.set_label("Start")
launch_unit_ctrl_item.connect("activate", self.ctrl_launch_unit, "start", pkg, launch_file)
launch_unit_ctrl_item.connect(
"activate", self.ctrl_launch_unit, "start", pkg, launch_file
)

# If the launch unit is 'Enabled' / 'Disabled'
if self._launch_state[pkg][launch_file][1]:
launch_unit_conf_item.set_label("Disable")
launch_unit_conf_item.connect("activate", self.ctrl_launch_unit, "disable", pkg, launch_file)
launch_unit_conf_item.connect(
"activate", self.ctrl_launch_unit, "disable", pkg, launch_file
)
else:
launch_unit_conf_item.set_label("Enable")
launch_unit_conf_item.connect("activate", self.ctrl_launch_unit, "enable", pkg, launch_file)
launch_unit_conf_item.connect(
"activate", self.ctrl_launch_unit, "enable", pkg, launch_file
)

# Convert boolean list into an int
s = int("%d%d"%tuple(self._launch_state[pkg][launch_file]),2)
s = int("{:d}{:d}".format(*self._launch_state[pkg][launch_file]), 2)
b = self._status_bullet[s]
label = "%s %s %s"%(b, pkg, launch_file)
launch_unit_item = Gtk.MenuItem(label)
label = f"{b} {pkg} {launch_file}"
launch_unit_item = Gtk.MenuItem.new_with_label(label)

submenu = Gtk.Menu()
submenu = Gtk.Menu.new()
submenu.append(launch_unit_ctrl_item)
submenu.append(launch_unit_conf_item)
launch_unit_item.set_submenu(submenu)

launch_unit_ctrl_item.show()
launch_unit_conf_item.show()
launch_unit_item.show()
self._menu.insert(launch_unit_item,p)
self._menu.insert(launch_unit_item, p)

# Increment menu insertion position
p+=1
p += 1
# Update complete
self._updated = False

################
# Menu Controls
################
def ctrl_roscore(self, menu_item, operation):
cmd = "systemctl --user %s roscore.service" % (operation)
cmd = f"systemctl --user {operation} roscore.service"
r = subprocess.run(shlex.split(cmd))
# TODO: Use notify to return error messages
if r.returncode != 0:
print(cmd+'\n'+r.stdout.decode()+'\n'+r.stderr.decode())
print(cmd, r.stdout.decode(), r.stderr.decode(), sep="\n")
self._updated = True
GObject.timeout_add(1000, self.update_units)
GLib.timeout_add(1000, self.update_units)

def ctrl_launch(self, menu_item):
cmd = "rosrun robot_indicator robot_indicator_launch"
r = subprocess.run(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
r = subprocess.run(
shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
# TODO: Use notify to return error messages
if r.returncode != 0:
print(cmd+'\n'+r.stdout.decode()+'\n'+r.stderr.decode())
print(cmd, r.stdout.decode(), r.stderr.decode(), sep="\n")

def ctrl_launch_unit(self, menu_item, operation, pkg, launch_file):
cmd = "systemctl --user %s roslaunch@%s:%s.service" % (operation, pkg, launch_file)
r = subprocess.run(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmd = f"systemctl --user {operation} roslaunch@{pkg}:{launch_file}.service"
r = subprocess.run(
shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
# TODO: Use notify to return error messages
if r.returncode != 0:
print(cmd+'\n'+r.stdout.decode()+'\n'+r.stderr.decode())
print(cmd, r.stdout.decode(), r.stderr.decode(), sep="\n")
self._updated = True
GObject.timeout_add(1000, self.update_units)
# GObject.idle_add(self.update_units,priority=GObject.PRIORITY_LOW)
GLib.timeout_add(1000, self.update_units)
# GLib.idle_add(self.update_units,priority=GLib.PRIORITY_LOW)

################
# Update State Vectors
################
def update_units(self):
"""Update internal list of running Systemd units
"""
"""Update internal list of running Systemd units"""
cmd = "systemctl list-units --user --full --no-legend --no-pager --type=service --state=running"
r = subprocess.run(shlex.split(cmd), bufsize=0, stdout=subprocess.PIPE)

Expand All @@ -222,48 +235,52 @@ class RobotIndicator:
# Clear the previous launch states
self._launch_state = collections.defaultdict(dict)

lines = r.stdout.decode().strip().split('\n')
services = map(lambda s: s.split('.service')[0], lines)
lines = r.stdout.decode().strip().split("\n")
services = map(lambda s: s.split(".service")[0], lines)

# Check if roscore unit is running
self._roscore_state = "roscore" in services

launch_services = filter(lambda l: l.startswith("roslaunch@")
or l.startswith("roscorelaunch@"), services)
instances = map(lambda i: i.split('@')[1],launch_services)
launch_services = filter(
lambda l: l.startswith("roslaunch@") or l.startswith("roscorelaunch@"),
services,
)
instances = map(lambda i: i.split("@")[1], launch_services)
for i in instances:
(pkg,launch_file) = i.split(':')
(pkg, launch_file) = i.split(":")
self._launch_state[pkg][launch_file] = [True, False]

# Update enabled units
systemd_user_dir = Path.home()/Path(".config/systemd/user/")
launch_paths = systemd_user_dir.glob("default.target.wants/ros*launch@*.service")
systemd_user_dir = Path.home() / Path(".config/systemd/user/")
launch_paths = systemd_user_dir.glob(
"default.target.wants/ros*launch@*.service"
)
launch_services = map(lambda s: s.stem, launch_paths)
instances = map(lambda i: i.split('@')[1],launch_services)
instances = map(lambda i: i.split("@")[1], launch_services)
for i in instances:
(pkg,launch_file) = i.split(':')
(pkg, launch_file) = i.split(":")
if launch_file in self._launch_state[pkg].keys():
self._launch_state[pkg][launch_file][1] = True
else:
self._launch_state[pkg][launch_file] = [False, True]

# Update menus
self._updated = True
GObject.idle_add(self.update_menu_roscore,priority=GObject.PRIORITY_LOW)
GObject.idle_add(self.update_menu_launch_units,priority=GObject.PRIORITY_LOW)
GLib.idle_add(self.update_menu_roscore, priority=GLib.PRIORITY_LOW)
GLib.idle_add(self.update_menu_launch_units, priority=GLib.PRIORITY_LOW)

def run(self):
# Update every 15 seconds
self.update_units()
GObject.timeout_add(15000, self.run)
GLib.timeout_add(15000, self.run)

def start(self):
GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT, self.stop)
self.init_menu()
self.run()
Gtk.main()

def stop(self,*args):
def stop(self, *args):
Gtk.main_quit()
sys.exit(os.EX_OK)

Expand Down
Loading