1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
|
#!/usr/bin/python
#
# Copyright 2009 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Methods to access BulkMutateJobService service."""
__author__ = 'api.sgrinberg@gmail.com (Stan Grinberg)'
import time
from aw_api import SanityCheck as glob_sanity_check
from aw_api import SOAPPY
from aw_api import ZSI
from aw_api import Utils
from aw_api.Errors import ApiVersionNotSupportedError
from aw_api.Errors import ValidationError
from aw_api.WebService import WebService
class BulkMutateJobService(object):
"""Wrapper for BulkMutateJobService.
The BulkMutateJob Service provides operations for submitting jobs to be
executed asynchronously and get information about submitted jobs and their
parts.
"""
def __init__(self, headers, config, op_config, lock, logger):
"""Inits BulkMutateJobService.
Args:
headers: dict dictionary object with populated authentication
credentials.
config: dict dictionary object with populated configuration values.
op_config: dict dictionary object with additional configuration values for
this operation.
lock: thread.lock the thread lock.
logger: Logger the instance of Logger
"""
# NOTE(api.sgrinberg): Custom handling for BulkMutateJobService, whose
# group in URL is 'job/' which is different from its namespace 'cm/'.
url = [op_config['server'], 'api/adwords', 'job',
op_config['version'], self.__class__.__name__]
if config['access']: url.insert(len(url) - 1, config['access'])
self.__service = WebService(headers, config, op_config, '/'.join(url), lock,
logger)
self.__config = config
self.__op_config = op_config
if self.__config['soap_lib'] == SOAPPY:
from aw_api.soappy_toolkit import SanityCheck
self.__web_services = None
elif self.__config['soap_lib'] == ZSI:
from aw_api import API_VERSIONS
from aw_api.zsi_toolkit import SanityCheck
if op_config['version'] in API_VERSIONS:
module = '%s_services' % self.__class__.__name__
try:
web_services = __import__('aw_api.zsi_toolkit.%s.%s'
% (op_config['version'], module), globals(),
locals(), [''])
except ImportError, e:
# If one of library's required modules is missing, re raise exception.
if str(e).find(module) < 0:
raise ImportError(e)
msg = ('The version \'%s\' is not compatible with \'%s\'.'
% (op_config['version'], self.__class__.__name__))
raise ValidationError(msg)
else:
msg = 'Invalid API version, not one of %s.' % str(list(API_VERSIONS))
raise ValidationError(msg)
self.__web_services = web_services
self.__loc = eval('web_services.%sLocator()' % self.__class__.__name__)
self.__sanity_check = SanityCheck
def DownloadBulkJob(self, job_id, wait_secs=30, max_polls=20):
"""Return results of the bulk mutate job or None if there was a failure.
Args:
job_id: str a bulk mutate job id.
wait_secs: int the time in seconds to wait between each poll.
max_polls: int the maximum number of polls to perform.
Returns:
list results of the bulk mutate job or None if there was a failure.
"""
glob_sanity_check.ValidateTypes(((job_id, (str, unicode)),
(wait_secs, int), (max_polls, int)))
# Wait for bulk muate job to complete.
selector = {
'jobIds': [job_id]
}
job = self.Get(selector)[0]
status = job['status']
num_parts = job['numRequestParts']
num_parts_recieved = job['numRequestPartsReceived']
# Were all parts of the job uploaded?
if num_parts != num_parts_recieved:
return None
num_polls = 0
while (status != 'COMPLETED' and status != 'FAILED' and
num_polls < max_polls):
if Utils.BoolTypeConvert(self.__config['debug']):
print 'Bulk mutate job status: %s' % status
time.sleep(wait_secs)
status = self.Get(selector)[0]['status']
if status == 'FAILED':
if Utils.BoolTypeConvert(self.__config['debug']):
print 'Bulk mutate job failed'
return None
if Utils.BoolTypeConvert(self.__config['debug']):
print 'Bulk mutate job completed successfully'
# Get results for each part of the job.
res = []
for part in xrange(int(num_parts)):
selector = {
'jobIds': [job_id],
'resultPartIndex': str(part)
}
res.append(self.Get(selector)[0])
return res
def Get(self, selector):
"""Return a list of bulk mutate jobs.
List of bulk mutate jobs specified by a job selector.
Args:
selector: dict filter to run campaign criteria through.
Returns:
tuple list of bulk mutate jobs meeting all the criteria of the selector.
"""
method_name = 'getBulkMutateJob'
if self.__config['soap_lib'] == SOAPPY:
msg = ('The \'%s\' request via %s is currently not supported for '
'use with SOAPpy toolkit.' % (Utils.GetCurrentFuncName(),
self.__op_config['version']))
raise ApiVersionNotSupportedError(msg)
elif self.__config['soap_lib'] == ZSI:
web_services = self.__web_services
self.__sanity_check.ValidateSelector(selector, web_services)
request = eval('web_services.%sRequest()' % method_name)
return self.__service.CallMethod(method_name, (({'selector': selector},)),
'BulkMutateJob', self.__loc, request)
def Mutate(self, op):
"""Add or update bulk mutate job.
Args:
op: dict operation.
Returns:
tuple mutated bulk mutate job.
"""
method_name = 'mutateBulkMutateJob'
if self.__config['soap_lib'] == SOAPPY:
msg = ('The \'%s\' request via %s is currently not supported for '
'use with SOAPpy toolkit.' % (Utils.GetCurrentFuncName(),
self.__op_config['version']))
raise ApiVersionNotSupportedError(msg)
elif self.__config['soap_lib'] == ZSI:
web_services = self.__web_services
op = self.__sanity_check.ValidateOperation(op, web_services)
request = eval('web_services.%sRequest()' % method_name)
return self.__service.CallMethod(method_name, (({'operation': op},)),
'BulkMutateJob', self.__loc, request)
|