Innovenergy_trunk/firmware/Cerbo_Release/CerboReleaseFiles/pika-0.13.1/pika/credentials.py

121 lines
4.2 KiB
Python

"""The credentials classes are used to encapsulate all authentication
information for the :class:`~pika.connection.ConnectionParameters` class.
The :class:`~pika.credentials.PlainCredentials` class returns the properly
formatted username and password to the :class:`~pika.connection.Connection`.
To authenticate with Pika, create a :class:`~pika.credentials.PlainCredentials`
object passing in the username and password and pass it as the credentials
argument value to the :class:`~pika.connection.ConnectionParameters` object.
If you are using :class:`~pika.connection.URLParameters` you do not need a
credentials object, one will automatically be created for you.
If you are looking to implement SSL certificate style authentication, you would
extend the :class:`~pika.credentials.ExternalCredentials` class implementing
the required behavior.
"""
from .compat import as_bytes
import logging
LOGGER = logging.getLogger(__name__)
class PlainCredentials(object):
"""A credentials object for the default authentication methodology with
RabbitMQ.
If you do not pass in credentials to the ConnectionParameters object, it
will create credentials for 'guest' with the password of 'guest'.
If you pass True to erase_on_connect the credentials will not be stored
in memory after the Connection attempt has been made.
:param str username: The username to authenticate with
:param str password: The password to authenticate with
:param bool erase_on_connect: erase credentials on connect.
"""
TYPE = 'PLAIN'
def __init__(self, username, password, erase_on_connect=False):
"""Create a new instance of PlainCredentials
:param str username: The username to authenticate with
:param str password: The password to authenticate with
:param bool erase_on_connect: erase credentials on connect.
"""
self.username = username
self.password = password
self.erase_on_connect = erase_on_connect
def __eq__(self, other):
return (isinstance(other, PlainCredentials) and
other.username == self.username and
other.password == self.password and
other.erase_on_connect == self.erase_on_connect)
def __ne__(self, other):
return not self == other
def response_for(self, start):
"""Validate that this type of authentication is supported
:param spec.Connection.Start start: Connection.Start method
:rtype: tuple(str|None, str|None)
"""
if as_bytes(PlainCredentials.TYPE) not in\
as_bytes(start.mechanisms).split():
return None, None
return (PlainCredentials.TYPE,
b'\0' + as_bytes(self.username) +
b'\0' + as_bytes(self.password))
def erase_credentials(self):
"""Called by Connection when it no longer needs the credentials"""
if self.erase_on_connect:
LOGGER.info("Erasing stored credential values")
self.username = None
self.password = None
class ExternalCredentials(object):
"""The ExternalCredentials class allows the connection to use EXTERNAL
authentication, generally with a client SSL certificate.
"""
TYPE = 'EXTERNAL'
def __init__(self):
"""Create a new instance of ExternalCredentials"""
self.erase_on_connect = False
def __eq__(self, other):
return (isinstance(other, ExternalCredentials) and
other.erase_on_connect == self.erase_on_connect)
def __ne__(self, other):
return not self == other
def response_for(self, start):
"""Validate that this type of authentication is supported
:param spec.Connection.Start start: Connection.Start method
:rtype: tuple(str or None, str or None)
"""
if as_bytes(ExternalCredentials.TYPE) not in\
as_bytes(start.mechanisms).split():
return None, None
return ExternalCredentials.TYPE, b''
def erase_credentials(self):
"""Called by Connection when it no longer needs the credentials"""
LOGGER.debug('Not supported by this Credentials type')
# Append custom credential types to this list for validation support
VALID_TYPES = [PlainCredentials, ExternalCredentials]