#!/usr/bin/python # expects ckb-daemon to be running, but the ckb user interface (including the # icon in the tray) to not be running. import sys import optparse import select import re from rgbkbd.manager import KeyboardManager class Controller(object): def __init__(self, keyboard=None): self.keyboards = {} def open_keyboard(self, device): self.keyboards[device] = KeyboardManager(device) def run(self): """Primary event loop""" try: main_notifier = open('/dev/input/ckb0/notify0', 'r') except IOError, error: sys.stderr.write("The ckb-daemon must be running before starting " "this application.\n") sys.stderr.flush() return key_event_re = re.compile('key (?P[-+])(?P.*)', re.M) for n in range(1,10): try: self.open_keyboard(device='/dev/input/ckb%s' % n) #sys.stderr.write("Keyboard %s found\n" % n) # DEBUG except Exception, error: pass #sys.stderr.write("Keyboard %s not found (%s)\n" % (n, error)) # DEBUG #sys.stderr.flush() # DEBUG notifiers = [main_notifier] notifiers.extend(x.notifier for x in self.keyboards.values()) try: while True: read_events, write_events, err_events = \ select.select(notifiers, [], notifiers, self.tick_rate()) if err_events: print "Error events %s" % err_events for read_event in read_events: if read_event == main_notifier: event = main_notifier.readline() # print "Event on ckb0/notify0: %s" % event # DEBUG words = event.split() if words[0] == 'device' and words[2] == 'added': # print "connecting new keyboard" # DEBUG new_kbd = words[-1] self.open_keyboard(new_kbd) notifiers.append(self.keyboards[new_kbd].notifier) elif words[0] == 'device' and words[2] == 'removed': kbd = words[-1] old_keyboard_manager = self.keyboards[kbd] del self.keyboards[kbd] notifiers.remove(old_keyboard_manager.notifier) else: keyboard_manager_matched = [x for x in self.keyboards.values() if x.notifier == read_event] if keyboard_manager_matched: # We could get an event on a keyboard at the same # time we get a notification of its removal. keyboard_manager = keyboard_manager_matched[0] line = read_event.readline() key_match = key_event_re.match(line) if key_match: keyboard_manager.key_event(**key_match.groupdict()) else: print "Read event on %r does not match a keyboard" % read_event # Send tick events to all keyboard managers for keyboard_manager in self.keyboards.values(): keyboard_manager.current_mode().tick() except KeyboardInterrupt: # Exit cleanly on ^C pass finally: self.shutdown() return def tick_rate(self): """Calculate a tick rate to use to meet the requirements across all attached keyboards. """ tick_rates = [km.current_mode().tick_rate for km in self.keyboards.values()] tick_rates = [x for x in tick_rates if x] if tick_rates: return min(tick_rates) return None def shutdown(self): for keyboard_manager in self.keyboards.values(): keyboard_manager.shutdown() def main(argv): """Run the keyboard manager""" parser = optparse.OptionParser("Usage: %s [options]" % argv[0]) parser.add_option('-k', '--keyboard', type="int", default=1, help="Keyboard number to manipulate") (options, args) = parser.parse_args(argv[1:]) manager = Controller() manager.run() return 0 if __name__ == '__main__': sys.exit(main(sys.argv))