aboutsummaryrefslogtreecommitdiffstats
path: root/build/lib.linux-x86_64-2.7/dogtail/utils.py
blob: be3c78adec23ec041c7a659c31554d4111a817a7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
# -*- coding: utf-8 -*-
"""
Various utilities

Authors: Ed Rousseau <rousseau@redhat.com>, Zack Cerza <zcerza@redhat.com, David Malcolm <dmalcolm@redhat.com>
"""

__author__ = """Ed Rousseau <rousseau@redhat.com>,
Zack Cerza <zcerza@redhat.com,
David Malcolm <dmalcolm@redhat.com>
"""

import os
import sys
import subprocess
import cairo
import predicate
import errno
import shlex

import gi
gi.require_version('Gtk', '3.0')
gi.require_version('Gdk', '3.0')

from gi.repository import Gtk
from gi.repository import GLib
from config import config
from time import sleep
from logging import debugLogger as logger
from logging import TimeStamp
from __builtin__ import file


def screenshot(file='screenshot.png', timeStamp=True):
    """
    This function wraps the ImageMagick import command to take a screenshot.

    The file argument may be specified as 'foo', 'foo.png', or using any other
    extension that ImageMagick supports. PNG is the default.

    By default, screenshot filenames are in the format of foo_YYYYMMDD-hhmmss.png .
    The timeStamp argument may be set to False to name the file foo.png.
    """
    if not isinstance(timeStamp, bool):
        raise TypeError("timeStampt must be True or False")
    # config is supposed to create this for us. If it's not there, bail.
    assert os.path.isdir(config.scratchDir)

    baseName = ''.join(file.split('.')[0:-1])
    fileExt = file.split('.')[-1].lower()
    if not baseName:
        baseName = file
        fileExt = 'png'

    if timeStamp:
        ts = TimeStamp()
        newFile = ts.fileStamp(baseName) + '.' + fileExt
        path = config.scratchDir + newFile
    else:
        newFile = baseName + '.' + fileExt
        path = config.scratchDir + newFile

    from gi.repository import Gdk
    from gi.repository import GLib
    from gi.repository import GdkPixbuf
    rootWindow = Gdk.get_default_root_window()
    geometry = rootWindow.get_geometry()
    pixbuf = GdkPixbuf.Pixbuf(colorspace=GdkPixbuf.Colorspace.RGB,
                              has_alpha=False,
                              bits_per_sample=8,
                              width=geometry[2],
                              height=geometry[3])

    pixbuf = Gdk.pixbuf_get_from_window(rootWindow, 0, 0,
                                        geometry[2], geometry[3])
    # GdkPixbuf.Pixbuf.save() needs 'jpeg' and not 'jpg'
    if fileExt == 'jpg':
        fileExt = 'jpeg'
    try:
        pixbuf.savev(path, fileExt, [], [])
    except GLib.GError:
        raise ValueError("Failed to save screenshot in %s format" % fileExt)
    assert os.path.exists(path)
    logger.log("Screenshot taken: " + path)
    return path


def run(string, timeout=config.runTimeout, interval=config.runInterval, desktop=None, dumb=False, appName=''):
    """
    Runs an application. [For simple command execution such as 'rm *', use os.popen() or os.system()]
    If dumb is omitted or is False, polls at interval seconds until the application is finished starting, or until timeout is reached.
    If dumb is True, returns when timeout is reached.
    """
    if not desktop:
        from tree import root as desktop
    args = shlex.split(string)
    os.environ['GTK_MODULES'] = 'gail:atk-bridge'
    pid = subprocess.Popen(args, env=os.environ).pid

    if not appName:
        appName = args[0]

    if dumb:
        # We're starting a non-AT-SPI-aware application. Disable startup
        # detection.
        doDelay(timeout)
    else:
        # Startup detection code
        # The timing here is not totally precise, but it's good enough for now.
        time = 0
        while time < timeout:
            time = time + interval
            try:
                for child in desktop.children[::-1]:
                    if child.name == appName:
                        for grandchild in child.children:
                            if grandchild.roleName == 'frame':
                                from procedural import focus
                                focus.application.node = child
                                doDelay(interval)
                                return pid
            except AttributeError:  # pragma: no cover
                pass
            doDelay(interval)
    return pid


def doDelay(delay=None):
    """
    Utility function to insert a delay (with logging and a configurable
    default delay)
    """
    if delay is None:
        delay = config.defaultDelay
    if config.debugSleep:
        logger.log("sleeping for %f" % delay)
    sleep(delay)


