认证、权限、频率

认证

登录接口

# modles.py
from django.db import models

class User(models.Model):
    username = models.CharField(max_length=32)
    password = models.CharField(max_length=32)
    user_type = models.IntegerField(choices=((1, '超级用户'), (2, '普通用户'), (3, '二笔用户')))

class UserToken(models.Model):
    user = models.OneToOneField(to='User', on_delete=models.CASCADE)
    token = models.CharField(max_length=64)

# serializer.py
from rest_framework import serializers
from app01 import models
class UserSerializer(serializers.ModelSerializer):
    class Meta:
        model = models.User
        fields = '__all__'

# views.py
import uuid
from app01 import models
from app01 import serializer
from rest_framework.viewsets import ViewSetMixin
from rest_framework.generics import CreateAPIView
from rest_framework.decorators import action
from rest_framework.response import Response

class UserView(ViewSetMixin, CreateAPIView):
    queryset = models.User.objects.all()
    serializer_class = serializer.UserSerializer

    @action(methods=['POST'], detail=False)
    def login(self, request):
        username = request.data.get('username')
        password = request.data.get('password')
        user = models.User.objects.filter(username=username, password=password).first()
        token = uuid.uuid4()
        # 根据user去查询,如果能查到,就修改token,如果查不到,就新增一条
        models.UserToken.objects.update_or_create(defaults={'token': token}, user=user)
        if user:
            return Response({'msg': '登录成功', "token": token})
        else:
            return Response({'status': 101, 'msg': '用户名或密码错误'})

# urls.py
from django.contrib import admin
from django.urls import path, include
from rest_framework.routers import SimpleRouter
from app01 import views

router = SimpleRouter()
router.register('user', views.UserView)
urlpatterns = [
    path('admin/', admin.site.urls),
    path('api/', include(router.urls)),
]

认证的使用

认证类的编写
# 认证类:用来校验用户是否登录,如果登录了,继续往下走,如果没有登录,直接返回
# 编写步骤:
  1.写一个类继承BaseAuthentication,
  2.重写authenticate在方法中做校验校验是否登录返回两个值没有登录抛异常
  3.全局配置局部配置
    全局配置配置文件中
        REST_FRAMEWORK={
        "DEFAULT_AUTHENTICATION_CLASSES":["app01.auth.LoginAuth",]
        }
    局部配置在视图类中
        class UserView(ViewSet):
          authentication_classes = [LoginAuth]

    局部禁用
         class UserView(ViewSet):
            authentication_classes = []

# 认证类中返回的两个变量
   返回的第一个给了request.user就是当前登录用户
   返回的第二个给了request.auth就是token串
# 在 app01 下创建 auth.py
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from app01 import models

class LoginAuth(BaseAuthentication):
    def authenticate(self, request):
        token = request.query_params.get('token')
        user_token = models.UserToken.objects.filter(token=token).first()
        if user_token:
            return user_token.user, token
        else:
            raise AuthenticationFailed('您没有登录')
认证类的使用
# 全局使用 在 settings.py 中配置 ---> 所有接口都需要认证
  REST_FRAMEWORK={
      "DEFAULT_AUTHENTICATION_CLASSES":["app01.auth.LoginAuth",]
  }

# 局部禁用 在视图类中添加(如登录的时候就不能要求认证)
  authentication_classes = []

# 局部使用
	from app01 import auth
  authentication_classes = [auth.LoginAuth,]

image-20220405161826558

image-20220405162315439

权限

权限类的编写

# 登录成功 ---> 所有必须登录能访问 ---> 每个视图类上加认证类
# 用户是普通用户 ---> 普通用户可以访问所有和单条
# 普通管理员和超级用户可以操作所有,除了访问单条和所有的那个视图类,加上认证类
# books:查看一条,和所有
# booksdetail 路由下有:删除,新增,修改 ---> 权限类加在这里

# book 5个接口,必须登录才能访问
# 5个接口分成了俩视图写:
    BookView获取所有获取单条
    BookDetailView删除修改新增
    这俩视图都需要登录authentication_classes = [LoginAuth, ]
    BookView只要登陆就可以操作
    BookDetailView必须有权限才能加了一个权限permission_classes = [UserPermission, ]

# 跟写认证类步骤差不多
第一步写一个类继承BasePermission重写has_permission判断如果有权限返回True如果没有权限返回False
第二步局部使用和全局使用
    局部使用
    class BookDetailView(GenericViewSet, CreateModelMixin, DestroyModelMixin, UpdateModelMixin):
      permission_classes = [UserPermission, ]

    全局使用
      REST_FRAMEWORK={
        "DEFAULT_PERMISSION_CLASSES":["app01.auth.UserPermission",]
      }
# 在 app01 下创建 auth.py
from app01 import models
from rest_framework.permissions import BasePermission

