forked from cms-sw/topic-collector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cvs-queue
executable file
·119 lines (113 loc) · 5.65 KB
/
cvs-queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python26
import cgi, os, tempfile, shutil
from ws_sso_content_reader import getContent
from BeautifulSoup import BeautifulSoup
from common import debug, jsonReply
from os.path import join, exists, dirname
from secrets import cern_secrets as config
if config.get("debug", False):
import cgitb
cgitb.enable()
from os import fdopen
try:
from json import dumps
from json import loads
except ImportError:
from cjson import encode as dumps
from cjson import decode as loads
try:
from hashlib import sha1
except:
from sha import new as sha1
OLD_TC_URL="https://cmstags.cern.ch/tc"
# Get TC url and calculate sha1sum with the result.
# Look if there is already a transformed object which uses the same content as
# input
# - If yes, simply return it.
# - If no, transform the result, save it and return it.
def getTCCached(url, transformation, transformation_id=""):
html = getContent(url, config["usercert"], config["userkey"])
contentHash = sha1(html).hexdigest()
hash = sha1(url + transformation_id).hexdigest()
cachedTransform = join(config["tccache"], hash[0:2], hash[2:], contentHash)
if exists(cachedTransform):
return file(cachedTransform).read()
result = transformation(html)
(tmpfile, tmppath) = tempfile.mkstemp(prefix='tmp', dir=config["tccache"])
f = fdopen(tmpfile, "w")
f.write(result)
f.close()
try:
os.makedirs(dirname(cachedTransform))
except OSError:
pass
try:
shutil.move(tmppath, cachedTransform)
except OSError:
pass
return result
if __name__ == "__main__":
if not os.environ.get("ADFS_LOGIN"):
print 'Status: 403 Forbidden\r\n\r\n\r\n';
exit(0)
requestMethod = os.environ.get("REQUEST_METHOD")
pathInfo = os.environ.get("PATH_INFO", "").strip("/")
if requestMethod == "GET":
if not pathInfo:
def getReleaseNamesTransform(html):
soup = [{"name": x.text} for x in BeautifulSoup(html).findAll("span")]
return dumps(soup)
jsonReply(getTCCached(OLD_TC_URL + "/getReleasesNames?release_regex=[0-9]_X$&only_my_releases=false", getReleaseNamesTransform, "ReleaseNames/V1"))
if not "/" in pathInfo:
def getPendingTagsets(html):
trs = BeautifulSoup(html).find("form").findAll("tr", {"class": "hovered selectable_area"})
soupIDs = [tr.find("td", {"class": "tagset_id"}).text for tr in trs]
soupPackages = [[td.text for td in tr.findAll("td", {"class": "noborder leftalign package_name"})] for tr in trs]
soupTagsNew = [[td.text for td in tr.findAll("td", {"class": "noborder leftalign scrollable_list_tag_td package_tag newer_tag"})] for tr in trs]
soupTagsOld = [[td.text for td in tr.findAll("td", {"class": "noborder leftalign scrollable_list_tag_td package_tag"})] for tr in trs]
tagsets = zip(soupPackages, soupTagsNew, soupTagsOld)
tagsets = [zip(*x) for x in tagsets]
return dumps(dict([x for x in zip(soupIDs, tagsets)]))
release_name = pathInfo
pendingApproval = getTCCached(OLD_TC_URL + "/getReleaseTagsetsPendingApproval?release_name=%s" % pathInfo, getPendingTagsets, "PendingApproval/V1")
pendingSignature = getTCCached(OLD_TC_URL + "/getReleaseTagsetsPendingSignaturesTables?release_name=%s&show_only=false" % pathInfo, getPendingTagsets, "PendingSignature/V1")
results = {}
results.update(loads(pendingSignature))
results.update(loads(pendingApproval))
jsonReply(results)
parts = pathInfo.split("/")
if len(parts) == 2:
if parts[1] == "history":
def getQueueHistory(html):
trs = BeautifulSoup(html).find("form").findAll("tr", {"class": "hovered selectable_area"})
soupIDs = [tr.find("td", {"class": "tagset_id"}).text for tr in trs]
soupDescriptions = [[div.text for div in tr.findAll("div", {"class": "description description_in_table"})] for tr in trs]
soupPublishers = [[td.text for td in tr.findAll("td", {"class": "user_name"})] for tr in trs]
soupPackages = [[td.text for td in tr.findAll("td", {"class": "noborder leftalign package_name"})] for tr in trs]
soupTags = [[td.text for td in tr.findAll("td", {"class": "noborder leftalign package_tag scrollable_list_tag_td"})] for tr in trs]
soupReleases = [[span.text for span in tr.findAll("span", {"class": "release_name"})] for tr in trs]
tagsets = zip(soupPackages, soupTags)
tagsets = [zip(*x) for x in tagsets]
history = zip(soupIDs, soupDescriptions, soupPublishers, tagsets, soupReleases)
history.reverse()
history = [dict(zip(["id", "description", "publisher", "tagset", "releases"], x)) for x in history]
return dumps([x for x in history])
def getFirstTagset(html):
soup = BeautifulSoup(html)
soupPackages = [td.text for td in soup.findAll("td", {"class": "package_name"})]
soupTags = [td.text for td in soup.findAll("td", {"class": "package_tag scrollable_list_tag_td"})]
tagsets = zip(soupPackages, soupTags)
return dumps(tagsets)
history = loads(getTCCached(OLD_TC_URL + "/getReleaseHistory?release_name=%s" % parts[0], getQueueHistory, "QueueHistory/V3"))
firstTagsetInfo = getTCCached(OLD_TC_URL + "/getTagsetInformation?tagset_id=%s" % history[0]["id"], getFirstTagset, "FirstTagset/V1")
history[0]["tagset"] = loads(firstTagsetInfo)
jsonReply(history)
elif parts[1].isdigit():
def getCVSDiff(html):
debug(html)
return html
html = getContent(OLD_TC_URL + "/CVSDiffTagsets?tagset_ids=[%s]" % parts[1], config["usercert"], config["userkey"])
print "Status: 200 OK\n"
print html
exit(0)
print "Status: 404 Not Found\r\n\r\n\r\n"