Source code for

# -*- coding: utf-8 -*-
import memcache
from oauth2.datatype import AccessToken, AuthorizationCode
from oauth2.error import AccessTokenNotFound, AuthCodeNotFound
from import AccessTokenStore, AuthCodeStore

[docs]class TokenStore(AccessTokenStore, AuthCodeStore): """ Uses memcache to store access tokens and auth tokens. This Store supports ``python-memcached``. Arguments are passed to the underlying client implementation. Initialization by passing an object:: # This example uses python-memcached import memcache # Somewhere in your application mc = memcache.Client(servers=[''], debug=0) # ... token_store = TokenStore(mc=mc) Initialization using ``python-memcached``:: token_store = TokenStore(servers=[''], debug=0) """ def __init__(self, mc=None, prefix="oauth2", *args, **kwargs): self.prefix = prefix if mc is not None: = mc else: = memcache.Client(*args, **kwargs) def fetch_by_code(self, code): """ Returns data belonging to an authorization code from memcache or ``None`` if no data was found. See :class:``. """ code_data = if code_data is None: raise AuthCodeNotFound return AuthorizationCode(**code_data) def save_code(self, authorization_code): """ Stores the data belonging to an authorization code token in memcache. See :class:``. """ key = self._generate_cache_key(authorization_code.code), {"client_id": authorization_code.client_id, "code": authorization_code.code, "expires_at": authorization_code.expires_at, "redirect_uri": authorization_code.redirect_uri, "scopes": authorization_code.scopes, "data":, "user_id": authorization_code.user_id}) def delete_code(self, code): """ Deletes an authorization code after use :param code: The authorization code. """ def save_token(self, access_token): """ Stores the access token and additional data in memcache. See :class:``. """ key = self._generate_cache_key(access_token.token), access_token.__dict__) unique_token_key = self._unique_token_key(access_token.client_id, access_token.grant_type, access_token.user_id), access_token.__dict__) if access_token.refresh_token is not None: rft_key = self._generate_cache_key(access_token.refresh_token), access_token.__dict__) def delete_refresh_token(self, refresh_token): """ Deletes a refresh token after use :param refresh_token: The refresh token to delete. """ access_token = self.fetch_by_refresh_token(refresh_token) def fetch_by_refresh_token(self, refresh_token): token_data = if token_data is None: raise AccessTokenNotFound return AccessToken(**token_data) def fetch_existing_token_of_user(self, client_id, grant_type, user_id): data =, grant_type, user_id)) if data is None: raise AccessTokenNotFound return AccessToken(**data) def _unique_token_key(self, client_id, grant_type, user_id): return "{0}_{1}_{2}".format(client_id, grant_type, user_id) def _generate_cache_key(self, identifier): return self.prefix + "_" + identifier