class MyPermission(BasePermission):
    def has_permission(self, request, view):
        self.message = "您是: %s,权限不足" % request.user.get_user_type_display()  # 没有权限的提示信息
        user_type = request.user.user_type
				# 如果有权限,返回True,没有权限返回False
        # 权限类,在认证类之后,request.user有了当前登录用户
        if user_type > 2:  # 是 1 和 2 就没有权限
            return False
        else:
            return True

权限类的使用

# 局部使用(在视图类中加)
  from app01 import auth
	permission_classes = [auth.MyPermission,]
# 全局使用(在配置文件中配置)
  REST_FRAMEWORK={
      "DEFAULT_PERMISSION_CLASSES":["app01.auth.MyPermission",],
  }

使用一个普通用户登录的 token 访问需要权限的视图类就会失败 image-20220405163147257

频率

频率类的编写

# 认证,权限都通过以后,现在某个接口的访问频率 ---> 一般根据ip或者用户限制

# 使用步骤
  第一步写一个类继承SimpleRateThrottle重写类属性scope和get_cache_key方法
    get_cache_key返回什么就以什么做现在scope配置文件中要用

  第二步在配置文件中配置
    'DEFAULT_THROTTLE_RATES': {
        'minute_3': '3/m'  # minute_3是scope的字符串,一分钟访问3次
        'minute_5''5/m'
    }

  局部使用 ---> 视图类中
  class BookView(GenericViewSet, ListModelMixin, RetrieveModelMixin):
    throttle_classes = [IPThrottle]
  全局使用--配置文件中
     'DEFAULT_THROTTLE_CLASSES': (  # 全局配置频率类
        'app01.auth.IPThrottle'
     ),
# 在 app01 下创建 auth.py
# 以客户端IP限制
class IPThrottle(SimpleRateThrottle):
    scope = 'min_throttle'
    def get_cache_key(self, request, view):
        return self.get_ident(request)  # 真实的客户端IP
        # return request.META.get('REMOTE_ADDR')  # 有代理的IP地址


# 以用户限制
class UserThrottle(SimpleRateThrottle):
    scope = 'user_throttle'
    def get_cache_key(self, request, view):
        return request.user.id

频率类的使用

# 全局使用 settings.py
## 以IP地址限制
REST_FRAMEWORK = {
    "DEFAULT_THROTTLE_CLASSES": ["app01.auth.IPThrottle",],
    "DEFAULT_THROTTLE_RATES": {
        'min_throttle': '3/m',  # 每分钟三次的访问
    }
}

## 以用户限制
REST_FRAMEWORK = {
    "DEFAULT_THROTTLE_CLASSES": ["app01.auth.UserThrottle",],
    "DEFAULT_THROTTLE_RATES": {
        'user_throttle': '5/m',,  # 每分钟五次的访问
    }
}

# 局部使用 视图类中写 throttle_classes 属性
  from app01 import auth
  # 以IP限制
  throttle_classes = [auth.IPThrottle]
  # 以用户限制
  throttle_classes = [auth.UserThrottle]
  # settings.py 文件中的 "DEFAULT_THROTTLE_RATES" 还是需要设置

综合使用

models.py

from django.db import models


class User(models.Model):
    username = models.CharField(max_length=32)
    password = models.CharField(max_length=32)
    user_type = models.IntegerField(choices=((1, "超级用户"), (2, "管理用户"), (3, "普通用户")))


class UserToken(models.Model):
    user = models.OneToOneField(to="User", on_delete=models.CASCADE)
    token = models.CharField(max_length=64)


class Books(models.Model):
    name = models.CharField(max_length=32)
    price = models.DecimalField(max_digits=8, decimal_places=2)
    publish = models.ForeignKey(to="Publish", on_delete=models.CASCADE)
    authors = models.ManyToManyField(to="Author")

    def __str__(self):
        return self.name

    @property
    def publish_list(self):
        return {'name': self.publish.name, 'city': self.publish.city, 'email': self.publish.email}

    @property
    def authors_list(self):
        return [{'name': author.name, 'age': author.age, 'phone': author.author_detail.tel,
                 'address': author.author_detail.addr} for author in self.authors.all()]


class Publish(models.Model):
    name = models.CharField(max_length=32)
    city = models.CharField(max_length=32)
    email = models.EmailField()

    def __str__(self):
        return self.name


class Author(models.Model):
    name = models.CharField(max_length=32)
    age = models.IntegerField()
    author_detail = models.OneToOneField(to="AuthorDetail", on_delete=models.CASCADE)

    def __str__(self):
        return self.name

    def tel(self):
        return self.author_detail.tel

    def addr(self):
        return self.author_detail.addr

    @property
    def detail_info(self):
        return {'phone': self.author_detail.tel, 'address': self.author_detail.addr}


class AuthorDetail(models.Model):
    tel = models.CharField(max_length=32)
    addr = models.CharField(max_length=32)

    def __str__(self):
        return self.addr

serializer.py

from rest_framework import serializers
from app01 import models


