Source code for oauth2.store.dynamodb

# -*- coding: utf-8 -*-

from oauth2.datatype import AccessToken
from oauth2.error import AccessTokenNotFound
from oauth2.store import AccessTokenStore


class DynamodbStore(object):
    """
    Uses dynamodb to store access tokens and auth tokens.

    This Store supports ``dynamodb``. Arguments are passed to the
    underlying client implementation. For connect to dynamodb use python boto library.
    http://boto.cloudhackers.com/en/latest/dynamodb_tut.html

    Initialization::

        from oauth2.store.dynamodb import TokenStore
        # oauth_token = Table('oauth_access_token')
        oauth_token = Table.create('users',
                                   schema=[HashKey('token_key')],
                                   global_indexes=[
                                     GlobalAllIndex('RefreshToken-index', parts=[HashKey('refresh_token')])
                                   ])

        token_store = TokenStore(oauth_token)
    """
    def __init__(self, connect):
        self.connect = connect


[docs]class TokenStore(AccessTokenStore, DynamodbStore): """Dynamodb Access Token Store""" def save_token(self, access_token): """ Stores the access token and additional data in redis. See :class:`oauth2.store.AccessTokenStore`. """ unique_token_key = self._unique_token_key(access_token.client_id, access_token.grant_type, access_token.user_id) storing_unique_token = access_token.__dict__ storing_unique_token.update({'token_key': unique_token_key}) self.connect.put_item(**storing_unique_token) def delete_refresh_token(self, refresh_token): """ Deletes a refresh token after use :param refresh_token: The refresh token to delete. """ access_token = self.fetch_by_refresh_token(refresh_token) self.connect.delete({'token_key': access_token.token}) def fetch_by_refresh_token(self, refresh_token): """ Find oauth tokens by refresh token. """ tokens_data = self.connect.query2(refresh_token__eq=refresh_token, index='RefreshToken-index', limit=1) tokens_res = list(tokens_data) if not tokens_res: raise error.AccessTokenNotFound token_data = tokens_res.pop() if token_data is None: raise error.AccessTokenNotFound data = token_data._data.get('token') # pylint: disable=protected-access del data['token_key'] return datatype.AccessToken(**data) def fetch_existing_token_of_user(self, client_id, grant_type, user_id): """ Find oauth access token by client_id, grant_type, user_id. """ unique_token_key = self._unique_token_key(client_id=client_id, grant_type=grant_type, user_id=user_id) token_data = self.connect.get_item(token_key=unique_token_key) if token_data is None: raise error.AccessTokenNotFound return datatype.AccessToken(**token_data) @classmethod def _unique_token_key(cls, client_id, grant_type, user_id): return "{0}_{1}_{2}".format(client_id, grant_type, user_id)