summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/compress/_pipecompress.py
blob: 4a403d62cf8a406ed8922ffa50d2c3ded2d15678 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
#
# Copyright BitBake Contributors
#
# SPDX-License-Identifier: GPL-2.0-only
#
# Helper library to implement streaming compression and decompression using an
# external process
#
# This library should be used directly by end users; a wrapper library for the
# specific compression tool should be created

import builtins
import io
import os
import subprocess


def open_wrap(
    cls, filename, mode="rb", *, encoding=None, errors=None, newline=None, **kwargs
):
    """
    Open a compressed file in binary or text mode.

    Users should not call this directly. A specific compression library can use
    this helper to provide it's own "open" command

    The filename argument can be an actual filename (a str or bytes object), or
    an existing file object to read from or write to.

    The mode argument can be "r", "rb", "w", "wb", "x", "xb", "a" or "ab" for
    binary mode, or "rt", "wt", "xt" or "at" for text mode. The default mode is
    "rb".

    For binary mode, this function is equivalent to the cls constructor:
    cls(filename, mode). In this case, the encoding, errors and newline
    arguments must not be provided.

    For text mode, a cls object is created, and wrapped in an
    io.TextIOWrapper instance with the specified encoding, error handling
    behavior, and line ending(s).
    """
    if "t" in mode:
        if "b" in mode:
            raise ValueError("Invalid mode: %r" % (mode,))
    else:
        if encoding is not None:
            raise ValueError("Argument 'encoding' not supported in binary mode")
        if errors is not None:
            raise ValueError("Argument 'errors' not supported in binary mode")
        if newline is not None:
            raise ValueError("Argument 'newline' not supported in binary mode")

    file_mode = mode.replace("t", "")
    if isinstance(filename, (str, bytes, os.PathLike, int)):
        binary_file = cls(filename, file_mode, **kwargs)
    elif hasattr(filename, "read") or hasattr(filename, "write"):
        binary_file = cls(None, file_mode, fileobj=filename, **kwargs)
    else:
        raise TypeError("filename must be a str or bytes object, or a file")

    if "t" in mode:
        return io.TextIOWrapper(
            binary_file, encoding, errors, newline, write_through=True
        )
    else:
        return binary_file


class CompressionError(OSError):
    pass


class PipeFile(io.RawIOBase):
    """
    Class that implements generically piping to/from a compression program

    Derived classes should add the function get_compress() and get_decompress()
    that return the required commands. Input will be piped into stdin and the
    (de)compressed output should be written to stdout, e.g.:

        class FooFile(PipeCompressionFile):
            def get_decompress(self):
                return ["fooc", "--decompress", "--stdout"]

            def get_compress(self):
                return ["fooc", "--compress", "--stdout"]

    """

    READ = 0
    WRITE = 1

    def __init__(self, filename=None, mode="rb", *, stderr=None, fileobj=None):
        if "t" in mode or "U" in mode:
            raise ValueError("Invalid mode: {!r}".format(mode))

        if not "b" in mode:
            mode += "b"

        if mode.startswith("r"):
            self.mode = self.READ
        elif mode.startswith("w"):
            self.mode = self.WRITE
        else:
            raise ValueError("Invalid mode %r" % mode)

        if fileobj is not None:
            self.fileobj = fileobj
        else:
            self.fileobj = builtins.open(filename, mode or "rb")

        if self.mode == self.READ:
            self.p = subprocess.Popen(
                self.get_decompress(),
                stdin=self.fileobj,
                stdout=subprocess.PIPE,
                stderr=stderr,
                close_fds=True,
            )
            self.pipe = self.p.stdout
        else:
            self.p = subprocess.Popen(
                self.get_compress(),
                stdin=subprocess.PIPE,
                stdout=self.fileobj,
                stderr=stderr,
                close_fds=True,
            )
            self.pipe = self.p.stdin

        self.__closed = False

    def _check_process(self):
        if self.p is None:
            return

        returncode = self.p.wait()
        if returncode:
            raise CompressionError("Process died with %d" % returncode)
        self.p = None

    def close(self):
        if self.closed:
            return

        self.pipe.close()
        if self.p is not None:
            self._check_process()
        self.fileobj.close()

        self.__closed = True

    @property
    def closed(self):
        return self.__closed

    def fileno(self):
        return self.pipe.fileno()

    def flush(self):
        self.pipe.flush()

    def isatty(self):
        return self.pipe.isatty()

    def readable(self):
        return self.mode == self.READ

    def writable(self):
        return self.mode == self.WRITE

    def readinto(self, b):
        if self.mode != self.READ:
            import errno

            raise OSError(
                errno.EBADF, "read() on write-only %s object" % self.__class__.__name__
            )
        size = self.pipe.readinto(b)
        if size == 0:
            self._check_process()
        return size

    def write(self, data):
        if self.mode != self.WRITE:
            import errno

            raise OSError(
                errno.EBADF, "write() on read-only %s object" % self.__class__.__name__
            )
        data = self.pipe.write(data)

        if not data:
            self._check_process()

        return data