diff --git a/scripts/robot_indicator b/scripts/robot_indicator index 6e726b7..40f65e3 100755 --- a/scripts/robot_indicator +++ b/scripts/robot_indicator @@ -1,53 +1,57 @@ -#!/usr/bin/python3 -import sys +#!/usr/bin/env python3 +import binascii +import collections import os +import shlex import signal import subprocess -import shlex -import binascii -import collections +import sys from pathlib import Path -import rospkg import gi.repository -gi.require_version('Gtk', '3.0') +import rospkg + +gi.require_version("Gtk", "3.0") from gi.repository import Gtk -gi.require_version('GLib', '2.0') + +gi.require_version("GLib", "2.0") from gi.repository import GLib -gi.require_version('GObject', '2.0') + +gi.require_version("GObject", "2.0") from gi.repository import GObject -gi.require_version('AppIndicator3', '0.1') + +gi.require_version("AppIndicator3", "0.1") from gi.repository import AppIndicator3 + class RobotIndicator: def __init__(self): - self._icon_path = rospkg.RosPack().get_path('robot_indicator')+"/icons" + self._icon_path = rospkg.RosPack().get_path("robot_indicator") + "/icons" self._indicator = AppIndicator3.Indicator.new_with_path( - "robot-indicator", - "ros-idle", - AppIndicator3.IndicatorCategory.APPLICATION_STATUS, - self._icon_path - ) + "robot-indicator", + "ros-idle", + AppIndicator3.IndicatorCategory.APPLICATION_STATUS, + self._icon_path, + ) self._indicator.set_status(AppIndicator3.IndicatorStatus.ACTIVE) # TODO: config_file label_guide length - setattr(self._indicator.props,'label_guide',"0123456789012345678901") + setattr(self._indicator.props, "label_guide", "0123456789012345678901") # TODO: How are ROS_MASTER_URI updates handled? - self._ros_master_uri = os.environ.get('ROS_MASTER_URI') + self._ros_master_uri = os.environ.get("ROS_MASTER_URI") self._roscore_state = None # Running: self._launch_state[pkg][launch_file][0] # Enabled: self._launch_state[pkg][launch_file][1] self._launch_state = collections.defaultdict(dict) # TODO: config_file status_bullet - self._status_bullet = ('◌', '○', '●', '◉') + self._status_bullet = ("◌", "○", "●", "◉") - self._menu = Gtk.Menu() - self._menu.set_double_buffered(True) + self._menu = Gtk.Menu.new() # Dynamic Menu Items - self._roscore_item = Gtk.MenuItem() - self._roscore_ctrl_item = Gtk.MenuItem() + self._roscore_item = Gtk.MenuItem.new() + self._roscore_ctrl_item = Gtk.MenuItem.new() self._running_units_checksum = None self._updated = False @@ -56,38 +60,37 @@ class RobotIndicator: # Menu Initialization ################ def init_menu(self): - """Initialize indicator menu - """ + """Initialize indicator menu""" self._roscore_item.show() self._roscore_ctrl_item.show() - submenu = Gtk.Menu() + submenu = Gtk.Menu.new() submenu.append(self._roscore_ctrl_item) self._roscore_item.set_submenu(submenu) self._menu.append(self._roscore_item) - separator_item = Gtk.SeparatorMenuItem() + separator_item = Gtk.SeparatorMenuItem.new() separator_item.show() self._menu.append(separator_item) - launch_units_item = Gtk.MenuItem("Launch Units") + launch_units_item = Gtk.MenuItem.new_with_label("Launch Units") launch_units_item.show() self._menu.append(launch_units_item) - launch_unit_launch_item = Gtk.MenuItem("Launch...") + launch_unit_launch_item = Gtk.MenuItem.new_with_label("Launch...") launch_unit_launch_item.connect("activate", self.ctrl_launch) launch_unit_launch_item.show() - submenu = Gtk.Menu() + submenu = Gtk.Menu.new() submenu.append(launch_unit_launch_item) launch_units_item.set_submenu(submenu) # Items between "Launch Units" and SeparatorMenuItem # auto-populated by self.update_menu_launch_units() - separator_item = Gtk.SeparatorMenuItem() + separator_item = Gtk.SeparatorMenuItem.new() separator_item.show() self._menu.append(separator_item) - quit_item = Gtk.MenuItem("Quit") + quit_item = Gtk.MenuItem.new_with_label("Quit") quit_item.connect("activate", self.stop) quit_item.show() self._menu.append(quit_item) @@ -97,20 +100,19 @@ class RobotIndicator: # Menu Updates ################ def update_menu_roscore(self): - """Update roscore menus - """ + """Update roscore menus""" s = None if self._roscore_state: s = self._status_bullet[2] - self._indicator.set_icon ("ros-active") + self._indicator.set_icon_full("ros-active", "ROSCore is active!") self._roscore_ctrl_item.set_label("Stop") - self._roscore_ctrl_item.connect("activate", self.ctrl_roscore,"stop") + self._roscore_ctrl_item.connect("activate", self.ctrl_roscore, "stop") else: s = self._status_bullet[0] - self._indicator.set_icon ("ros-idle") + self._indicator.set_icon_full("ros-idle", "ROSCore is not active") self._roscore_ctrl_item.set_label("Start") - self._roscore_ctrl_item.connect("activate", self.ctrl_roscore,"start") - self._roscore_item.set_label(u"%s Master [%s] "%(s, self._ros_master_uri)) + self._roscore_ctrl_item.connect("activate", self.ctrl_roscore, "start") + self._roscore_item.set_label(f"{s} Master [{self._ros_master_uri}] ") def update_menu_launch_units(self): p = 0 @@ -125,7 +127,7 @@ class RobotIndicator: self._menu.remove(item) else: # Until reaping begins increment the menu insertion point - p+=1 + p += 1 if item.get_label() == "Launch Units": # Begin reaping after "Launch Units" menu item is located @@ -135,32 +137,40 @@ class RobotIndicator: for pkg in pkgs: launch_files = self._launch_state[pkg].keys() for launch_file in launch_files: - launch_unit_ctrl_item = Gtk.MenuItem() - launch_unit_conf_item = Gtk.MenuItem() + launch_unit_ctrl_item = Gtk.MenuItem.new() + launch_unit_conf_item = Gtk.MenuItem.new() # If the launch unit is 'Running' / 'Stopped' if self._launch_state[pkg][launch_file][0]: launch_unit_ctrl_item.set_label("Stop") - launch_unit_ctrl_item.connect("activate", self.ctrl_launch_unit, "stop", pkg, launch_file) + launch_unit_ctrl_item.connect( + "activate", self.ctrl_launch_unit, "stop", pkg, launch_file + ) else: launch_unit_ctrl_item.set_label("Start") - launch_unit_ctrl_item.connect("activate", self.ctrl_launch_unit, "start", pkg, launch_file) + launch_unit_ctrl_item.connect( + "activate", self.ctrl_launch_unit, "start", pkg, launch_file + ) # If the launch unit is 'Enabled' / 'Disabled' if self._launch_state[pkg][launch_file][1]: launch_unit_conf_item.set_label("Disable") - launch_unit_conf_item.connect("activate", self.ctrl_launch_unit, "disable", pkg, launch_file) + launch_unit_conf_item.connect( + "activate", self.ctrl_launch_unit, "disable", pkg, launch_file + ) else: launch_unit_conf_item.set_label("Enable") - launch_unit_conf_item.connect("activate", self.ctrl_launch_unit, "enable", pkg, launch_file) + launch_unit_conf_item.connect( + "activate", self.ctrl_launch_unit, "enable", pkg, launch_file + ) # Convert boolean list into an int - s = int("%d%d"%tuple(self._launch_state[pkg][launch_file]),2) + s = int("{:d}{:d}".format(*self._launch_state[pkg][launch_file]), 2) b = self._status_bullet[s] - label = "%s %s %s"%(b, pkg, launch_file) - launch_unit_item = Gtk.MenuItem(label) + label = f"{b} {pkg} {launch_file}" + launch_unit_item = Gtk.MenuItem.new_with_label(label) - submenu = Gtk.Menu() + submenu = Gtk.Menu.new() submenu.append(launch_unit_ctrl_item) submenu.append(launch_unit_conf_item) launch_unit_item.set_submenu(submenu) @@ -168,10 +178,10 @@ class RobotIndicator: launch_unit_ctrl_item.show() launch_unit_conf_item.show() launch_unit_item.show() - self._menu.insert(launch_unit_item,p) + self._menu.insert(launch_unit_item, p) # Increment menu insertion position - p+=1 + p += 1 # Update complete self._updated = False @@ -179,37 +189,40 @@ class RobotIndicator: # Menu Controls ################ def ctrl_roscore(self, menu_item, operation): - cmd = "systemctl --user %s roscore.service" % (operation) + cmd = f"systemctl --user {operation} roscore.service" r = subprocess.run(shlex.split(cmd)) # TODO: Use notify to return error messages if r.returncode != 0: - print(cmd+'\n'+r.stdout.decode()+'\n'+r.stderr.decode()) + print(cmd, r.stdout.decode(), r.stderr.decode(), sep="\n") self._updated = True - GObject.timeout_add(1000, self.update_units) + GLib.timeout_add(1000, self.update_units) def ctrl_launch(self, menu_item): cmd = "rosrun robot_indicator robot_indicator_launch" - r = subprocess.run(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + r = subprocess.run( + shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) # TODO: Use notify to return error messages if r.returncode != 0: - print(cmd+'\n'+r.stdout.decode()+'\n'+r.stderr.decode()) + print(cmd, r.stdout.decode(), r.stderr.decode(), sep="\n") def ctrl_launch_unit(self, menu_item, operation, pkg, launch_file): - cmd = "systemctl --user %s roslaunch@%s:%s.service" % (operation, pkg, launch_file) - r = subprocess.run(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + cmd = f"systemctl --user {operation} roslaunch@{pkg}:{launch_file}.service" + r = subprocess.run( + shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) # TODO: Use notify to return error messages if r.returncode != 0: - print(cmd+'\n'+r.stdout.decode()+'\n'+r.stderr.decode()) + print(cmd, r.stdout.decode(), r.stderr.decode(), sep="\n") self._updated = True - GObject.timeout_add(1000, self.update_units) - # GObject.idle_add(self.update_units,priority=GObject.PRIORITY_LOW) + GLib.timeout_add(1000, self.update_units) + # GLib.idle_add(self.update_units,priority=GLib.PRIORITY_LOW) ################ # Update State Vectors ################ def update_units(self): - """Update internal list of running Systemd units - """ + """Update internal list of running Systemd units""" cmd = "systemctl list-units --user --full --no-legend --no-pager --type=service --state=running" r = subprocess.run(shlex.split(cmd), bufsize=0, stdout=subprocess.PIPE) @@ -222,26 +235,30 @@ class RobotIndicator: # Clear the previous launch states self._launch_state = collections.defaultdict(dict) - lines = r.stdout.decode().strip().split('\n') - services = map(lambda s: s.split('.service')[0], lines) + lines = r.stdout.decode().strip().split("\n") + services = map(lambda s: s.split(".service")[0], lines) # Check if roscore unit is running self._roscore_state = "roscore" in services - launch_services = filter(lambda l: l.startswith("roslaunch@") - or l.startswith("roscorelaunch@"), services) - instances = map(lambda i: i.split('@')[1],launch_services) + launch_services = filter( + lambda l: l.startswith("roslaunch@") or l.startswith("roscorelaunch@"), + services, + ) + instances = map(lambda i: i.split("@")[1], launch_services) for i in instances: - (pkg,launch_file) = i.split(':') + (pkg, launch_file) = i.split(":") self._launch_state[pkg][launch_file] = [True, False] # Update enabled units - systemd_user_dir = Path.home()/Path(".config/systemd/user/") - launch_paths = systemd_user_dir.glob("default.target.wants/ros*launch@*.service") + systemd_user_dir = Path.home() / Path(".config/systemd/user/") + launch_paths = systemd_user_dir.glob( + "default.target.wants/ros*launch@*.service" + ) launch_services = map(lambda s: s.stem, launch_paths) - instances = map(lambda i: i.split('@')[1],launch_services) + instances = map(lambda i: i.split("@")[1], launch_services) for i in instances: - (pkg,launch_file) = i.split(':') + (pkg, launch_file) = i.split(":") if launch_file in self._launch_state[pkg].keys(): self._launch_state[pkg][launch_file][1] = True else: @@ -249,13 +266,13 @@ class RobotIndicator: # Update menus self._updated = True - GObject.idle_add(self.update_menu_roscore,priority=GObject.PRIORITY_LOW) - GObject.idle_add(self.update_menu_launch_units,priority=GObject.PRIORITY_LOW) + GLib.idle_add(self.update_menu_roscore, priority=GLib.PRIORITY_LOW) + GLib.idle_add(self.update_menu_launch_units, priority=GLib.PRIORITY_LOW) def run(self): # Update every 15 seconds self.update_units() - GObject.timeout_add(15000, self.run) + GLib.timeout_add(15000, self.run) def start(self): GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT, self.stop) @@ -263,7 +280,7 @@ class RobotIndicator: self.run() Gtk.main() - def stop(self,*args): + def stop(self, *args): Gtk.main_quit() sys.exit(os.EX_OK) diff --git a/scripts/robot_indicator_launch b/scripts/robot_indicator_launch index 05144be..541c984 100755 --- a/scripts/robot_indicator_launch +++ b/scripts/robot_indicator_launch @@ -1,24 +1,29 @@ -#!/usr/bin/python3 -import sys -import os -import signal -import subprocess -import shlex +#!/usr/bin/env python3 import binascii import collections +import os import pathlib -import rospkg +import shlex +import signal +import subprocess +import sys import gi.repository -gi.require_version('Gtk', '3.0') +import rospkg + +gi.require_version("Gtk", "3.0") from gi.repository import Gtk -gi.require_version('Gdk', '3.0') + +gi.require_version("Gdk", "3.0") from gi.repository import Gdk -gi.require_version('GLib', '2.0') + +gi.require_version("GLib", "2.0") from gi.repository import GLib -gi.require_version('GObject', '2.0') + +gi.require_version("GObject", "2.0") from gi.repository import GObject + class LaunchWindow(Gtk.Window): def __init__(self): self._rospack = rospkg.RosPack() @@ -44,22 +49,24 @@ class LaunchWindow(Gtk.Window): self._box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6) self.add(self._box) - self._pkg_label = Gtk.Label("ROS Package") + self._pkg_label = Gtk.Label.new_with_label("ROS Package") self._pkg_label.set_xalign(0) self._box.pack_start(self._pkg_label, True, True, 0) self._entrycompletion = Gtk.EntryCompletion() self._entrycompletion.set_model(self._pkg_liststore) self._entrycompletion.set_text_column(0) - self._entrycompletion.connect("match-selected",self.on_selected_pkg_update_launch) + self._entrycompletion.connect( + "match-selected", self.on_selected_pkg_update_launch + ) self._entry = Gtk.Entry() self._entry.set_completion(self._entrycompletion) self._entry.set_max_width_chars(42) - self._entry.connect("focus-out-event",self.on_focus_update_launch) + self._entry.connect("focus-out-event", self.on_focus_update_launch) self._box.pack_start(self._entry, True, True, 0) - self._pkg_label = Gtk.Label("Launch File") + self._pkg_label = Gtk.Label.new_with_label("Launch File") self._pkg_label.set_xalign(0) self._box.pack_start(self._pkg_label, True, True, 0) @@ -88,7 +95,7 @@ class LaunchWindow(Gtk.Window): self._pkg = pkg self._pkg_path = pathlib.Path(self._rospack.get_path(self._pkg)) - self._launch_files = sorted(self._pkg_path.glob('**/*.launch')) + self._launch_files = sorted(self._pkg_path.glob("**/*.launch")) self._launch_combo.remove_all() for launch_file in self._launch_files: self._launch_combo.append_text(launch_file.name) @@ -100,7 +107,7 @@ class LaunchWindow(Gtk.Window): self.update_launch_files(pkg) def on_selected_pkg_update_launch(self, widget, model, i): - pkg = model.get_value(i,0) + pkg = model.get_value(i, 0) self.update_launch_files(pkg) def on_change_set_launch(self, combo): @@ -108,13 +115,18 @@ class LaunchWindow(Gtk.Window): self._launch_button.set_sensitive(True) def on_launch(self, button): - cmd = "systemctl --user start roslaunch@%s:%s.service"%(self._pkg, self._launch_file) - r = subprocess.run(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + cmd = ( + f"systemctl --user start roslaunch@{self._pkg}:{self._launch_file}.service" + ) + r = subprocess.run( + shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) # TODO: Use notify to return error messages if r.returncode != 0: - print(cmd+'\n'+r.stdout.decode()+'\n'+r.stderr.decode()) + print(cmd, r.stdout.decode(), r.stderr.decode(), sep="\n") Gtk.main_quit() + class Launcher: def __init__(self): self._win = LaunchWindow() @@ -125,10 +137,11 @@ class Launcher: GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT, self.stop) Gtk.main() - def stop(self,*args): + def stop(self, *args): Gtk.main_quit() sys.exit(os.EX_OK) + if __name__ == "__main__": app = Launcher() app.start()