class UserSerializer(serializers.ModelSerializer):
    class Meta:
        model = models.User
        fields = "__all__"


class BookSerializer(serializers.ModelSerializer):
    class Meta:
        model = models.Books
        fields = "__all__"
        extra_kwargs = {
            "publish": {'write_only': True},
            "authors": {'write_only': True},
        }

    publish_list = serializers.DictField(read_only=True)
    authors_list = serializers.ListField(read_only=True)


class PublishSerializer(serializers.ModelSerializer):
    class Meta:
        model = models.Publish
        fields = "__all__"


class AuthorSerializer(serializers.ModelSerializer):
    class Meta:
        model = models.Author
        fields = ["id", "name", "age", "tel", "addr"]

    tel = serializers.CharField()
    addr = serializers.CharField()

    # 重写了 create 和 update 方法 在插入作者的时候可以将详情以前插入
    def create(self, validated_data):
        detail = models.AuthorDetail.objects.create(tel=validated_data.get("tel"), addr=validated_data.get("addr"))
        author = models.Author.objects.create(author_detail=detail, name=validated_data.get('name'),
                                              age=validated_data.get('age'))
        return author

    def update(self, instance, validated_data):
        name = validated_data.get('name')
        age = validated_data.get('age')
        tel = validated_data.get('tel')
        addr = validated_data.get('addr')
        instance.author_detail.tel = tel
        instance.author_detail.addr = addr
        instance.name = name
        instance.age = age
        instance.save()
        return instance

auth.py

from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from app01 import models
from rest_framework.permissions import BasePermission
from rest_framework.throttling import BaseThrottle, SimpleRateThrottle


class LoginAuth(BaseAuthentication):
    def authenticate(self, request):
        token = request.query_params.get('token')
        user_token = models.UserToken.objects.filter(token=token).first()
        if user_token:
            return user_token.user, token
        else:
            raise AuthenticationFailed('您没有登录')


class MyPermission(BasePermission):
    def has_permission(self, request, view):
        self.message = "您是: %s,权限不足" % request.user.get_user_type_display()
        user_type = request.user.user_type
        if user_type > 2:  # 不是1 2 就是3 没有权限
            return False
        else:
            return True


# 以客户端IP
class IPThrottle(SimpleRateThrottle):
    scope = 'min_throttle'

    def get_cache_key(self, request, view):
        return self.get_ident(request)  # 真实的客户端IP
        # return request.META.get('REMOTE_ADDR')  # 客户端ip


# 以用户限制
class UserThrottle(SimpleRateThrottle):
    scope = 'user_throttle'

    def get_cache_key(self, request, view):
        return request.user.id

views.py

import uuid
from rest_framework.response import Response
from app01 import models, serializer, auth
from rest_framework.viewsets import ViewSetMixin, ModelViewSet
from rest_framework.generics import CreateAPIView
from rest_framework.decorators import action


class UserView(ViewSetMixin, CreateAPIView):
    authentication_classes = []  # 登录接口禁用认证类
    queryset = models.User.objects.all()
    serializer_class = serializer.UserSerializer

    @action(methods=['POST'], detail=False)
    def login(self, request):
        username = request.data.get('username')
        password = request.data.get('password')
        user = models.User.objects.filter(username=username, password=password).first()
        token = uuid.uuid4()
        models.UserToken.objects.update_or_create(user=user, defaults={'token': token})
        if user:
            return Response({'msg': '登录成功', 'token': token})
        else:
            return Response({'status': 101, 'msg': '用户名或密码错误'})


class BookView(ModelViewSet):
    queryset = models.Books.objects.all()
    serializer_class = serializer.BookSerializer


class PublishView(ModelViewSet):
    permission_classes = [auth.MyPermission, ]  # 有管理员权限才可以操作该接口
    queryset = models.Publish.objects.all()
    serializer_class = serializer.PublishSerializer


class AuthorView(ModelViewSet):
    queryset = models.Author.objects.all()
    serializer_class = serializer.AuthorSerializer

urls.py

from django.contrib import admin
from django.urls import path, include
from app01 import views
from rest_framework.routers import SimpleRouter

router = SimpleRouter()
router.register('user', views.UserView)
router.register('books', views.BookView)
router.register('publish', views.PublishView)
router.register('authors', views.AuthorView)
urlpatterns = [
    path('admin/', admin.site.urls),
    path('api/', include(router.urls)),
]

settings.py

# drf配置
REST_FRAMEWORK = {
    "DEFAULT_AUTHENTICATION_CLASSES": ["app01.auth.LoginAuth", ],
    "DEFAULT_THROTTLE_CLASSES": ["app01.auth.IPThrottle",],  # 使用的是以IP为限制
    # "DEFAULT_THROTTLE_CLASSES": ["app01.auth.UserThrottle",],
    "DEFAULT_THROTTLE_RATES": {
        'min_throttle': '3/m',  # 每分钟只允许访问三次
        # 'user_throttle': '5/m',
    }
}