class Highlight (Gtk.Window):  # pragma: no cover

    def __init__(self, x, y, w, h):  # pragma: no cover
        super(Highlight, self).__init__()
        self.set_decorated(False)
        self.set_has_resize_grip(False)
        self.set_default_size(w, h)
        self.screen = self.get_screen()
        self.visual = self.screen.get_rgba_visual()
        if self.visual is not None and self.screen.is_composited():
            self.set_visual(self.visual)
        self.set_app_paintable(True)
        self.connect("draw", self.area_draw)
        self.show_all()
        self.move(x, y)

    def area_draw(self, widget, cr):  # pragma: no cover
        cr.set_source_rgba(.0, .0, .0, 0.0)
        cr.set_operator(cairo.OPERATOR_SOURCE)
        cr.paint()
        cr.set_operator(cairo.OPERATOR_OVER)
        cr.set_source_rgb(0.9, 0.1, 0.1)
        cr.set_line_width(6)
        cr.rectangle(0, 0, self.get_size()[0], self.get_size()[1])
        cr.stroke()


class Blinker(object):  # pragma: no cover
    INTERVAL_MS = 1000
    main_loop = GLib.MainLoop()

    def __init__(self, x, y, w, h):  # pragma: no cover
        self.highlight_window = Highlight(x, y, w, h)
        if self.highlight_window.screen.is_composited() is not False:
            self.timeout_handler_id = GLib.timeout_add(
                Blinker.INTERVAL_MS, self.destroyHighlight)
            self.main_loop.run()
        else:
            self.highlight_window.destroy()

    def destroyHighlight(self):  # pragma: no cover
        self.highlight_window.destroy()
        self.main_loop.quit()
        return False


class Lock(object):

    """
    A mutex implementation that uses atomicity of the mkdir operation in UNIX-like
    systems. This can be used by scripts to provide for mutual exlusion, either in single
    scripts using threads etc. or i.e. to handle sitations of possible collisions among
    multiple running scripts. You can choose to make randomized single-script wise locks
    or a more general locks if you do not choose to randomize the lockdir name
    """

    def __init__(self, location='/tmp', lockname='dogtail_lockdir_', randomize=True):
        """
        You can change the default lockdir location or name. Setting randomize to
        False will result in no random string being appened to the lockdir name.
        """
        self.lockdir = os.path.join(os.path.normpath(location), lockname)
        if randomize:
            self.lockdir = "%s%s" % (self.lockdir, self.__getPostfix())

    def lock(self):
        """
        Creates a lockdir based on the settings on Lock() instance creation.
        Raises OSError exception of the lock is already present. Should be
        atomic on POSIX compliant systems.
        """
        locked_msg = 'Dogtail lock: Already locked with the same lock'
        if not os.path.exists(self.lockdir):
            try:
                os.mkdir(self.lockdir)
                return self.lockdir
            except OSError as e:
                if e.errno == errno.EEXIST and os.path.isdir(self.lockdir):
                    raise OSError(locked_msg)
        else:
            raise OSError(locked_msg)

    def unlock(self):
        """
        Removes a lock. Will raise OSError exception if the lock was not present.
        Should be atomic on POSIX compliant systems.
        """
        import os  # have to import here for situations when executed from __del__
        if os.path.exists(self.lockdir):
            try:
                os.rmdir(self.lockdir)
            except OSError as e:
                if e.erron == errno.EEXIST:
                    raise OSError('Dogtail unlock: lockdir removed elsewhere!')
        else:
            raise OSError('Dogtail unlock: not locked')

    def __del__(self):
        """
        Makes sure lock is removed when the process ends. Although not when killed indeed.
        """
        self.unlock()

    def __getPostfix(self):
        import random
        import string
        return ''.join(random.choice(string.letters + string.digits) for x in range(5))


a11yDConfKey = 'org.gnome.desktop.interface'


def isA11yEnabled():
    """
    Checks if accessibility is enabled via DConf.
    """
    from gi.repository.Gio import Settings
    InterfaceSettings = Settings(a11yDConfKey)
    dconfEnabled = InterfaceSettings.get_boolean('toolkit-accessibility')
    if os.environ.get('GTK_MODULES', '').find('gail:atk-bridge') == -1:
        envEnabled = False
    else:
        envEnabled = True  # pragma: no cover
    return (dconfEnabled or envEnabled)


def bailBecauseA11yIsDisabled():
    if sys.argv[0].endswith("pydoc"):
        return  # pragma: no cover
    try:
        if file("/proc/%s/cmdline" % os.getpid()).read().find('epydoc') != -1:
            return  # pragma: no cover
    except:   # pragma: no cover
        pass  # pragma: no cover
    logger.log("Dogtail requires that Assistive Technology support be enabled."
               "\nYou can enable accessibility with sniff or by running:\n"
               "'gsettings set org.gnome.desktop.interface toolkit-accessibility true'\nAborting...")
    sys.exit(1)


