summaryrefslogtreecommitdiffstats
path: root/google-appengine/google/appengine/datastore/datastore_sqlite_stub.py
diff options
context:
space:
mode:
Diffstat (limited to 'google-appengine/google/appengine/datastore/datastore_sqlite_stub.py')
-rw-r--r--google-appengine/google/appengine/datastore/datastore_sqlite_stub.py1501
1 files changed, 1501 insertions, 0 deletions
diff --git a/google-appengine/google/appengine/datastore/datastore_sqlite_stub.py b/google-appengine/google/appengine/datastore/datastore_sqlite_stub.py
new file mode 100644
index 0000000..8994d85
--- /dev/null
+++ b/google-appengine/google/appengine/datastore/datastore_sqlite_stub.py
@@ -0,0 +1,1501 @@
+#!/usr/bin/env python
+#
+# Copyright 2007 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SQlite-based stub for the Python datastore API.
+
+Entities are stored in an sqlite database in a similar fashion to the production
+datastore.
+
+Transactions are serialized through __tx_lock. Each transaction acquires it
+when it begins and releases it when it commits or rolls back.
+"""
+
+
+
+
+
+
+import array
+import itertools
+import logging
+import md5
+import sys
+import threading
+
+from google.appengine.datastore import entity_pb
+from google.appengine.api import api_base_pb
+from google.appengine.api import apiproxy_stub
+from google.appengine.api import apiproxy_stub_map
+from google.appengine.api import datastore_errors
+from google.appengine.datastore import datastore_index
+from google.appengine.datastore import datastore_pb
+from google.appengine.datastore import sortable_pb_encoder
+from google.appengine.runtime import apiproxy_errors
+
+try:
+ import pysqlite2.dbapi2 as sqlite3
+except ImportError:
+ import sqlite3
+
+try:
+ __import__('google.appengine.api.labs.taskqueue.taskqueue_service_pb')
+ taskqueue_service_pb = sys.modules.get(
+ 'google.appengine.api.labs.taskqueue.taskqueue_service_pb')
+except ImportError:
+ from google.appengine.api.taskqueue import taskqueue_service_pb
+
+
+import __builtin__
+buffer = __builtin__.buffer
+
+
+entity_pb.Reference.__hash__ = lambda self: hash(self.Encode())
+datastore_pb.Query.__hash__ = lambda self: hash(self.Encode())
+datastore_pb.Transaction.__hash__ = lambda self: hash(self.Encode())
+datastore_pb.Cursor.__hash__ = lambda self: hash(self.Encode())
+
+
+_MAXIMUM_RESULTS = 1000
+
+
+_MAX_QUERY_COMPONENTS = 63
+
+
+_BATCH_SIZE = 20
+
+
+_MAX_ACTIONS_PER_TXN = 5
+
+
+_MAX_TIMEOUT = 5.0
+
+
+_OPERATOR_MAP = {
+ datastore_pb.Query_Filter.LESS_THAN: '<',
+ datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL: '<=',
+ datastore_pb.Query_Filter.EQUAL: '=',
+ datastore_pb.Query_Filter.GREATER_THAN: '>',
+ datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL: '>=',
+}
+
+
+_ORDER_MAP = {
+ datastore_pb.Query_Order.ASCENDING: 'ASC',
+ datastore_pb.Query_Order.DESCENDING: 'DESC',
+}
+
+_CORE_SCHEMA = """
+CREATE TABLE IF NOT EXISTS Apps (
+ app_id TEXT NOT NULL PRIMARY KEY,
+ indexes BLOB);
+
+CREATE TABLE IF NOT EXISTS Namespaces (
+ app_id TEXT NOT NULL,
+ name_space TEXT NOT NULL,
+ PRIMARY KEY (app_id, name_space));
+
+CREATE TABLE IF NOT EXISTS IdSeq (
+ prefix TEXT NOT NULL PRIMARY KEY,
+ next_id INT NOT NULL);
+"""
+
+_NAMESPACE_SCHEMA = """
+CREATE TABLE "%(prefix)s!Entities" (
+ __path__ BLOB NOT NULL PRIMARY KEY,
+ kind TEXT NOT NULL,
+ entity BLOB NOT NULL);
+CREATE INDEX "%(prefix)s!EntitiesByKind" ON "%(prefix)s!Entities" (
+ kind ASC,
+ __path__ ASC);
+
+CREATE TABLE "%(prefix)s!EntitiesByProperty" (
+ kind TEXT NOT NULL,
+ name TEXT NOT NULL,
+ value BLOB NOT NULL,
+ __path__ BLOB NOT NULL REFERENCES Entities,
+ PRIMARY KEY(kind ASC, name ASC, value ASC, __path__ ASC) ON CONFLICT IGNORE);
+CREATE INDEX "%(prefix)s!EntitiesByPropertyDesc"
+ ON "%(prefix)s!EntitiesByProperty" (
+ kind ASC,
+ name ASC,
+ value DESC,
+ __path__ ASC);
+CREATE INDEX "%(prefix)s!EntitiesByPropertyKey"
+ ON "%(prefix)s!EntitiesByProperty" (
+ __path__ ASC);
+
+INSERT OR IGNORE INTO Apps (app_id) VALUES ('%(app_id)s');
+INSERT INTO Namespaces (app_id, name_space)
+ VALUES ('%(app_id)s', '%(name_space)s');
+INSERT OR IGNORE INTO IdSeq VALUES ('%(prefix)s', 1);
+"""
+
+
+def ReferencePropertyToReference(refprop):
+ ref = entity_pb.Reference()
+ ref.set_app(refprop.app())
+ if refprop.has_name_space():
+ ref.set_name_space(refprop.name_space())
+ for pathelem in refprop.pathelement_list():
+ ref.mutable_path().add_element().CopyFrom(pathelem)
+ return ref
+
+
+class QueryCursor(object):
+ """Encapsulates a database cursor and provides methods to fetch results."""
+
+ def __init__(self, query, db_cursor):
+ """Constructor.
+
+ Args:
+ query: A Query PB.
+ db_cursor: An SQLite cursor returning n+2 columns. The first 2 columns
+ must be the path of the entity and the entity itself, while the
+ remaining columns must be the sort columns for the query.
+ """
+ self.__query = query
+ self.app = query.app()
+ self.__cursor = db_cursor
+ self.__seen = set()
+
+ self.__position = ''
+
+ self.__next_result = (None, None)
+
+ if query.has_limit():
+ self.limit = query.limit() + query.offset()
+ else:
+ self.limit = None
+
+ def Count(self):
+ """Counts results, up to the query's limit.
+
+ Note this method does not deduplicate results, so the query it was generated
+ from should have the 'distinct' clause applied.
+
+ Returns:
+ int: Result count.
+ """
+ count = 0
+ while self.limit is None or count < self.limit:
+ row = self.__cursor.fetchone()
+ if not row:
+ break
+ count += 1
+ return count
+
+ def _EncodeCompiledCursor(self, cc):
+ """Encodes the current position in the query as a compiled cursor.
+
+ Args:
+ cc: The compiled cursor to fill out.
+ """
+ position = cc.add_position()
+ position.set_start_key(self.__position)
+
+ def _GetResult(self):
+ """Returns the next result from the result set, without deduplication.
+
+ Returns:
+ (path, value): The path and value of the next result.
+ """
+ if not self.__cursor:
+ return None, None
+ row = self.__cursor.fetchone()
+ if not row:
+ self.__cursor = None
+ return None, None
+ path, data, position_parts = str(row[0]), row[1], row[2:]
+ self.__position = ''.join(str(x) for x in position_parts)
+ return path, data
+
+ def _Next(self):
+ """Fetches the next unique result from the result set.
+
+ Returns:
+ A datastore_pb.EntityProto instance.
+ """
+ entity = None
+ path, data = self.__next_result
+ self.__next_result = None, None
+ while self.__cursor and not entity:
+ if path and path not in self.__seen:
+ self.__seen.add(path)
+ entity = entity_pb.EntityProto(data)
+ else:
+ path, data = self._GetResult()
+ return entity
+
+ def Skip(self, count):
+ """Skips the specified number of unique results.
+
+ Args:
+ count: Number of results to skip.
+ """
+ for unused_i in xrange(count):
+ self._Next()
+
+ def ResumeFromCompiledCursor(self, cc):
+ """Resumes a query from a compiled cursor.
+
+ Args:
+ cc: The compiled cursor to resume from.
+ """
+ target_position = cc.position(0).start_key()
+ while self.__position <= target_position and self.__cursor:
+ self.__next_result = self._GetResult()
+
+ def PopulateQueryResult(self, count, result):
+ """Populates a QueryResult PB with results from the cursor.
+
+ Args:
+ count: The number of results to retrieve.
+ result: out: A query_result PB.
+ """
+ if count > _MAXIMUM_RESULTS:
+ count = _MAXIMUM_RESULTS
+
+ result.set_keys_only(self.__query.keys_only())
+
+ result_list = result.result_list()
+ while len(result_list) < count:
+ if self.limit is not None and len(self.__seen) >= self.limit:
+ break
+ entity = self._Next()
+ if entity is None:
+ break
+ result_list.append(entity)
+
+ result.set_more_results(len(result_list) == count)
+ self._EncodeCompiledCursor(result.mutable_compiled_cursor())
+
+
+class DatastoreSqliteStub(apiproxy_stub.APIProxyStub):
+ """Persistent stub for the Python datastore API.
+
+ Stores all entities in an SQLite database. A DatastoreSqliteStub instance
+ handles a single app's data.
+ """
+
+ WRITE_ONLY = entity_pb.CompositeIndex.WRITE_ONLY
+ READ_WRITE = entity_pb.CompositeIndex.READ_WRITE
+ DELETED = entity_pb.CompositeIndex.DELETED
+ ERROR = entity_pb.CompositeIndex.ERROR
+
+ _INDEX_STATE_TRANSITIONS = {
+ WRITE_ONLY: frozenset((READ_WRITE, DELETED, ERROR)),
+ READ_WRITE: frozenset((DELETED,)),
+ ERROR: frozenset((DELETED,)),
+ DELETED: frozenset((ERROR,)),
+ }
+
+ READ_ERROR_MSG = ('Data in %s is corrupt or a different version. '
+ 'Try running with the --clear_datastore flag.\n%r')
+
+ def __init__(self,
+ app_id,
+ datastore_file,
+ require_indexes=False,
+ verbose=False,
+ service_name='datastore_v3',
+ trusted=False):
+ """Constructor.
+
+ Initializes the SQLite database if necessary.
+
+ Args:
+ app_id: string
+ datastore_file: string, path to sqlite database. Use None to create an
+ in-memory database.
+ require_indexes: bool, default False. If True, composite indexes must
+ exist in index.yaml for queries that need them.
+ verbose: bool, default False. If True, logs all select statements.
+ service_name: Service name expected for all calls.
+ trusted: bool, default False. If True, this stub allows an app to access
+ the data of another app.
+ """
+ apiproxy_stub.APIProxyStub.__init__(self, service_name)
+
+ assert isinstance(app_id, basestring) and app_id
+ self.__app_id = app_id
+ self.__datastore_file = datastore_file
+ self.SetTrusted(trusted)
+
+ self.__tx_actions = []
+
+ self.__require_indexes = require_indexes
+ self.__verbose = verbose
+
+ self.__id_map = {}
+ self.__id_lock = threading.Lock()
+
+ self.__connection = sqlite3.connect(
+ self.__datastore_file or ':memory:',
+ timeout=_MAX_TIMEOUT,
+ check_same_thread=False)
+ self.__connection_lock = threading.RLock()
+ self.__current_transaction = None
+ self.__next_tx_handle = 1
+
+ self.__tx_writes = {}
+ self.__tx_deletes = set()
+
+ self.__next_cursor_id = 1
+ self.__cursor_lock = threading.Lock()
+ self.__cursors = {}
+
+ self.__namespaces = set()
+
+ self.__indexes = {}
+ self.__index_lock = threading.Lock()
+
+ self.__query_history = {}
+
+ try:
+ self.__Init()
+ except sqlite3.DatabaseError, e:
+ raise datastore_errors.InternalError(self.READ_ERROR_MSG %
+ (self.__datastore_file, e))
+
+ def __Init(self):
+ self.__connection.executescript(_CORE_SCHEMA)
+ self.__connection.commit()
+
+ c = self.__connection.execute('SELECT app_id, name_space FROM Namespaces')
+ self.__namespaces = set(c.fetchall())
+
+ c = self.__connection.execute('SELECT app_id, indexes FROM Apps')
+ for app_id, index_proto in c.fetchall():
+ index_map = self.__indexes.setdefault(app_id, {})
+ if not index_proto:
+ continue
+ indexes = datastore_pb.CompositeIndices(index_proto)
+ for index in indexes.index_list():
+ index_map.setdefault(index.definition().entity_type(), []).append(index)
+
+ def Clear(self):
+ """Clears the datastore."""
+ conn = self.__GetConnection(None)
+ try:
+ c = conn.execute(
+ "SELECT tbl_name FROM sqlite_master WHERE type = 'table'")
+ for row in c.fetchall():
+ conn.execute('DROP TABLE "%s"' % row)
+ conn.commit()
+ finally:
+ self.__ReleaseConnection(conn, None)
+
+ self.__namespaces = set()
+ self.__indexes = {}
+ self.__cursors = {}
+ self.__query_history = {}
+
+ self.__Init()
+
+ def Read(self):
+ """Reads the datastore from disk.
+
+ Noop for compatibility with file stub.
+ """
+ pass
+
+ def Write(self):
+ """Writes the datastore to disk.
+
+ Noop for compatibility with file stub.
+ """
+ pass
+
+ def SetTrusted(self, trusted):
+ """Set/clear the trusted bit in the stub.
+
+ This bit indicates that the app calling the stub is trusted. A
+ trusted app can write to datastores of other apps.
+
+ Args:
+ trusted: boolean.
+ """
+ self.__trusted = trusted
+
+ @staticmethod
+ def __MakeParamList(size):
+ """Returns a comma separated list of sqlite substitution parameters.
+
+ Args:
+ size: Number of parameters in returned list.
+ Returns:
+ A comma separated list of substitution parameters.
+ """
+ return ','.join('?' * size)
+
+ @staticmethod
+ def __GetEntityKind(key):
+ if isinstance(key, entity_pb.EntityProto):
+ key = key.key()
+ return key.path().element_list()[-1].type()
+
+ @staticmethod
+ def __EncodeIndexPB(pb):
+ if isinstance(pb, entity_pb.PropertyValue) and pb.has_uservalue():
+ userval = entity_pb.PropertyValue()
+ userval.mutable_uservalue().set_email(pb.uservalue().email())
+ userval.mutable_uservalue().set_auth_domain(pb.uservalue().auth_domain())
+ userval.mutable_uservalue().set_gaiaid(0)
+ pb = userval
+ encoder = sortable_pb_encoder.Encoder()
+ pb.Output(encoder)
+ return buffer(encoder.buffer().tostring())
+
+ @staticmethod
+ def __AddQueryParam(params, param):
+ params.append(param)
+ return len(params)
+
+ @staticmethod
+ def __CreateFilterString(filter_list, params):
+ """Transforms a filter list into an SQL WHERE clause.
+
+ Args:
+ filter_list: The list of (property, operator, value) filters
+ to transform. A value_type of -1 indicates no value type comparison
+ should be done.
+ params: out: A list of parameters to pass to the query.
+ Returns:
+ An SQL 'where' clause.
+ """
+ clauses = []
+ for prop, operator, value in filter_list:
+ sql_op = _OPERATOR_MAP[operator]
+
+ value_index = DatastoreSqliteStub.__AddQueryParam(params, value)
+ clauses.append('%s %s :%d' % (prop, sql_op, value_index))
+
+ filters = ' AND '.join(clauses)
+ if filters:
+ filters = 'WHERE ' + filters
+ return filters
+
+ @staticmethod
+ def __CreateOrderString(order_list):
+ """Returns an 'ORDER BY' clause from the given list of orders.
+
+ Args:
+ order_list: A list of (field, order) tuples.
+ Returns:
+ An SQL ORDER BY clause.
+ """
+ orders = ', '.join('%s %s' % (x[0], _ORDER_MAP[x[1]]) for x in order_list)
+ if orders:
+ orders = 'ORDER BY ' + orders
+ return orders
+
+ def __ValidateAppId(self, app_id):
+ """Verify that this is the stub for app_id.
+
+ Args:
+ app_id: An application ID.
+
+ Raises:
+ datastore_errors.BadRequestError: if this is not the stub for app_id.
+ """
+ assert app_id
+ if not self.__trusted and app_id != self.__app_id:
+ raise datastore_errors.BadRequestError(
+ 'app %s cannot access app %s\'s data' % (self.__app_id, app_id))
+
+ def __ValidateTransaction(self, tx):
+ """Verify that this transaction exists and is valid.
+
+ Args:
+ tx: datastore_pb.Transaction
+
+ Raises:
+ datastore_errors.BadRequestError: if the tx is valid or doesn't exist.
+ """
+ assert isinstance(tx, datastore_pb.Transaction)
+ self.__ValidateAppId(tx.app())
+ if tx.handle() != self.__current_transaction:
+ raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST,
+ 'Transaction %s not found' % tx)
+
+ def __ValidateKey(self, key):
+ """Validate this key.
+
+ Args:
+ key: entity_pb.Reference
+
+ Raises:
+ datastore_errors.BadRequestError: if the key is invalid
+ """
+ assert isinstance(key, entity_pb.Reference)
+
+ self.__ValidateAppId(key.app())
+
+ for elem in key.path().element_list():
+ if elem.has_id() == elem.has_name():
+ raise datastore_errors.BadRequestError(
+ 'each key path element should have id or name but not both: %r'
+ % key)
+
+ def __GetConnection(self, transaction):
+ """Retrieves a connection to the SQLite DB.
+
+ If a transaction is supplied, the transaction's connection is returned;
+ otherwise a fresh connection is returned.
+
+ Args:
+ transaction: A Transaction PB.
+ Returns:
+ An SQLite connection object.
+ """
+ self.__connection_lock.acquire()
+ request_tx = transaction and transaction.handle()
+ if request_tx == 0:
+ request_tx = None
+ if request_tx != self.__current_transaction:
+ raise apiproxy_errors.ApplicationError(
+ datastore_pb.Error.BAD_REQUEST,
+ 'Only one concurrent transaction per thread is permitted.')
+ return self.__connection
+
+ def __ReleaseConnection(self, conn, transaction, rollback=False):
+ """Releases a connection for use by other operations.
+
+ If a transaction is supplied, no action is taken.
+
+ Args:
+ conn: An SQLite connection object.
+ transaction: A Transaction PB.
+ rollback: If True, roll back the database TX instead of committing it.
+ """
+ if not transaction or not transaction.has_handle():
+ if rollback:
+ conn.rollback()
+ else:
+ conn.commit()
+ self.__connection_lock.release()
+
+ def __ConfigureNamespace(self, conn, prefix, app_id, name_space):
+ """Ensures the relevant tables and indexes exist.
+
+ Args:
+ conn: An SQLite database connection.
+ prefix: The namespace prefix to configure.
+ app_id: The app ID.
+ name_space: The per-app namespace name.
+ """
+ format_args = {'app_id': app_id, 'name_space': name_space, 'prefix': prefix}
+ conn.executescript(_NAMESPACE_SCHEMA % format_args)
+ conn.commit()
+
+ def __WriteIndexData(self, conn, app):
+ """Writes index data to disk.
+
+ Args:
+ conn: An SQLite connection.
+ app: The app ID to write indexes for.
+ """
+ indices = datastore_pb.CompositeIndices()
+ for indexes in self.__indexes[app].values():
+ indices.index_list().extend(indexes)
+
+ conn.execute('UPDATE Apps SET indexes = ? WHERE app_id = ?',
+ (app, indices.Encode()))
+
+ def __GetTablePrefix(self, data):
+ """Returns the namespace prefix for a query.
+
+ Args:
+ data: An Entity, Key or Query PB, or an (app_id, ns) tuple.
+ Returns:
+ A valid table prefix
+ """
+ if isinstance(data, entity_pb.EntityProto):
+ data = data.key()
+ if not isinstance(data, tuple):
+ data = (data.app(), data.name_space())
+ prefix = ('%s!%s' % data).replace('"', '""')
+ if data not in self.__namespaces:
+ self.__namespaces.add(data)
+ self.__ConfigureNamespace(self.__connection, prefix, *data)
+ return prefix
+
+ def __DeleteRows(self, conn, paths, table):
+ """Deletes rows from a table.
+
+ Args:
+ conn: An SQLite connection.
+ paths: Paths to delete.
+ table: The table to delete from.
+ Returns:
+ The number of rows deleted.
+ """
+ c = conn.execute('DELETE FROM "%s" WHERE __path__ IN (%s)'
+ % (table, self.__MakeParamList(len(paths))),
+ paths)
+ return c.rowcount
+
+ def __DeleteEntityRows(self, conn, keys, table):
+ """Deletes rows from the specified table that index the keys provided.
+
+ Args:
+ conn: A database connection.
+ keys: A list of keys to delete index entries for.
+ table: The table to delete from.
+ Returns:
+ The number of rows deleted.
+ """
+ keys = sorted((x.app(), x.name_space(), x) for x in keys)
+ for (app_id, ns), group in itertools.groupby(keys, lambda x: x[:2]):
+ path_strings = [self.__EncodeIndexPB(x[2].path()) for x in group]
+ prefix = self.__GetTablePrefix((app_id, ns))
+ return self.__DeleteRows(conn, path_strings, '%s!%s' % (prefix, table))
+
+ def __DeleteIndexEntries(self, conn, keys):
+ """Deletes entities from the index.
+
+ Args:
+ conn: An SQLite connection.
+ keys: A list of keys to delete.
+ """
+ self.__DeleteEntityRows(conn, keys, 'EntitiesByProperty')
+
+ def __InsertEntities(self, conn, entities):
+ """Inserts or updates entities in the DB.
+
+ Args:
+ conn: A database connection.
+ entities: A list of entities to store.
+ """
+
+ def RowGenerator(entities):
+ for unused_prefix, e in entities:
+ yield (self.__EncodeIndexPB(e.key().path()),
+ self.__GetEntityKind(e),
+ buffer(e.Encode()))
+
+ entities = sorted((self.__GetTablePrefix(x), x) for x in entities)
+ for prefix, group in itertools.groupby(entities, lambda x: x[0]):
+ conn.executemany(
+ 'INSERT OR REPLACE INTO "%s!Entities" VALUES (?, ?, ?)' % prefix,
+ RowGenerator(group))
+
+ def __InsertIndexEntries(self, conn, entities):
+ """Inserts index entries for the supplied entities.
+
+ Args:
+ conn: A database connection.
+ entities: A list of entities to create index entries for.
+ """
+
+ def RowGenerator(entities):
+ for unused_prefix, e in entities:
+ for p in e.property_list():
+ yield (self.__GetEntityKind(e),
+ p.name(),
+ self.__EncodeIndexPB(p.value()),
+ self.__EncodeIndexPB(e.key().path()))
+ entities = sorted((self.__GetTablePrefix(x), x) for x in entities)
+ for prefix, group in itertools.groupby(entities, lambda x: x[0]):
+ conn.executemany(
+ 'INSERT INTO "%s!EntitiesByProperty" VALUES (?, ?, ?, ?)' % prefix,
+ RowGenerator(group))
+
+ def __AllocateIds(self, conn, prefix, size):
+ """Allocates IDs.
+
+ Args:
+ conn: An Sqlite connection object.
+ prefix: A table namespace prefix.
+ size: Number of IDs to allocate.
+ Returns:
+ int: The beginning of a range of size IDs
+ """
+ self.__id_lock.acquire()
+ next_id, block_size = self.__id_map.get(prefix, (0, 0))
+ if size >= block_size:
+ block_size = max(1000, size)
+ c = conn.execute(
+ 'UPDATE IdSeq SET next_id = next_id + ? WHERE prefix = ?',
+ (block_size, prefix))
+ assert c.rowcount == 1
+ c = conn.execute('SELECT next_id FROM IdSeq WHERE prefix = ? LIMIT 1',
+ (prefix,))
+ next_id = c.fetchone()[0] - block_size
+
+ ret = next_id
+
+ next_id += size
+ block_size -= size
+ self.__id_map[prefix] = (next_id, block_size)
+ self.__id_lock.release()
+
+ return ret
+
+ def MakeSyncCall(self, service, call, request, response):
+ """The main RPC entry point. service must be 'datastore_v3'."""
+ self.AssertPbIsInitialized(request)
+ try:
+ apiproxy_stub.APIProxyStub.MakeSyncCall(self, service, call, request,
+ response)
+ except sqlite3.OperationalError, e:
+ if e.args[0] == 'database is locked':
+ raise datastore_errors.Timeout('Database is locked.')
+ else:
+ raise
+ self.AssertPbIsInitialized(response)
+
+ def AssertPbIsInitialized(self, pb):
+ """Raises an exception if the given PB is not initialized and valid."""
+ explanation = []
+ assert pb.IsInitialized(explanation), explanation
+ pb.Encode()
+
+ def QueryHistory(self):
+ """Returns a dict that maps Query PBs to times they've been run."""
+ return dict((pb, times) for pb, times in self.__query_history.items() if
+ pb.app() == self.__app_id)
+
+ def __PutEntities(self, conn, entities):
+ self.__DeleteIndexEntries(conn, [e.key() for e in entities])
+ self.__InsertEntities(conn, entities)
+ self.__InsertIndexEntries(conn, entities)
+
+ def __DeleteEntities(self, conn, keys):
+ self.__DeleteIndexEntries(conn, keys)
+ self.__DeleteEntityRows(conn, keys, 'Entities')
+
+ def _Dynamic_Put(self, put_request, put_response):
+ conn = self.__GetConnection(put_request.transaction())
+ try:
+ entities = put_request.entity_list()
+ for entity in entities:
+ self.__ValidateKey(entity.key())
+
+ for prop in itertools.chain(entity.property_list(),
+ entity.raw_property_list()):
+ if prop.value().has_uservalue():
+ uid = md5.new(prop.value().uservalue().email().lower()).digest()
+ uid = '1' + ''.join(['%02d' % ord(x) for x in uid])[:20]
+ prop.mutable_value().mutable_uservalue().set_obfuscated_gaiaid(uid)
+
+ assert entity.has_key()
+ assert entity.key().path().element_size() > 0
+
+ last_path = entity.key().path().element_list()[-1]
+ if last_path.id() == 0 and not last_path.has_name():
+ id_ = self.__AllocateIds(conn, self.__GetTablePrefix(entity.key()), 1)
+ last_path.set_id(id_)
+
+ assert entity.entity_group().element_size() == 0
+ group = entity.mutable_entity_group()
+ root = entity.key().path().element(0)
+ group.add_element().CopyFrom(root)
+
+ else:
+ assert (entity.has_entity_group() and
+ entity.entity_group().element_size() > 0)
+
+ if put_request.transaction().handle():
+ self.__tx_writes[entity.key()] = entity
+ self.__tx_deletes.discard(entity.key())
+
+ if not put_request.transaction().handle():
+ self.__PutEntities(conn, entities)
+ put_response.key_list().extend([e.key() for e in entities])
+ finally:
+ self.__ReleaseConnection(conn, put_request.transaction())
+
+ def _Dynamic_Get(self, get_request, get_response):
+ conn = self.__GetConnection(get_request.transaction())
+ try:
+ for key in get_request.key_list():
+ self.__ValidateAppId(key.app())
+ prefix = self.__GetTablePrefix(key)
+ c = conn.execute(
+ 'SELECT entity FROM "%s!Entities" WHERE __path__ = ?' % (prefix,),
+ (self.__EncodeIndexPB(key.path()),))
+ group = get_response.add_entity()
+ row = c.fetchone()
+ if row:
+ group.mutable_entity().ParseFromString(row[0])
+ finally:
+ self.__ReleaseConnection(conn, get_request.transaction())
+
+ def _Dynamic_Delete(self, delete_request, delete_response):
+ conn = self.__GetConnection(delete_request.transaction())
+ try:
+ for key in delete_request.key_list():
+ self.__ValidateAppId(key.app())
+ if delete_request.transaction().handle():
+ self.__tx_deletes.add(key)
+ self.__tx_writes.pop(key, None)
+
+ if not delete_request.transaction().handle():
+ self.__DeleteEntities(conn, delete_request.key_list())
+ finally:
+ self.__ReleaseConnection(conn, delete_request.transaction())
+
+ def __GenerateFilterInfo(self, filters, query):
+ """Transform a list of filters into a more usable form.
+
+ Args:
+ filters: A list of filter PBs.
+ query: The query to generate filter info for.
+ Returns:
+ A dict mapping property names to lists of (op, value) tuples.
+ """
+ filter_info = {}
+ for filt in filters:
+ assert filt.property_size() == 1
+ prop = filt.property(0)
+ value = prop.value()
+ if prop.name() == '__key__':
+ value = ReferencePropertyToReference(value.referencevalue())
+ assert value.app() == query.app()
+ assert value.name_space() == query.name_space()
+ value = value.path()
+ filter_info.setdefault(prop.name(), []).append(
+ (filt.op(), self.__EncodeIndexPB(value)))
+ return filter_info
+
+ def __GenerateOrderInfo(self, orders):
+ """Transform a list of orders into a more usable form.
+
+ Args:
+ orders: A list of order PBs.
+ Returns:
+ A list of (property, direction) tuples.
+ """
+ orders = [(order.property(), order.direction()) for order in orders]
+ if orders and orders[-1] == ('__key__', datastore_pb.Query_Order.ASCENDING):
+ orders.pop()
+ return orders
+
+ def __GetPrefixRange(self, prefix):
+ """Returns a (min, max) range that encompasses the given prefix.
+
+ Args:
+ prefix: A string prefix to filter for. Must be a PB encodable using
+ __EncodeIndexPB.
+ Returns:
+ (min, max): Start and end string values to filter on.
+ """
+ ancestor_min = self.__EncodeIndexPB(prefix)
+ ancestor_max = buffer(str(ancestor_min) + '\xfb\xff\xff\xff\x89')
+ return ancestor_min, ancestor_max
+
+ def __KindQuery(self, query, filter_info, order_info):
+ """Performs kind only, kind and ancestor, and ancestor only queries."""
+ if not (set(filter_info.keys()) |
+ set(x[0] for x in order_info)).issubset(['__key__']):
+ return None
+ if len(order_info) > 1:
+ return None
+
+ filters = []
+ filters.extend(('__path__', op, value) for op, value
+ in filter_info.get('__key__', []))
+ if query.has_kind():
+ filters.append(('kind', datastore_pb.Query_Filter.EQUAL, query.kind()))
+ if query.has_ancestor():
+ amin, amax = self.__GetPrefixRange(query.ancestor().path())
+ filters.append(('__path__',
+ datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL, amin))
+ filters.append(('__path__', datastore_pb.Query_Filter.LESS_THAN, amax))
+
+ if order_info:
+ orders = [('__path__', order_info[0][1])]
+ else:
+ orders = [('__path__', datastore_pb.Query_Order.ASCENDING)]
+
+ params = []
+ query = ('SELECT Entities.__path__, Entities.entity, %s '
+ 'FROM "%s!Entities" AS Entities %s %s' % (
+ ','.join(x[0] for x in orders),
+ self.__GetTablePrefix(query),
+ self.__CreateFilterString(filters, params),
+ self.__CreateOrderString(orders)))
+ return query, params
+
+ def __SinglePropertyQuery(self, query, filter_info, order_info):
+ """Performs queries satisfiable by the EntitiesByProperty table."""
+ property_names = set(filter_info.keys())
+ property_names.update(x[0] for x in order_info)
+ property_names.discard('__key__')
+ if len(property_names) != 1:
+ return None
+
+ property_name = property_names.pop()
+ filter_ops = filter_info.get(property_name, [])
+
+ if len([1 for o, _ in filter_ops
+ if o == datastore_pb.Query_Filter.EQUAL]) > 1:
+ return None
+
+ if len(order_info) > 1 or (order_info and order_info[0][0] == '__key__'):
+ return None
+
+ if query.has_ancestor():
+ return None
+
+ if not query.has_kind():
+ return None
+
+ prefix = self.__GetTablePrefix(query)
+ filters = []
+ filters.append(('EntitiesByProperty.kind',
+ datastore_pb.Query_Filter.EQUAL, query.kind()))
+ filters.append(('name', datastore_pb.Query_Filter.EQUAL, property_name))
+ for op, value in filter_ops:
+ if property_name == '__key__':
+ filters.append(('EntitiesByProperty.__path__', op, value))
+ else:
+ filters.append(('value', op, value))
+
+ orders = [('EntitiesByProperty.kind', datastore_pb.Query_Order.ASCENDING),
+ ('name', datastore_pb.Query_Order.ASCENDING)]
+ if order_info:
+ orders.append(('value', order_info[0][1]))
+ else:
+ orders.append(('value', datastore_pb.Query_Order.ASCENDING))
+ orders.append(('EntitiesByProperty.__path__',
+ datastore_pb.Query_Order.ASCENDING))
+
+ params = []
+ format_args = (
+ ','.join(x[0] for x in orders[2:]),
+ prefix,
+ prefix,
+ self.__CreateFilterString(filters, params),
+ self.__CreateOrderString(orders))
+ query = ('SELECT Entities.__path__, Entities.entity, %s '
+ 'FROM "%s!EntitiesByProperty" AS EntitiesByProperty INNER JOIN '
+ '"%s!Entities" AS Entities USING (__path__) %s %s' % format_args)
+ return query, params
+
+ def __StarSchemaQueryPlan(self, query, filter_info, order_info):
+ """Executes a query using a 'star schema' based on EntitiesByProperty.
+
+ A 'star schema' is a join between an objects table (Entities) and multiple
+ instances of a facts table (EntitiesByProperty). Ideally, this will result
+ in a merge join if the only filters are inequalities and the sort orders
+ match those in the index for the facts table; otherwise, the DB will do its
+ best to satisfy the query efficiently.
+
+ Args:
+ query: The datastore_pb.Query PB.
+ filter_info: A dict mapping properties filtered on to (op, value) tuples.
+ order_info: A list of (property, direction) tuples.
+ Returns:
+ (query, params): An SQL query string and list of parameters for it.
+ """
+ filter_sets = []
+ for name, filter_ops in filter_info.items():
+ filter_sets.extend((name, [x]) for x in filter_ops
+ if x[0] == datastore_pb.Query_Filter.EQUAL)
+ ineq_ops = [x for x in filter_ops
+ if x[0] != datastore_pb.Query_Filter.EQUAL]
+ if ineq_ops:
+ filter_sets.append((name, ineq_ops))
+
+ for prop, _ in order_info:
+ if prop == '__key__':
+ continue
+ if prop not in filter_info:
+ filter_sets.append((prop, []))
+
+ prefix = self.__GetTablePrefix(query)
+
+ joins = []
+ filters = []
+ join_name_map = {}
+ for name, filter_ops in filter_sets:
+ join_name = 'ebp_%d' % (len(joins),)
+ join_name_map.setdefault(name, join_name)
+ joins.append(
+ 'INNER JOIN "%s!EntitiesByProperty" AS %s '
+ 'ON Entities.__path__ = %s.__path__'
+ % (prefix, join_name, join_name))
+ filters.append(('%s.kind' % join_name, datastore_pb.Query_Filter.EQUAL,
+ query.kind()))
+ filters.append(('%s.name' % join_name, datastore_pb.Query_Filter.EQUAL,
+ name))
+ for op, value in filter_ops:
+ filters.append(('%s.value' % join_name, op, buffer(value)))
+ if query.has_ancestor():
+ amin, amax = self.__GetPrefixRange(query.ancestor().path())
+ filters.append(('%s.__path__' % join_name,
+ datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL, amin))
+ filters.append(('%s.__path__' % join_name,
+ datastore_pb.Query_Filter.LESS_THAN, amax))
+
+ orders = []
+ for prop, order in order_info:
+ if prop == '__key__':
+ orders.append(('Entities.__path__', order))
+ else:
+ prop = '%s.value' % (join_name_map[prop],)
+ orders.append((prop, order))
+ if not order_info or order_info[-1][0] != '__key__':
+ orders.append(('Entities.__path__', datastore_pb.Query_Order.ASCENDING))
+
+ params = []
+ format_args = (
+ ','.join(x[0] for x in orders),
+ prefix,
+ ' '.join(joins),
+ self.__CreateFilterString(filters, params),
+ self.__CreateOrderString(orders))
+ query = ('SELECT Entities.__path__, Entities.entity, %s '
+ 'FROM "%s!Entities" AS Entities %s %s %s' % format_args)
+ return query, params
+
+ def __MergeJoinQuery(self, query, filter_info, order_info):
+ if order_info:
+ return None
+ if query.has_ancestor():
+ return None
+ if not query.has_kind():
+ return None
+ for filter_ops in filter_info.values():
+ for op, _ in filter_ops:
+ if op != datastore_pb.Query_Filter.EQUAL:
+ return None
+
+ return self.__StarSchemaQueryPlan(query, filter_info, order_info)
+
+ def __LastResortQuery(self, query, filter_info, order_info):
+ """Last resort query plan that executes queries requring composite indexes.
+
+ Args:
+ query: The datastore_pb.Query PB.
+ filter_info: A dict mapping properties filtered on to (op, value) tuples.
+ order_info: A list of (property, direction) tuples.
+ Returns:
+ (query, params): An SQL query string and list of parameters for it.
+ """
+ if self.__require_indexes:
+ index = self.__FindIndexForQuery(query)
+ if not index:
+ raise apiproxy_errors.ApplicationError(
+ datastore_pb.Error.NEED_INDEX,
+ 'This query requires a composite index that is not defined. '
+ 'You must update the index.yaml file in your application root.')
+ return self.__StarSchemaQueryPlan(query, filter_info, order_info)
+
+ def __FindIndexForQuery(self, query):
+ """Finds an index that can be used to satisfy the provided query.
+
+ Args:
+ query: A datastore_pb.Query PB.
+ Returns:
+ An entity_pb.CompositeIndex PB, if a suitable index exists; otherwise None
+ """
+ unused_required, kind, ancestor, props, num_eq_filters = (
+ datastore_index.CompositeIndexForQuery(query))
+ required_key = (kind, ancestor, props)
+ indexes = self.__indexes.get(query.app(), {}).get(kind, [])
+
+ eq_filters_set = set(props[:num_eq_filters])
+ remaining_filters = props[num_eq_filters:]
+ for index in indexes:
+ definition = datastore_index.ProtoToIndexDefinition(index)
+ index_key = datastore_index.IndexToKey(definition)
+ if required_key == index_key:
+ return index
+ if num_eq_filters > 1 and (kind, ancestor) == index_key[:2]:
+ this_props = index_key[2]
+ this_eq_filters_set = set(this_props[:num_eq_filters])
+ this_remaining_filters = this_props[num_eq_filters:]
+ if (eq_filters_set == this_eq_filters_set and
+ remaining_filters == this_remaining_filters):
+ return index
+
+ _QUERY_STRATEGIES = [
+ __KindQuery,
+ __SinglePropertyQuery,
+ __MergeJoinQuery,
+ __LastResortQuery,
+ ]
+
+ def __GetQueryCursor(self, conn, query):
+ """Returns an SQLite query cursor for the provided query.
+
+ Args:
+ conn: The SQLite connection.
+ query: A datastore_pb.Query protocol buffer.
+ Returns:
+ A QueryCursor object.
+ """
+ if query.has_transaction() and not query.has_ancestor():
+ raise apiproxy_errors.ApplicationError(
+ datastore_pb.Error.BAD_REQUEST,
+ 'Only ancestor queries are allowed inside transactions.')
+
+ num_components = len(query.filter_list()) + len(query.order_list())
+ if query.has_ancestor():
+ num_components += 1
+ if num_components > _MAX_QUERY_COMPONENTS:
+ raise apiproxy_errors.ApplicationError(
+ datastore_pb.Error.BAD_REQUEST,
+ ('query is too large. may not have more than %s filters'
+ ' + sort orders ancestor total' % _MAX_QUERY_COMPONENTS))
+
+ app_id = query.app()
+ self.__ValidateAppId(app_id)
+
+ filters, orders = datastore_index.Normalize(query.filter_list(),
+ query.order_list())
+
+ filter_info = self.__GenerateFilterInfo(filters, query)
+ order_info = self.__GenerateOrderInfo(orders)
+
+ for strategy in DatastoreSqliteStub._QUERY_STRATEGIES:
+ result = strategy(self, query, filter_info, order_info)
+ if result:
+ break
+ else:
+ raise apiproxy_errors.ApplicationError(
+ datastore_pb.Error.BAD_REQUEST,
+ 'No strategy found to satisfy query.')
+
+ sql_stmt, params = result
+
+ if self.__verbose:
+ logging.info("Executing statement '%s' with arguments %r",
+ sql_stmt, [str(x) for x in params])
+ db_cursor = conn.execute(sql_stmt, params)
+ cursor = QueryCursor(query, db_cursor)
+ if query.has_compiled_cursor() and query.compiled_cursor().position_size():
+ cursor.ResumeFromCompiledCursor(query.compiled_cursor())
+ if query.has_offset():
+ cursor.Skip(query.offset())
+
+ clone = datastore_pb.Query()
+ clone.CopyFrom(query)
+ clone.clear_hint()
+ clone.clear_limit()
+ clone.clear_count()
+ clone.clear_offset()
+ self.__query_history[clone] = self.__query_history.get(clone, 0) + 1
+
+ return cursor
+
+ def _Dynamic_RunQuery(self, query, query_result):
+ conn = self.__GetConnection(query.transaction())
+ try:
+ cursor = self.__GetQueryCursor(conn, query)
+
+ self.__cursor_lock.acquire()
+ cursor_id = self.__next_cursor_id
+ self.__next_cursor_id += 1
+ self.__cursor_lock.release()
+
+ cursor_pb = query_result.mutable_cursor()
+ cursor_pb.set_app(query.app())
+ cursor_pb.set_cursor(cursor_id)
+
+ if query.has_count():
+ count = query.count()
+ elif query.has_limit():
+ count = query.limit()
+ else:
+ count = _BATCH_SIZE
+
+ cursor.PopulateQueryResult(count, query_result)
+ self.__cursors[cursor_pb] = cursor
+ finally:
+ self.__ReleaseConnection(conn, query.transaction())
+
+ def _Dynamic_Next(self, next_request, query_result):
+ self.__ValidateAppId(next_request.cursor().app())
+
+ try:
+ cursor = self.__cursors[next_request.cursor()]
+ except KeyError:
+ raise apiproxy_errors.ApplicationError(
+ datastore_pb.Error.BAD_REQUEST,
+ 'Cursor %d not found' % next_request.cursor().cursor())
+
+ assert cursor.app == next_request.cursor().app()
+
+ count = _BATCH_SIZE
+ if next_request.has_count():
+ count = next_request.count()
+ cursor.PopulateQueryResult(count, query_result)
+
+ def _Dynamic_Count(self, query, integer64proto):
+ if query.has_limit():
+ query.set_limit(min(query.limit(), _MAXIMUM_RESULTS))
+ else:
+ query.set_limit(_MAXIMUM_RESULTS)
+
+ conn = self.__GetConnection(query.transaction())
+ try:
+ cursor = self.__GetQueryCursor(conn, query)
+ integer64proto.set_value(cursor.Count())
+ finally:
+ self.__ReleaseConnection(conn, query.transaction())
+
+ def _Dynamic_BeginTransaction(self, request, transaction):
+ self.__ValidateAppId(request.app())
+
+ self.__connection_lock.acquire()
+ assert self.__current_transaction is None
+ handle = self.__next_tx_handle
+ self.__next_tx_handle += 1
+
+ transaction.set_app(request.app())
+ transaction.set_handle(handle)
+ self.__current_transaction = handle
+
+ def _Dynamic_AddActions(self, request, _):
+
+ if ((len(self.__tx_actions) + request.add_request_size()) >
+ _MAX_ACTIONS_PER_TXN):
+ raise apiproxy_errors.ApplicationError(
+ datastore_pb.Error.BAD_REQUEST,
+ 'Too many messages, maximum allowed %s' % _MAX_ACTIONS_PER_TXN)
+
+ new_actions = []
+ for add_request in request.add_request_list():
+ self.__ValidateTransaction(add_request.transaction())
+ clone = taskqueue_service_pb.TaskQueueAddRequest()
+ clone.CopyFrom(add_request)
+ clone.clear_transaction()
+ new_actions.append(clone)
+
+ self.__tx_actions.extend(new_actions)
+
+ def _Dynamic_Commit(self, transaction, _):
+ assert self.__current_transaction == transaction.handle()
+ conn = self.__connection
+
+ try:
+ self.__PutEntities(conn, self.__tx_writes.values())
+ self.__DeleteEntities(conn, self.__tx_deletes)
+
+ for action in self.__tx_actions:
+ try:
+ apiproxy_stub_map.MakeSyncCall(
+ 'taskqueue', 'Add', action, api_base_pb.VoidProto())
+ except apiproxy_errors.ApplicationError, e:
+ logging.warning('Transactional task %s has been dropped, %s',
+ action, e)
+ finally:
+ self.__current_transaction = None
+ self.__tx_actions = []
+ self.__tx_writes = {}
+ self.__tx_deletes = set()
+ self.__ReleaseConnection(conn, None)
+
+ def _Dynamic_Rollback(self, transaction, _):
+ conn = self.__GetConnection(transaction)
+ self.__current_transaction = None
+ self.__tx_actions = []
+ self.__tx_writes = {}
+ self.__tx_deletes = set()
+ self.__ReleaseConnection(conn, None, True)
+
+ def _Dynamic_GetSchema(self, req, schema):
+ conn = self.__GetConnection(None)
+ try:
+ prefix = self.__GetTablePrefix(req)
+
+ filters = []
+ if req.has_start_kind():
+ filters.append(('kind', datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL,
+ req.start_kind()))
+ if req.has_end_kind():
+ filters.append(('kind', datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL,
+ req.end_kind()))
+
+ params = []
+ if req.properties():
+ sql_stmt = ('SELECT kind, name, value FROM "%s!EntitiesByProperty" %s '
+ 'GROUP BY kind, name, substr(value, 1, 1) ORDER BY kind'
+ % (prefix, self.__CreateFilterString(filters, params)))
+ else:
+ sql_stmt = ('SELECT kind FROM "%s!Entities" %s GROUP BY kind'
+ % (prefix, self.__CreateFilterString(filters, params)))
+ c = conn.execute(sql_stmt, params)
+
+ kind = None
+ current_name = None
+ kind_pb = None
+ for row in c.fetchall():
+ if row[0] != kind:
+ if kind_pb:
+ schema.kind_list().append(kind_pb)
+ kind = row[0].encode('utf-8')
+ kind_pb = entity_pb.EntityProto()
+ kind_pb.mutable_key().set_app('')
+ kind_pb.mutable_key().mutable_path().add_element().set_type(kind)
+ kind_pb.mutable_entity_group()
+
+ if req.properties():
+ name, value_data = row[1:]
+ if current_name != name:
+ current_name = name
+ prop_pb = kind_pb.add_property()
+ prop_pb.set_name(name.encode('utf-8'))
+ prop_pb.set_multiple(False)
+
+ value_decoder = sortable_pb_encoder.Decoder(
+ array.array('B', str(value_data)))
+ value_pb = prop_pb.mutable_value()
+ value_pb.Merge(value_decoder)
+
+ if value_pb.has_int64value():
+ value_pb.set_int64value(0)
+ if value_pb.has_booleanvalue():
+ value_pb.set_booleanvalue(False)
+ if value_pb.has_stringvalue():
+ value_pb.set_stringvalue('none')
+ if value_pb.has_doublevalue():
+ value_pb.set_doublevalue(0.0)
+ if value_pb.has_pointvalue():
+ value_pb.mutable_pointvalue().set_x(0.0)
+ value_pb.mutable_pointvalue().set_y(0.0)
+ if value_pb.has_uservalue():
+ value_pb.mutable_uservalue().set_gaiaid(0)
+ value_pb.mutable_uservalue().set_email('none')
+ value_pb.mutable_uservalue().set_auth_domain('none')
+ value_pb.mutable_uservalue().clear_nickname()
+ value_pb.mutable_uservalue().clear_obfuscated_gaiaid()
+ if value_pb.has_referencevalue():
+ value_pb.clear_referencevalue()
+ value_pb.mutable_referencevalue().set_app('none')
+ pathelem = value_pb.mutable_referencevalue().add_pathelement()
+ pathelem.set_type('none')
+ pathelem.set_name('none')
+
+ if kind_pb:
+ schema.kind_list().append(kind_pb)
+ finally:
+ self.__ReleaseConnection(conn, None)
+
+ def _Dynamic_AllocateIds(self, allocate_ids_request, allocate_ids_response):
+ conn = self.__GetConnection(None)
+
+ model_key = allocate_ids_request.model_key()
+ size = allocate_ids_request.size()
+
+ self.__ValidateAppId(model_key.app())
+
+ first_id = self.__AllocateIds(conn, self.__GetTablePrefix(model_key), size)
+ allocate_ids_response.set_start(first_id)
+ allocate_ids_response.set_end(first_id + size - 1)
+
+ self.__ReleaseConnection(conn, None)
+
+ def __FindIndex(self, index):
+ """Finds an existing index by definition.
+
+ Args:
+ index: entity_pb.CompositeIndex
+
+ Returns:
+ entity_pb.CompositeIndex, if it exists; otherwise None
+ """
+ app_indexes = self.__indexes.get(index.app_id(), {})
+ for stored_index in app_indexes.get(index.definition().entity_type(), []):
+ if index.definition() == stored_index.definition():
+ return stored_index
+
+ return None
+
+ def _Dynamic_CreateIndex(self, index, id_response):
+ app_id = index.app_id()
+ kind = index.definition().entity_type()
+
+ self.__ValidateAppId(app_id)
+ if index.id() != 0:
+ raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST,
+ 'New index id must be 0.')
+
+ self.__index_lock.acquire()
+ try:
+ if self.__FindIndex(index):
+ raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST,
+ 'Index already exists.')
+
+ next_id = max([idx.id() for x in self.__indexes.get(app_id, {}).values()
+ for idx in x] + [0]) + 1
+ index.set_id(next_id)
+ id_response.set_value(next_id)
+
+ clone = entity_pb.CompositeIndex()
+ clone.CopyFrom(index)
+ self.__indexes.setdefault(app_id, {}).setdefault(kind, []).append(clone)
+
+ conn = self.__GetConnection(None)
+ try:
+ self.__WriteIndexData(conn, app_id)
+ finally:
+ self.__ReleaseConnection(conn, None)
+ finally:
+ self.__index_lock.release()
+
+ def _Dynamic_GetIndices(self, app_str, composite_indices):
+ self.__ValidateAppId(app_str.value())
+
+ index_list = composite_indices.index_list()
+ for indexes in self.__indexes.get(app_str.value(), {}).values():
+ index_list.extend(indexes)
+
+ def _Dynamic_UpdateIndex(self, index, _):
+ self.__ValidateAppId(index.app_id())
+ my_index = self.__FindIndex(index)
+ if not my_index:
+ raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST,
+ "Index doesn't exist.")
+ elif (index.state() != my_index.state() and
+ index.state() not in self._INDEX_STATE_TRANSITIONS[my_index.state()]):
+ raise apiproxy_errors.ApplicationError(
+ datastore_pb.Error.BAD_REQUEST,
+ 'Cannot move index state from %s to %s' %
+ (entity_pb.CompositeIndex.State_Name(my_index.state()),
+ (entity_pb.CompositeIndex.State_Name(index.state()))))
+
+ self.__index_lock.acquire()
+ try:
+ my_index.set_state(index.state())
+ finally:
+ self.__index_lock.release()
+
+ def _Dynamic_DeleteIndex(self, index, _):
+ app_id = index.app_id()
+ kind = index.definition().entity_type()
+ self.__ValidateAppId(app_id)
+
+ my_index = self.__FindIndex(index)
+ if not my_index:
+ raise apiproxy_errors.ApplicationError(datastore_pb.Error.BAD_REQUEST,
+ "Index doesn't exist.")
+
+ conn = self.__GetConnection(None)
+ try:
+ self.__WriteIndexData(conn, app_id)
+ finally:
+ self.__ReleaseConnection(conn, None)
+ self.__index_lock.acquire()
+ try:
+ self.__indexes[app_id][kind].remove(my_index)
+ finally:
+ self.__index_lock.release()