Source code for oauth2.tokengenerator

"""
Provides various implementations of algorithms to generate an Access Token or Refresh Token.
"""

import hashlib
import os
import uuid

import itsdangerous
from oauth2.error import AccessTokenNotFound


[docs]class TokenGenerator(object): """ Base class of every token generator. """ def __init__(self): """ Create a new instance of a token generator. """ self.expires_in = {} self.refresh_expires_in = 0
[docs] def create_access_token_data(self, data, scopes, grant_type, user_id, client_id): """ Create data needed by an access token. :param data: Arbitrary data as returned by the ``authenticate()`` method of a ``SiteAdapter``. :type data: dict :param grant_type: :type grant_type: str :param user_id: Identifier of the current user as returned by the ``authenticate()`` method of a ``SiteAdapter`` :type user_id: int :param client_id: Identifier of the current client. :type client_id: str :return: A ``dict`` containing the ``access_token`` and the ``token_type``. If the value of ``TokenGenerator.expires_in`` is larger than 0, a ``refresh_token`` will be generated too. :rtype: dict """ result = {"access_token": self.generate(grant_type, data, scopes, user_id, client_id), "token_type": "Bearer"} grant_type_expires_in = self.expires_in.get(grant_type) if grant_type_expires_in: result["refresh_token"] = self.refresh_generate(grant_type, data, scopes, user_id, client_id) result["expires_in"] = grant_type_expires_in return result
[docs] def generate(self, grant_type=None, data=None, scopes=None, user_id=None, client_id=None): """ Implemented by generators extending this base class. :param grant_type: Identifier token grant_type :type grant_type: str :param data: Arbitrary data as returned by the ``authenticate()`` method of a ``SiteAdapter``. :type data: dict :param scopes: scopes for oauth session :type scopes: dict :param user_id: Identifier of the current user as returned by the ``authenticate()`` method of a ``SiteAdapter`` :type user_id: int :param client_id: Identifier of the current client. :type client_id: str :raises NotImplementedError: """ raise NotImplementedError
[docs] def refresh_generate(self, grant_type=None, data=None, scopes=None, user_id=None, client_id=None): """ Implemented by refresh generators extending this base class. :param grant_type: Identifier token grant_type :type grant_type: str :param data: Arbitrary data as returned by the ``authenticate()`` method of a ``SiteAdapter``. :type data: dict :param scopes: scopes for oauth session :type scopes: dict :param user_id: Identifier of the current user as returned by the ``authenticate()`` method of a ``SiteAdapter`` :type user_id: int :param client_id: Identifier of the current client. :type client_id: str :raises NotImplementedError: """ raise NotImplementedError
[docs]class StatelessTokenGenerator(TokenGenerator): """ Generate a token using JSON Web Tokens tokens. """ def __init__(self, secret_key): self.serializer = itsdangerous.URLSafeTimedSerializer(secret_key) TokenGenerator.__init__(self) def json_serialize(self, data): _data = dict((k, v) for k, v in data.items() if v) # Remove empty val return self.serializer.dumps(_data) def unserialize(self, serialized): try: payload, timestamp = self.serializer.loads(serialized, return_timestamp=True) payload["refresh_expires_at"] = timestamp return payload except (itsdangerous.BadSignature, itsdangerous.SignatureExpired): raise AccessTokenNotFound def validate_token(self, token, token_type): payload = self.unserialize(token) if payload['type'] != token_type: raise AccessTokenNotFound return payload
[docs] def generate(self, grant_type=None, data=None, scopes=None, user_id=None, client_id=None): """ :return: A new token :rtype: str """ # We use the same generator for code and access_token # JWT will return the same code for different user user_id = user_id if user_id else str(uuid.uuid4()) return self.json_serialize(dict(type='access_token', grant_type=grant_type, user_id=user_id, data=data, scopes=scopes, client_id=client_id))
[docs] def refresh_generate(self, grant_type=None, data=None, scopes=None, user_id=None, client_id=None): """ :return: A new refresh token :rtype: str """ return self.json_serialize(dict(type='refresh_token', grant_type=grant_type, user_id=user_id, data=data, scopes=scopes, client_id=client_id))
[docs]class URandomTokenGenerator(TokenGenerator): """ Create a token using ``os.urandom()``. """ def __init__(self, length=40): self.token_length = length TokenGenerator.__init__(self)
[docs] def generate(self, grant_type=None, data=None, scopes=None, user_id=None, client_id=None): """ :return: A new token :rtype: str """ random_data = os.urandom(100) hash_gen = hashlib.new("sha512") hash_gen.update(random_data) return hash_gen.hexdigest()[:self.token_length]
refresh_generate = generate
[docs]class Uuid4TokenGenerator(TokenGenerator): """ Generate a token using uuid4. """
[docs] def generate(self, grant_type=None, data=None, scopes=None, user_id=None, client_id=None): """ :return: A new token :rtype: str """ return str(uuid.uuid4())
refresh_generate = generate