summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/__init__.py
blob: 55f48410d32568a62fd9d31455591d674d530c06 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Copyright (C) 2018-2019 Garmin Ltd.
#
# SPDX-License-Identifier: GPL-2.0-only
#

import asyncio
from contextlib import closing
import re
import sqlite3
import itertools
import json

UNIX_PREFIX = "unix://"

ADDR_TYPE_UNIX = 0
ADDR_TYPE_TCP = 1

# The Python async server defaults to a 64K receive buffer, so we hardcode our
# maximum chunk size. It would be better if the client and server reported to
# each other what the maximum chunk sizes were, but that will slow down the
# connection setup with a round trip delay so I'd rather not do that unless it
# is necessary
DEFAULT_MAX_CHUNK = 32 * 1024

TABLE_DEFINITION = (
    ("method", "TEXT NOT NULL"),
    ("outhash", "TEXT NOT NULL"),
    ("taskhash", "TEXT NOT NULL"),
    ("unihash", "TEXT NOT NULL"),
    ("created", "DATETIME"),

    # Optional fields
    ("owner", "TEXT"),
    ("PN", "TEXT"),
    ("PV", "TEXT"),
    ("PR", "TEXT"),
    ("task", "TEXT"),
    ("outhash_siginfo", "TEXT"),
)

TABLE_COLUMNS = tuple(name for name, _ in TABLE_DEFINITION)

def setup_database(database, sync=True):
    db = sqlite3.connect(database)
    db.row_factory = sqlite3.Row

    with closing(db.cursor()) as cursor:
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS tasks_v2 (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                %s
                UNIQUE(method, outhash, taskhash)
                )
            ''' % " ".join("%s %s," % (name, typ) for name, typ in TABLE_DEFINITION))
        cursor.execute('PRAGMA journal_mode = WAL')
        cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF'))

        # Drop old indexes
        cursor.execute('DROP INDEX IF EXISTS taskhash_lookup')
        cursor.execute('DROP INDEX IF EXISTS outhash_lookup')

        # Create new indexes
        cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v2 ON tasks_v2 (method, taskhash, created)')
        cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v2 ON tasks_v2 (method, outhash)')

    return db


def parse_address(addr):
    if addr.startswith(UNIX_PREFIX):
        return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],))
    else:
        m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr)
        if m is not None:
            host = m.group('host')
            port = m.group('port')
        else:
            host, port = addr.split(':')

        return (ADDR_TYPE_TCP, (host, int(port)))


def chunkify(msg, max_chunk):
    if len(msg) < max_chunk - 1:
        yield ''.join((msg, "\n"))
    else:
        yield ''.join((json.dumps({
                'chunk-stream': None
            }), "\n"))

        args = [iter(msg)] * (max_chunk - 1)
        for m in map(''.join, itertools.zip_longest(*args, fillvalue='')):
            yield ''.join(itertools.chain(m, "\n"))
        yield "\n"


def create_server(addr, dbname, *, sync=True, upstream=None):
    from . import server
    db = setup_database(dbname, sync=sync)
    s = server.Server(db, upstream=upstream)

    (typ, a) = parse_address(addr)
    if typ == ADDR_TYPE_UNIX:
        s.start_unix_server(*a)
    else:
        s.start_tcp_server(*a)

    return s


def create_client(addr):
    from . import client
    c = client.Client()

    (typ, a) = parse_address(addr)
    if typ == ADDR_TYPE_UNIX:
        c.connect_unix(*a)
    else:
        c.connect_tcp(*a)

    return c

async def create_async_client(addr):
    from . import client
    c = client.AsyncClient()

    (typ, a) = parse_address(addr)
    if typ == ADDR_TYPE_UNIX:
        await c.connect_unix(*a)
    else:
        await c.connect_tcp(*a)

    return c