diff options
Diffstat (limited to 'google_appengine/google/appengine/api/labs/taskqueue/taskqueue.py')
-rwxr-xr-x | google_appengine/google/appengine/api/labs/taskqueue/taskqueue.py | 633 |
1 files changed, 633 insertions, 0 deletions
diff --git a/google_appengine/google/appengine/api/labs/taskqueue/taskqueue.py b/google_appengine/google/appengine/api/labs/taskqueue/taskqueue.py new file mode 100755 index 0000000..733df36 --- /dev/null +++ b/google_appengine/google/appengine/api/labs/taskqueue/taskqueue.py @@ -0,0 +1,633 @@ +#!/usr/bin/env python +# +# Copyright 2007 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Task Queue API. + +Enables an application to queue background work for itself. Work is done through +webhooks that process tasks pushed from a queue. Tasks will execute in +best-effort order of ETA. Webhooks that fail will cause tasks to be retried at a +later time. Multiple queues may exist with independent throttling controls. + +Webhook URLs may be specified directly for Tasks, or the default URL scheme +may be used, which will translate Task names into URLs relative to a Queue's +base path. A default queue is also provided for simple usage. +""" + + + +import datetime +import re +import time +import urllib +import urlparse + +import taskqueue_service_pb + +from google.appengine.api import apiproxy_stub_map +from google.appengine.api import urlfetch +from google.appengine.runtime import apiproxy_errors + + +class Error(Exception): + """Base-class for exceptions in this module.""" + + +class UnknownQueueError(Error): + """The queue specified is unknown.""" + + +class TransientError(Error): + """There was a transient error while accessing the queue. + + Please Try again later. + """ + + +class InternalError(Error): + """There was an internal error while accessing this queue. + + If this problem continues, please contact the App Engine team through + our support forum with a description of your problem. + """ + + +class InvalidTaskError(Error): + """The task's parameters, headers, or method is invalid.""" + + +class InvalidTaskNameError(InvalidTaskError): + """The task's name is invalid.""" + + +class TaskTooLargeError(InvalidTaskError): + """The task is too large with its headers and payload.""" + + +class TaskAlreadyExistsError(InvalidTaskError): + """Task already exists. It has not yet run.""" + + +class TombstonedTaskError(InvalidTaskError): + """Task has been tombstoned.""" + + +class InvalidUrlError(InvalidTaskError): + """The task's relative URL is invalid.""" + + +class BadTaskStateError(Error): + """The task is in the wrong state for the requested operation.""" + + +class InvalidQueueError(Error): + """The Queue's configuration is invalid.""" + + +class InvalidQueueNameError(InvalidQueueError): + """The Queue's name is invalid.""" + + +class _RelativeUrlError(Error): + """The relative URL supplied is invalid.""" + + +class PermissionDeniedError(Error): + """The requested operation is not allowed for this app.""" + + +MAX_QUEUE_NAME_LENGTH = 100 + +MAX_TASK_NAME_LENGTH = 500 + +MAX_TASK_SIZE_BYTES = 10 * (2 ** 10) + +MAX_URL_LENGTH = 2083 + +_DEFAULT_QUEUE = 'default' + +_DEFAULT_QUEUE_PATH = '/_ah/queue' + +_METHOD_MAP = { + 'GET': taskqueue_service_pb.TaskQueueAddRequest.GET, + 'POST': taskqueue_service_pb.TaskQueueAddRequest.POST, + 'HEAD': taskqueue_service_pb.TaskQueueAddRequest.HEAD, + 'PUT': taskqueue_service_pb.TaskQueueAddRequest.PUT, + 'DELETE': taskqueue_service_pb.TaskQueueAddRequest.DELETE, +} + +_NON_POST_METHODS = frozenset(['GET', 'HEAD', 'PUT', 'DELETE']) + +_BODY_METHODS = frozenset(['POST', 'PUT']) + +_TASK_NAME_PATTERN = r'^[a-zA-Z0-9-]{1,%s}$' % MAX_TASK_NAME_LENGTH + +_TASK_NAME_RE = re.compile(_TASK_NAME_PATTERN) + +_QUEUE_NAME_PATTERN = r'^[a-zA-Z0-9-]{1,%s}$' % MAX_QUEUE_NAME_LENGTH + +_QUEUE_NAME_RE = re.compile(_QUEUE_NAME_PATTERN) + + +class _UTCTimeZone(datetime.tzinfo): + """UTC timezone.""" + + ZERO = datetime.timedelta(0) + + def utcoffset(self, dt): + return self.ZERO + + def dst(self, dt): + return self.ZERO + + def tzname(self, dt): + return 'UTC' + + +_UTC = _UTCTimeZone() + + +def _parse_relative_url(relative_url): + """Parses a relative URL and splits it into its path and query string. + + Args: + relative_url: The relative URL, starting with a '/'. + + Returns: + Tuple (path, query) where: + path: The path in the relative URL. + query: The query string in the URL without the '?' character. + + Raises: + _RelativeUrlError if the relative_url is invalid for whatever reason + """ + if not relative_url: + raise _RelativeUrlError('Relative URL is empty') + (scheme, netloc, path, query, fragment) = urlparse.urlsplit(relative_url) + if scheme or netloc: + raise _RelativeUrlError('Relative URL may not have a scheme or location') + if fragment: + raise _RelativeUrlError('Relative URL may not specify a fragment') + if not path or path[0] != '/': + raise _RelativeUrlError('Relative URL path must start with "/"') + return path, query + + +def _flatten_params(params): + """Converts a dictionary of parameters to a list of parameters. + + Any unicode strings in keys or values will be encoded as UTF-8. + + Args: + params: Dictionary mapping parameter keys to values. Values will be + converted to a string and added to the list as tuple (key, value). If + a values is iterable and not a string, each contained value will be + added as a separate (key, value) tuple. + + Returns: + List of (key, value) tuples. + """ + def get_string(value): + if isinstance(value, unicode): + return unicode(value).encode('utf-8') + else: + return str(value) + + param_list = [] + for key, value in params.iteritems(): + key = get_string(key) + if isinstance(value, basestring): + param_list.append((key, get_string(value))) + else: + try: + iterator = iter(value) + except TypeError: + param_list.append((key, str(value))) + else: + param_list.extend((key, get_string(v)) for v in iterator) + + return param_list + + +class Task(object): + """Represents a single Task on a queue.""" + + __CONSTRUCTOR_KWARGS = frozenset([ + 'countdown', 'eta', 'headers', 'method', 'name', 'params', 'url']) + + def __init__(self, payload=None, **kwargs): + """Initializer. + + All parameters are optional. + + Args: + payload: The payload data for this Task that will be delivered to the + webhook as the HTTP request body. This is only allowed for POST and PUT + methods. + countdown: Time in seconds into the future that this Task should execute. + Defaults to zero. + eta: Absolute time when the Task should execute. May not be specified + if 'countdown' is also supplied. + headers: Dictionary of headers to pass to the webhook. Values in the + dictionary may be iterable to indicate repeated header fields. + method: Method to use when accessing the webhook. Defaults to 'POST'. + name: Name to give the Task; if not specified, a name will be + auto-generated when added to a queue and assigned to this object. Must + match the _TASK_NAME_PATTERN regular expression. + params: Dictionary of parameters to use for this Task. For POST requests + these params will be encoded as 'application/x-www-form-urlencoded' and + set to the payload. For all other methods, the parameters will be + converted to a query string. May not be specified if the URL already + contains a query string. + url: Relative URL where the webhook that should handle this task is + located for this application. May have a query string unless this is + a POST method. + + Raises: + InvalidTaskError if any of the parameters are invalid; + InvalidTaskNameError if the task name is invalid; InvalidUrlError if + the task URL is invalid or too long; TaskTooLargeError if the task with + its payload is too large. + """ + args_diff = set(kwargs.iterkeys()) - self.__CONSTRUCTOR_KWARGS + if args_diff: + raise TypeError('Invalid arguments: %s' % ', '.join(args_diff)) + + self.__name = kwargs.get('name') + if self.__name and not _TASK_NAME_RE.match(self.__name): + raise InvalidTaskNameError( + 'Task name does not match expression "%s"; found %s' % + (_TASK_NAME_PATTERN, self.__name)) + + self.__default_url, self.__relative_url, query = Task.__determine_url( + kwargs.get('url', '')) + self.__headers = urlfetch._CaselessDict() + self.__headers.update(kwargs.get('headers', {})) + self.__method = kwargs.get('method', 'POST').upper() + self.__payload = None + params = kwargs.get('params', {}) + + if query and params: + raise InvalidTaskError('Query string and parameters both present; ' + 'only one of these may be supplied') + + if self.__method == 'POST': + if payload and params: + raise InvalidTaskError('Message body and parameters both present for ' + 'POST method; only one of these may be supplied') + elif query: + raise InvalidTaskError('POST method may not have a query string; ' + 'use the "params" keyword argument instead') + elif params: + self.__payload = Task.__encode_params(params) + self.__headers.setdefault( + 'content-type', 'application/x-www-form-urlencoded') + elif payload is not None: + self.__payload = Task.__convert_payload(payload, self.__headers) + elif self.__method in _NON_POST_METHODS: + if payload and self.__method not in _BODY_METHODS: + raise InvalidTaskError('Payload may only be specified for methods %s' % + ', '.join(_BODY_METHODS)) + if payload: + self.__payload = Task.__convert_payload(payload, self.__headers) + if params: + query = Task.__encode_params(params) + if query: + self.__relative_url = '%s?%s' % (self.__relative_url, query) + else: + raise InvalidTaskError('Invalid method: %s' % self.__method) + + self.__headers_list = _flatten_params(self.__headers) + self.__eta = Task.__determine_eta( + kwargs.get('eta'), kwargs.get('countdown')) + self.__enqueued = False + + if self.size > MAX_TASK_SIZE_BYTES: + raise TaskTooLargeError('Task size must be less than %d; found %d' % + (MAX_TASK_SIZE_BYTES, self.size)) + + @staticmethod + def __determine_url(relative_url): + """Determines the URL of a task given a relative URL and a name. + + Args: + relative_url: The relative URL for the Task. + + Returns: + Tuple (default_url, relative_url, query) where: + default_url: True if this Task is using the default URL scheme; + False otherwise. + relative_url: String containing the relative URL for this Task. + query: The query string for this task. + + Raises: + InvalidUrlError if the relative_url is invalid. + """ + if not relative_url: + default_url, query = True, '' + else: + default_url = False + try: + relative_url, query = _parse_relative_url(relative_url) + except _RelativeUrlError, e: + raise InvalidUrlError(e) + + if len(relative_url) > MAX_URL_LENGTH: + raise InvalidUrlError( + 'Task URL must be less than %d characters; found %d' % + (MAX_URL_LENGTH, len(relative_url))) + + return (default_url, relative_url, query) + + @staticmethod + def __determine_eta(eta=None, countdown=None, now=datetime.datetime.now): + """Determines the ETA for a task. + + If 'eta' and 'countdown' are both None, the current time will be used. + Otherwise, only one of them may be specified. + + Args: + eta: A datetime.datetime specifying the absolute ETA or None + countdown: Count in seconds into the future from the present time that + the ETA should be assigned to. + + Returns: + A datetime in the UTC timezone containing the ETA. + + Raises: + InvalidTaskError if the parameters are invalid. + """ + if eta is not None and countdown is not None: + raise InvalidTaskError('May not use a countdown and ETA together') + elif eta is not None: + if not isinstance(eta, datetime.datetime): + raise InvalidTaskError('ETA must be a datetime.datetime instance') + elif countdown is not None: + try: + countdown = float(countdown) + except ValueError: + raise InvalidTaskError('Countdown must be a number') + else: + eta = now() + datetime.timedelta(seconds=countdown) + else: + eta = now() + + if eta.tzinfo is None: + eta = eta.replace(tzinfo=_UTC) + return eta.astimezone(_UTC) + + @staticmethod + def __encode_params(params): + """URL-encodes a list of parameters. + + Args: + params: Dictionary of parameters, possibly with iterable values. + + Returns: + URL-encoded version of the params, ready to be added to a query string or + POST body. + """ + return urllib.urlencode(_flatten_params(params)) + + @staticmethod + def __convert_payload(payload, headers): + """Converts a Task payload into UTF-8 and sets headers if necessary. + + Args: + payload: The payload data to convert. + headers: Dictionary of headers. + + Returns: + The payload as a non-unicode string. + + Raises: + InvalidTaskError if the payload is not a string or unicode instance. + """ + if isinstance(payload, unicode): + headers.setdefault('content-type', 'text/plain; charset=utf-8') + payload = payload.encode('utf-8') + elif not isinstance(payload, str): + raise InvalidTaskError( + 'Task payloads must be strings; invalid payload: %r' % payload) + return payload + + @property + def on_queue_url(self): + """Returns True if this Task will run on the queue's URL.""" + return self.__default_url + + @property + def eta(self): + """Returns an datetime corresponding to when this Task will execute.""" + return self.__eta + + @property + def headers(self): + """Returns a copy of the headers for this Task.""" + return self.__headers.copy() + + @property + def method(self): + """Returns the method to use for this Task.""" + return self.__method + + @property + def name(self): + """Returns the name of this Task. + + Will be None if using auto-assigned Task names and this Task has not yet + been added to a Queue. + """ + return self.__name + + @property + def payload(self): + """Returns the payload for this task, which may be None.""" + return self.__payload + + @property + def size(self): + """Returns the size of this task in bytes.""" + HEADER_SEPERATOR = len(': \r\n') + header_size = sum((len(key) + len(value) + HEADER_SEPERATOR) + for key, value in self.__headers_list) + return (len(self.__method) + len(self.__payload or '') + + len(self.__relative_url) + header_size) + + @property + def url(self): + """Returns the relative URL for this Task.""" + return self.__relative_url + + @property + def was_enqueued(self): + """Returns True if this Task has been enqueued. + + Note: This will not check if this task already exists in the queue. + """ + return self.__enqueued + + def add(self, queue_name=_DEFAULT_QUEUE, transactional=True): + """Adds this Task to a queue. See Queue.add.""" + return Queue(queue_name).add(self, transactional=transactional) + + +class Queue(object): + """Represents a Queue.""" + + def __init__(self, name=_DEFAULT_QUEUE): + """Initializer. + + Args: + name: Name of this queue. If not supplied, defaults to the default queue. + + Raises: + InvalidQueueNameError if the queue name is invalid. + """ + if not _QUEUE_NAME_RE.match(name): + raise InvalidQueueNameError( + 'Queue name does not match pattern "%s"; found %s' % + (_QUEUE_NAME_PATTERN, name)) + self.__name = name + self.__url = '%s/%s' % (_DEFAULT_QUEUE_PATH, self.__name) + + def add(self, task, transactional=True): + """Adds a Task to this Queue. + + Args: + task: The Task to add. + transactional: If false adds the task to a queue irrespectively to the + enclosing transaction success or failure. (optional) + + Returns: + The Task that was supplied to this method. + + Raises: + BadTaskStateError if the Task has already been added to a queue. + Error-subclass on application errors. + """ + if task.was_enqueued: + raise BadTaskStateError('Task has already been enqueued') + + request = taskqueue_service_pb.TaskQueueAddRequest() + response = taskqueue_service_pb.TaskQueueAddResponse() + + adjusted_url = task.url + if task.on_queue_url: + adjusted_url = self.__url + task.url + + + request.set_queue_name(self.__name) + request.set_eta_usec(int(time.mktime(task.eta.utctimetuple())) * 10**6) + request.set_method(_METHOD_MAP.get(task.method)) + request.set_url(adjusted_url) + + if task.name: + request.set_task_name(task.name) + else: + request.set_task_name('') + + if task.payload: + request.set_body(task.payload) + for key, value in _flatten_params(task.headers): + header = request.add_header() + header.set_key(key) + header.set_value(value) + + if transactional: + from google.appengine.api import datastore + datastore._MaybeSetupTransaction(request, []) + + call_tuple = ('taskqueue', 'Add', request, response) + apiproxy_stub_map.apiproxy.GetPreCallHooks().Call(*call_tuple) + try: + apiproxy_stub_map.MakeSyncCall(*call_tuple) + except apiproxy_errors.ApplicationError, e: + self.__TranslateError(e) + else: + apiproxy_stub_map.apiproxy.GetPostCallHooks().Call(*call_tuple) + + if response.has_chosen_task_name(): + task._Task__name = response.chosen_task_name() + task._Task__enqueued = True + return task + + @property + def name(self): + """Returns the name of this queue.""" + return self.__name + + @staticmethod + def __TranslateError(error): + """Translates a TaskQueueServiceError into an exception. + + Args: + error: Value from TaskQueueServiceError enum. + + Raises: + The corresponding Exception sub-class for that error code. + """ + if (error.application_error == + taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE): + raise UnknownQueueError(error.error_detail) + elif (error.application_error == + taskqueue_service_pb.TaskQueueServiceError.TRANSIENT_ERROR): + raise TransientError(error.error_detail) + elif (error.application_error == + taskqueue_service_pb.TaskQueueServiceError.INTERNAL_ERROR): + raise InternalError(error.error_detail) + elif (error.application_error == + taskqueue_service_pb.TaskQueueServiceError.TASK_TOO_LARGE): + raise TaskTooLargeError(error.error_detail) + elif (error.application_error == + taskqueue_service_pb.TaskQueueServiceError.INVALID_TASK_NAME): + raise InvalidTaskNameError(error.error_detail) + elif (error.application_error == + taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_NAME): + raise InvalidQueueNameError(error.error_detail) + elif (error.application_error == + taskqueue_service_pb.TaskQueueServiceError.INVALID_URL): + raise InvalidUrlError(error.error_detail) + elif (error.application_error == + taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_RATE): + raise InvalidQueueError(error.error_detail) + elif (error.application_error == + taskqueue_service_pb.TaskQueueServiceError.PERMISSION_DENIED): + raise PermissionDeniedError(error.error_detail) + elif (error.application_error == + taskqueue_service_pb.TaskQueueServiceError.TASK_ALREADY_EXISTS): + raise TaskAlreadyExistsError(error.error_detail) + elif (error.application_error == + taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_TASK): + raise TombstonedTaskError(error.error_detail) + elif (error.application_error == + taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA): + raise InvalidTaskError(error.error_detail) + else: + raise Error('Application error %s: %s' % + (error.application_error, error.error_detail)) + + +def add(*args, **kwargs): + """Convenience method will create a Task and add it to the default queue. + + Args: + *args, **kwargs: Passed to the Task constructor. + + Returns: + The Task that was added to the queue. + """ + return Task(*args, **kwargs).add() |