def enableA11y(enable=True):
    """
    Enables accessibility via DConf.
    """
    from gi.repository.Gio import Settings
    InterfaceSettings = Settings(schema=a11yDConfKey)
    InterfaceSettings.set_boolean('toolkit-accessibility', enable)


def checkForA11y():
    """
    Checks if accessibility is enabled, and halts execution if it is not.
    """
    if not isA11yEnabled():  # pragma: no cover
        bailBecauseA11yIsDisabled()


def checkForA11yInteractively():  # pragma: no cover
    """
    Checks if accessibility is enabled, and presents a dialog prompting the
    user if it should be enabled if it is not already, then halts execution.
    """
    if isA11yEnabled():
        return
    from gi.repository import Gtk
    dialog = Gtk.Dialog('Enable Assistive Technology Support?',
                        None,
                        Gtk.DialogFlags.MODAL | Gtk.DialogFlags.DESTROY_WITH_PARENT,
                        (Gtk.STOCK_QUIT, Gtk.ResponseType.CLOSE,
                         "_Enable", Gtk.ResponseType.ACCEPT))
    question = """Dogtail requires that Assistive Technology Support be enabled for it to function. Would you like to enable Assistive Technology support now?

Note that you will have to log out for the change to fully take effect.
    """.strip()
    dialog.set_default_response(Gtk.ResponseType.ACCEPT)
    questionLabel = Gtk.Label(label=question)
    questionLabel.set_line_wrap(True)
    dialog.vbox.pack_start(questionLabel, True, True, 0)
    dialog.show_all()
    result = dialog.run()
    if result == Gtk.ResponseType.ACCEPT:
        logger.log("Enabling accessibility...")
        enableA11y()
    elif result == Gtk.ResponseType.CLOSE:
        bailBecauseA11yIsDisabled()
    dialog.destroy()


class GnomeShell(object):  # pragma: no cover

    """
    Utility class to help working with certain atributes of gnome-shell.
    Currently that means handling the Application menu available for apps
    on the top gnome-shell panel. Searching for the menu and its items is
    somewhat tricky due to fuzzy a11y tree of gnome-shell, mainly since the
    actual menu is not present as child to the menu-spawning button. Also,
    the menus get constructed/destroyed on the fly with application focus
    changes. Thus current application name as displayed plus a reference
    known menu item (with 'Quit' as default) are required by these methods.
    """

    def __init__(self, classic_mode=False):
        from tree import root
        self.shell = root.application('gnome-shell')

    def getApplicationMenuList(self, search_by_item='Quit'):
        """
        Returns list of all menu item nodes. Searches for the menu by a reference item.
        Provide a different item name, if the 'Quit' is not present - but beware picking one
        present elsewhere, like 'Lock' or 'Power Off' present under the user menu.
        """
        matches = self.shell.findChildren(
            predicate.GenericPredicate(name=search_by_item, roleName='label'))
        for match in matches:
            ancestor = match.parent.parent.parent
            if ancestor.roleName == 'panel':
                return ancestor.findChildren(predicate.GenericPredicate(roleName='label'))
        from tree import SearchError
        raise SearchError("Could not find the Application menu based on '%s' item. Please provide an existing reference item"
                          % search_by_item)

    def getApplicationMenuButton(self, app_name):
        """
        Returns the application menu 'button' node as present on the gnome-shell top panel.
        """
        try:
            return self.shell[0][0][3].child(app_name, roleName='label')
        except:
            from tree import SearchError
            raise SearchError(
                "Application menu button of %s could not be found within gnome-shell!" % app_name)

    def getApplicationMenuItem(self, item, search_by_item='Quit'):
        """
        Returns a particilar menu item node. Uses a different 'Quit' or custom item name for reference, but also
        attempts to use the given item if the general reference fails.
        """
        try:
            menu_items = self.getApplicationMenuList(search_by_item)
        except:
            menu_items = self.getApplicationMenuList(item)
        for node in menu_items:
            if node.name == item:
                return node
        raise Exception(
            'Could not find the item, did application focus change?')

    def clickApplicationMenuItem(self, app_name, item, search_by_item='Quit'):
        """
        Executes the given menu item through opening the menu first followed
        by a click at the particular item. The menu search reference 'Quit'
        may be customized. Also attempts to use the given item for reference
        if search fails with the default/custom one.
        """
        self.getApplicationMenuButton(app_name).click()
        self.getApplicationMenuItem(item, search_by_item).click()