Source code for pytds.login

# vim: set fileencoding=utf8 :
"""
.. module:: login
   :platform: Unix, Windows, MacOSX
   :synopsis: Login classes

.. moduleauthor:: Mikhail Denisenko <denisenkom@gmail.com>
"""
import socket
import logging

logger = logging.getLogger(__name__)


[docs]class SspiAuth(object): """ SSPI authentication :platform: Windows Required parameters are server_name and port or spn :keyword user_name: User name, if not provided current security context will be used :type user_name: str :keyword password: User password, if not provided current security context will be used :type password: str :keyword server_name: MSSQL server host name :type server_name: str :keyword port: MSSQL server port :type port: int :keyword spn: Service name :type spn: str """ def __init__(self, user_name='', password='', server_name='', port=None, spn=None): from . import sspi # parse username/password informations if '\\' in user_name: domain, user_name = user_name.split('\\') else: domain = '' if domain and user_name: self._identity = sspi.make_winnt_identity( domain, user_name, password) else: self._identity = None # build SPN if spn: self._sname = spn else: primary_host_name, _, _ = socket.gethostbyname_ex(server_name) self._sname = 'MSSQLSvc/{0}:{1}'.format(primary_host_name, port) # using Negotiate system will use proper protocol (either NTLM or Kerberos) self._cred = sspi.SspiCredentials( package='Negotiate', use=sspi.SECPKG_CRED_OUTBOUND, identity=self._identity) self._flags = sspi.ISC_REQ_CONFIDENTIALITY | sspi.ISC_REQ_REPLAY_DETECT | sspi.ISC_REQ_CONNECTION def create_packet(self): from . import sspi import ctypes buf = ctypes.create_string_buffer(4096) self._ctx, status, bufs = self._cred.create_context( flags=self._flags, byte_ordering='network', target_name=self._sname, output_buffers=[(sspi.SECBUFFER_TOKEN, buf)]) if status == sspi.Status.SEC_I_COMPLETE_AND_CONTINUE: self._ctx.complete_auth_token(bufs) return bufs[0][1] def handle_next(self, packet): from . import sspi import ctypes buf = ctypes.create_string_buffer(4096) status, buffers = self._ctx.next( flags=self._flags, byte_ordering='network', target_name=self._sname, input_buffers=[(sspi.SECBUFFER_TOKEN, packet)], output_buffers=[(sspi.SECBUFFER_TOKEN, buf)]) return buffers[0][1] def close(self): self._ctx.close() self._cred.close()
[docs]class NtlmAuth(object): """ NTLM authentication, uses Python implementation :param user_name: User name :type user_name: str :param password: User password :type password: str """ def __init__(self, user_name, password): if '\\' in user_name: self._domain, self._user = user_name.split('\\', 1) else: self._domain = 'workspace' self._user = user_name self._password = password def create_packet(self): from . import ntlm return ntlm.create_NTLM_NEGOTIATE_MESSAGE_raw(socket.gethostname(), self._domain) def handle_next(self, packet): from . import ntlm nonce, flags = ntlm.parse_NTLM_CHALLENGE_MESSAGE_raw(packet) return ntlm.create_NTLM_AUTHENTICATE_MESSAGE_raw(nonce, self._user, self._domain, self._password, flags) def close(self): pass