Skip to content
This repository has been archived by the owner on Dec 15, 2020. It is now read-only.

Patch cqlengine: add async query functions #70

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ __pycache__/
*.egg-info/
.coverage
htmlcov
.pytest_cache/

# generic files to ignore
*~
*.lock
*.DS_Store
*.swp
*.out
.idea/

.tox/
deps/
Expand Down
73 changes: 73 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,79 @@ Usage

Python 3.5+ is required


Using with cqlengine
-----

.. code-block:: python

import asyncio
import uuid
import os

from aiocassandra import aiosession, AioModel
from cassandra.cluster import Cluster
from cassandra.cqlengine import columns, connection, management

cluster = Cluster()
session = cluster.connect()


class User(AioModel):
user_id = columns.UUID(primary_key=True)
username = columns.Text()


async def main():
aiosession(session)

# Set aiosession for cqlengine
session.set_keyspace('example_keyspace')
connection.set_session(session)

# Model.objects.create() and Model.create() in async way:
user_id = uuid.uuid4()
await User.objects.async_create(user_id=user_id, username='user1')
# also can use: await User.async_create(user_id=user_id, username='user1)

# Model.objects.all() and Model.all() in async way:
print(list(await User.async_all()))
print(list(await User.objects.filter(user_id=user_id).async_all()))

# Model.object.update() in async way:
await User.objects(user_id=user_id).async_update(username='updated-user1')

# Model.objects.get() and Model.get() in async way:
user = await User.objects.async_get(user_id=user_id)
assert user.user_id == (await User.async_get(user_id=user_id)).user_id
print(user, user.username)

# obj.save() in async way:
user.username = 'saved-user1'
await user.async_save()

# obj.delete() in async way:
await user.async_delete()

# Didn't break original functions
print('Left users: ', len(User.objects.all()))


def create_keyspace(keyspace):
os.environ['CQLENG_ALLOW_SCHEMA_MANAGEMENT'] = 'true'
connection.register_connection('cqlengine', session=session, default=True)
management.create_keyspace_simple(keyspace, replication_factor=1)
management.sync_table(User, keyspaces=[keyspace])


create_keyspace('example_keyspace')

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
cluster.shutdown()
loop.close()


Thanks
------

Expand Down
4 changes: 4 additions & 0 deletions aiocassandra/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from aiocassandra.cqlengine.models import AioModel
from aiocassandra.session import aiosession

__all__ = ['aiosession', 'AioModel']
Empty file.
139 changes: 139 additions & 0 deletions aiocassandra/cqlengine/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
from cassandra.cqlengine.models import Model, PolymorphicModelException
from cassandra.cqlengine.query import ValidationError

from aiocassandra.cqlengine.query import AioDMLQuery, AioQuerySet


class AioModel(Model):
__abstract__ = True
__dmlquery__ = AioDMLQuery
__queryset__ = AioQuerySet

@classmethod
async def async_create(cls, **kwargs):
extra_columns = set(kwargs.keys()) - set(cls._columns.keys())
if extra_columns:
raise ValidationError(
"Incorrect columns passed: {0}".format(extra_columns))
return await cls.objects.async_create(**kwargs)

async def async_delete(self):
"""
Deletes the object from the database
"""
await self.__dmlquery__(
self.__class__,
self,
batch=self._batch,
timestamp=self._timestamp,
consistency=self.__consistency__,
timeout=self._timeout,
conditional=self._conditional,
if_exists=self._if_exists).async_delete()

@classmethod
async def async_all(cls):
"""
Returns a queryset representing all stored objects.

This is a pass-through to the model objects().async_all()
"""
return await cls.objects.async_all()

@classmethod
async def async_get(cls, *args, **kwargs):
"""
Returns a single object based on the passed filter constraints.

This is a pass-through to the model objects().
:method:`~cqlengine.queries.get`.
"""
return await cls.objects.async_get(*args, **kwargs)

async def async_save(self):
# handle polymorphic models
if self._is_polymorphic:
if self._is_polymorphic_base:
raise PolymorphicModelException(
'cannot save polymorphic base model')
else:
setattr(self, self._discriminator_column_name,
self.__discriminator_value__)

self.validate()
await self.__dmlquery__(
self.__class__,
self,
batch=self._batch,
ttl=self._ttl,
timestamp=self._timestamp,
consistency=self.__consistency__,
if_not_exists=self._if_not_exists,
conditional=self._conditional,
timeout=self._timeout,
if_exists=self._if_exists).async_save()

self._set_persisted()
self._timestamp = None
return self

async def async_update(self, **values):
"""
Performs an update on the model instance. You can pass in values to
set on the model for updating, or you can call without values to
execute an update against any modified fields.
If no fields on the model have been modified since loading,
no query will be performed. Model validation is performed normally.
Setting a value to `None` is equivalent to running a CQL `DELETE` on
that column.

It is possible to do a blind update, that is, to update a field without
having first selected the object out of the database.
See :ref:`Blind Updates <blind_updates>`
"""
for column_id, v in values.items():
col = self._columns.get(column_id)

# check for nonexistant columns
if col is None:
raise ValidationError(
"{0}.{1} has no column named: {2}".format(
self.__module__, self.__class__.__name__, column_id))

# check for primary key update attempts
if col.is_primary_key:
current_value = getattr(self, column_id)
if v != current_value:
raise ValidationError(
"Cannot apply update to primary key '{0}' for {1}.{2}".
format(column_id, self.__module__,
self.__class__.__name__))

setattr(self, column_id, v)

# handle polymorphic models
if self._is_polymorphic:
if self._is_polymorphic_base:
raise PolymorphicModelException(
'cannot update polymorphic base model')
else:
setattr(self, self._discriminator_column_name,
self.__discriminator_value__)

self.validate()
await self.__dmlquery__(
self.__class__,
self,
batch=self._batch,
ttl=self._ttl,
timestamp=self._timestamp,
consistency=self.__consistency__,
conditional=self._conditional,
timeout=self._timeout,
if_exists=self._if_exists).async_update()

self._set_persisted()

self._timestamp = None

return self
Loading