#!/usr/bin/python # # Copyright 2009 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """Handy utility functions.""" __author__ = 'api.sgrinberg@gmail.com (Stan Grinberg)' import codecs import csv import datetime import htmlentitydefs import math import os import re import sys import traceback from urlparse import urlparse from urlparse import urlunparse from aw_api import LIB_HOME from aw_api import SanityCheck from aw_api.AuthToken import AuthToken from aw_api.Buffer import Buffer from aw_api.Errors import Error def ReadFile(f_path): """Load data from a given file. Args: f_path: str absolute path to the file to read. Returns: str data loaded from the file, None otherwise. """ data = '' if f_path: try: fh = open(f_path, 'r') try: data = fh.read() finally: fh.close() except IOError: return '' return data def GetDataFromCsvFile(file_name): """Get data from CSV file, given name of the file from "aw_api/data". Args: file_name: str file name. Returns: list data from CSV file. """ f_path = os.path.join(LIB_HOME, 'data', file_name) rows = [] for row in csv.reader(ReadFile(f_path).split('\n')): if row: rows.append(row) return rows[1:] def GetCategories(): """Get a list of available categories. Returns: list available categories. """ return GetDataFromCsvFile('categories.csv') def GetCountries(): """Get a list of available countries. Returns: list available countries. """ return GetDataFromCsvFile('countries.csv') def GetCurrencies(): """Get a list of available currencies. Returns: list available currencies. """ return GetDataFromCsvFile('currencies.csv') def GetErrorCodes(): """Get a list of available error codes. Returns: list available error codes. """ return GetDataFromCsvFile('error_codes.csv') def GetLanguages(): """Get a list of available languages. Returns: list available languages. """ return GetDataFromCsvFile('languages.csv') def GetOpsRates(): """Get a list of available API operations rates. Returns: list available API operations rates. """ return GetDataFromCsvFile('ops_rates.csv') def GetMethodCost(version, service, method, ops_num, is_sandbox=False, is_validate_only=False): """Get cost of the method in units. Args: version: str version to which method belongs. service: str service to which method belongs. method: str method for which to return cost. ops_num: int number of operations for which to calculate cost. [optional] is_sandbox: bool whether to calculate cost for sandbox instance. is_validate_only: bool whether to calculate cost for validateOnly request. Returns: int cost of the method in units. """ cost = 0 for line in GetOpsRates(): # TODO(api.sgrinberg): Implement support for calculating method cost for # operatins with different operators. if (line and line[0] == version and line[1] == service and line[2].lower() == method.lower()): cost = float(line[3]) if is_validate_only: cost = 0.05 if bool(line[4]): cost *= ops_num break return int(math.ceil(cost)) def GetTimezones(): """Get a list of available timezones. Returns: list available timezones. """ return GetDataFromCsvFile('timezones.csv') def GetUsCities(): """Get a list of available US cities. Returns: list available US cities. """ return GetDataFromCsvFile('us_cities.csv') def GetUsMetros(): """Get a list of available US metros. Returns: list available US metros. """ return GetDataFromCsvFile('us_metros.csv') def GetWorldCities(): """Get a list of available world cities. Returns: list available world cities. """ return GetDataFromCsvFile('world_cities.csv') def GetWorldRegions(): """Get a list of available world regions. Returns: list available world regions. """ return GetDataFromCsvFile('world_regions.csv') def PurgeLogs(logs_home): """Clear all logs generated by this client library. This will remove all stored SOAP XML and request data. WARNING: This will remove all data from the logs generated by the client library. Args: logs_home: str absolute path to logs directory. """ logs = (os.path.join(logs_home, 'aw_api_lib.log'), os.path.join(logs_home, 'soap_xml.log'), os.path.join(logs_home, 'request_info.log')) for log in logs: PurgeLog(log) def PurgeLog(log): """Clear content of a given log file. If the file cannot be opened, Error is raised. Args: log: str absolute path to a log file. """ try: fh = open(log, 'w') try: fh.write('') finally: fh.close() except IOError, e: raise Error(e) # TODO(api.sgrinberg): Rewrite this method using HTML parser library. def GetErrorFromHtml(data): """Return error message from HTML page. Args: data: str HTML data. Returns: str error message. """ pattern = re.compile('\n') data = pattern.sub('', data) # Fetch error message. pattern = re.compile('(.*)|(.*)') msg = pattern.findall(data) if msg: for item in msg[0]: if item: msg = item # Cut unnecessary wording. pattern = re.compile('Error: |ERROR: ') msg = pattern.sub('', msg, count=1) # Fetch detail for the error message pattern = re.compile(('

