1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
|
# -*- coding: utf-8 -*-
# Laurent's version of grequests.py :
# Merged https://github.com/kennethreitz/grequests/pull/22/files
# in recent grequest branch, to have working exception_handler
"""
grequests
~~~~~~~~~
This module contains an asynchronous replica of ``requests.api``, powered
by gevent. All API methods return a ``Request`` instance (as opposed to
``Response``). A list of requests can be sent with ``map()``.
"""
from functools import partial
try:
import gevent
from gevent import monkey as curious_george
from gevent.pool import Pool
except ImportError:
raise RuntimeError('Gevent is required for grequests.')
# Monkey-patch.
curious_george.patch_all(thread=False, select=False)
from requests import Session
def _greenlet_report_error(self, exc_info):
import sys
import traceback
exception = exc_info[1]
if isinstance(exception, gevent.greenlet.GreenletExit):
self._report_result(exception)
return
exc_handler = False
for lnk in self._links:
if isinstance(lnk, gevent.greenlet.FailureSpawnedLink):
exc_handler = True
break
if not exc_handler:
try:
traceback.print_exception(*exc_info)
except:
pass
self._exception = exception
if self._links and self._notifier is None:
self._notifier = gevent.greenlet.core.active_event(self._notify_links)
## Only print errors
if not exc_handler:
info = str(self) + ' failed with '
try:
info += self._exception.__class__.__name__
except Exception:
info += str(self._exception) or repr(self._exception)
sys.stderr.write(info + '\n\n')
## Patch the greenlet error reporting
gevent.greenlet.Greenlet._report_error = _greenlet_report_error
__all__ = (
'map', 'imap',
'get', 'options', 'head', 'post', 'put', 'patch', 'delete', 'request'
)
class AsyncRequest(object):
""" Asynchronous request.
Accept same parameters as ``Session.request`` and some additional:
:param session: Session which will do request
:param callback: Callback called on response.
Same as passing ``hooks={'response': callback}``
"""
def __init__(self, method, url, **kwargs):
#: Request method
self.method = method
#: URL to request
self.url = url
#: Associated ``Session``
self.session = kwargs.pop('session', None)
if self.session is None:
self.session = Session()
callback = kwargs.pop('callback', None)
if callback:
kwargs['hooks'] = {'response': callback}
#: The rest arguments for ``Session.request``
self.kwargs = kwargs
#: Resulting ``Response``
self.response = None
def send(self, **kwargs):
"""
Prepares request based on parameter passed to constructor and optional ``kwargs```.
Then sends request and saves response to :attr:`response`
:returns: ``Response``
"""
merged_kwargs = {}
merged_kwargs.update(self.kwargs)
merged_kwargs.update(kwargs)
self.response = self.session.request(self.method,
self.url, **merged_kwargs)
return self.response
def send(r, pool=None, stream=False, exception_handler=None):
"""Sends the request object using the specified pool. If a pool isn't
specified this method blocks. Pools are useful because you can specify size
and can hence limit concurrency."""
if pool != None:
p = pool.spawn
else:
p = gevent.spawn
if exception_handler:
glet = p(r.send, stream=stream)
def eh_wrapper(g):
return exception_handler(r,g.exception)
glet.link_exception(eh_wrapper)
else:
glet = p(r.send, stream=stream)
return glet
# Shortcuts for creating AsyncRequest with appropriate HTTP method
get = partial(AsyncRequest, 'GET')
options = partial(AsyncRequest, 'OPTIONS')
head = partial(AsyncRequest, 'HEAD')
post = partial(AsyncRequest, 'POST')
put = partial(AsyncRequest, 'PUT')
patch = partial(AsyncRequest, 'PATCH')
delete = partial(AsyncRequest, 'DELETE')
# synonym
def request(method, url, **kwargs):
return AsyncRequest(method, url, **kwargs)
def map(requests, stream=False, size=None, exception_handler=None):
"""Concurrently converts a list of Requests to Responses.
:param requests: a collection of Request objects.
:param stream: If True, the content will not be downloaded immediately.
:param size: Specifies the number of requests to make at a time. If None, no throttling occurs.
"""
requests = list(requests)
pool = Pool(size) if size else None
jobs = [send(r, pool, stream=stream, exception_handler=exception_handler) for r in requests]
gevent.joinall(jobs)
return [r.response for r in requests]
def imap(requests, stream=False, size=2, exception_handler=None):
"""Concurrently converts a generator object of Requests to
a generator of Responses.
:param requests: a generator of Request objects.
:param stream: If True, the content will not be downloaded immediately.
:param size: Specifies the number of requests to make at a time. default is 2
"""
pool = Pool(size)
def send(r):
return r.send(stream=stream, exception_handler=exception_handler)
for r in pool.imap_unordered(send, requests):
yield r
pool.join()
|