########################################################################### # Joshua R. Boverhof, LBNL # See Copyright for copyright notice! # $Id: WSsecurity.py 1134 2006-02-24 00:23:06Z boverhof $ ########################################################################### import sys, time, warnings import sha, base64 # twisted & related imports from zope.interface import classProvides, implements, Interface from twisted.python import log, failure from twisted.web.error import NoResource from twisted.web.server import NOT_DONE_YET from twisted.internet import reactor import twisted.web.http import twisted.web.resource # ZSI imports from ZSI import _get_element_nsuri_name, EvaluateException, ParseException from ZSI.parse import ParsedSoap from ZSI.writer import SoapWriter from ZSI.TC import _get_global_element_declaration as GED from ZSI import fault from ZSI.wstools.Namespaces import OASIS, DSIG from WSresource import DefaultHandlerChain, HandlerChainInterface,\ WSAddressCallbackHandler, DataHandler, WSAddressHandler # # Global Element Declarations # UsernameTokenDec = GED(OASIS.WSSE, "UsernameToken") SecurityDec = GED(OASIS.WSSE, "Security") SignatureDec = GED(DSIG.BASE, "Signature") PasswordDec = GED(OASIS.WSSE, "Password") NonceDec = GED(OASIS.WSSE, "Nonce") CreatedDec = GED(OASIS.UTILITY, "Created") if None in [UsernameTokenDec,SecurityDec,SignatureDec,PasswordDec,NonceDec,CreatedDec]: raise ImportError, 'required global element(s) unavailable: %s ' %({ (OASIS.WSSE, "UsernameToken"):UsernameTokenDec, (OASIS.WSSE, "Security"):SecurityDec, (DSIG.BASE, "Signature"):SignatureDec, (OASIS.WSSE, "Password"):PasswordDec, (OASIS.WSSE, "Nonce"):NonceDec, (OASIS.UTILITY, "Created"):CreatedDec, }) # # Stability: Unstable, Untested, Not Finished. # class WSSecurityHandler: """Web Services Security: SOAP Message Security 1.0 Class Variables: debug -- If True provide more detailed SOAP:Fault information to clients. """ classProvides(HandlerChainInterface) debug = True @classmethod def processRequest(cls, ps, **kw): if type(ps) is not ParsedSoap: raise TypeError,'Expecting ParsedSoap instance' security = ps.ParseHeaderElements([cls.securityDec]) # Assume all security headers are supposed to be processed here. for pyobj in security or []: for any in pyobj.Any or []: if any.typecode is UsernameTokenDec: try: ps = cls.UsernameTokenProfileHandler.processRequest(ps, any) except Exception, ex: if cls.debug: raise raise RuntimeError, 'Unauthorized Username/passphrase combination' continue if any.typecode is SignatureDec: try: ps = cls.SignatureHandler.processRequest(ps, any) except Exception, ex: if cls.debug: raise raise RuntimeError, 'Invalid Security Header' continue raise RuntimeError, 'WS-Security, Unsupported token %s' %str(any) return ps @classmethod def processResponse(cls, output, **kw): return output class UsernameTokenProfileHandler: """Web Services Security UsernameToken Profile 1.0 Class Variables: targetNamespace -- """ classProvides(HandlerChainInterface) # Class Variables targetNamespace = OASIS.WSSE sweepInterval = 60*5 nonces = None # Set to None to disable PasswordText = targetNamespace + "#PasswordText" PasswordDigest = targetNamespace + "#PasswordDigest" # Override passwordCallback passwordCallback = lambda cls,username: None @classmethod def sweep(cls, index): """remove nonces every sweepInterval. Parameters: index -- remove all nonces up to this index. """ if cls.nonces is None: cls.nonces = [] seconds = cls.sweepInterval cls.nonces = cls.nonces[index:] reactor.callLater(seconds, cls.sweep, len(cls.nonces)) @classmethod def processRequest(cls, ps, token, **kw): """ Parameters: ps -- ParsedSoap instance token -- UsernameToken pyclass instance """ if token.typecode is not UsernameTokenDec: raise TypeError, 'expecting GED (%s,%s) representation.' %( UsernameTokenDec.nspname, UsernameTokenDec.pname) username = token.Username # expecting only one password # may have a nonce and a created password = nonce = timestamp = None for any in token.Any or []: if any.typecode is PasswordDec: password = any continue if any.typecode is NonceTypeDec: nonce = any continue if any.typecode is CreatedTypeDec: timestamp = any continue raise TypeError, 'UsernameTokenProfileHander unexpected %s' %str(any) if password is None: raise RuntimeError, 'Unauthorized, no password' # TODO: not yet supporting complexType simpleContent in pyclass_type attrs = getattr(password, password.typecode.attrs_aname, {}) pwtype = attrs.get('Type', cls.PasswordText) # Clear Text Passwords if cls.PasswordText is not None and pwtype == cls.PasswordText: if password == cls.passwordCallback(username): return ps raise RuntimeError, 'Unauthorized, clear text password failed' if cls.nonces is None: cls.sweep(0) if nonce is not None: if nonce in cls.nonces: raise RuntimeError, 'Invalid Nonce' # created was 10 seconds ago or sooner if created is not None and created < time.gmtime(time.time()-10): raise RuntimeError, 'UsernameToken created is expired' cls.nonces.append(nonce) # PasswordDigest, recommended that implemenations # require a Nonce and Created if cls.PasswordDigest is not None and pwtype == cls.PasswordDigest: digest = sha.sha() for i in (nonce, created, cls.passwordCallback(username)): if i is None: continue digest.update(i) if password == base64.encodestring(digest.digest()).strip(): return ps raise RuntimeError, 'Unauthorized, digest failed' raise RuntimeError, 'Unauthorized, contents of UsernameToken unknown' @classmethod def processResponse(cls, output, **kw): return output @staticmethod def hmac_sha1(xml): return class SignatureHandler: """Web Services Security UsernameToken Profile 1.0 """ digestMethods = { DSIG.BASE+"#sha1":sha.sha, } signingMethods = { DSIG.BASE+"#hmac-sha1":hmac_sha1, } canonicalizationMethods = { DSIG.C14N_EXCL:lambda node: Canonicalize(node, unsuppressedPrefixes=[]), DSIG.C14N:lambda node: Canonicalize(node), } @classmethod def processRequest(cls, ps, signature, **kw): """ Parameters: ps -- ParsedSoap instance signature -- Signature pyclass instance """ if token.typecode is not SignatureDec: raise TypeError, 'expecting GED (%s,%s) representation.' %( SignatureDec.nspname, SignatureDec.pname) si = signature.SignedInfo si.CanonicalizationMethod calgo = si.CanonicalizationMethod.get_attribute_Algorithm() for any in si.CanonicalizationMethod.Any: pass # Check Digest si.Reference context = XPath.Context.Context(ps.dom, processContents={'wsu':OASIS.UTILITY}) exp = XPath.Compile('//*[@wsu:Id="%s"]' %si.Reference.get_attribute_URI()) nodes = exp.evaluate(context) if len(nodes) != 1: raise RuntimeError, 'A SignedInfo Reference must refer to one node %s.' %( si.Reference.get_attribute_URI()) try: xml = cls.canonicalizeMethods[calgo](nodes[0]) except IndexError: raise RuntimeError, 'Unsupported canonicalization algorithm' try: digest = cls.digestMethods[salgo] except IndexError: raise RuntimeError, 'unknown digestMethods Algorithm' digestValue = base64.encodestring(digest(xml).digest()).strip() if si.Reference.DigestValue != digestValue: raise RuntimeError, 'digest does not match' if si.Reference.Transforms: pass signature.KeyInfo signature.KeyInfo.KeyName signature.KeyInfo.KeyValue signature.KeyInfo.RetrievalMethod signature.KeyInfo.X509Data signature.KeyInfo.PGPData signature.KeyInfo.SPKIData signature.KeyInfo.MgmtData signature.KeyInfo.Any signature.Object # TODO: Check Signature signature.SignatureValue si.SignatureMethod salgo = si.SignatureMethod.get_attribute_Algorithm() if si.SignatureMethod.HMACOutputLength: pass for any in si.SignatureMethod.Any: pass # exp = XPath.Compile('//child::*[attribute::URI = "%s"]/..' %( si.Reference.get_attribute_URI())) nodes = exp.evaluate(context) if len(nodes) != 1: raise RuntimeError, 'A SignedInfo Reference must refer to one node %s.' %( si.Reference.get_attribute_URI()) try: xml = cls.canonicalizeMethods[calgo](nodes[0]) except IndexError: raise RuntimeError, 'Unsupported canonicalization algorithm' # TODO: Check SignatureValue @classmethod def processResponse(cls, output, **kw): return output class X509TokenProfileHandler: """Web Services Security UsernameToken Profile 1.0 """ targetNamespace = DSIG.BASE # Token Types singleCertificate = targetNamespace + "#X509v3" certificatePath = targetNamespace + "#X509PKIPathv1" setCerticatesCRLs = targetNamespace + "#PKCS7" @classmethod def processRequest(cls, ps, signature, **kw): return ps """ """ class WSSecurityHandlerChainFactory: protocol = DefaultHandlerChain @classmethod def newInstance(cls): return cls.protocol(WSAddressCallbackHandler, DataHandler, WSSecurityHandler, WSAddressHandler())