# SPDX-License-Identifier: (GPL-2.0 OR BSD-3-Clause) import hashlib import os import socket import struct import sys import unittest from fcntl import ioctl TPM2_ST_NO_SESSIONS = 0x8001 TPM2_ST_SESSIONS = 0x8002 TPM2_CC_FIRST = 0x01FF TPM2_CC_CREATE_PRIMARY = 0x0131 TPM2_CC_DICTIONARY_ATTACK_LOCK_RESET = 0x0139 TPM2_CC_CREATE = 0x0153 TPM2_CC_LOAD = 0x0157 TPM2_CC_UNSEAL = 0x015E TPM2_CC_FLUSH_CONTEXT = 0x0165 TPM2_CC_START_AUTH_SESSION = 0x0176 TPM2_CC_GET_CAPABILITY = 0x017A TPM2_CC_GET_RANDOM = 0x017B TPM2_CC_PCR_READ = 0x017E TPM2_CC_POLICY_PCR = 0x017F TPM2_CC_PCR_EXTEND = 0x0182 TPM2_CC_POLICY_PASSWORD = 0x018C TPM2_CC_POLICY_GET_DIGEST = 0x0189 TPM2_SE_POLICY = 0x01 TPM2_SE_TRIAL = 0x03 TPM2_ALG_RSA = 0x0001 TPM2_ALG_SHA1 = 0x0004 TPM2_ALG_AES = 0x0006 TPM2_ALG_KEYEDHASH = 0x0008 TPM2_ALG_SHA256 = 0x000B TPM2_ALG_NULL = 0x0010 TPM2_ALG_CBC = 0x0042 TPM2_ALG_CFB = 0x0043 TPM2_RH_OWNER = 0x40000001 TPM2_RH_NULL = 0x40000007 TPM2_RH_LOCKOUT = 0x4000000A TPM2_RS_PW = 0x40000009 TPM2_RC_SIZE = 0x01D5 TPM2_RC_AUTH_FAIL = 0x098E TPM2_RC_POLICY_FAIL = 0x099D TPM2_RC_COMMAND_CODE = 0x0143 TSS2_RC_LAYER_SHIFT = 16 TSS2_RESMGR_TPM_RC_LAYER = (11 << TSS2_RC_LAYER_SHIFT) TPM2_CAP_HANDLES = 0x00000001 TPM2_CAP_COMMANDS = 0x00000002 TPM2_CAP_TPM_PROPERTIES = 0x00000006 TPM2_PT_FIXED = 0x100 TPM2_PT_TOTAL_COMMANDS = TPM2_PT_FIXED + 41 HR_SHIFT = 24 HR_LOADED_SESSION = 0x02000000 HR_TRANSIENT = 0x80000000 SHA1_DIGEST_SIZE = 20 SHA256_DIGEST_SIZE = 32 TPM2_VER0_ERRORS = { 0x000: "TPM_RC_SUCCESS", 0x030: "TPM_RC_BAD_TAG", } TPM2_VER1_ERRORS = { 0x000: "TPM_RC_FAILURE", 0x001: "TPM_RC_FAILURE", 0x003: "TPM_RC_SEQUENCE", 0x00B: "TPM_RC_PRIVATE", 0x019: "TPM_RC_HMAC", 0x020: "TPM_RC_DISABLED", 0x021: "TPM_RC_EXCLUSIVE", 0x024: "TPM_RC_AUTH_TYPE", 0x025: "TPM_RC_AUTH_MISSING", 0x026: "TPM_RC_POLICY", 0x027: "TPM_RC_PCR", 0x028: "TPM_RC_PCR_CHANGED", 0x02D: "TPM_RC_UPGRADE", 0x02E: "TPM_RC_TOO_MANY_CONTEXTS", 0x02F: "TPM_RC_AUTH_UNAVAILABLE", 0x030: "TPM_RC_REBOOT", 0x031: "TPM_RC_UNBALANCED", 0x042: "TPM_RC_COMMAND_SIZE", 0x043: "TPM_RC_COMMAND_CODE", 0x044: "TPM_RC_AUTHSIZE", 0x045: "TPM_RC_AUTH_CONTEXT", 0x046: "TPM_RC_NV_RANGE", 0x047: "TPM_RC_NV_SIZE", 0x048: "TPM_RC_NV_LOCKED", 0x049: "TPM_RC_NV_AUTHORIZATION", 0x04A: "TPM_RC_NV_UNINITIALIZED", 0x04B: "TPM_RC_NV_SPACE", 0x04C: "TPM_RC_NV_DEFINED", 0x050: "TPM_RC_BAD_CONTEXT", 0x051: "TPM_RC_CPHASH", 0x052: "TPM_RC_PARENT", 0x053: "TPM_RC_NEEDS_TEST", 0x054: "TPM_RC_NO_RESULT", 0x055: "TPM_RC_SENSITIVE", 0x07F: "RC_MAX_FM0", } TPM2_FMT1_ERRORS = { 0x001: "TPM_RC_ASYMMETRIC", 0x002: "TPM_RC_ATTRIBUTES", 0x003: "TPM_RC_HASH", 0x004: "TPM_RC_VALUE", 0x005: "TPM_RC_HIERARCHY", 0x007: "TPM_RC_KEY_SIZE", 0x008: "TPM_RC_MGF", 0x009: "TPM_RC_MODE", 0x00A: "TPM_RC_TYPE", 0x00B: "TPM_RC_HANDLE", 0x00C: "TPM_RC_KDF", 0x00D: "TPM_RC_RANGE", 0x00E: "TPM_RC_AUTH_FAIL", 0x00F: "TPM_RC_NONCE", 0x010: "TPM_RC_PP", 0x012: "TPM_RC_SCHEME", 0x015: "TPM_RC_SIZE", 0x016: "TPM_RC_SYMMETRIC", 0x017: "TPM_RC_TAG", 0x018: "TPM_RC_SELECTOR", 0x01A: "TPM_RC_INSUFFICIENT", 0x01B: "TPM_RC_SIGNATURE", 0x01C: "TPM_RC_KEY", 0x01D: "TPM_RC_POLICY_FAIL", 0x01F: "TPM_RC_INTEGRITY", 0x020: "TPM_RC_TICKET", 0x021: "TPM_RC_RESERVED_BITS", 0x022: "TPM_RC_BAD_AUTH", 0x023: "TPM_RC_EXPIRED", 0x024: "TPM_RC_POLICY_CC", 0x025: "TPM_RC_BINDING", 0x026: "TPM_RC_CURVE", 0x027: "TPM_RC_ECC_POINT", } TPM2_WARN_ERRORS = { 0x001: "TPM_RC_CONTEXT_GAP", 0x002: "TPM_RC_OBJECT_MEMORY", 0x003: "TPM_RC_SESSION_MEMORY", 0x004: "TPM_RC_MEMORY", 0x005: "TPM_RC_SESSION_HANDLES", 0x006: "TPM_RC_OBJECT_HANDLES", 0x007: "TPM_RC_LOCALITY", 0x008: "TPM_RC_YIELDED", 0x009: "TPM_RC_CANCELED", 0x00A: "TPM_RC_TESTING", 0x010: "TPM_RC_REFERENCE_H0", 0x011: "TPM_RC_REFERENCE_H1", 0x012: "TPM_RC_REFERENCE_H2", 0x013: "TPM_RC_REFERENCE_H3", 0x014: "TPM_RC_REFERENCE_H4", 0x015: "TPM_RC_REFERENCE_H5", 0x016: "TPM_RC_REFERENCE_H6", 0x018: "TPM_RC_REFERENCE_S0", 0x019: "TPM_RC_REFERENCE_S1", 0x01A: "TPM_RC_REFERENCE_S2", 0x01B: "TPM_RC_REFERENCE_S3", 0x01C: "TPM_RC_REFERENCE_S4", 0x01D: "TPM_RC_REFERENCE_S5", 0x01E: "TPM_RC_REFERENCE_S6", 0x020: "TPM_RC_NV_RATE", 0x021: "TPM_RC_LOCKOUT", 0x022: "TPM_RC_RETRY", 0x023: "TPM_RC_NV_UNAVAILABLE", 0x7F: "TPM_RC_NOT_USED", } RC_VER1 = 0x100 RC_FMT1 = 0x080 RC_WARN = 0x900 ALG_DIGEST_SIZE_MAP = { TPM2_ALG_SHA1: SHA1_DIGEST_SIZE, TPM2_ALG_SHA256: SHA256_DIGEST_SIZE, } ALG_HASH_FUNCTION_MAP = { TPM2_ALG_SHA1: hashlib.sha1, TPM2_ALG_SHA256: hashlib.sha256 } NAME_ALG_MAP = { "sha1": TPM2_ALG_SHA1, "sha256": TPM2_ALG_SHA256, } class UnknownAlgorithmIdError(Exception): def __init__(self, alg): self.alg = alg def __str__(self): return '0x%0x' % (alg) class UnknownAlgorithmNameError(Exception): def __init__(self, name): self.name = name def __str__(self): return name class UnknownPCRBankError(Exception): def __init__(self, alg): self.alg = alg def __str__(self): return '0x%0x' % (alg) class ProtocolError(Exception): def __init__(self, cc, rc): self.cc = cc self.rc = rc if (rc & RC_FMT1) == RC_FMT1: self.name = TPM2_FMT1_ERRORS.get(rc & 0x3f, "TPM_RC_UNKNOWN") elif (rc & RC_WARN) == RC_WARN: self.name = TPM2_WARN_ERRORS.get(rc & 0x7f, "TPM_RC_UNKNOWN") elif (rc & RC_VER1) == RC_VER1: self.name = TPM2_VER1_ERRORS.get(rc & 0x7f, "TPM_RC_UNKNOWN") else: self.name = TPM2_VER0_ERRORS.get(rc & 0x7f, "TPM_RC_UNKNOWN") def __str__(self): if self.cc: return '%s: cc=0x%08x, rc=0x%08x' % (self.name, self.cc, self.rc) else: return '%s: rc=0x%08x' % (self.name, self.rc) class AuthCommand(object): """TPMS_AUTH_COMMAND""" def __init__(self, session_handle=TPM2_RS_PW, nonce='', session_attributes=0, hmac=''): self.session_handle = session_handle self.nonce = nonce self.session_attributes = session_attributes self.hmac = hmac def __str__(self): fmt = '>I H%us B H%us' % (len(self.nonce), len(self.hmac)) return struct.pack(fmt, self.session_handle, len(self.nonce), self.nonce, self.session_attributes, len(self.hmac), self.hmac) def __len__(self): fmt = '>I H%us B H%us' % (len(self.nonce), len(self.hmac)) return struct.calcsize(fmt) class SensitiveCreate(object): """TPMS_SENSITIVE_CREATE""" def __init__(self, user_auth='', data=''): self.user_auth = user_auth self.data = data def __str__(self): fmt = '>H%us H%us' % (len(self.user_auth), len(self.data)) return struct.pack(fmt, len(self.user_auth), self.user_auth, len(self.data), self.data) def __len__(self): fmt = '>H%us H%us' % (len(self.user_auth), len(self.data)) return struct.calcsize(fmt) class Public(object): """TPMT_PUBLIC""" FIXED_TPM = (1 << 1) FIXED_PARENT = (1 << 4) SENSITIVE_DATA_ORIGIN = (1 << 5) USER_WITH_AUTH = (1 << 6) RESTRICTED = (1 << 16) DECRYPT = (1 << 17) def __fmt(self): return '>HHIH%us%usH%us' % \ (len(self.auth_policy), len(self.parameters), len(self.unique)) def __init__(self, object_type, name_alg, object_attributes, auth_policy='', parameters='', unique=''): self.object_type = object_type self.name_alg = name_alg self.object_attributes = object_attributes self.auth_policy = auth_policy self.parameters = parameters self.unique = unique def __str__(self): return struct.pack(self.__fmt(), self.object_type, self.name_alg, self.object_attributes, len(self.auth_policy), self.auth_policy, self.parameters, len(self.unique), self.unique) def __len__(self): return struct.calcsize(self.__fmt()) def get_digest_size(alg): ds = ALG_DIGEST_SIZE_MAP.get(alg) if not ds: raise UnknownAlgorithmIdError(alg) return ds def get_hash_function(alg): f = ALG_HASH_FUNCTION_MAP.get(alg) if not f: raise UnknownAlgorithmIdError(alg) return f def get_algorithm(name): alg = NAME_ALG_MAP.get(name) if not alg: raise UnknownAlgorithmNameError(name) return alg def hex_dump(d): d = [format(ord(x), '02x') for x in d] d = [d[i: i + 16] for i in xrange(0, len(d), 16)] d = [' '.join(x) for x in d] d = os.linesep.join(d) return d class Client: FLAG_DEBUG = 0x01 FLAG_SPACE = 0x02 TPM_IOC_NEW_SPACE = 0xa200 def __init__(self, flags = 0): self.flags = flags if (self.flags & Client.FLAG_SPACE) == 0: self.tpm = open('/dev/tpm0', 'r+b', buffering=0) else: self.tpm = open('/dev/tpmrm0', 'r+b', buffering=0) def close(self): self.tpm.close() def send_cmd(self, cmd): self.tpm.write(cmd) rsp = self.tpm.read() if (self.flags & Client.FLAG_DEBUG) != 0: sys.stderr.write('cmd' + os.linesep) sys.stderr.write(hex_dump(cmd) + os.linesep) sys.stderr.write('rsp' + os.linesep) sys.stderr.write(hex_dump(rsp) + os.linesep) rc = struct.unpack('>I', rsp[6:10])[0] if rc != 0: cc = struct.unpack('>I', cmd[6:10])[0] raise ProtocolError(cc, rc) return rsp def read_pcr(self, i, bank_alg = TPM2_ALG_SHA1): pcrsel_len = max((i >> 3) + 1, 3) pcrsel = [0] * pcrsel_len pcrsel[i >> 3] = 1 << (i & 7) pcrsel = ''.join(map(chr, pcrsel)) fmt = '>HII IHB%us' % (pcrsel_len) cmd = struct.pack(fmt, TPM2_ST_NO_SESSIONS, struct.calcsize(fmt), TPM2_CC_PCR_READ, 1, bank_alg, pcrsel_len, pcrsel) rsp = self.send_cmd(cmd) pcr_update_cnt, pcr_select_cnt = struct.unpack('>II', rsp[10:18]) assert pcr_select_cnt == 1 rsp = rsp[18:] alg2, pcrsel_len2 = struct.unpack('>HB', rsp[:3]) assert bank_alg == alg2 and pcrsel_len == pcrsel_len2 rsp = rsp[3 + pcrsel_len:] digest_cnt = struct.unpack('>I', rsp[:4])[0] if digest_cnt == 0: return None rsp = rsp[6:] return rsp def extend_pcr(self, i, dig, bank_alg = TPM2_ALG_SHA1): ds = get_digest_size(bank_alg) assert(ds == len(dig)) auth_cmd = AuthCommand() fmt = '>HII I I%us IH%us' % (len(auth_cmd), ds) cmd = struct.pack( fmt, TPM2_ST_SESSIONS, struct.calcsize(fmt), TPM2_CC_PCR_EXTEND, i, len(auth_cmd), str(auth_cmd), 1, bank_alg, dig) self.send_cmd(cmd) def start_auth_session(self, session_type, name_alg = TPM2_ALG_SHA1): fmt = '>HII IIH16sHBHH' cmd = struct.pack(fmt, TPM2_ST_NO_SESSIONS, struct.calcsize(fmt), TPM2_CC_START_AUTH_SESSION, TPM2_RH_NULL, TPM2_RH_NULL, 16, '\0' * 16, 0, session_type, TPM2_ALG_NULL, name_alg) return struct.unpack('>I', self.send_cmd(cmd)[10:14])[0] def __calc_pcr_digest(self, pcrs, bank_alg = TPM2_ALG_SHA1, digest_alg = TPM2_ALG_SHA1): x = [] f = get_hash_function(digest_alg) for i in pcrs: pcr = self.read_pcr(i, bank_alg) if pcr == None: return None x += pcr return f(bytearray(x)).digest() def policy_pcr(self, handle, pcrs, bank_alg = TPM2_ALG_SHA1, name_alg = TPM2_ALG_SHA1): ds = get_digest_size(name_alg) dig = self.__calc_pcr_digest(pcrs, bank_alg, name_alg) if not dig: raise UnknownPCRBankError(bank_alg) pcrsel_len = max((max(pcrs) >> 3) + 1, 3) pcrsel = [0] * pcrsel_len for i in pcrs: pcrsel[i >> 3] |= 1 << (i & 7) pcrsel = ''.join(map(chr, pcrsel)) fmt = '>HII IH%usIHB3s' % ds cmd = struct.pack(fmt, TPM2_ST_NO_SESSIONS, struct.calcsize(fmt), TPM2_CC_POLICY_PCR, handle, len(dig), str(dig), 1, bank_alg, pcrsel_len, pcrsel) self.send_cmd(cmd) def policy_password(self, handle): fmt = '>HII I' cmd = struct.pack(fmt, TPM2_ST_NO_SESSIONS, struct.calcsize(fmt), TPM2_CC_POLICY_PASSWORD, handle) self.send_cmd(cmd) def get_policy_digest(self, handle): fmt = '>HII I' cmd = struct.pack(fmt, TPM2_ST_NO_SESSIONS, struct.calcsize(fmt), TPM2_CC_POLICY_GET_DIGEST, handle) return self.send_cmd(cmd)[12:] def flush_context(self, handle): fmt = '>HIII' cmd = struct.pack(fmt, TPM2_ST_NO_SESSIONS, struct.calcsize(fmt), TPM2_CC_FLUSH_CONTEXT, handle) self.send_cmd(cmd) def create_root_key(self, auth_value = ''): attributes = \ Public.FIXED_TPM | \ Public.FIXED_PARENT | \ Public.SENSITIVE_DATA_ORIGIN | \ Public.USER_WITH_AUTH | \ Public.RESTRICTED | \ Public.DECRYPT auth_cmd = AuthCommand() sensitive = SensitiveCreate(user_auth=auth_value) public_parms = struct.pack( '>HHHHHI', TPM2_ALG_AES, 128, TPM2_ALG_CFB, TPM2_ALG_NULL, 2048, 0) public = Public( object_type=TPM2_ALG_RSA, name_alg=TPM2_ALG_SHA1, object_attributes=attributes, parameters=public_parms) fmt = '>HIII I%us H%us H%us HI' % \ (len(auth_cmd), len(sensitive), len(public)) cmd = struct.pack( fmt, TPM2_ST_SESSIONS, struct.calcsize(fmt), TPM2_CC_CREATE_PRIMARY, TPM2_RH_OWNER, len(auth_cmd), str(auth_cmd), len(sensitive), str(sensitive), len(public), str(public), 0, 0) return struct.unpack('>I', self.send_cmd(cmd)[10:14])[0] def seal(self, parent_key, data, auth_value, policy_dig, name_alg = TPM2_ALG_SHA1): ds = get_digest_size(name_alg) assert(not policy_dig or ds == len(policy_dig)) attributes = 0 if not policy_dig: attributes |= Public.USER_WITH_AUTH policy_dig = '' auth_cmd = AuthCommand() sensitive = SensitiveCreate(user_auth=auth_value, data=data) public = Public( object_type=TPM2_ALG_KEYEDHASH, name_alg=name_alg, object_attributes=attributes, auth_policy=policy_dig, parameters=struct.pack('>H', TPM2_ALG_NULL)) fmt = '>HIII I%us H%us H%us HI' % \ (len(auth_cmd), len(sensitive), len(public)) cmd = struct.pack( fmt, TPM2_ST_SESSIONS, struct.calcsize(fmt), TPM2_CC_CREATE, parent_key, len(auth_cmd), str(auth_cmd), len(sensitive), str(sensitive), len(public), str(public), 0, 0) rsp = self.send_cmd(cmd) return rsp[14:] def unseal(self, parent_key, blob, auth_value, policy_handle): private_len = struct.unpack('>H', blob[0:2])[0] public_start = private_len + 2 public_len = struct.unpack('>H', blob[public_start:public_start + 2])[0] blob = blob[:private_len + public_len + 4] auth_cmd = AuthCommand() fmt = '>HII I I%us %us' % (len(auth_cmd), len(blob)) cmd = struct.pack( fmt, TPM2_ST_SESSIONS, struct.calcsize(fmt), TPM2_CC_LOAD, parent_key, len(auth_cmd), str(auth_cmd), blob) data_handle = struct.unpack('>I', self.send_cmd(cmd)[10:14])[0] if policy_handle: auth_cmd = AuthCommand(session_handle=policy_handle, hmac=auth_value) else: auth_cmd = AuthCommand(hmac=auth_value) fmt = '>HII I I%us' % (len(auth_cmd)) cmd = struct.pack( fmt, TPM2_ST_SESSIONS, struct.calcsize(fmt), TPM2_CC_UNSEAL, data_handle, len(auth_cmd), str(auth_cmd)) try: rsp = self.send_cmd(cmd) finally: self.flush_context(data_handle) data_len = struct.unpack('>I', rsp[10:14])[0] - 2 return rsp[16:16 + data_len] def reset_da_lock(self): auth_cmd = AuthCommand() fmt = '>HII I I%us' % (len(auth_cmd)) cmd = struct.pack( fmt, TPM2_ST_SESSIONS, struct.calcsize(fmt), TPM2_CC_DICTIONARY_ATTACK_LOCK_RESET, TPM2_RH_LOCKOUT, len(auth_cmd), str(auth_cmd)) self.send_cmd(cmd) def __get_cap_cnt(self, cap, pt, cnt): handles = [] fmt = '>HII III' cmd = struct.pack(fmt, TPM2_ST_NO_SESSIONS, struct.calcsize(fmt), TPM2_CC_GET_CAPABILITY, cap, pt, cnt) rsp = self.send_cmd(cmd)[10:] more_data, cap, cnt = struct.unpack('>BII', rsp[:9]) rsp = rsp[9:] for i in xrange(0, cnt): handle = struct.unpack('>I', rsp[:4])[0] handles.append(handle) rsp = rsp[4:] return handles, more_data def get_cap(self, cap, pt): handles = [] more_data = True while more_data: next_handles, more_data = self.__get_cap_cnt(cap, pt, 1) handles += next_handles pt += 1 return handles