.*

(.*)

|' '

.*

(.*)')) msg_detail = pattern.findall(data) if isinstance(msg_detail, list) and msg_detail: for item in msg_detail[0]: if item: msg_detail = item msg_detail = msg_detail.strip('.') # Cut any HTML tags that appear in the message. pattern = re.compile('<.?H2>|<.?p>|<.?A.*>|<.?P.*>|<.?HR.*>') msg_detail = pattern.sub(' ', msg_detail).strip(' ') if msg_detail == msg: msg_detail = '' else: msg_detail = '' if msg: if not msg_detail: return '%s.' % msg return '%s. %s.' % (msg, msg_detail) else: # Check for non standard HTML content, with just the . pattern = re.compile('(.*)') msg = pattern.findall(data) if msg: return msg[0] return '' def IsHtml(data): """Return True if data is HTML, False otherwise. Args: data: str data to check. Returns: bool True if data is HTML, False otherwise. """ # Remove banners and XML header. Convert to lower case for easy search. data = ''.join(data.split('\n')).lower() pattern = re.compile('.*?.*?.*?') if pattern.findall(data): return True else: return False def DecodeNonASCIIText(text, encoding='utf-8'): """Decode a non-ASCII text into a unicode, using given encoding. A full list of supported encodings is available at http://docs.python.org/lib/standard-encodings.html. If unable to find given encoding, Error is raised. Args: text: str a text to encode. [optional] encoding: str an encoding to use. Returns: tuple decoded text with the text length, (text, len(text)). """ if isinstance(text, unicode): return (text, len(text)) dec_text = '' try: decoder = codecs.getdecoder(encoding) dec_text = decoder(text) except LookupError, e: msg = 'Unable to find \'%s\' encoding. %s.' % (encoding, e) raise Error(msg) return dec_text def MakeTextXMLReady(text): """Convert given text into an XML ready text. XML ready text with all non-ASCII characters properly decoded. Args: text: str a text to convert. Returns: str converted text. """ dec_text = DecodeNonASCIIText(text)[0] items = [] for char in dec_text: try: char = char.encode('ascii') except UnicodeEncodeError: # We have a non-ASCII character of type unicode. Convert it into an # XML-ready format. char = '%s;' % hex(ord(char)).replace('0x', '&#x') items.append(char) return ''.join(items) def __ParseUrl(url): """Parse URL into components. Args: url: str a URL to parse. """ return urlparse(url) def GetSchemeFromUrl(url): """Return scheme portion of the URL. Args: url: str a URL to parse. """ return __ParseUrl(url)[0] def GetNetLocFromUrl(url): """Return netloc portion of the URL. Args: url: str a URL to parse. Returns: str: netloc portion of the URL. """ return __ParseUrl(url)[1] def GetPathFromUrl(url): """Return path portion of the URL. Args: url: str a URL to parse. Returns: str: path portion of the URL. """ return __ParseUrl(url)[2] def GetServerFromUrl(url): """Return reconstructed scheme and netloc portion of the URL. Args: url: str a URL to parse. """ return urlunparse((GetSchemeFromUrl(url), GetNetLocFromUrl(url), '', '', '', '')) def GetVersionFromUrl(url): """Return API version portion of the URL. Args: url: str a URL to parse. """ return __ParseUrl(url)[2].split('/')[3] def GetAuthToken(email, password): """Return an authentication token for Google Account. If an error occurs, AuthTokenError is raised. Args: email: str Google Account's login email. password: str Google Account's password. Returns: str authentication token for Google Account. """ return AuthToken(email, password).GetAuthToken() def GetCurrentFuncName(): """Return current function/method name. Returns: str current function/method name. """ return sys._getframe(1).f_code.co_name def UnLoadDictKeys(dct, keys_lst): """Return newly built dictionary with out the keys in keys_lst. Args: dct: dict a dictionary to unload keys from. keys_lst: list a list of keys to unload from a dictionary. Returns: dict a new dictionary with out given keys. """ if not keys_lst: return dct SanityCheck.ValidateTypes(((dct, dict), (keys_lst, list))) new_dct = {} for key in dct: if key in keys_lst: continue new_dct[key] = dct[key] return new_dct def CleanUpDict(dct): """Return newly built dictionary with out the keys that point to no values. Args: dct: dict a dictionary to clean up. Returns: dict a new dictionary with out empty keys. """ SanityCheck.ValidateTypes(((dct, dict),)) new_dct = {} for key in dct: if dct[key]: new_dct[key] = dct[key] return new_dct def BoolTypeConvert(bool_type): """Convert bool to local bool and vice versa. Args: bool_type: bool or str a type to convert (i.e. True<=>'y', False<=>'n', 'true'=>True, 'false'=>False). Returns: bool or str converted type. """ if isinstance(bool_type, bool): if bool_type: return 'y' else: return 'n' elif isinstance(bool_type, str): if bool_type == 'y' or bool_type.lower() == 'true': return True elif bool_type == 'n' or bool_type.lower() == 'false': return False def LastStackTrace(): """Fetch last stack traceback. Returns: str last stack traceback. """ # Temporarily redirect traceback from STDOUT into a buffer. trace_buf = Buffer() old_stdout = sys.stdout sys.stdout = trace_buf try: traceback.print_exc(file=sys.stdout) except AttributeError: # No exception for traceback exist. print '' # Restore STDOUT. sys.stdout = old_stdout return trace_buf.GetBufferAsStr().strip() def HtmlUnescape(text): """Removes HTML or XML character references and entities from a text string. See http://effbot.org/zone/re-sub.htm#unescape-html. Args: text: str the HTML (or XML) source text. Returns: str/unicode plain text, as a Unicode string, if necessary. """ def fixup(m): text = m.group(0) if text[:2] == "&#": # character reference try: if text[:3] == "&#x": return unichr(int(text[3:-1], 16)) else: return unichr(int(text[2:-1])) except ValueError: pass else: # named entity try: text = unichr(htmlentitydefs.name2codepoint[text[1:-1]]) except KeyError: pass return text # leave as is return re.sub("&#?\w+;", fixup, text) def CsvEscape(text): """Escapes data entry for consistency with CSV format. The CSV format rules: - Fields with embedded commas must be enclosed within double-quote characters. - Fields with embedded double-quote characters must be enclosed within double-quote characters, and each of the embedded double-quote characters must be represented by a pair of double-quote characters. - Fields with embedded line breaks must be enclosed within double-quote characters. - Fields with leading or trailing spaces must be enclosed within double-quote characters. Args: text: str data entry. Returns: str a CSV encoded data entry. """ if not text: return '' if text.find('"') > -1: text = text.replace('"', '""') if (text == '' or text.find(',') > -1 or text.find('"') > -1 or text.find('\n') > -1 or text.find('\r') > -1 or text[0] == ' ' or text[-1] == ' '): text = '"%s"' % text return text def GetUniqueName(): """Returns a unique value consisting of parts from datetime.datetime.now(). Returns: str unique name. """ dt = datetime.datetime.now() return '%s%s%s%s%s%s%s' % (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, dt.microsecond)