# 2016-04-08 Cornelius Kölbel <cornelius@privacyidea.org>
# Avoid consecutive if-statements
# 2014-10-03 fix getUsername function
# Cornelius Kölbel <cornelius@privcyidea.org>
#
# May, 08 2014 Cornelius Kölbel
# http://www.privacyidea.org
#
# product: LinOTP2
# module: useridresolver
# tool: PasswdIdResolver
# edition: Comunity Edition
#
# Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
# License: AGPLv3
# contact: http://www.linotp.org
# http://www.lsexperts.de
# linotp@lsexperts.de
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
Description: This file is part of the privacyidea service
This module implements the communication interface
for resolvin user info to the /etc/passwd user base
Dependencies: -
"""
import re
import os
import logging
import codecs
from passlib.context import CryptContext
from privacyidea.lib.utils import convert_column_to_unicode
from .UserIdResolver import UserIdResolver
log = logging.getLogger(__name__)
ENCODING = "utf-8"
crypt_ctx = CryptContext(schemes=["sha512_crypt", "sha256_crypt", "bcrypt"])
def tokenise(r):
def _(s):
ret = None
st = s.strip()
m = re.match("^" + r, st)
if m:
ret = (st[:m.end()].strip(), st[m.end():].strip())
return ret
return _
[docs]
class IdResolver (UserIdResolver):
fields = {"username": 1, "userid": 1,
"description": 0,
"phone": 0, "mobile": 0, "email": 0,
"givenname": 0, "surname": 0, "gender": 0
}
search_fields = {"username": "text",
"userid": "numeric",
"description": "text",
"email": "text"
}
search_field_indices = {"username": 0,
"cryptpass": 1,
"userid": 2,
"description": 4,
"email": 4,
}
[docs]
@staticmethod
def setup(config=None, cache_dir=None):
"""
this setup hook is triggered, when the server
starts to serve the first request
:param config: the privacyidea config
:type config: the privacyidea config dict
"""
log.info("Setting up the PasswdResolver")
return
def __init__(self):
"""
simple constructor
"""
self.name = "etc-passwd"
self.file_name = ""
self.name = "P"
self.name_dict = {}
self.description_dict = {}
self.reverse_dict = {}
self.pass_dict = {}
self.office_phone_dict = {}
self.home_phone_dict = {}
self.surname_dict = {}
self.given_name_dict = {}
self.email_dict = {}
[docs]
def load_file(self):
"""
Loads the data of the file initially.
if the self.fileName is empty, it loads /etc/passwd.
Empty lines are ignored.
"""
if not self.file_name:
self.file_name = "/etc/passwd"
log.info(f'loading users from file {self.file_name!s} from within {os.getcwd()!r}')
with codecs.open(self.file_name, "r", ENCODING) as file_handle:
ID = self.search_field_indices["userid"]
NAME = self.search_field_indices["username"]
PASS = self.search_field_indices["cryptpass"]
DESCRIPTION = self.search_field_indices["description"]
for line in file_handle:
line = line.strip()
if not line:
# continue on an empty line
continue
fields = line.split(":", 7)
self.name_dict[fields[NAME]] = fields[ID]
# for speed reason - build a revers lookup
self.reverse_dict[fields[ID]] = fields[NAME]
# for full info store the line
self.description_dict[fields[ID]] = fields
# store the crypted password
self.pass_dict[fields[ID]] = fields[PASS]
# store surname, givenname and phones
descriptions = fields[DESCRIPTION].split(",")
name = descriptions[0]
names = name.split(' ', 1)
self.given_name_dict[fields[ID]] = names[0]
self.surname_dict[fields[ID]] = ""
self.office_phone_dict[fields[ID]] = ""
self.home_phone_dict[fields[ID]] = ""
self.email_dict[fields[ID]] = ""
if len(names) >= 2:
self.surname_dict[fields[ID]] = names[1]
if len(descriptions) >= 4:
self.office_phone_dict[fields[ID]] = descriptions[2]
self.home_phone_dict[fields[ID]] = descriptions[3]
if len(descriptions) >= 5:
for field in descriptions[4:]:
# very basic e-mail regex
email_match = re.search(r'.+@.+\..+', field)
if email_match:
self.email_dict[fields[ID]] = email_match.group(0)
[docs]
def checkPass(self, uid, password):
"""
This function checks the password for a given uid.
returns true in case of success
false if password does not match
We do not support shadow passwords. so the seconds column
of the passwd file needs to contain the encrypted password
If the password is a unicode object, it is encoded according
to ENCODING first.
:param uid: The uid of the user
:type uid: int
:param password: The password in cleartext
:type password: sting
:return: True or False
:rtype: bool
"""
log.info(f"checking password for user uid {uid!s}")
cryptedpasswd = self.pass_dict.get(uid)
log.debug(f"We found the encrypted pass {cryptedpasswd!s} for uid {uid!s}")
if cryptedpasswd:
if cryptedpasswd in ['x', '*']:
err = "Sorry, currently no support for shadow passwords"
log.error(f"{err!s}")
raise NotImplementedError(err)
if crypt_ctx.verify(password, cryptedpasswd):
log.info(f"successfully authenticated user uid {uid!s}")
return True
else:
log.warning(f"user uid {uid!s} failed to authenticate")
return False
else:
log.warning("Failed to verify password. No encrypted password "
"found in file")
return False
[docs]
def get_user_info(self, user_id: int or str, attributes: list[str] = None, no_passwd: bool = False) -> dict:
"""
get some info about the user
as we only have the loginId, we have to traverse the dict for the value
:param user_id: the to be searched user
:param attributes: list of attribute names to be returned for the user. If None or an empty list, all
attributes are returned.
:param no_passwd: return no password
:return: dict of user info
"""
ret = {}
if user_id in self.reverse_dict:
fields = self.description_dict.get(user_id, [])
for key in self.search_field_indices:
if (no_passwd and key == "cryptpass") or (attributes and key not in attributes):
continue
index = self.search_field_indices[key]
if index < len(fields):
ret[key] = fields[index]
if not attributes or "givenname" in attributes:
ret['givenname'] = self.given_name_dict.get(user_id)
if not attributes or "surname" in attributes:
ret['surname'] = self.surname_dict.get(user_id)
if not attributes or "phone" in attributes:
ret['phone'] = self.home_phone_dict.get(user_id)
if not attributes or "mobile" in attributes:
ret['mobile'] = self.office_phone_dict.get(user_id)
if not attributes or "email" in attributes:
ret['email'] = self.email_dict.get(user_id)
else:
log.debug("User with user ID %s could not be found.", user_id)
return ret
[docs]
def get_available_info_keys(self) -> list[str]:
"""
This function returns a list of known privacyIDEA user attributes which can be used, e.g. for getUserList or
get_user_info
:return: list of possible keys for searching users
"""
attributes = ["givenname", "surname", "phone", "mobile", "email"]
attributes.extend(self.search_field_indices.keys())
return attributes
[docs]
def getUsername(self, userId):
'''
Returns the username/loginname for a given userid
:param userid: The userid in this resolver
:type userid: string
:return: username
:rtype: str
'''
fields = self.description_dict.get(userId, [])
index = self.search_field_indices["username"]
username = ""
if index < len(fields):
username = fields[index]
else:
log.debug("Username for user ID %s could not be found.", userId)
return username
[docs]
def getUserId(self, LoginName):
"""
search the user id from the login name
:param LoginName: the login of the user (as unicode)
:return: the userId
:rtype: str
"""
# We do not encode the LoginName anymore, as we are
# storing unicode in nameDict now.
if LoginName in self.name_dict:
return convert_column_to_unicode(self.name_dict.get(LoginName, ""))
else:
return ""
[docs]
def get_search_fields(self, search_dict=None):
"""
show, which search fields this userIdResolver supports
TODO: implementation is not completed
:param search_dict: fields, which can be queried
:type search_dict: dict
:return: dict of all searchFields
:rtype: dict
"""
if search_dict is not None:
for search in search_dict:
pattern = search_dict[search]
log.debug("searching for %s:%s",
search, pattern)
return self.search_fields
[docs]
def getUserList(self, search_dict: dict = None, attributes: list[str] = None) -> list[dict]:
"""
get a list of all users matching the search criteria of the searchdict
:param search_dict: dict of search expressions
:param attributes: list of attributes to be returned for each user
"""
ret = []
search_dict = search_dict or {}
# first check if the searches are in the searchDict
for _id, line in self.description_dict.items():
ok = True
for search in search_dict:
if search not in self.search_fields:
ok = False
break
pattern = search_dict.get(search)
log.debug("searching for %s:%s", search, pattern)
if search in ["username", "description", "email"]:
ok = self.check_attribute(line, pattern, search)
elif search == "userid":
ok = self.check_user_id(line, pattern)
if ok is not True:
break
if ok is True:
uid_index = self.search_field_indices["userid"]
uid = ""
if uid_index < len(line):
uid = line[self.search_field_indices["userid"]]
info = self.get_user_info(uid, attributes=attributes, no_passwd=True)
ret.append(info)
return ret
[docs]
def check_attribute(self, line: list[str], pattern: str, attribute_name: str) -> bool:
"""
Checks if a given attribute matches a pattern.
:param line: the list of user attributes
:param pattern: the pattern to match
:param attribute_name: the name of the attribute to check
:return: True if the attribute matches the pattern, False otherwise
"""
index = self.search_field_indices.get(attribute_name)
if index is None:
log.debug("Unknown search field: %s", attribute_name)
return False
attribute = ""
if index < len(line):
attribute = line[index]
ret = self._string_match(attribute, pattern)
return ret
@staticmethod
def _string_match(value, pattern):
"""
internal function to match strings.
:param value: The string to match
:param pattern: the pattern
:return: If the sting matches
:rtype: bool
"""
ret = False
e = s = ""
string = value.lower()
pattern = pattern.lower()
if pattern.startswith("*"):
e = "e"
pattern = pattern[1:]
if pattern.endswith("*"):
s = "s"
pattern = pattern[:-1]
if e == "e" and s == "s" and string.find(pattern) != -1:
return True
elif e == "e" and string.endswith(pattern):
return True
elif s == "s" and string.startswith(pattern):
return True
elif string == pattern:
return True
return ret
[docs]
def check_user_id(self, line, pattern):
"""
Check if a userid matches a pattern.
A pattern can be "=1000", ">=1000",
"<2000" or "between 1000,2000".
:param line: the dictionary of a user
:type line: dict
:param pattern: match pattern with <, <=...
:type pattern: string
:return: True or False
:rtype: bool
"""
ret = False
try:
cUserId = int(line[self.search_field_indices["userid"]])
except ValueError: # pragma: no cover
return ret
(op, val) = tokenise(">=|<=|>|<|=|between")(pattern)
if op == "between":
(lVal, hVal) = val.split(",", 2)
try:
ilVal = int(lVal.strip())
ihVal = int(hVal.strip())
if ihVal < ilVal:
v = ihVal
ihVal = ilVal
ilVal = v
except ValueError: # pragma: no cover
return ret
if ilVal <= cUserId <= ihVal:
ret = True
else:
try:
ival = int(val)
except ValueError: # pragma: no cover
return ret
if op == "=" and cUserId == ival:
ret = True
elif op == ">" and cUserId > ival:
ret = True
elif op == ">=" and cUserId >= ival:
ret = True
elif op == "<" and cUserId < ival:
ret = True
elif op == "<=" and cUserId <= ival:
ret = True
return ret
#############################################################
# server info methods
#############################################################
[docs]
def getResolverId(self):
"""
return the resolver identifier string, which in fact is
filename, where it points to.
"""
return self.file_name
[docs]
@staticmethod
def getResolverClassType():
return 'passwdresolver'
[docs]
@staticmethod
def getResolverType():
return IdResolver.getResolverClassType()
[docs]
@classmethod
def getResolverClassDescriptor(cls):
'''
return the descriptor of the resolver, which is
- the class name and
- the config description
:return: resolver description dict
:rtype: dict
'''
descriptor = {}
typ = cls.getResolverClassType()
descriptor['clazz'] = "useridresolver.PasswdIdResolver.IdResolver"
descriptor['config'] = {'fileName': 'string'}
return {typ: descriptor}
[docs]
@staticmethod
def getResolverDescriptor():
return IdResolver.getResolverClassDescriptor()
[docs]
def loadConfig(self, config):
""" loadConfig(configDict)
The UserIdResolver could be configured
from the pylons app config - here
this could be the passwd file ,
whether it is /etc/passwd or /etc/shadow
"""
self.file_name = config.get("fileName", config.get("filename"))
self.load_file()
return self