1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
#! /usr/bin/env python
# $Header$
'''SOAP messaging parsing.
'''
from ZSI import _copyright, _child_elements, EvaluateException, TC
import multifile, mimetools, urllib
from base64 import decodestring as b64decode
import cStringIO as StringIO
def Opaque(uri, tc, ps, **keywords):
'''Resolve a URI and return its content as a string.
'''
source = urllib.urlopen(uri, **keywords)
enc = source.info().getencoding()
if enc in ['7bit', '8bit', 'binary']: return source.read()
data = StringIO.StringIO()
mimetools.decode(source, data, enc)
return data.getvalue()
def XML(uri, tc, ps, **keywords):
'''Resolve a URI and return its content as an XML DOM.
'''
source = urllib.urlopen(uri, **keywords)
enc = source.info().getencoding()
if enc in ['7bit', '8bit', 'binary']:
data = source
else:
data = StringIO.StringIO()
mimetools.decode(source, data, enc)
data.seek(0)
dom = ps.readerclass().fromStream(data)
return _child_elements(dom)[0]
class NetworkResolver:
'''A resolver that support string and XML.
'''
def __init__(self, prefix=None):
self.allowed = prefix or []
def _check_allowed(self, uri):
for a in self.allowed:
if uri.startswith(a): return
raise EvaluateException("Disallowed URI prefix")
def Opaque(self, uri, tc, ps, **keywords):
self._check_allowed(uri)
return Opaque(uri, tc, ps, **keywords)
def XML(self, uri, tc, ps, **keywords):
self._check_allowed(uri)
return XML(uri, tc, ps, **keywords)
def Resolve(self, uri, tc, ps, **keywords):
if isinstance(tc, TC.XML):
return XML(uri, tc, ps, **keywords)
return Opaque(uri, tc, ps, **keywords)
class MIMEResolver:
'''Multi-part MIME resolver -- SOAP With Attachments, mostly.
'''
def __init__(self, ct, f, next=None, uribase='thismessage:/',
seekable=0, **kw):
# Get the boundary. It's too bad I have to write this myself,
# but no way am I going to import cgi for 10 lines of code!
for param in ct.split(';'):
a = param.strip()
if a.startswith('boundary='):
if a[9] in [ '"', "'" ]:
boundary = a[10:-1]
else:
boundary = a[9:]
break
else:
raise ValueError('boundary parameter not found')
self.id_dict, self.loc_dict, self.parts = {}, {}, []
self.next = next
self.base = uribase
mf = multifile.MultiFile(f, seekable)
mf.push(boundary)
while mf.next():
head = mimetools.Message(mf)
body = StringIO.StringIO()
mimetools.decode(mf, body, head.getencoding())
body.seek(0)
part = (head, body)
self.parts.append(part)
key = head.get('content-id')
if key:
if key[0] == '<' and key[-1] == '>': key = key[1:-1]
self.id_dict[key] = part
key = head.get('content-location')
if key: self.loc_dict[key] = part
mf.pop()
def GetSOAPPart(self):
'''Get the SOAP body part.
'''
head, part = self.parts[0]
return StringIO.StringIO(part.getvalue())
def get(self, uri):
'''Get the content for the bodypart identified by the uri.
'''
if uri.startswith('cid:'):
# Content-ID, so raise exception if not found.
head, part = self.id_dict[uri[4:]]
return StringIO.StringIO(part.getvalue())
if self.loc_dict.has_key(uri):
head, part = self.loc_dict[uri]
return StringIO.StringIO(part.getvalue())
return None
def Opaque(self, uri, tc, ps, **keywords):
content = self.get(uri)
if content: return content.getvalue()
if not self.next: raise EvaluateException("Unresolvable URI " + uri)
return self.next.Opaque(uri, tc, ps, **keywords)
def XML(self, uri, tc, ps, **keywords):
content = self.get(uri)
if content:
dom = ps.readerclass().fromStream(content)
return _child_elements(dom)[0]
if not self.next: raise EvaluateException("Unresolvable URI " + uri)
return self.next.XML(uri, tc, ps, **keywords)
def Resolve(self, uri, tc, ps, **keywords):
if isinstance(tc, TC.XML):
return self.XML(uri, tc, ps, **keywords)
return self.Opaque(uri, tc, ps, **keywords)
def __getitem__(self, cid):
head, body = self.id_dict[cid]
newio = StringIO.StringIO(body.getvalue())
return newio
if __name__ == '__main__': print _copyright
|