summaryrefslogtreecommitdiffstats
path: root/google_appengine/google/appengine/tools/requeue.py
blob: 986bc5cd280bb06155cddee8eb1f78881cf517ff (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A thread-safe queue in which removed objects put back to the front."""


import logging
import Queue
import threading
import time

logger = logging.getLogger('google.appengine.tools.requeue')


class ReQueue(object):
  """A special thread-safe queue.

  A ReQueue allows unfinished work items to be returned with a call to
  reput().  When an item is reput, task_done() should *not* be called
  in addition, getting an item that has been reput does not increase
  the number of outstanding tasks.

  This class shares an interface with Queue.Queue and provides the
  additional reput method.
  """

  def __init__(self,
               queue_capacity,
               requeue_capacity=None,
               queue_factory=Queue.Queue,
               get_time=time.time):
    """Initialize a ReQueue instance.

    Args:
      queue_capacity: The number of items that can be put in the ReQueue.
      requeue_capacity: The numer of items that can be reput in the ReQueue.
      queue_factory: Used for dependency injection.
      get_time: Used for dependency injection.
    """
    if requeue_capacity is None:
      requeue_capacity = queue_capacity

    self.get_time = get_time
    self.queue = queue_factory(queue_capacity)
    self.requeue = queue_factory(requeue_capacity)
    self.lock = threading.Lock()
    self.put_cond = threading.Condition(self.lock)
    self.get_cond = threading.Condition(self.lock)

  def _DoWithTimeout(self,
                     action,
                     exc,
                     wait_cond,
                     done_cond,
                     lock,
                     timeout=None,
                     block=True):
    """Performs the given action with a timeout.

    The action must be non-blocking, and raise an instance of exc on a
    recoverable failure.  If the action fails with an instance of exc,
    we wait on wait_cond before trying again.  Failure after the
    timeout is reached is propagated as an exception.  Success is
    signalled by notifying on done_cond and returning the result of
    the action.  If action raises any exception besides an instance of
    exc, it is immediately propagated.

    Args:
      action: A callable that performs a non-blocking action.
      exc: An exception type that is thrown by the action to indicate
        a recoverable error.
      wait_cond: A condition variable which should be waited on when
        action throws exc.
      done_cond: A condition variable to signal if the action returns.
      lock: The lock used by wait_cond and done_cond.
      timeout: A non-negative float indicating the maximum time to wait.
      block: Whether to block if the action cannot complete immediately.

    Returns:
      The result of the action, if it is successful.

    Raises:
      ValueError: If the timeout argument is negative.
    """
    if timeout is not None and timeout < 0.0:
      raise ValueError('\'timeout\' must not be a negative  number')
    if not block:
      timeout = 0.0
    result = None
    success = False
    start_time = self.get_time()
    lock.acquire()
    try:
      while not success:
        try:
          result = action()
          success = True
        except Exception, e:
          if not isinstance(e, exc):
            raise e
          if timeout is not None:
            elapsed_time = self.get_time() - start_time
            timeout -= elapsed_time
            if timeout <= 0.0:
              raise e
          wait_cond.wait(timeout)
    finally:
      if success:
        done_cond.notify()
      lock.release()
    return result

  def put(self, item, block=True, timeout=None):
    """Put an item into the requeue.

    Args:
      item: An item to add to the requeue.
      block: Whether to block if the requeue is full.
      timeout: Maximum on how long to wait until the queue is non-full.

    Raises:
      Queue.Full if the queue is full and the timeout expires.
    """
    def PutAction():
      self.queue.put(item, block=False)
    self._DoWithTimeout(PutAction,
                        Queue.Full,
                        self.get_cond,
                        self.put_cond,
                        self.lock,
                        timeout=timeout,
                        block=block)

  def reput(self, item, block=True, timeout=None):
    """Re-put an item back into the requeue.

    Re-putting an item does not increase the number of outstanding
    tasks, so the reput item should be uniquely associated with an
    item that was previously removed from the requeue and for which
    TaskDone has not been called.

    Args:
      item: An item to add to the requeue.
      block: Whether to block if the requeue is full.
      timeout: Maximum on how long to wait until the queue is non-full.

    Raises:
      Queue.Full is the queue is full and the timeout expires.
    """
    def ReputAction():
      self.requeue.put(item, block=False)
    self._DoWithTimeout(ReputAction,
                        Queue.Full,
                        self.get_cond,
                        self.put_cond,
                        self.lock,
                        timeout=timeout,
                        block=block)

  def get(self, block=True, timeout=None):
    """Get an item from the requeue.

    Args:
      block: Whether to block if the requeue is empty.
      timeout: Maximum on how long to wait until the requeue is non-empty.

    Returns:
      An item from the requeue.

    Raises:
      Queue.Empty if the queue is empty and the timeout expires.
    """
    def GetAction():
      try:
        result = self.requeue.get(block=False)
        self.requeue.task_done()
      except Queue.Empty:
        result = self.queue.get(block=False)
      return result
    return self._DoWithTimeout(GetAction,
                               Queue.Empty,
                               self.put_cond,
                               self.get_cond,
                               self.lock,
                               timeout=timeout,
                               block=block)

  def join(self):
    """Blocks until all of the items in the requeue have been processed."""
    self.queue.join()

  def task_done(self):
    """Indicate that a previously enqueued item has been fully processed."""
    self.queue.task_done()

  def empty(self):
    """Returns true if the requeue is empty."""
    return self.queue.empty() and self.requeue.empty()

  def get_nowait(self):
    """Try to get an item from the queue without blocking."""
    return self.get(block=False)

  def qsize(self):
    return self.queue.qsize() + self.requeue.qsize()