1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
|
#!/usr/bin/python3
import os
import sys
import logging
import sqlite3
import io
import tarfile
import json
import configparser
import urllib.parse
import time
from urllib.request import urlopen
config = None
db_con = None
def init_db(path):
'''Create DB if it does not exist, and connect to it'''
db = sqlite3.connect(path)
c = db.cursor()
try:
c.execute('CREATE TABLE IF NOT EXISTS test ('
' id INTEGER PRIMARY KEY, '
' release CHAR[20], '
' arch CHAR[20], '
' package char[120])')
c.execute('CREATE TABLE IF NOT EXISTS result ('
' test_id INTEGER, '
' run_id CHAR[30], '
' version VARCHAR[200], '
' triggers TEXT, '
' duration INTEGER, '
' exitcode INTEGER, '
' PRIMARY KEY(test_id, run_id), '
' FOREIGN KEY(test_id) REFERENCES test(id))')
db.commit()
logging.debug('database %s created', path)
except sqlite3.OperationalError as e:
if 'already exists' not in str(e):
raise
logging.debug('database %s already exists', path)
return db
def get_test_id(release, arch, src):
if not get_test_id._cache:
# prime the cache with all test IDs; much more efficient than doing
# thousands of individual queries
c = db_con.cursor()
c.execute('SELECT * FROM test')
while True:
row = c.fetchone()
if row is None:
break
get_test_id._cache[row[1] + '/' + row[2] + '/' + row[3]] = row[0]
cache_idx = release + '/' + arch + '/' + src
try:
return get_test_id._cache[cache_idx]
except KeyError:
# create new ID
c = db_con.cursor()
c.execute('INSERT INTO test VALUES (NULL, ?, ?, ?)', (release, arch, src))
test_id = c.lastrowid
db_con.commit()
get_test_id._cache[cache_idx] = test_id
return test_id
get_test_id._cache = {}
def last_known_run_id(test_id):
try:
return last_known_run_id._cache[test_id]
except KeyError:
pass
c = db_con.cursor()
c.execute('SELECT MAX(run_id) FROM result WHERE test_id=?', (test_id,))
res = c.fetchone()[0]
if res is None:
return ''
last_known_run_id._cache[test_id] = res
return res
last_known_run_id._cache = {}
def fetch_one_result(url):
'''Download one result URL from swift and add it to the DB'''
(release, arch, _, src, run_id, _) = url.split('/')[-6:]
test_id = get_test_id(release, arch, src)
last = last_known_run_id(test_id)
if run_id <= last:
logging.debug('Test result for %s/%s/%s %s older than last known run ID %s, ignoring', release, arch, src, run_id, last)
return
try:
f = urlopen(url, timeout=30)
if f.getcode() == 200:
tar_bytes = io.BytesIO(f.read())
f.close()
else:
raise NotImplementedError('fetch_one_result(%s): cannot handle HTTP code %i' %
(url, f.getcode()))
except IOError as e:
logging.error('Failure to fetch %s: %s', url, str(e))
# we tolerate "not found" (something went wrong on uploading the
# result), but other things indicate infrastructure problems
if hasattr(e, 'code') and e.code == 404:
return
sys.exit(1)
try:
with tarfile.open(None, 'r', tar_bytes) as tar:
exitcode = int(tar.extractfile('exitcode').read().strip())
srcver = tar.extractfile('testpkg-version').read().decode().strip()
(ressrc, ver) = srcver.split()
testinfo = json.loads(tar.extractfile('testinfo.json').read().decode())
duration = int(tar.extractfile('duration').read().strip())
except (KeyError, ValueError, tarfile.TarError) as e:
logging.debug('%s is damaged, ignoring: %s', url, str(e))
return
if src != ressrc:
logging.error('%s is a result for package %s, but expected package %s',
url, ressrc, src)
return
# parse recorded triggers in test result
for e in testinfo.get('custom_environment', []):
if e.startswith('ADT_TEST_TRIGGERS='):
test_triggers = e.split('=', 1)[1]
break
else:
logging.error('%s result has no ADT_TEST_TRIGGERS, ignoring', url)
return
logging.debug('Fetched test result for %s/%s/%s/%s %s (triggers: %s): exit code %i',
release, arch, src, ver, run_id, test_triggers, exitcode)
c = db_con.cursor()
c.execute('INSERT INTO result VALUES (?, ?, ?, ?, ?, ?)',
(test_id, run_id, ver, test_triggers, duration, exitcode))
db_con.commit()
def fetch_container(container_url):
'''Download new results from a swift container'''
marker = ''
# '@' is at the end of each run id, to mark the end of a test run directory path
# example: <autopkgtest-wily>wily/amd64/libp/libpng/20150630_054517_123FE@/result.tar
query = {'delimiter': '@'}
# we get a lot of results, so we need to do multiple batches starting with
# marker (last returned result)
while True:
query['marker'] = marker
url = os.path.join(container_url) + '?' + urllib.parse.urlencode(query)
logging.debug('Querying batch %s', url)
for retry in reversed(range(5)):
try:
f = urlopen(url, timeout=30)
if f.getcode() == 200:
result_paths = f.read().decode().strip().splitlines()
elif f.getcode() == 204: # No content
result_paths = []
else:
# we should not ever end up here as we expect a HTTPError in
# other cases; e. g. 3XX is something that tells us to adjust
# our URLS, so fail hard on those
raise NotImplementedError('fetch_container(%s): cannot handle HTTP code %i' %
(url, f.getcode()))
f.close()
break
except IOError as e:
# 401 "Unauthorized" is swift's way of saying "container does not exist"
if hasattr(e, 'code') and e.code == 401:
logging.debug('fetch_container: %s does not exist yet or is inaccessible', url)
return
if retry:
logging.debug('Failed to download %s, sleeping for 5s and retrying.', url)
time.sleep(5)
continue
# Other status codes are usually a transient
# network/infrastructure failure. Ignoring this can lead to
# re-requesting tests which we already have results for, so
# fail hard on this and let the next run retry.
logging.error('Failure to fetch swift results from %s: %s', url, str(e))
sys.exit(1)
if not result_paths:
break
for p in result_paths:
fetch_one_result(os.path.join(container_url, p, 'result.tar'))
marker = p
if __name__ == '__main__':
if len(sys.argv) != 2:
sys.stderr.write('Usage: %s <release>\n' % sys.argv[0])
sys.exit(1)
if 'DEBUG' in os.environ:
logging.basicConfig(level=logging.DEBUG)
config = configparser.ConfigParser()
config.read(os.path.expanduser('~ubuntu/autopkgtest-cloud.conf'))
try:
db_con = init_db(config['web']['database'])
fetch_container(os.path.join(config['web']['SwiftURL'], 'autopkgtest-' + sys.argv[1]))
finally:
if db_con:
db_con.close()
|