# coding=utf-8 """ @project: MaxKB @Author:虎虎 @file: login.py @date:2025/4/14 11:08 @desc: """ import base64 import datetime import json from captcha.image import ImageCaptcha from django.core import signing from django.core.cache import cache from django.db.models import QuerySet from django.utils.translation import gettext_lazy as _ from rest_framework import serializers from common.constants.authentication_type import AuthenticationType from common.constants.cache_version import Cache_Version from common.database_model_manage.database_model_manage import DatabaseModelManage from common.exception.app_exception import AppApiException from common.utils.common import password_encrypt, get_random_chars from maxkb.const import CONFIG from users.models import User class LoginRequest(serializers.Serializer): username = serializers.CharField(required=True, max_length=64, help_text=_("Username"), label=_("Username")) password = serializers.CharField(required=True, max_length=128, label=_("Password")) captcha = serializers.CharField(required=False, max_length=64, label=_('captcha'), allow_null=True, allow_blank=True) system_version, system_get_key = Cache_Version.SYSTEM.value class LoginResponse(serializers.Serializer): """ 登录响应对象 """ token = serializers.CharField(required=True, label=_("token")) def record_login_fail(username: str, expire: int = 3600): """记录登录失败次数""" if not username: return fail_key = system_get_key(f'system_{username}') fail_count = cache.get(fail_key, version=system_version) if fail_count is None: cache.set(fail_key, 1, timeout=expire, version=system_version) else: cache.incr(fail_key, 1, version=system_version) class LoginSerializer(serializers.Serializer): @staticmethod def login(instance): username = instance.get("username", "") try: LoginRequest(data=instance).is_valid(raise_exception=True) except Exception as e: record_login_fail(username) raise e auth_setting_model = DatabaseModelManage.get_model('auth_setting') # 默认配置 auth_setting = {} if auth_setting_model: setting_obj = auth_setting_model.objects.filter(param_key='auth_setting').first() if setting_obj: try: auth_setting = json.loads(setting_obj.param_value) or {} except Exception: auth_setting = {} max_attempts = auth_setting.get("max_attempts", 1) password = instance.get("password") captcha = instance.get("captcha", "") # 判断是否需要验证码 need_captcha = False if max_attempts == -1: need_captcha = False elif max_attempts > 0: fail_count = cache.get(system_get_key(f'system_{username}'), version=system_version) or 0 need_captcha = fail_count >= max_attempts if need_captcha: if not captcha: raise AppApiException(1005, _("Captcha is required")) captcha_cache = cache.get( Cache_Version.CAPTCHA.get_key(captcha=f"system_{username}"), version=Cache_Version.CAPTCHA.get_version() ) if captcha_cache is None or captcha.lower() != captcha_cache: raise AppApiException(1005, _("Captcha code error or expiration")) user = QuerySet(User).filter(username=username, password=password_encrypt(password)).first() if user is None: record_login_fail(username) raise AppApiException(500, _('The username or password is incorrect')) if not user.is_active: record_login_fail(username) raise AppApiException(1005, _("The user has been disabled, please contact the administrator!")) cache.delete(system_get_key(f'system_{username}'), version=system_version) token = signing.dumps({'username': user.username, 'id': str(user.id), 'email': user.email, 'type': AuthenticationType.SYSTEM_USER.value}) version, get_key = Cache_Version.TOKEN.value timeout = CONFIG.get_session_timeout() cache.set(get_key(token), user, timeout=timeout, version=version) return {'token': token} class CaptchaResponse(serializers.Serializer): """ 登录响应对象 """ captcha = serializers.CharField(required=True, label=_("captcha")) class CaptchaSerializer(serializers.Serializer): @staticmethod def generate(username: str, type: str = 'system'): chars = get_random_chars() image = ImageCaptcha() data = image.generate(chars) captcha = base64.b64encode(data.getbuffer()) cache.set(Cache_Version.CAPTCHA.get_key(captcha=f'{type}_{username}'), chars.lower(), timeout=300, version=Cache_Version.CAPTCHA.get_version()) return {'captcha': 'data:image/png;base64,' + captcha.decode()}