Source code for autopilot.emulators.bamf

# Copyright 2011 Canonical
# Author: Thomi Richards
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.

"""Various classes for interacting with BAMF."""

from __future__ import absolute_import

import dbus
import dbus.glib
from gi.repository import Gio
from gi.repository import GLib
import os
from Xlib import display, X, protocol

from autopilot.emulators.dbus_handler import get_session_bus
from autopilot.utilities import Silence

__all__ = [
    "Bamf",
    "BamfApplication",
    "BamfWindow",
    ]

_BAMF_BUS_NAME = 'org.ayatana.bamf'
_X_DISPLAY = None


def get_display():
    """Create an Xlib display object (silently) and return it."""
    global _X_DISPLAY
    if _X_DISPLAY is None:
        with Silence():
            _X_DISPLAY = display.Display()
    return _X_DISPLAY


def _filter_user_visible(win):
    """Filter out non-user-visible objects.

    In some cases the DBus method we need to call hasn't been registered yet,
    in which case we do the safe thing and return False.

    """
    try:
        return win.user_visible
    except dbus.DBusException:
        return False


[docs]class Bamf(object): """High-level class for interacting with Bamf from within a test. Use this class to inspect the state of running applications and open windows. """ def __init__(self): matcher_path = '/org/ayatana/bamf/matcher' self.matcher_interface_name = 'org.ayatana.bamf.matcher' self.matcher_proxy = get_session_bus().get_object(_BAMF_BUS_NAME, matcher_path) self.matcher_interface = dbus.Interface(self.matcher_proxy, self.matcher_interface_name)
[docs] def get_running_applications(self, user_visible_only=True): """Get a list of the currently running applications. If user_visible_only is True (the default), only applications visible to the user in the switcher will be returned. """ apps = [BamfApplication(p) for p in self.matcher_interface.RunningApplications()] if user_visible_only: return filter(_filter_user_visible, apps) return apps
[docs] def get_running_applications_by_desktop_file(self, desktop_file): """Return a list of applications that have the desktop file *desktop_file*. This method may return an empty list, if no applications are found with the specified desktop file. """ apps = [] for a in self.get_running_applications(): try: if a.desktop_file == desktop_file: apps.append(a) except dbus.DBusException: pass return apps
[docs] def get_application_by_xid(self, xid): """Return the application that has a child with the requested xid or None.""" app_path = self.matcher_interface.ApplicationForXid(xid) if len(app_path): return BamfApplication(app_path) return None
[docs] def get_open_windows(self, user_visible_only=True): """Get a list of currently open windows. If *user_visible_only* is True (the default), only applications visible to the user in the switcher will be returned. The result is sorted to be in stacking order. """ windows = [BamfWindow(w) for w in self.matcher_interface.WindowStackForMonitor(-1)] if user_visible_only: windows = filter(_filter_user_visible, windows) # Now sort on stacking order. # We explicitly convert to a list from an iterator since tests frequently # try and use len() on return values from these methods. return list(reversed(windows))
[docs] def get_window_by_xid(self, xid): """Get the BamfWindow that matches the provided *xid*.""" windows = [BamfWindow(w) for w in self.matcher_interface.WindowPaths() if BamfWindow(w).x_id == xid] return windows[0] if windows else None
[docs] def wait_until_application_is_running(self, desktop_file, timeout): """Wait until a given application is running. :param string desktop_file: The name of the application desktop file. :param integer timeout: The maximum time to wait, in seconds. *If set to something less than 0, this method will wait forever.* :return: true once the application is found, or false if the application was not found until the timeout was reached. """ desktop_file = os.path.split(desktop_file)[1] # python workaround since you can't assign to variables in the enclosing scope: # see on_timeout_reached below... found_app = [True] # maybe the app is running already? if len(self.get_running_applications_by_desktop_file(desktop_file)) == 0: wait_forever = timeout < 0 gobject_loop = GLib.MainLoop() # No, so define a callback to watch the ViewOpened signal: def on_view_added(bamf_path, name): if bamf_path.split('/')[-1].startswith('application'): app = BamfApplication(bamf_path) if desktop_file == os.path.split(app.desktop_file)[1]: gobject_loop.quit() # ...and one for when the user-defined timeout has been reached: def on_timeout_reached(): gobject_loop.quit() found_app[0] = False return False # need a timeout? if so, connect it: if not wait_forever: GLib.timeout_add(timeout * 1000, on_timeout_reached) # connect signal handler: get_session_bus().add_signal_receiver(on_view_added, 'ViewOpened') # pump the gobject main loop until either the correct signal is emitted, or the # timeout happens. gobject_loop.run() return found_app[0]
[docs] def launch_application(self, desktop_file, files=[], wait=True): """Launch an application by specifying a desktop file. :param files: List of files to pass to the application. *Not all apps support this.* :type files: List of strings .. note:: If `wait` is True, this method will wait up to 10 seconds for the application to appear in the BAMF model. :raises: **TypeError** on invalid *files* parameter. :return: The Gobject process object. """ if type(files) is not list: raise TypeError("files must be a list.") proc = Gio.DesktopAppInfo.new(desktop_file) # FIXME: second item is a GEerror proc.launch_uris(files, None) if wait: self.wait_until_application_is_running(desktop_file, 10) return proc
[docs]class BamfApplication(object): """Represents an application, with information as returned by Bamf. .. important:: Don't instantiate this class yourself. instead, use the methods as provided by the Bamf class. :raises: **dbus.DBusException** in the case of a DBus error. """ def __init__(self, bamf_app_path): self.bamf_app_path = bamf_app_path try: self._app_proxy = get_session_bus().get_object(_BAMF_BUS_NAME, bamf_app_path) self._view_iface = dbus.Interface(self._app_proxy, 'org.ayatana.bamf.view') self._app_iface = dbus.Interface(self._app_proxy, 'org.ayatana.bamf.application') except dbus.DBusException, e: e.message += 'bamf_app_path=%r' % (bamf_app_path) raise @property
[docs] def desktop_file(self): """Get the application desktop file. This just returns the filename, not the full path. If the application no longer exists, this returns an empty string. """ try: return os.path.split(self._app_iface.DesktopFile())[1] except dbus.DBusException: return ""
@property
[docs] def name(self): """Get the application name. .. note:: This may change according to the current locale. If you want a unique string to match applications against, use the desktop_file instead. """ return self._view_iface.Name()
@property
[docs] def icon(self): """Get the application icon. :return: The name of the icon. """ return self._view_iface.Icon()
@property
[docs] def is_active(self): """Is the application active (i.e.- has keyboard focus)?""" return self._view_iface.IsActive()
@property
[docs] def is_urgent(self): """Is the application currently signalling urgency?""" return self._view_iface.IsUrgent()
@property
[docs] def user_visible(self): """Is this application visible to the user? .. note:: Some applications (such as the panel) are hidden to the user but will still be returned by bamf. """ return self._view_iface.UserVisible()
[docs] def get_windows(self): """Get a list of the application windows.""" return [BamfWindow(w) for w in self._view_iface.Children()]
def __repr__(self): return "<BamfApplication '%s'>" % (self.name) def __eq__(self, other): return self.desktop_file == other.desktop_file
[docs]class BamfWindow(object): """Represents an application window, as returned by Bamf. .. important:: Don't instantiate this class yourself. Instead, use the appropriate methods in BamfApplication. """ def __init__(self, window_path): self._bamf_win_path = window_path self._app_proxy = get_session_bus().get_object(_BAMF_BUS_NAME, window_path) self._window_iface = dbus.Interface(self._app_proxy, 'org.ayatana.bamf.window') self._view_iface = dbus.Interface(self._app_proxy, 'org.ayatana.bamf.view') self._xid = int(self._window_iface.GetXid()) self._x_root_win = get_display().screen().root self._x_win = get_display().create_resource_object('window', self._xid) @property
[docs] def x_id(self): """Get the X11 Window Id.""" return self._xid
@property
[docs] def x_win(self): """Get the X11 window object of the underlying window.""" return self._x_win
@property
[docs] def name(self): """Get the window name. .. note:: This may change according to the current locale. If you want a unique string to match windows against, use the x_id instead. """ return self._view_iface.Name()
@property
[docs] def title(self): """Get the window title. This may be different from the application name. .. note:: This may change depending on the current locale. """ return self._getProperty('_NET_WM_NAME')
@property
[docs] def geometry(self): """Get the geometry for this window. :return: Tuple containing (x, y, width, height). """ # Note: MUST import these here, rather than at the top of the file. Why? # Because sphinx imports these modules to build the API documentation, # which in turn tries to import Gdk, which in turn fails because there's # no DISPlAY environment set in the package builder. from gi.repository import GdkX11 # FIXME: We need to use the gdk window here to get the real coordinates geometry = self._x_win.get_geometry() origin = GdkX11.X11Window.foreign_new_for_display(get_display(), self._xid).get_origin() return (origin[0], origin[1], geometry.width, geometry.height)
@property
[docs] def is_maximized(self): """Is the window maximized? Maximized in this case means both maximized vertically and horizontally. If a window is only maximized in one direction it is not considered maximized. """ win_state = self._get_window_states() return '_NET_WM_STATE_MAXIMIZED_VERT' in win_state and \ '_NET_WM_STATE_MAXIMIZED_HORZ' in win_state
@property
[docs] def application(self): """Get the application that owns this window. This method may return None if the window does not have an associated application. The 'desktop' window is one such example. """ # BAMF returns a list of parents since some windows don't have an # associated application. For these windows we return none. parents = self._view_iface.Parents() if parents: return BamfApplication(parents[0]) else: return None
@property
[docs] def user_visible(self): """Is this window visible to the user in the switcher?""" return self._view_iface.UserVisible()
@property
[docs] def is_hidden(self): """Is this window hidden? Windows are hidden when the 'Show Desktop' mode is activated. """ win_state = self._get_window_states() return '_NET_WM_STATE_HIDDEN' in win_state
@property
[docs] def is_focused(self): """Is this window focused?""" win_state = self._get_window_states() return '_NET_WM_STATE_FOCUSED' in win_state
@property
[docs] def is_valid(self): """Is this window object valid? Invalid windows are caused by windows closing during the construction of this object instance. """ return not self._x_win is None
@property
[docs] def monitor(self): """Returns the monitor to which the windows belongs to""" return self._window_iface.Monitor()
@property
[docs] def closed(self): """Returns True if the window has been closed""" # This will return False when the window is closed and then removed from BUS try: return (self._window_iface.GetXid() != self.x_id) except: return True
[docs] def close(self): """Close the window.""" self._setProperty('_NET_CLOSE_WINDOW', [0, 0])
[docs] def set_focus(self): self._x_win.set_input_focus(X.RevertToParent, X.CurrentTime) self._x_win.configure(stack_mode=X.Above)
def __repr__(self): return "<BamfWindow '%s' Xid: %d>" % (self.title if self._x_win else '', self.x_id) def _getProperty(self, _type): """Get an X11 property. _type is a string naming the property type. win is the X11 window object. """ atom = self._x_win.get_full_property(get_display().get_atom(_type), X.AnyPropertyType) if atom: return atom.value def _setProperty(self, _type, data, mask=None): if type(data) is str: dataSize = 8 else: # data length must be 5 - pad with 0's if it's short, truncate otherwise. data = (data + [0] * (5 - len(data)))[:5] dataSize = 32 ev = protocol.event.ClientMessage(window=self._x_win, client_type=get_display().get_atom(_type), data=(dataSize, data)) if not mask: mask = (X.SubstructureRedirectMask | X.SubstructureNotifyMask) self._x_root_win.send_event(ev, event_mask=mask) get_display().sync() def _get_window_states(self): """Return a list of strings representing the current window state.""" get_display().sync() return map(get_display().get_atom_name, self._getProperty('_NET_WM_STATE'))