提交 1dce8602 作者: 王静

zzsnkgpltV1.0

上级
import os
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'application.settings')
django.setup()
from channels.auth import AuthMiddlewareStack
from channels.http import AsgiHandler
from channels.routing import ProtocolTypeRouter, URLRouter
websocket_urlpatterns = [
]
application = ProtocolTypeRouter({
"http": AsgiHandler(),
# Just HTTP for now. (We can add other protocols later.)
'websocket': AuthMiddlewareStack(URLRouter(websocket_urlpatterns, )),
})
import os
import django
from celery import Celery, platforms
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', "application.settings")
django.setup()
app = Celery(f"dvadmin")
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
platforms.C_FORCE_ROOT = True
"""application URL Configuration
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/1.11/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: url(r'^$', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: url(r'^$', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.conf.urls import url, include
2. Add a URL to urlpatterns: url(r'^blog/', include('blog.urls'))
"""
from captcha.conf import settings as ca_settings
from captcha.helpers import captcha_image_url, captcha_audio_url
from captcha.models import CaptchaStore
from django.conf import settings
from django.urls import re_path, include
from django.views.static import serve
from rest_framework.views import APIView
from apps.vadmin.op_drf.response import SuccessResponse
from rest_framework import permissions
from drf_yasg.views import get_schema_view
from drf_yasg import openapi
schema_view = get_schema_view(
openapi.Info(
title="Snippets API",
default_version='v1',
description="Test description",
terms_of_service="https://www.google.com/policies/terms/",
contact=openapi.Contact(email="contact@snippets.local"),
license=openapi.License(name="BSD License"),
),
public=True,
permission_classes=[permissions.AllowAny],
)
class CaptchaRefresh(APIView):
authentication_classes = []
permission_classes = []
def get(self, request):
new_key = CaptchaStore.pick()
to_json_response = {
"key": new_key,
"image_url": captcha_image_url(new_key),
"audio_url": captcha_audio_url(new_key) if ca_settings.CAPTCHA_FLITE_PATH else None,
}
return SuccessResponse(to_json_response)
urlpatterns = [
re_path(r'media/(?P<path>.*)', serve, {"document_root": settings.MEDIA_ROOT}),
re_path(r'^admin/', include('apps.vadmin.urls')),
re_path(r'^$', schema_view.with_ui('swagger', cache_timeout=0), name='schema-swagger-ui'),
re_path(r'^swagger(?P<format>\.json|\.yaml)$', schema_view.without_ui(cache_timeout=0), name='schema-json'),
re_path(r'^swagger/$', schema_view.with_ui('swagger', cache_timeout=0), name='schema-swagger-ui'),
re_path(r'^redoc/$', schema_view.with_ui('redoc', cache_timeout=0), name='schema-redoc'),
]
[uwsgi]
chdir = /dvadmin-backend
wsgi-file = /dvadmin-backend/application/wsgi.py
master = true
processes = 8
http-socket = 0.0.0.0:8000
module = application.wsgi:application
vacuum = true
log-maxsize = 20000000
log-reopen = true
buffer-size = 65536
"""
WSGI config for application project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/1.11/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "application.settings")
application = get_wsgi_application()
from django.apps import AppConfig
class DpCmdbConfig(AppConfig):
name = 'apps.vadmin.celery'
import django_filters
from django_celery_beat.models import IntervalSchedule, CrontabSchedule, PeriodicTask
class IntervalScheduleFilter(django_filters.rest_framework.FilterSet):
class Meta:
model = IntervalSchedule
fields = '__all__'
class CrontabScheduleFilter(django_filters.rest_framework.FilterSet):
class Meta:
model = CrontabSchedule
exclude = ('timezone',)
class PeriodicTaskFilter(django_filters.rest_framework.FilterSet):
class Meta:
model = PeriodicTask
fields = '__all__'
from django_celery_beat.models import IntervalSchedule, CrontabSchedule, PeriodicTask
from rest_framework import serializers
from apps.vadmin.op_drf.serializers import CustomModelSerializer
from apps.vadmin.utils.exceptions import APIException
class IntervalScheduleSerializer(CustomModelSerializer):
class Meta:
model = IntervalSchedule
fields = '__all__'
class CrontabScheduleSerializer(CustomModelSerializer):
def save(self, **kwargs):
queryset = CrontabSchedule.objects.filter(**self.validated_data)
if queryset.count() and (
queryset.count() == 1 and self.initial_data.get('id', None) not in queryset.values_list('id',
flat=True)):
raise APIException(message='值已存在!!!')
super().save(**kwargs)
class Meta:
model = CrontabSchedule
exclude = ('timezone',)
class PeriodicTaskSerializer(CustomModelSerializer):
interval_list = serializers.SerializerMethodField(read_only=True)
crontab_list = serializers.SerializerMethodField(read_only=True)
def get_interval_list(self, obj):
return IntervalScheduleSerializer(obj.interval).data if obj.interval else {}
def get_crontab_list(self, obj):
return CrontabScheduleSerializer(obj.crontab).data if obj.crontab else {}
class Meta:
model = PeriodicTask
fields = '__all__'
from django.test import TestCase
# Create your tests here.
from django.conf.urls import url
from rest_framework.routers import DefaultRouter
from rest_framework.urlpatterns import format_suffix_patterns
from apps.vadmin.celery.views import IntervalScheduleModelViewSet, CrontabScheduleModelViewSet, PeriodicTaskModelViewSet, TasksAsChoices, \
OperateCeleryTask
router = DefaultRouter()
# 调度间隔
router.register(r'intervalschedule', IntervalScheduleModelViewSet)
router.register(r'crontabschedule', CrontabScheduleModelViewSet)
router.register(r'periodictask', PeriodicTaskModelViewSet)
urlpatterns = format_suffix_patterns([
# 获取所有 tasks 名称
url(r'^tasks_as_choices/', TasksAsChoices.as_view()),
url(r'^operate_celery/', OperateCeleryTask.as_view()),
])
urlpatterns += router.urls
from django_celery_beat.admin import TaskSelectWidget
from django_celery_beat.models import IntervalSchedule, CrontabSchedule, PeriodicTask
from rest_framework.views import APIView
from apps.vadmin.celery.filters import IntervalScheduleFilter, CrontabScheduleFilter, PeriodicTaskFilter
from apps.vadmin.celery.serializers import IntervalScheduleSerializer, CrontabScheduleSerializer, PeriodicTaskSerializer
from apps.vadmin.op_drf.views import CustomAPIView
from apps.vadmin.op_drf.viewsets import CustomModelViewSet
from apps.vadmin.op_drf.response import SuccessResponse
class IntervalScheduleModelViewSet(CustomModelViewSet):
"""
IntervalSchedule 调度间隔模型
every 次数
period 时间(天,小时,分钟,秒.毫秒)
"""
queryset = IntervalSchedule.objects.all()
serializer_class = IntervalScheduleSerializer
create_serializer_class = IntervalScheduleSerializer
update_serializer_class = IntervalScheduleSerializer
filter_class = IntervalScheduleFilter
search_fields = ('every', 'period')
ordering = 'every' # 默认排序
class CrontabScheduleModelViewSet(CustomModelViewSet):
"""
CrontabSchedule crontab调度模型
minute 分钟
hour 小时
day_of_week 每周的周几
day_of_month 每月的某一天
month_of_year 每年的某一个月
"""
queryset = CrontabSchedule.objects.all()
serializer_class = CrontabScheduleSerializer
filter_class = CrontabScheduleFilter
search_fields = ('minute', 'hour')
ordering = 'minute' # 默认排序
class PeriodicTaskModelViewSet(CustomModelViewSet):
"""
PeriodicTask celery 任务数据模型
name 名称
task celery任务名称
interval 频率
crontab 任务编排
args 形式参数
kwargs 位置参数
queue 队列名称
exchange 交换
routing_key 路由密钥
expires 过期时间
enabled 是否开启
"""
queryset = PeriodicTask.objects.exclude(name="celery.backend_cleanup")
serializer_class = PeriodicTaskSerializer
filter_class = PeriodicTaskFilter
search_fields = ('name', 'task', 'date_changed')
ordering = 'date_changed' # 默认排序
class TasksAsChoices(APIView):
def get(self, request):
"""
获取所有 tasks 名称
:param request:
:return:
"""
lis = []
def get_data(datas):
for item in datas:
if isinstance(item, (str, int)) and item:
lis.append(item)
else:
get_data(item)
get_data(TaskSelectWidget().tasks_as_choices())
return SuccessResponse(list(set(lis)))
class OperateCeleryTask(CustomAPIView):
def post(self, request):
req_data = request.data
task = req_data.get('celery_name', '')
data = {
'task': ''
}
test = f"""
from {'.'.join(task.split('.')[:-1])} import {task.split('.')[-1]}
task = {task.split('.')[-1]}.delay()
"""
exec(test, data)
return SuccessResponse({'task_id': data.get('task').id})
from django.apps import AppConfig
class MonitorConfig(AppConfig):
name = 'apps.vadmin.monitor'
verbose_name = "系统监控"
import django_filters
from apps.vadmin.monitor.models import Server, Monitor
class ServerFilter(django_filters.rest_framework.FilterSet):
"""
服务器信息 简单过滤器
"""
class Meta:
model = Server
fields = '__all__'
class MonitorFilter(django_filters.rest_framework.FilterSet):
"""
服务器监控信息 简单过滤器
"""
class Meta:
model = Monitor
fields = '__all__'
# Generated by Django 2.0 on 2021-10-23 09:44
import apps.vadmin.op_drf.fields
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='Monitor',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('description', apps.vadmin.op_drf.fields.DescriptionField(blank=True, default='', help_text='描述', null=True, verbose_name='描述')),
('modifier', apps.vadmin.op_drf.fields.ModifierCharField(blank=True, help_text='该记录最后修改者', max_length=255, null=True, verbose_name='修改者')),
('dept_belong_id', models.CharField(blank=True, max_length=64, null=True, verbose_name='数据归属部门')),
('update_datetime', apps.vadmin.op_drf.fields.UpdateDateTimeField(auto_now=True, help_text='修改时间', null=True, verbose_name='修改时间')),
('create_datetime', apps.vadmin.op_drf.fields.CreateDateTimeField(auto_now_add=True, help_text='创建时间', null=True, verbose_name='创建时间')),
('cpu_num', models.CharField(max_length=8, verbose_name='CPU核数')),
('cpu_sys', models.CharField(max_length=8, verbose_name='CPU已使用率')),
('mem_num', models.CharField(max_length=32, verbose_name='内存总数(KB)')),
('mem_sys', models.CharField(max_length=32, verbose_name='内存已使用大小(KB)')),
('seconds', models.CharField(max_length=32, verbose_name='系统已运行时间')),
],
options={
'verbose_name': '服务器监控信息',
'verbose_name_plural': '服务器监控信息',
},
),
migrations.CreateModel(
name='Server',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(blank=True, max_length=256, null=True, verbose_name='服务器名称')),
('ip', models.CharField(max_length=32, verbose_name='ip地址')),
('os', models.CharField(max_length=32, verbose_name='操作系统')),
('remark', models.CharField(blank=True, max_length=256, null=True, verbose_name='备注')),
('update_datetime', apps.vadmin.op_drf.fields.UpdateDateTimeField(auto_now=True, help_text='修改时间', null=True, verbose_name='修改时间')),
('create_datetime', apps.vadmin.op_drf.fields.CreateDateTimeField(auto_now_add=True, help_text='创建时间', null=True, verbose_name='创建时间')),
],
options={
'verbose_name': '服务器信息',
'verbose_name_plural': '服务器信息',
},
),
migrations.CreateModel(
name='SysFiles',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('description', apps.vadmin.op_drf.fields.DescriptionField(blank=True, default='', help_text='描述', null=True, verbose_name='描述')),
('modifier', apps.vadmin.op_drf.fields.ModifierCharField(blank=True, help_text='该记录最后修改者', max_length=255, null=True, verbose_name='修改者')),
('dept_belong_id', models.CharField(blank=True, max_length=64, null=True, verbose_name='数据归属部门')),
('update_datetime', apps.vadmin.op_drf.fields.UpdateDateTimeField(auto_now=True, help_text='修改时间', null=True, verbose_name='修改时间')),
('create_datetime', apps.vadmin.op_drf.fields.CreateDateTimeField(auto_now_add=True, help_text='创建时间', null=True, verbose_name='创建时间')),
('dir_name', models.CharField(max_length=32, verbose_name='磁盘路径')),
('sys_type_name', models.CharField(max_length=400, verbose_name='系统文件类型')),
('type_name', models.CharField(max_length=32, verbose_name='盘符类型')),
('total', models.CharField(max_length=32, verbose_name='磁盘总大小(KB)')),
('disk_sys', models.CharField(max_length=32, verbose_name='已使用大小(KB)')),
],
options={
'verbose_name': '系统磁盘',
'verbose_name_plural': '系统磁盘',
},
),
]
# Generated by Django 2.0 on 2021-10-23 09:44
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
('monitor', '0001_initial'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.AddField(
model_name='sysfiles',
name='creator',
field=models.ForeignKey(db_constraint=False, null=True, on_delete=django.db.models.deletion.SET_NULL, related_query_name='creator_query', to=settings.AUTH_USER_MODEL, verbose_name='创建者'),
),
migrations.AddField(
model_name='sysfiles',
name='monitor',
field=models.ForeignKey(db_constraint=False, on_delete=django.db.models.deletion.CASCADE, to='monitor.Monitor', verbose_name='关联服务器监控信息'),
),
migrations.AddField(
model_name='monitor',
name='creator',
field=models.ForeignKey(db_constraint=False, null=True, on_delete=django.db.models.deletion.SET_NULL, related_query_name='creator_query', to=settings.AUTH_USER_MODEL, verbose_name='创建者'),
),
migrations.AddField(
model_name='monitor',
name='server',
field=models.ForeignKey(db_constraint=False, on_delete=django.db.models.deletion.CASCADE, to='monitor.Server', verbose_name='关联服务器信息'),
),
]
from apps.vadmin.monitor.models.monitor import Monitor
from apps.vadmin.monitor.models.server import Server
from apps.vadmin.monitor.models.sys_files import SysFiles
from django.db.models import CharField, ForeignKey, CASCADE
from apps.vadmin.op_drf.models import CoreModel
class Monitor(CoreModel):
cpu_num = CharField(max_length=8, verbose_name='CPU核数')
cpu_sys = CharField(max_length=8, verbose_name='CPU已使用率')
mem_num = CharField(max_length=32, verbose_name='内存总数(KB)')
mem_sys = CharField(max_length=32, verbose_name='内存已使用大小(KB)')
seconds = CharField(max_length=32, verbose_name='系统已运行时间')
server = ForeignKey(to='monitor.Server', on_delete=CASCADE, verbose_name="关联服务器信息", db_constraint=False)
class Meta:
verbose_name = '服务器监控信息'
verbose_name_plural = verbose_name
def __str__(self):
return f"{self.server and self.server.name and self.server.ip}监控信息"
from django.db import models
from django.db.models import CharField
from apps.vadmin.op_drf.fields import UpdateDateTimeField, CreateDateTimeField
class Server(models.Model):
name = CharField(max_length=256, verbose_name='服务器名称', null=True, blank=True)
ip = CharField(max_length=32, verbose_name="ip地址")
os = CharField(max_length=32, verbose_name="操作系统")
remark = CharField(max_length=256, verbose_name="备注", null=True, blank=True)
update_datetime = UpdateDateTimeField() # 修改时间
create_datetime = CreateDateTimeField() # 创建时间
class Meta:
verbose_name = '服务器信息'
verbose_name_plural = verbose_name
def __str__(self):
return f"{self.name and self.ip}"
from django.db.models import CharField, ForeignKey, CASCADE
from apps.vadmin.op_drf.models import CoreModel
class SysFiles(CoreModel):
dir_name = CharField(max_length=32, verbose_name='磁盘路径')
sys_type_name = CharField(max_length=400, verbose_name='系统文件类型')
type_name = CharField(max_length=32, verbose_name='盘符类型')
total = CharField(max_length=32, verbose_name='磁盘总大小(KB)')
disk_sys = CharField(max_length=32, verbose_name='已使用大小(KB)')
monitor = ForeignKey(to='monitor.Monitor', on_delete=CASCADE, verbose_name="关联服务器监控信息", db_constraint=False)
class Meta:
verbose_name = '系统磁盘'
verbose_name_plural = verbose_name
def __str__(self):
return f"{self.creator and self.creator.name}"
from apps.vadmin.monitor.models import Server, Monitor
from apps.vadmin.op_drf.serializers import CustomModelSerializer
# ================================================= #
# ************** 服务器信息 序列化器 ************** #
# ================================================= #
class ServerSerializer(CustomModelSerializer):
"""
服务器信息 简单序列化器
"""
class Meta:
model = Server
fields = ("id", "ip", "name", "os", "remark")
class UpdateServerSerializer(CustomModelSerializer):
"""
服务器信息 简单序列化器
"""
class Meta:
model = Server
fields = ("name", "remark")
# ================================================= #
# ************** 服务器监控信息 序列化器 ************** #
# ================================================= #
class MonitorSerializer(CustomModelSerializer):
"""
服务器监控信息 简单序列化器
"""
class Meta:
model = Monitor
fields = '__all__'
import datetime
import logging
import sys
import time
import psutil
from apps.vadmin.monitor.models import Server, Monitor, SysFiles
from apps.vadmin.op_drf.response import SuccessResponse
from apps.vadmin.system.models import ConfigSettings
from apps.vadmin.utils.decorators import BaseCeleryApp
logger = logging.getLogger(__name__)
from platform import platform
def getIP():
"""获取ipv4地址"""
dic = psutil.net_if_addrs()
ipv4_list = []
for adapter in dic:
snicList = dic[adapter]
for snic in snicList:
if snic.family.name == 'AF_INET':
ipv4 = snic.address
if ipv4 != '127.0.0.1':
ipv4_list.append(ipv4)
if len(ipv4_list) >= 1:
return ipv4_list[0]
else:
return None
@BaseCeleryApp(name='apps.vadmin.monitor.tasks.get_monitor_info', save_success_logs=False)
def get_monitor_info():
"""
定时获取系统监控信息
:return:
"""
# 获取服务器
ip = getIP()
if not ip:
logger.error("无法获取到IP")
return
server_obj, create = Server.objects.get_or_create(ip=ip)
if create:
server_obj.name = ip
terse = ('terse' in sys.argv or '--terse' in sys.argv)
aliased = (not 'nonaliased' in sys.argv and not '--nonaliased' in sys.argv)
server_obj.os = platform(aliased, terse)
server_obj.save()
# 获取CPU和内存信息
mem = psutil.virtual_memory()
monitor_obj = Monitor()
monitor_obj.cpu_num = psutil.cpu_count()
monitor_obj.cpu_sys = float(psutil.cpu_percent(0.1))
monitor_obj.mem_num = int(mem.total / 1024)
monitor_obj.mem_sys = int(mem.used / 1024)
monitor_obj.seconds = time.strftime("%d天 %H 小时 %M 分 %S 秒", time.gmtime(int(time.time()) - int(psutil.boot_time())))
monitor_obj.server = server_obj
monitor_obj.save()
# 保存磁盘信息
for ele in psutil.disk_partitions():
disk = psutil.disk_usage('/')
sys_files_obj = SysFiles()
sys_files_obj.dir_name = ele.mountpoint
sys_files_obj.sys_type_name = ele.opts
sys_files_obj.type_name = ele.fstype
sys_files_obj.total = disk.total
sys_files_obj.disk_sys = disk.used
sys_files_obj.monitor = monitor_obj
sys_files_obj.save()
return SuccessResponse(msg="")
@BaseCeleryApp(name='apps.vadmin.monitor.tasks.clean_surplus_monitor_info')
def clean_surplus_monitor_info():
"""
定时清理多余 系统监控信息
:return:
"""
config_settings_obj = ConfigSettings.objects.filter(configKey='sys.monitor.info.save_days').first()
Monitor.objects.filter(
update_datetime__lt=datetime.timedelta(days=int(config_settings_obj.configValue or 30))).delete()
logger.info(f"成功清空{config_settings_obj.configValue}天前数据")
from django.test import TestCase
# Create your tests here.
from django.urls import re_path
from rest_framework.routers import DefaultRouter
from apps.vadmin.monitor.views import ServerModelViewSet, MonitorModelViewSet
router = DefaultRouter()
router.register(r'server', ServerModelViewSet)
router.register(r'monitor', MonitorModelViewSet)
urlpatterns = [
re_path('monitor/info/(?P<pk>.*)/', MonitorModelViewSet.as_view({'get': 'get_monitor_info'})),
re_path('monitor/rate/(?P<pk>.*)/', MonitorModelViewSet.as_view({'get': 'get_rate_info'})),
re_path('monitor/enabled/', MonitorModelViewSet.as_view({'get': 'enabled_monitor_info'})),
re_path('monitor/clean/', MonitorModelViewSet.as_view({'get': 'clean_all'})),
]
urlpatterns += router.urls
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
from rest_framework.request import Request
from apps.vadmin.monitor.filters import ServerFilter, MonitorFilter
from apps.vadmin.monitor.models import Server, Monitor, SysFiles
from apps.vadmin.monitor.serializers import ServerSerializer, MonitorSerializer, UpdateServerSerializer
from apps.vadmin.op_drf.response import SuccessResponse, ErrorResponse
from apps.vadmin.op_drf.viewsets import CustomModelViewSet
from apps.vadmin.permission.permissions import CommonPermission
from apps.vadmin.system.models import ConfigSettings
class ServerModelViewSet(CustomModelViewSet):
"""
服务器信息 模型的CRUD视图
"""
queryset = Server.objects.all()
serializer_class = ServerSerializer
update_serializer_class = UpdateServerSerializer
# extra_filter_backends = [DataLevelPermissionsFilter]
filter_class = ServerFilter
update_extra_permission_classes = (CommonPermission,)
destroy_extra_permission_classes = (CommonPermission,)
create_extra_permission_classes = (CommonPermission,)
ordering = '-create_datetime' # 默认排序
class MonitorModelViewSet(CustomModelViewSet):
"""
服务器监控信息 模型的CRUD视图
"""
queryset = Monitor.objects.all()
serializer_class = MonitorSerializer
# extra_filter_backends = [DataLevelPermissionsFilter]
filter_class = MonitorFilter
update_extra_permission_classes = (CommonPermission,)
destroy_extra_permission_classes = (CommonPermission,)
create_extra_permission_classes = (CommonPermission,)
ordering = '-create_datetime' # 默认排序
def get_rate_info(self, request: Request, *args, **kwargs):
"""
获取使用率 监控信息
:param request:
:param args:
:param kwargs:
:return:
"""
pk = kwargs.get("pk")
queryset = self.filter_queryset(self.get_queryset())
queryset = queryset.filter(server__id=pk).order_by("create_datetime")
# 间隔取值
queryset_count = queryset.count()
Interval_num = 1 if queryset_count < 200 else int(queryset_count / 100)
queryset = queryset.values('cpu_sys', 'mem_num', 'mem_sys', 'create_datetime')[::Interval_num][:100]
data = {
"cpu": [],
"memory": [],
"datetime": [],
}
for ele in queryset:
data["cpu"].append(round(float(ele.get('cpu_sys', 0)), 2))
data["datetime"].append(ele.get('create_datetime').strftime('%Y-%m-%d %H:%M:%S'))
data["memory"].append(round(float(ele.get('mem_num', 0)), 4) and round(float(ele.get('mem_sys', 0)) /
float(ele.get('mem_num', 0)) * 100,
2))
return SuccessResponse(data=data)
def get_monitor_info(self, request: Request, *args, **kwargs):
"""
最新的服务器状态信息
:param request:
:param args:
:param kwargs:
:return:
"""
pk = kwargs.get("pk")
instance = Monitor.objects.filter(server__id=pk).order_by("-create_datetime").first()
if not instance:
return ErrorResponse(msg="未找到服务器信息id")
serializer = self.get_serializer(instance)
data = serializer.data
return SuccessResponse(data={
"cpu": {
"total": int(data.get('cpu_num'), 0),
"used": "", # cpu核心 可不传,如指cpu当前主频,该值可以传
"rate": float(data.get('cpu_sys', 0)) / 100,
"unit": "核心", # 默认单位 核心
},
"memory": {
"total": int(int(data.get('mem_num', 0)) / 1024),
"used": int(int(data.get('mem_sys', 0)) / 1024),
"rate": int(data.get('mem_num', 0)) and round(int(data.get('mem_sys', 0)) /
int(data.get('mem_num', 0)), 4),
"unit": "MB", # 默认单位 MB
},
"disk": [{
"dir_name": ele.get('dir_name'),
"total": int(int(ele.get('total', 0)) / 1024 / 1024 / 1024),
"used": int(int(ele.get('disk_sys', 0)) / 1024 / 1024 / 1024),
"rate": int(ele.get('total', 0)) and round(int(ele.get('disk_sys', 0)) / int(ele.get('total', 0)),
4),
"unit": "GB", # 默认单位 GB
} for ele in SysFiles.objects.filter(monitor=instance).values('dir_name', 'total', 'disk_sys')]
})
def enabled_monitor_info(self, request: Request, *args, **kwargs):
"""
更新和获取监控信息
:param request:
:param args:
:param kwargs:
:return:
"""
enabled = request.query_params.get('enabled', None)
save_days = request.query_params.get('save_days', None)
interval = request.query_params.get('interval', None)
# 定时获取系统监控信息
periodictask_obj = PeriodicTask.objects.filter(task='apps.vadmin.monitor.tasks.get_monitor_info').first()
if not periodictask_obj:
intervalschedule_obj, _ = IntervalSchedule.objects.get_or_create(every=5, period="seconds")
periodictask_obj = PeriodicTask()
periodictask_obj.task = "apps.vadmin.monitor.tasks.get_monitor_info"
periodictask_obj.name = "定时获取系统监控信息"
periodictask_obj.interval = intervalschedule_obj
periodictask_obj.enabled = False
periodictask_obj.save()
# 定时清理多余 系统监控信息
clean_task_obj = PeriodicTask.objects.filter(
task='apps.vadmin.monitor.tasks.clean_surplus_monitor_info').first()
if not clean_task_obj:
crontab_obj, _ = CrontabSchedule.objects.get_or_create(day_of_month="*", day_of_week="*", hour="1",
minute="0", month_of_year="*")
clean_task_obj = PeriodicTask()
clean_task_obj.task = "apps.vadmin.monitor.tasks.clean_surplus_monitor_info"
clean_task_obj.name = "定时清理多余-系统监控信息"
clean_task_obj.crontab = crontab_obj
clean_task_obj.enabled = True
clean_task_obj.save()
# 配置添加
config_obj = ConfigSettings.objects.filter(configKey='sys.monitor.info.save_days').first()
if not config_obj:
config_obj = ConfigSettings()
config_obj.configKey = "sys.monitor.info.save_days"
config_obj.configName = "定时清理多余系统监控信息"
config_obj.configValue = "30"
config_obj.configType = "Y"
config_obj.status = "1"
config_obj.remark = "定时清理多余-系统监控信息,默认30天"
config_obj.save()
if enabled:
# 更新监控状态
periodictask_obj.enabled = True if enabled == "1" else False
periodictask_obj.save()
# 更新 定时清理多余 系统监控信息 状态
clean_task_obj.enabled = True if enabled == "1" else False
clean_task_obj.save()
# 更新保留天数
if save_days and config_obj:
config_obj.configValue = save_days
config_obj.save()
# 更新监控获取频率
if interval:
periodictask_obj.interval.every = interval
periodictask_obj.interval.save()
return SuccessResponse(data={
"enabled": periodictask_obj.enabled,
"interval": periodictask_obj.interval.every,
"save_days": config_obj.configValue if config_obj else "30",
})
def clean_all(self, request: Request, *args, **kwargs):
"""
清空监控信息
:param request:
:param args:
:param kwargs:
:return:
"""
self.get_queryset().delete()
return SuccessResponse(msg="清空成功")
import logging
from django.apps import AppConfig
logger = logging.getLogger(__name__)
class OpDrfConfig(AppConfig):
name = 'apps.vadmin.op_drf'
verbose_name = "OP DRF"
def ready(self):
logging.info("OP DRF框架检测完成:success")
from django.contrib.auth import get_user_model
from django.db import models
from django.db.models import SET_NULL
from apps.vadmin.utils.string_util import uuid_8, uuid_16, uuid_32, uuid_36
class IdField(models.CharField):
"""
id = IdField()
"""
def __init__(self, *args, **kwargs):
kwargs['max_length'] = kwargs.get('max_length', 8)
kwargs['primary_key'] = kwargs.get('primary_key', True)
kwargs['unique'] = kwargs.get('unique', True)
kwargs['db_index'] = kwargs.get('db_index', True)
kwargs['default'] = kwargs.get('default', uuid_8)
kwargs['null'] = kwargs.get('null', False)
kwargs['blank'] = kwargs.get('blank', False)
kwargs['verbose_name'] = kwargs.get('verbose_name', 'ID')
kwargs['help_text'] = kwargs.get('help_text', 'ID')
super().__init__(*args, **kwargs)
class UUID8Field(models.CharField):
def __init__(self, *args, **kwargs):
kwargs['max_length'] = kwargs.get('max_length', 8)
kwargs['default'] = kwargs.get('default', uuid_8)
kwargs['verbose_name'] = kwargs.get('verbose_name', 'UUID')
kwargs['help_text'] = kwargs.get('help_text', 'UUID')
super().__init__(*args, **kwargs)
class UUID16Field(models.CharField):
def __init__(self, *args, **kwargs):
kwargs['max_length'] = kwargs.get('max_length', 16)
kwargs['default'] = kwargs.get('default', uuid_16)
kwargs['verbose_name'] = kwargs.get('verbose_name', 'UUID')
kwargs['help_text'] = kwargs.get('help_text', 'UUID')
super().__init__(*args, **kwargs)
class UUID32Field(models.CharField):
def __init__(self, *args, **kwargs):
kwargs['max_length'] = kwargs.get('max_length', 32)
kwargs['default'] = kwargs.get('default', uuid_32)
kwargs['verbose_name'] = kwargs.get('verbose_name', 'UUID')
kwargs['help_text'] = kwargs.get('help_text', 'UUID')
super().__init__(*args, **kwargs)
class UUID36Field(models.CharField):
def __init__(self, *args, **kwargs):
kwargs['max_length'] = kwargs.get('max_length', 36)
kwargs['unique'] = kwargs.get('unique', True)
kwargs['default'] = kwargs.get('default', uuid_36)
kwargs['verbose_name'] = kwargs.get('verbose_name', 'UUID')
kwargs['help_text'] = kwargs.get('help_text', 'UUID')
super().__init__(*args, **kwargs)
class DescriptionField(models.TextField):
"""
description = DescriptionField()
"""
def __init__(self, *args, **kwargs):
if kwargs.get('null', True):
kwargs['default'] = kwargs.get('default', '')
kwargs['blank'] = kwargs.get('blank', True)
kwargs['null'] = kwargs.get('null', True)
kwargs['verbose_name'] = kwargs.get('verbose_name', '描述')
kwargs['help_text'] = kwargs.get('help_text', '') or kwargs.get('verbose_name', '描述')
super().__init__(*args, **kwargs)
class UserForeignKeyField(models.ForeignKey):
"""
user = UserForeignKeyField()
"""
def __init__(self, to=None, on_delete=None, related_name=None, related_query_name=None, limit_choices_to=None,
parent_link=False, to_field=None, db_constraint=False, **kwargs):
if to is None:
to = get_user_model()
if to_field is None:
to_field = 'id'
if on_delete is None:
on_delete = SET_NULL
kwargs['verbose_name'] = kwargs.get('verbose_name', '关联的用户')
kwargs['help_text'] = kwargs.get('help_text', '') or kwargs.get('verbose_name', '关联的用户')
kwargs['editable'] = kwargs.get('default', False)
super().__init__(to, on_delete, related_name, related_query_name, limit_choices_to, parent_link, to_field,
db_constraint, **kwargs)
class UpdateDateTimeField(models.DateTimeField):
"""
update_datetime = ModifyDateTimeField()
"""
def __init__(self, verbose_name=None, name=None, auto_now=True, auto_now_add=False, **kwargs):
verbose_name = verbose_name or '修改时间'
kwargs['help_text'] = kwargs.get('help_text', '修改时间')
kwargs['editable'] = kwargs.get('default', False)
kwargs['blank'] = kwargs.get('blank', True)
kwargs['null'] = kwargs.get('null', True)
super().__init__(verbose_name, name, auto_now, auto_now_add, **kwargs)
class CreateDateTimeField(models.DateTimeField):
"""
create_datetime = CreateDateTimeField()
"""
def __init__(self, verbose_name=None, name=None, auto_now=False, auto_now_add=True, **kwargs):
verbose_name = verbose_name or '创建时间'
kwargs['help_text'] = kwargs.get('help_text', '创建时间')
kwargs['editable'] = kwargs.get('default', False)
kwargs['blank'] = kwargs.get('blank', True)
kwargs['null'] = kwargs.get('null', True)
super().__init__(verbose_name, name, auto_now, auto_now_add, **kwargs)
class CreatorCharField(models.CharField):
"""
creator = CreatorCharField()
"""
def __init__(self, *args, **kwargs):
kwargs['max_length'] = kwargs.get('max_length', 255)
kwargs['null'] = kwargs.get('null', True)
kwargs['blank'] = kwargs.get('blank', True)
kwargs['verbose_name'] = kwargs.get('verbose_name', '创建者')
kwargs['help_text'] = kwargs.get('help_text', '该记录的创建者')
super().__init__(*args, **kwargs)
class ModifierCharField(models.CharField):
"""
modifier = ModifierCharField()
"""
def __init__(self, *args, **kwargs):
kwargs['max_length'] = kwargs.get('max_length', 255)
kwargs['null'] = kwargs.get('null', True)
kwargs['blank'] = kwargs.get('blank', True)
kwargs['verbose_name'] = kwargs.get('verbose_name', '修改者')
kwargs['help_text'] = kwargs.get('help_text', '该记录最后修改者')
super().__init__(*args, **kwargs)
"""
常用的过滤器以及DRF的过滤器
"""
import json
import logging
import operator
from functools import reduce
from django.utils import six
from mongoengine.queryset import visitor
from rest_framework.filters import BaseFilterBackend, SearchFilter, OrderingFilter
from apps.vadmin.utils.model_util import get_dept
logger = logging.getLogger(__name__)
def get_as_kwargs(request):
params = request.GET.dict()
if 'as' not in params:
return {}
as_params = json.loads(params.get('as', '{}'))
return as_params
class MongoSearchFilter(SearchFilter):
"""
适配Mongo模型视图的Search过滤器
"""
def filter_queryset(self, request, queryset, view):
search_fields = getattr(view, 'search_fields', None)
search_terms = self.get_search_terms(request)
if not search_fields or not search_terms:
return queryset
orm_lookups = [
self.construct_search(six.text_type(search_field))
for search_field in search_fields
]
if not orm_lookups:
return queryset
conditions = []
for search_term in search_terms:
queries = [
visitor.Q(**{orm_lookup: search_term})
for orm_lookup in orm_lookups
]
conditions.append(reduce(operator.or_, queries))
queryset = queryset.filter(reduce(operator.and_, conditions))
return queryset
class MongoOrderingFilter(OrderingFilter):
"""
适配Mongo模型视图的Search过滤器
"""
def get_valid_fields(self, queryset, view, context={}):
valid_fields = getattr(view, 'ordering_fields', self.ordering_fields)
if valid_fields is None:
return self.get_default_valid_fields(queryset, view, context)
elif valid_fields == '__all__':
# View explicitly allows filtering on any model field
model = view.get_serializer().__class__.Meta.model
valid_fields = [
(field_name, getattr(field, 'verbose_name', field_name)) for field_name, field in model._fields.items()
]
else:
valid_fields = [
(item, item) if isinstance(item, six.string_types) else item
for item in valid_fields
]
return valid_fields
class AdvancedSearchFilter(BaseFilterBackend):
"""
高级搜索过滤器
"""
def filter_queryset(self, request, queryset, view):
as_kwargs = get_as_kwargs(request)
if as_kwargs:
queryset = queryset.filter(**as_kwargs)
return queryset
class MongoAdvancedSearchFilter(BaseFilterBackend):
"""
mongo高级搜索过滤器
"""
def filter_queryset(self, request, queryset, view):
as_kwargs = get_as_kwargs(request)
if as_kwargs:
queryset = queryset.filter(**as_kwargs)
return queryset
class DataLevelPermissionsFilter(BaseFilterBackend):
"""
数据 级权限过滤器
0. 获取用户的部门id,没有部门则返回空
1. 判断过滤的数据是否有创建人所在部门 "creator" 字段,没有则返回全部
2. 如果用户没有关联角色则返回本部门数据
3. 根据角色的最大权限进行数据过滤(会有多个角色,进行去重取最大权限)
3.1 判断用户是否为超级管理员角色/如果有1(所有数据) 则返回所有数据
4. 只为仅本人数据权限时只返回过滤本人数据,并且部门为自己本部门(考虑到用户会变部门,只能看当前用户所在的部门数据)
5. 自定数据权限 获取部门,根据部门过滤
"""
def filter_queryset(self, request, queryset, view):
# 0. 获取用户的部门id,没有部门则返回空
user_dept_id = getattr(request.user, 'dept_id')
if not user_dept_id:
return queryset.none()
# 1. 判断过滤的数据是否有创建人所在部门 "dept_belong_id" 字段
if not getattr(queryset.model, 'dept_belong_id', None):
return queryset
# 2. 如果用户没有关联角色则返回本部门数据
if not hasattr(request.user, 'role'):
return queryset.filter(dept_belong_id=user_dept_id)
# 3. 根据所有角色 获取所有权限范围
role_list = request.user.role.filter(status='1').values('admin', 'dataScope')
dataScope_list = []
for ele in role_list:
# 3.1 判断用户是否为超级管理员角色/如果有1(所有数据) 则返回所有数据
if '1' == ele.get('dataScope') or ele.get('admin') == True:
return queryset
dataScope_list.append(ele.get('dataScope'))
dataScope_list = list(set(dataScope_list))
# 4. 只为仅本人数据权限时只返回过滤本人数据,并且部门为自己本部门(考虑到用户会变部门,只能看当前用户所在的部门数据)
if dataScope_list == ['5']:
return queryset.filter(creator=request.user, dept_belong_id=user_dept_id)
# 5. 自定数据权限 获取部门,根据部门过滤
dept_list = []
for ele in dataScope_list:
if ele == '2':
dept_list.extend(request.user.role.filter(status='1').values_list('dept__id', flat=True))
elif ele == '3':
dept_list.append(user_dept_id)
elif ele == '4':
dept_list.extend(get_dept(user_dept_id, ))
return queryset.filter(dept_belong_id__in=list(set(dept_list)))
import logging
class RedisHandler(logging.StreamHandler):
def emit(self, record):
msg = self.format(record)
print(msg)
import logging
from django.db.models import Model
from rest_framework.request import Request
from rest_framework.views import APIView
logger = logging.getLogger(__name__)
class ViewLogger(object):
"""
基于View视图的日志
"""
def __init__(self, view=None, request=None, *args, **kwargs) -> None:
super().__init__()
self.view = view
self.request = request
self.model = None
self.log_prefix: str = ''
if self.view and hasattr(self.view.get_queryset(), 'model'):
self.model: Model = self.view.get_queryset().model
elif self.view and hasattr(self.view.get_serializer(), 'Meta') and hasattr(self.view.get_serializer().Meta,
'model'):
self.model: Model = self.view.get_serializer().Meta.model
if self.model:
request.session['model_name'] = str(getattr(self.model, '_meta').verbose_name)
def handle(self, request: Request, *args, **kwargs):
pass
def logger(self, msg):
"""
:param msg:
:return: logger
"""
self.request.session['request_msg'] = msg
return logger
class APIViewLogger(ViewLogger):
"""
(1)仅在op_drf.views.CustomAPIView的子类中生效
(2)使用: 请勿直接配置view_logger_classes = (APIViewLogger, ), 这样无效,
如有需求, 需要继承APIViewLogger重写其相应的方法
(3)重写handle()方法,所有请求均触发此方法
重写handle_get()方法,仅GET请求均触发此方法
重写handle_post()方法,仅POST请求均触发此方法
重写handle_put()方法,仅PUT请求均触发此方法
重写handle_delete()方法,仅DELETE请求均触发此方法
重写handle_xxx()方法,仅xxx请求均触发此方法
"""
def __init__(self, view=None, request=None, *args, **kwargs) -> None:
super().__init__(view, request, *args, **kwargs)
self.view: APIView = view
self.request: Request = request
self.user = request.user
class ModelViewLogger(APIViewLogger):
"""
基础模型操作日志
(1)仅在op_drf.viewsets.GenericViewSet的子类中生效
(1)在CustomModelViewSet子类中配置: view_logger_classes = [ModelViewLogger, ]
(2)不要在op_drf中继续写具体的日志逻辑代码,
如有需求, 应该继承ModelViewLogger并且重写相应的方法, 例如CustomerModelViewLogger(涉及到其他模块时不要将代码放入op_drf)
"""
def __init__(self, view=None, request=None, *args, **kwargs) -> None:
super().__init__(view, request, *args, **kwargs)
class RelationshipViewLogger(APIViewLogger):
"""
关联关系模型操作日志
(1)在ModelRelationshipView子类中配置: view_logger_classes = [RelationshipViewLogger, ]
(2)不要在op_drf中继续写具体的日志逻辑代码,
如有需求, 应该继承RelationshipViewLogger并且重写相应的方法, 例如CustomerRelationshipViewLogger(涉及到其他模块时不要将代码放入op_drf)
"""
def __init__(self, view=None, request=None, instanceId=None, *args, **kwargs) -> None:
super().__init__(view, request)
self.instanceId: str = instanceId
def handle(self, request: Request, *args, **kwargs):
"""
每一次请求都会触发此方法
"""
pass
class CustomerRelationshipViewLogger(RelationshipViewLogger):
"""
(1)在ModelRelationshipView子类中配置: view_logger_classes = [CustomerRelationshipViewLogger, ]
"""
def __init__(self, view=None, request=None, instanceId=None, *args, **kwargs) -> None:
super().__init__(view, request, instanceId, *args, **kwargs)
self.log_prefix: str = 'RelationshipView日志系统:'
def handle_get(self, request: Request, *args, **kwargs):
"""
仅GET请求才会触发此方法
"""
pass
def handle_post(self, request: Request, *args, **kwargs):
"""
仅POST请求才会触发此方法
"""
operator = self.user.username
model_name = getattr(self.view.model, '_meta').verbose_name
to_field_name = self.view.to_field_name
to_model_name = getattr(self.view.relationship_model, '_meta').verbose_name
self.logger(
f'{self.log_prefix}用户[username={operator}]新增, {model_name}实例[{to_field_name}={self.instanceId}]与{to_model_name}的关联关系')
def handle_put(self, request: Request, *args, **kwargs):
"""
仅PUT请求才会触发此方法
"""
operator = self.user.username
model_name = getattr(self.view.model, '_meta').verbose_name
to_field_name = self.view.to_field_name
to_model_name = getattr(self.view.relationship_model, '_meta').verbose_name
self.logger(
f'{self.log_prefix}用户[username={operator}]重置, {model_name}实例[{to_field_name}={self.instanceId}]与{to_model_name}的关联关系')
def handle_delete(self, request: Request, *args, **kwargs):
"""
仅DELETE请求才会触发此方法
"""
operator = self.user.username
model_name = getattr(self.view.model, '_meta').verbose_name
to_field_name = self.view.to_field_name
to_model_name = getattr(self.view.relationship_model, '_meta').verbose_name
self.logger(
f'{self.log_prefix}用户[username={operator}]移除, {model_name}实例[{to_field_name}={self.instanceId}]与{to_model_name}的关联关系')
class CustomerModelViewLogger(ModelViewLogger):
def __init__(self, view=None, request=None, *args, **kwargs) -> None:
super().__init__(view, request, *args, **kwargs)
self.log_prefix: str = 'CustomModelViewSet日志系统:'
def handle(self, request: Request, *args, **kwargs):
pass
def handle_retrieve(self, request: Request, instance: Model = None, *args, **kwargs):
"""
仅retrieve(GET)请求才会触发此方法
"""
pass
operator = self.user.username
model_name = getattr(self.model, '_meta').verbose_name
self.logger(f'{self.log_prefix}用户[username={operator}]检索{model_name}:[{instance}]')
def handle_list(self, request: Request, *args, **kwargs):
"""
仅list(GET)请求才会触发此方法
"""
pass
operator = self.user.username
model_name = getattr(self.model, '_meta').verbose_name
self.logger(f'{self.log_prefix}用户[username={operator}]查询{model_name}')
def handle_create(self, request: Request, instance: Model = None, *args, **kwargs):
"""
仅create(POST)请求才会触发此方法
"""
pass
operator = self.user.username
model_name = getattr(self.model, '_meta').verbose_name
self.logger(f'{self.log_prefix}用户[username={operator}]创建{model_name}:[{instance}]')
def handle_update(self, request: Request, instance: Model = None, *args, **kwargs):
"""
仅update(PUT)请求才会触发此方法
"""
pass
operator = self.user.username
model_name = getattr(self.model, '_meta').verbose_name
self.logger(f'{self.log_prefix}用户[username={operator}]更新{model_name}:[{instance}]')
def handle_partial_update(self, request: Request, instance: Model = None, *args, **kwargs):
"""
仅partial_update(PATCH)请求才会触发此方法
"""
pass
operator = self.user.username
model_name = getattr(self.model, '_meta').verbose_name
self.logger(f'{self.log_prefix}用户[username={operator}]部分更新{model_name}:[{instance}]')
def handle_destroy(self, request: Request, instance: Model = None, *args, **kwargs):
"""
仅destroy(DELETE)请求才会触发此方法
"""
pass
operator = self.user.username
model_name = getattr(self.model, '_meta').verbose_name
self.logger(f'{self.log_prefix}用户[username={operator}]删除{model_name}:[{instance}]')
def handle_other(self, request: Request, instance: Model = None, *args, **kwargs):
"""
仅 其他 请求才会触发此方法
"""
pass
operator = self.user.username
model_name = getattr(self.model, '_meta').verbose_name
self.logger(f'{self.log_prefix}用户[username={operator}]其他请求{model_name}:[{instance}]')
import logging
import os
import shutil
from django.core.management.base import BaseCommand
logger = logging.getLogger(__name__)
from application.settings import BASE_DIR
class Command(BaseCommand):
"""
创建App命令:
python manage.py createapp app名
python manage.py createapp app01 app02 ...
python manage.py createapp 一级文件名/app01 ... # 支持多级目录建app
"""
def add_arguments(self, parser):
parser.add_argument('app_name', nargs='*', type=str, )
def handle(self, *args, **options):
app_name = options.get('app_name')
for name in app_name:
names = name.split('/')
dnames = ".".join(names)
app_path = os.path.join(BASE_DIR, "apps", *names)
# 判断app是否存在
if os.path.exists(app_path):
print(f"创建失败,App {name} 已存在!")
break
source_path = os.path.join(BASE_DIR, "apps", "vadmin", "template")
target_path = app_path
if not os.path.exists(target_path):
# 如果目标路径不存在原文件夹的话就创建
os.makedirs(target_path)
if os.path.exists(source_path):
# 如果目标路径存在原文件夹的话就先删除
shutil.rmtree(target_path)
shutil.copytree(source_path, target_path)
# 修改app中的apps 内容
content = f"""from django.apps import AppConfig
class {name[-1].capitalize()}Config(AppConfig):
name = 'apps.{dnames}'
verbose_name = "{names[-1]}App"
"""
with open(os.path.join(app_path, "apps.py"), 'w', encoding='UTF-8') as f:
f.write(content)
f.close()
# 注册app到 settings.py 中
injection(os.path.join(BASE_DIR, "application", "settings.py"), f" 'apps.{dnames}',\n", "INSTALLED_APPS",
"]")
# 注册app到 urls.py 中
injection(os.path.join(BASE_DIR, "application", "urls.py"),
f" re_path(r'^{name}/', include('apps.{dnames}.urls')),\n", "urlpatterns = [",
"]")
print(f"创建 {name} App成功")
def injection(file_path, insert_content, startswith, endswith):
with open(file_path, "r+", encoding="utf-8") as f:
data = f.readlines()
with open(file_path, 'w', encoding='UTF-8') as f1:
is_INSTALLED_APPS = False
is_insert = False
for content in data:
# 判断文件是否 INSTALLED_APPS 开头
if not is_insert and content.startswith(startswith):
is_INSTALLED_APPS = True
if not is_insert and content.startswith(endswith) and is_INSTALLED_APPS:
# 给前一行插入数据
content = insert_content + content
is_insert = True
f1.writelines(content)
"""
django中间件
"""
import json
import logging
import os
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from django.utils.deprecation import MiddlewareMixin
from apps.vadmin.op_drf.response import ErrorJsonResponse
from apps.vadmin.permission.models import Menu
from apps.vadmin.system.models import OperationLog
from apps.vadmin.utils.request_util import get_request_ip, get_request_data, get_request_path, get_browser, get_os, \
get_login_location, get_request_canonical_path, get_request_user, get_verbose_name
logger = logging.getLogger(__name__)
class ApiLoggingMiddleware(MiddlewareMixin):
"""
用于记录API访问日志中间件
"""
def __init__(self, get_response=None):
super().__init__(get_response)
self.enable = getattr(settings, 'API_LOG_ENABLE', None) or False
self.methods = getattr(settings, 'API_LOG_METHODS', None) or set()
@classmethod
def __handle_request(cls, request):
request.request_ip = get_request_ip(request)
request.request_data = get_request_data(request)
request.request_path = get_request_path(request)
@classmethod
def __handle_response(cls, request, response):
# request_data,request_ip由PermissionInterfaceMiddleware中间件中添加的属性
body = getattr(request, 'request_data', {})
# 请求含有password则用*替换掉(暂时先用于所有接口的password请求参数)
if isinstance(body, dict) and body.get('password', ''):
body['password'] = '*' * len(body['password'])
if not hasattr(response, 'data') or not isinstance(response.data, dict):
response.data = {}
if not response.data and response.content:
try:
content = json.loads(response.content.decode())
response.data = content if isinstance(content, dict) else {}
except:
pass
user = get_request_user(request)
info = {
'request_ip': getattr(request, 'request_ip', 'unknown'),
'creator': user if not isinstance(user, AnonymousUser) else None,
'dept_belong_id': getattr(request.user, 'dept_id', None),
'request_method': request.method,
'request_path': request.request_path,
'request_body': body,
'response_code': response.data.get('code'),
'request_location': get_login_location(request),
'request_os': get_os(request),
'request_browser': get_browser(request),
'request_msg': request.session.get('request_msg'),
'status': True if response.data.get('code') in [200, 204] else False,
'json_result': {"code": response.data.get('code'), "msg": response.data.get('msg')},
'request_modular': request.session.get('model_name'),
}
log = OperationLog(**info)
log.save()
def process_view(self, request, view_func, view_args, view_kwargs):
if hasattr(view_func, 'cls') and hasattr(view_func.cls, 'queryset'):
request.session['model_name'] = get_verbose_name(view_func.cls.queryset)
return
def process_request(self, request):
self.__handle_request(request)
def process_response(self, request, response):
"""
主要请求处理完之后记录
:param request:
:param response:
:return:
"""
if self.enable:
if self.methods == 'ALL' or request.method in self.methods:
self.__handle_response(request, response)
return response
class PermissionModeMiddleware(MiddlewareMixin):
"""
权限模式拦截判断
"""
def process_request(self, request):
return
def has_interface_permission(self, request, method, view_path, user=None):
"""
接口权限验证,优先级:
(1)接口是否接入权限管理, 是:继续; 否:通过
(2)认证的user是否superuser, 是:通过; 否:继续
(3)user的角色有该接口权限, 是:通过, 否:不通过
auth_code含义: auth_code >=0, 表示接口认证通过; auth_code < 0, 表示无接口访问权限, 具体含义如下
-1:
-10: 该请求已认证的用户没有这个接口的访问权限
0:
1: 白名单
10: 该接口没有录入权限系统, 放行 请求中认证的用户为超级管理员, 直接放行
20: 请求中认证的用户是superuser放行
30: 请求中认证的用户对应的角色中,某个角色包含了该接口的访问权限, 放行
1. 先获取所有录入系统的接口
2 判断此用户是否为 superuser
3. 获取此用户所请求的接口
4. 获取此用户关联角色所有有权限的接口
:param interface: 接口模型
:param path: 接口路径
:param method: 请求方法
:param project: 接口所属项目
:param args:
:param kwargs:
:return:
"""
interface_dict = Menu.get_interface_dict()
# (1) 接口是否接入权限管理, 是:继续; 否:通过
if not view_path in interface_dict.get(method, []):
return 10
# (2)认证的user是否superuser, 是:通过; 否:继续
if user.is_superuser or (hasattr(user, 'role') and user.role.filter(status='1', admin=True).count()):
return 20
# (3)user的角色有该接口权限, 是:通过, 否:不通过
if view_path in user.get_user_interface_dict.get(method, []):
return 30
return -10
def process_view(self, request, view_func, view_args, view_kwargs):
# 判断环境变量中,是否为演示模式(正常可忽略此判断)
white_list = ['/admin/logout/', '/admin/login/', '/admin/api-auth/login/']
if os.getenv('DEMO_ENV') and not request.method in ['GET', 'OPTIONS'] and request.path not in white_list:
return ErrorJsonResponse(data={}, msg=f'演示模式,不允许操作!')
if not settings.INTERFACE_PERMISSION:
return
user = get_request_user(request)
if user and not isinstance(user, AnonymousUser):
method = request.method.upper()
if method == 'GET': # GET 不设置接口权限
return
view_path = get_request_canonical_path(request, *view_args, **view_kwargs)
auth_code = self.has_interface_permission(request, method, view_path, user)
logger.info(f"[{user.username}] {method}:{view_path}, 权限认证:{auth_code}")
if auth_code >= 0:
return
return ErrorJsonResponse(data={}, msg=f'无接口访问权限!')
def process_response(self, request, response):
"""
主要请求处理完之后记录
:param request:
:param response:
:return:
"""
return response
from django.conf import settings
from django.db import models
from django.db.models import SET_NULL
from .fields import CreateDateTimeField, DescriptionField, ModifierCharField, UpdateDateTimeField
class BaseModel(models.Model):
"""
标准抽象模型模型,可直接继承使用
"""
description = DescriptionField() # 描述
update_datetime = UpdateDateTimeField() # 修改时间
create_datetime = CreateDateTimeField() # 创建时间
class Meta:
abstract = True
verbose_name = '基本模型'
verbose_name_plural = verbose_name
class CoreModel(models.Model):
"""
核心标准抽象模型模型,可直接继承使用
增加审计字段, 覆盖字段时, 字段名称请勿修改, 必须统一审计字段名称
"""
description = DescriptionField() # 描述
creator = models.ForeignKey(to=settings.AUTH_USER_MODEL, related_query_name='creator_query', null=True,
verbose_name='创建者', on_delete=SET_NULL, db_constraint=False) # 创建者
modifier = ModifierCharField() # 修改者
dept_belong_id = models.CharField(max_length=64, verbose_name="数据归属部门", null=True, blank=True)
update_datetime = UpdateDateTimeField() # 修改时间
create_datetime = CreateDateTimeField() # 创建时间
class Meta:
abstract = True
verbose_name = '核心模型'
verbose_name_plural = verbose_name
from collections import OrderedDict
from rest_framework.pagination import PageNumberPagination, _positive_int
from rest_framework.utils.urls import replace_query_param
from .response import SuccessResponse
class Pagination(PageNumberPagination):
"""
标准分页器
"""
page_size_query_param = 'pageSize'
other_page_size_query_param = []
# other_page_size_query_param = ['pageSize', ]
page_query_param = "pageNum"
other_page_query_param = []
# other_page_query_param = ['currentPage', 'pageNum']
max_page_size = 1000
page_size = 10
def paginate_queryset(self, queryset, request, view=None):
page_num = request.query_params.get(self.page_query_param)
# 判断,如果 pageNum 为all 则取消分页返回所有
if page_num == 'all':
return None
return super().paginate_queryset(queryset, request, view)
def get_next_link(self):
return super().get_next_link()
def get_previous_link(self):
return super().get_previous_link()
def get_page_size(self, request):
"""
获取页大小
:param request:
:return:
"""
if self.other_page_size_query_param:
for param_name in self.other_page_size_query_param:
if param_name in request.query_params:
return _positive_int(
request.query_params[param_name],
strict=True,
cutoff=self.max_page_size
)
return super().get_page_size(request)
def get_page_num(self, request):
"""
获取页码
:param request:
:return:
"""
if self.other_page_query_param:
for param_name in self.other_page_query_param:
if param_name in request.query_params:
return _positive_int(request.query_params[param_name], strict=True)
page_num = request.query_params.get(self.page_query_param, 1)
return _positive_int(page_num, strict=True)
def get_paginated_response(self, data, search_fields=''):
return SuccessResponse(OrderedDict([
('count', self.page.paginator.count),
('next', self.get_next_link()),
('previous', self.get_previous_link()),
('results', data)
]))
class JsonPagination(Pagination):
"""
Json数据内存分页器
"""
def __init__(self, count=0) -> None:
super().__init__()
self._page_size = 0
self._page_num = 0
self._count = count
self.data_count = 0
def get_page_size(self, request):
_page_size = super().get_page_size(request)
self._page_size = _page_size
return _page_size
def get_page_num(self, request):
_page_num = super().get_page_num(request)
self._page_num = _page_num
return _page_num
def get_next_link(self):
if self._page_size * self._page_num >= self.data_count:
return None
url = self.request.build_absolute_uri()
url = replace_query_param(url, self.page_query_param, self._page_num + 1)
url = replace_query_param(url, self.page_size_query_param, self._page_size)
return url
def get_previous_link(self, count=0):
if self._page_num <= 1:
return None
url = self.request.build_absolute_uri()
url = replace_query_param(url, self.page_query_param, self._page_num - 1)
url = replace_query_param(url, self.page_size_query_param, self._page_size)
return url
def paginate_queryset(self, queryset, request, view=None):
self.get_page_size(request)
self.get_page_num(request)
return super().paginate_queryset(queryset, request, view)
def get_paginated_response(self, data, search_fields=''):
return SuccessResponse(OrderedDict([
('count', self.data_count),
('next', self.get_next_link()),
('previous', self.get_previous_link()),
('results', data)
]))
"""
常用的Response以及Django的Response、DRF的Response
"""
from django.http.response import DjangoJSONEncoder, JsonResponse
from rest_framework.response import Response
class OpDRFJSONEncoder(DjangoJSONEncoder):
"""
重写DjangoJSONEncoder
(1)默认返回支持中文格式的json字符串
"""
def __init__(self, *, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False,
indent=None, separators=None, default=None):
super().__init__(skipkeys=skipkeys, ensure_ascii=False, check_circular=check_circular,
allow_nan=allow_nan, sort_keys=sort_keys, indent=indent, separators=separators,
default=default)
class SuccessResponse(Response):
"""
标准响应成功的返回, SuccessResponse(data)或者SuccessResponse(data=data)
(1)默认错误码返回200, 不支持指定其他返回码
"""
def __init__(self, data=None, msg='success', status=None, template_name=None, headers=None, exception=False,
content_type=None):
self.std_data = {
"code": 200,
"data": data,
"msg": msg,
"status": 'success'
}
super().__init__(self.std_data, status, template_name, headers, exception, content_type)
def __str__(self):
return str(self.std_data)
class ErrorResponse(Response):
"""
标准响应错误的返回,ErrorResponse(msg='xxx')
(1)默认错误码返回201, 也可以指定其他返回码:ErrorResponse(code=xxx)
"""
def __init__(self, data=None, msg='error', code=201, status=None, template_name=None, headers=None,
exception=False, content_type=None):
self.std_data = {
"code": code,
"data": data,
"msg": msg,
"status": 'error'
}
super().__init__(self.std_data, status, template_name, headers, exception, content_type)
def __str__(self):
return str(self.std_data)
class SuccessJsonResponse(JsonResponse):
"""
标准JsonResponse, SuccessJsonResponse(data)SuccessJsonResponse(data=data)
(1)仅SuccessResponse无法使用时才能推荐使用SuccessJsonResponse
"""
def __init__(self, data, msg='success', encoder=DjangoJSONEncoder, safe=True, json_dumps_params=None, **kwargs):
std_data = {
"code": 200,
"data": data,
"msg": msg,
"status": 'success'
}
super().__init__(std_data, encoder, safe, json_dumps_params, **kwargs)
class ErrorJsonResponse(JsonResponse):
"""
标准JsonResponse, 仅ErrorResponse无法使用时才能使用ErrorJsonResponse
(1)默认错误码返回2001, 也可以指定其他返回码:ErrorJsonResponse(code=xxx)
"""
def __init__(self, data, msg='error', code=201, encoder=OpDRFJSONEncoder, safe=True, json_dumps_params=None,
**kwargs):
std_data = {
"code": code,
"data": data,
"msg": msg,
"status": 'error'
}
super().__init__(std_data, encoder, safe, json_dumps_params, **kwargs)
"""
标准返回
"""
def funcSuccess(data=None, msg='success', **kwargs):
"""
普通函数成功返回格式
:param data:
:param msg:
:return:
"""
return {
"result": True,
"msg": msg,
"data": data,
}
def funcError(data=None, msg='error', **kwargs):
"""
普通函数失败返回格式
:param data:
:param msg:
:return:
"""
return {
"result": False,
"msg": msg,
"data": data,
}
def funcResult(result=True, data=None, msg='success', **kwargs):
"""
普通函数返回格式
:param result:
:param data:
:param msg:
:return:
"""
if result:
return funcSuccess(data=data, msg=msg)
return funcError(data=data, msg=msg)
def paginate(data=None, count=0, next=None, previous=None, msg='success', code=2000):
"""
标准分页返回, 分页错误时:应该直接使用pagination()即可,无需传入任何参数
:param data: 默认为[]
:param count: 默认为len(data); 总计值, 不是data的元素个数,相当于count(*);仅当不传入count时, count==len(data)
:param next: 默认为None, 下一页
:param previous: 默认为None, 上一页
:param msg: 默认为success
:param code: 默认为2000, 建议不传入参数,使用默认即可
:return:
"""
if not data:
data = []
if not count:
count = len(data)
return {
"code": code,
"data": {
"count": count,
"next": next,
"previous": previous,
"results": data
},
"msg": msg,
"status": "success"
}
from django.utils.functional import cached_property
from rest_framework import serializers
from rest_framework.fields import empty
from rest_framework.request import Request
from rest_framework.serializers import ModelSerializer
from rest_framework.utils.serializer_helpers import BindingDict
class CustomModelSerializer(ModelSerializer):
"""
增强DRF的ModelSerializer,可自动更新模型的审计字段记录
(1)仅当op_drf.generics.GenericAPIView的子类里使用时有效
(2)非op_drf.generics.GenericAPIView里使用时, 与ModelSerializer作用一样,没人任何增强
(3)self.request能获取到rest_framework.request.Request对象
"""
# 修改人的审计字段名称, 默认modifier, 继承使用时可自定义覆盖
modifier_field_name = 'modifier'
# 创建人的审计字段名称, 默认creator, 继承使用时可自定义覆盖
creator_field_name = 'creator'
# 数据所属部门字段
dept_belong_id_field_name = 'dept_belong_id'
# 添加默认时间返回格式
create_datetime = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S", required=False, read_only=True)
update_datetime = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S", required=False, read_only=True)
creator_name = serializers.SlugRelatedField(slug_field="username", source="creator", read_only=True)
def __init__(self, instance=None, data=empty, request=None, **kwargs):
super().__init__(instance, data, **kwargs)
self.request: Request = request
def save(self, **kwargs):
return super().save(**kwargs)
def create(self, validated_data):
if self.context.get('request'):
self.request = self.context.get('request')
if self.request:
username = self.get_request_username()
if self.modifier_field_name in self.fields.fields:
validated_data[self.modifier_field_name] = username
if self.creator_field_name in self.fields.fields:
validated_data[self.creator_field_name] = self.request.user
if self.dept_belong_id_field_name in self.fields.fields:
validated_data[self.dept_belong_id_field_name] = getattr(self.request.user, 'dept_id', None)
return super().create(validated_data)
def update(self, instance, validated_data):
if self.request:
if hasattr(self.instance, self.modifier_field_name):
self.instance.modifier = self.get_request_username()
return super().update(instance, validated_data)
def get_request_username(self):
if getattr(self.request, 'user', None):
return getattr(self.request.user, 'username', None)
return None
@cached_property
def fields(self):
fields = BindingDict(self)
for key, value in self.get_fields().items():
fields[key] = value
if not hasattr(self, '_context'):
return fields
is_root = self.root == self
parent_is_list_root = self.parent == self.root and getattr(self.parent, 'many', False)
if not (is_root or parent_is_list_root):
return fields
try:
request = self.request or self.context['request']
except KeyError:
return fields
params = getattr(
request, 'query_params', getattr(request, 'GET', None)
)
if params is None:
pass
try:
filter_fields = params.get('_fields', None).split(',')
except AttributeError:
filter_fields = None
try:
omit_fields = params.get('_omit', None).split(',')
except AttributeError:
omit_fields = []
existing = set(fields.keys())
if filter_fields is None:
allowed = existing
else:
allowed = set(filter(None, filter_fields))
omitted = set(filter(None, omit_fields))
for field in existing:
if field not in allowed:
fields.pop(field, None)
if field in omitted:
fields.pop(field, None)
return fields
"""
重写校验器返回字段
"""
from rest_framework.validators import UniqueValidator, qs_exists
from apps.vadmin.utils.exceptions import APIException
class CustomUniqueValidator(UniqueValidator):
"""
继承,重写必填字段的验证器结果,防止字段暴露
"""
def __call__(self, value, serializer_field):
# Determine the underlying model field name. This may not be the
# same as the serializer field name if `source=<>` is set.
field_name = serializer_field.source_attrs[-1]
# Determine the existing instance, if this is an update operation.
instance = getattr(serializer_field.parent, 'instance', None)
queryset = self.queryset
queryset = self.filter_queryset(value, queryset, field_name)
queryset = self.exclude_current_instance(queryset, instance)
if qs_exists(queryset):
raise APIException(message=self.message)
def __repr__(self):
return super().__repr__()
import logging
import traceback
from types import FunctionType, MethodType
from rest_framework.exceptions import APIException as DRFAPIException
from rest_framework.request import Request
from rest_framework.views import APIView
from apps.vadmin.utils import exceptions
from .response import ErrorResponse
logger = logging.getLogger(__name__)
def op_exception_handler(ex, context):
"""
统一异常拦截处理
目的:(1)取消所有的500异常响应,统一响应为标准错误返回
(2)准确显示错误信息
:param ex:
:param context:
:return:
"""
msg = ''
if isinstance(ex, DRFAPIException):
# set_rollback()
msg = ex.detail
elif isinstance(ex, exceptions.APIException):
msg = ex.message
elif isinstance(ex, Exception):
logger.error(traceback.format_exc())
msg = str(ex)
return ErrorResponse(msg=msg)
class CustomAPIView(APIView):
"""
继承、增强DRF的APIView
"""
extra_permission_classes = ()
# 仅当GET方法时会触发该权限的校验
GET_permission_classes = ()
# 仅当POST方法时会触发该权限的校验
POST_permission_classes = ()
# 仅当DELETE方法时会触发该权限的校验
DELETE_permission_classes = ()
# 仅当PUT方法时会触发该权限的校验
PUT_permission_classes = ()
view_logger_classes = ()
def initial(self, request: Request, *args, **kwargs):
super().initial(request, *args, **kwargs)
self.check_extra_permissions(request)
self.check_method_extra_permissions(request)
def get_view_loggers(self, request: Request, *args, **kwargs):
logger_classes = self.view_logger_classes or []
if not logger_classes:
return []
view_loggers = [logger_class(view=self, request=request, *args, **kwargs) for logger_class in logger_classes]
return view_loggers
def handle_logging(self, request: Request, *args, **kwargs):
view_loggers = self.get_view_loggers(request, *args, **kwargs)
method = request.method.lower()
for view_logger in view_loggers:
view_logger.handle(request, *args, **kwargs)
logger_fun = getattr(view_logger, f'handle_{method}', f'handle_other')
if logger_fun and isinstance(logger_fun, (FunctionType, MethodType)):
logger_fun(request, *args, **kwargs)
def get_extra_permissions(self):
return [permission() for permission in self.extra_permission_classes]
def check_extra_permissions(self, request: Request):
for permission in self.get_extra_permissions():
if not permission.has_permission(request, self):
self.permission_denied(
request, message=getattr(permission, 'message', None)
)
def get_method_extra_permissions(self):
_name = self.request.method.upper()
method_extra_permission_classes = getattr(self, f"{_name}_permission_classes", None)
if not method_extra_permission_classes:
return []
return [permission() for permission in method_extra_permission_classes]
def check_method_extra_permissions(self, request):
for permission in self.get_method_extra_permissions():
if not permission.has_permission(request, self):
self.permission_denied(
request, message=getattr(permission, 'message', None)
)
from django.apps import AppConfig
class PermissionConfig(AppConfig):
name = 'apps.vadmin.permission'
verbose_name = "权限管理"
import django_filters
from django.contrib.auth import get_user_model
from apps.vadmin.permission.models import Menu, Dept, Post, Role
from apps.vadmin.utils.model_util import get_dept
UserProfile = get_user_model()
class MenuFilter(django_filters.rest_framework.FilterSet):
"""
菜单管理 简单序过滤器
"""
name = django_filters.CharFilter(lookup_expr='icontains')
class Meta:
model = Menu
exclude = ('description', 'creator', 'modifier')
class DeptFilter(django_filters.rest_framework.FilterSet):
"""
部门管理 简单序过滤器
"""
deptName = django_filters.CharFilter(lookup_expr='icontains')
class Meta:
model = Dept
exclude = ('description', 'creator', 'modifier')
class PostFilter(django_filters.rest_framework.FilterSet):
"""
岗位管理 简单序过滤器
"""
postName = django_filters.CharFilter(lookup_expr='icontains')
class Meta:
model = Post
exclude = ('description', 'creator', 'modifier')
class RoleFilter(django_filters.rest_framework.FilterSet):
"""
角色管理 简单序过滤器
"""
roleName = django_filters.CharFilter(lookup_expr='icontains')
class Meta:
model = Role
exclude = ('description', 'creator', 'modifier')
class UserProfileFilter(django_filters.rest_framework.FilterSet):
"""
用户管理 简单序过滤器
"""
username = django_filters.CharFilter(lookup_expr='icontains')
mobile = django_filters.CharFilter(lookup_expr='icontains')
deptId = django_filters.CharFilter(method='filter_deptId')
def filter_deptId(self, queryset, name, value):
return queryset.filter(dept__id__in=get_dept(dept_id=value))
class Meta:
model = UserProfile
exclude = ('secret', 'password',)
import logging
import os
from django.conf import settings
from django.core.management.base import BaseCommand
from django.db import connection
from apps.vadmin.scripts import getSql
logger = logging.getLogger(__name__)
class Command(BaseCommand):
"""
项目初始化命令: python manage.py init
"""
def customSql(self, sql_list, model_name, table_name, is_yes):
"""
批量执行sql
:param sql_list:
:param table_name: 表名
:return:
"""
with connection.cursor() as cursor:
num = 0
for ele in table_name.split(','):
cursor.execute("select count(*) from {}".format(ele))
result = cursor.fetchone()
num += result[0]
if num > 0:
while True:
if is_yes is None:
inp = input(f'[{model_name}]模型已初始化完成,继续将清空[{table_name}]表中所有数据,是否继续初始化?【 Y/N 】')
else:
inp = 'Y' if is_yes == True else 'N'
if inp.upper() == 'N':
return False
elif inp.upper() == 'Y':
logger.info(f'正在清空[{table_name}]中数据...')
cursor.execute("SET foreign_key_checks = 0")
for ele in table_name.split(','):
cursor.execute("truncate table {};".format(ele))
cursor.execute("SET foreign_key_checks = 1")
connection.commit()
logger.info(f'清空[{table_name}]中数据{result[0]}条')
break
for sql in sql_list:
try:
cursor.execute(sql)
except Exception as e:
print(e)
connection.commit()
return True
def init(self, sql_filename, model_name, table_name, is_yes):
"""
初始化
:param sql_filename: sql存放位置
:param model_name: 模块名
:param table_name: 表名
:return:
"""
logger.info(f'正在初始化[{model_name}]中...')
if self.customSql(getSql(sql_filename), model_name, table_name, is_yes):
logger.info(f'[{model_name}]初始化完成!')
else:
logger.info(f'已取消[{table_name}]初始化')
def add_arguments(self, parser):
parser.add_argument('init_name', nargs='*', type=str, )
parser.add_argument('-y', nargs='*')
parser.add_argument('-Y', nargs='*')
parser.add_argument('-n', nargs='*')
parser.add_argument('-N', nargs='*')
def handle(self, *args, **options):
user_name = "_".join(settings.AUTH_USER_MODEL.lower().split("."))
init_dict = {
'system_dictdata': [os.path.join('system', 'system_dictdata.sql'), '字典管理', 'system_dictdata'],
'system_dictdetails': [os.path.join('system', 'system_dictdetails.sql'), '字典详情', 'system_dictdetails'],
'system_configsettings': [os.path.join('system', 'system_configsettings.sql'), '参数设置',
'system_configsettings'],
'permission_post': [os.path.join('permission', 'permission_post.sql'), '岗位管理', 'permission_post'],
'permission_dept': [os.path.join('permission', 'permission_dept.sql'), '部门管理', 'permission_dept'],
'permission_menu': [os.path.join('permission', 'permission_menu.sql'), '菜单管理', 'permission_menu'],
'permission_role': [os.path.join('permission', 'permission_role.sql'), '角色管理',
','.join(['permission_role', 'permission_role_dept', 'permission_role_menu'])],
'permission_userprofile': [os.path.join('permission', 'permission_userprofile.sql'), '用户管理', ','.join(
[f'{user_name}_groups', f'{user_name}', f'{user_name}_role', f'{user_name}_post'])]
}
init_name = options.get('init_name')
is_yes = None
if isinstance(options.get('y'), list) or isinstance(options.get('Y'), list):
is_yes = True
if isinstance(options.get('n'), list) or isinstance(options.get('N'), list):
is_yes = False
if init_name:
[self.init(*init_dict[ele], is_yes=is_yes) for ele in init_name if ele in init_dict]
else:
for ele in init_dict.values():
self.init(*ele, is_yes=is_yes)
from apps.vadmin.permission.models.dept import Dept
from apps.vadmin.permission.models.menu import Menu
from apps.vadmin.permission.models.post import Post
from apps.vadmin.permission.models.role import Role
from apps.vadmin.permission.models.users import UserProfile
from django.db.models import CASCADE
from django.db.models import CharField, IntegerField, ForeignKey
from apps.vadmin.op_drf.models import CoreModel
class Dept(CoreModel):
deptName = CharField(max_length=64, verbose_name="部门名称")
orderNum = IntegerField(verbose_name="显示排序")
owner = CharField(max_length=32, verbose_name="负责人", null=True, blank=True)
phone = CharField(max_length=32, verbose_name="联系电话", null=True, blank=True)
email = CharField(max_length=32, verbose_name="邮箱", null=True, blank=True)
status = CharField(max_length=8, verbose_name="部门状态", null=True, blank=True)
parentId = ForeignKey(to='permission.Dept', on_delete=CASCADE, default=False, verbose_name="上级部门",
db_constraint=False, null=True, blank=True)
class Meta:
verbose_name = '部门管理'
verbose_name_plural = verbose_name
def __str__(self):
return f"{self.deptName}"
from django.core.cache import cache
from django.db.models import IntegerField, ForeignKey, CharField, CASCADE, Q
from application import settings
from apps.vadmin.op_drf.models import CoreModel
class Menu(CoreModel):
# MENU_TYPE_CHOICES = (
# ("0", "目录"),
# ("1", "菜单"),
# ("2", "按钮"),
# )
# METHOD_CHOICE = (
# ('GET', 'GET'),
# ('POST', 'POST'),
# ('PUT', 'PUT'),
# ('PATCH', 'PATCH'),
# ('DELETE', 'DELETE'),
# ('HEAD', 'HEAD'),
# ('OPTIONS', 'OPTIONS'),
# ('TRACE', 'TRACE'),
# )
parentId = ForeignKey(to='Menu', on_delete=CASCADE, verbose_name="上级菜单", null=True, blank=True, db_constraint=False)
menuType = CharField(max_length=8, verbose_name="菜单类型")
icon = CharField(max_length=64, verbose_name="菜单图标", null=True, blank=True)
name = CharField(max_length=64, verbose_name="菜单名称")
orderNum = IntegerField(verbose_name="显示排序")
isFrame = CharField(max_length=8, verbose_name="是否外链")
web_path = CharField(max_length=128, verbose_name="前端路由地址", null=True, blank=True)
component_path = CharField(max_length=128, verbose_name="前端组件路径", null=True, blank=True)
interface_path = CharField(max_length=256, verbose_name="后端接口路径", null=True, blank=True)
interface_method = CharField(max_length=16, default='GET', verbose_name="接口请求方式")
perms = CharField(max_length=256, verbose_name="权限标识", null=True, blank=True)
status = CharField(max_length=8, verbose_name="菜单状态")
visible = CharField(max_length=8, verbose_name="显示状态")
isCache = CharField(max_length=8, verbose_name="是否缓存")
@classmethod
def get_interface_dict(cls):
"""
获取所有接口列表
:return:
"""
try:
interface_dict = cache.get('permission_interface_dict', {}) if getattr(settings, "REDIS_ENABLE",
False) else {}
except:
interface_dict = {}
if not interface_dict:
for ele in Menu.objects.filter(~Q(interface_path=''), ~Q(interface_path=None), status='1', ).values(
'interface_path', 'interface_method'):
if ele.get('interface_method') in interface_dict:
interface_dict[ele.get('interface_method', '')].append(ele.get('interface_path'))
else:
interface_dict[ele.get('interface_method', '')] = [ele.get('interface_path')]
if getattr(settings, "REDIS_ENABLE", False):
cache.set('permission_interface_dict', interface_dict, 84600)
return interface_dict
@classmethod
def delete_cache(cls):
"""
清空缓存中的接口列表
:return:
"""
if getattr(settings, "REDIS_ENABLE", False):
cache.delete('permission_interface_dict')
class Meta:
verbose_name = '菜单管理'
verbose_name_plural = verbose_name
def __str__(self):
return f"{self.name}"
from django.db.models import IntegerField, BooleanField, CharField, TextField
from apps.vadmin.op_drf.models import CoreModel
class Post(CoreModel):
postName = CharField(null=False, max_length=64, verbose_name="岗位名称")
postCode = CharField(max_length=32, verbose_name="岗位编码")
postSort = IntegerField(verbose_name="岗位顺序")
status = CharField(max_length=8, verbose_name="岗位状态")
remark = TextField(verbose_name="备注", help_text="备注", null=True, blank=True)
class Meta:
verbose_name = '岗位管理'
verbose_name_plural = verbose_name
def __str__(self):
return f"{self.postName}"
from django.db.models import IntegerField, BooleanField, CharField, TextField, ManyToManyField
from apps.vadmin.op_drf.models import CoreModel
class Role(CoreModel):
DATASCOPE_CHOICES = (
('1', "全部数据权限"),
('2', "自定数据权限"),
('3', "本部门数据权限"),
('4', "本部门及以下数据权限"),
('5', "仅本人数据权限"),
)
roleName = CharField(max_length=64, verbose_name="角色名称")
roleKey = CharField(max_length=64, verbose_name="权限字符")
roleSort = IntegerField(verbose_name="角色顺序")
status = CharField(max_length=8, verbose_name="角色状态")
admin = BooleanField(default=False, verbose_name="是否为admin")
dataScope = CharField(max_length=8, default='1', choices=DATASCOPE_CHOICES, verbose_name="权限范围", )
remark = TextField(verbose_name="备注", help_text="备注", null=True, blank=True)
dept = ManyToManyField(to='permission.Dept', verbose_name='数据权限-关联部门', db_constraint=False)
menu = ManyToManyField(to='permission.Menu', verbose_name='关联菜单权限', db_constraint=False)
class Meta:
verbose_name = '角色管理'
verbose_name_plural = verbose_name
def __str__(self):
return f"{self.roleName}"
from uuid import uuid4
from django.conf import settings
from django.contrib.auth.models import UserManager, AbstractUser
from django.core.cache import cache
from django.db.models import IntegerField, ForeignKey, CharField, TextField, ManyToManyField, CASCADE
from apps.vadmin.op_drf.models import CoreModel
class UserProfile(AbstractUser, CoreModel):
USER_TYPE_CHOICES = (
(0, "后台用户"),
(1, "前台用户"),
)
objects = UserManager()
username = CharField(max_length=150, unique=True, db_index=True, verbose_name='用户账号')
secret = CharField(max_length=255, default=uuid4, verbose_name='加密秘钥')
email = CharField(max_length=255, verbose_name="邮箱", null=True, blank=True)
mobile = CharField(max_length=255, verbose_name="电话", null=True, blank=True)
avatar = TextField(verbose_name="头像", null=True, blank=True)
name = CharField(max_length=40, verbose_name="姓名")
gender = CharField(max_length=8, verbose_name="性别", null=True, blank=True)
remark = TextField(verbose_name="备注", null=True)
user_type = IntegerField(default=0, verbose_name="用户类型")
post = ManyToManyField(to='permission.Post', verbose_name='关联岗位', db_constraint=False)
role = ManyToManyField(to='permission.Role', verbose_name='关联角色', db_constraint=False)
dept = ForeignKey(to='permission.Dept', verbose_name='归属部门', on_delete=CASCADE, db_constraint=False, null=True,
blank=True)
@property
def get_user_interface_dict(self):
interface_dict = cache.get(f'permission_interface_dict_{self.username}', {}) if \
getattr(settings, "REDIS_ENABLE", False) else {}
if not interface_dict:
for ele in self.role.filter(status='1', menu__status='1').values('menu__interface_path',
'menu__interface_method').distinct():
interface_path = ele.get('menu__interface_path')
if interface_path is None or interface_path == '':
continue
if ele.get('menu__interface_method') in interface_dict:
interface_dict[ele.get('menu__interface_method', '')].append(interface_path)
else:
interface_dict[ele.get('menu__interface_method', '')] = [interface_path]
if getattr(settings, "REDIS_ENABLE", False):
cache.set(f'permission_interface_dict_{self.username}', interface_dict, 84600)
return interface_dict
@property
def delete_cache(self):
"""
清空缓存中的接口列表
:return:
"""
if not getattr(settings, "REDIS_ENABLE", False): return ""
return cache.delete(f'permission_interface_dict_{self.username}')
class Meta:
abstract = settings.AUTH_USER_MODEL != 'permission.UserProfile'
verbose_name = '用户管理'
verbose_name_plural = verbose_name
def __str__(self):
if self.name:
return f"{self.username}({self.name})"
return f"{self.username}"
"""
常用的Permission以及DRF的Permission
@author: ruoxing
"""
import logging
from django.contrib.auth import get_user_model
from rest_framework.permissions import (BasePermission,
)
from rest_framework.request import Request
from rest_framework.views import APIView
from apps.vadmin.permission.models import Dept
from apps.vadmin.utils.model_util import get_dept
logger = logging.getLogger(__name__)
User = get_user_model()
class CustomPermission(BasePermission):
def __init__(self, message=None) -> None:
super().__init__()
self.message = getattr(self.__class__, 'message', '无权限')
self.user: User = None
def init_permission(self, request: Request, view: APIView):
self.user = request.user
def has_permission(self, request: Request, view: APIView):
return True
def has_object_permission(self, request: Request, view: APIView, obj):
return True
class CommonPermission(CustomPermission):
"""
通用权限类,判断用户是否有这条数据的查询权限,如没有则直接没有操作权限
0. 获取用户的部门id,没有部门则返回 False
1. 判断过滤的数据是否有创建人所在部门 "dept_belong_id" 字段,没有则返回 True
2. 如果用户没有关联角色则校验该数据是否属于本部门
3. 根据所有角色 获取所有权限范围
3.1 判断用户是否为超级管理员角色/如果有1(所有数据) 则有权限操作
4. 只为仅本人数据权限时只有操作本人数据权限,并且部门为自己本部门(考虑到用户会变部门,只能看当前用户所在的部门数据)
5. 自定数据权限 获取部门,根据部门判断,是否有权限操作
"""
message = '没有有操作权限'
def check_queryset(self, request, instance):
# 0. 获取用户的部门id,没有部门则返回 False
user_dept_id = getattr(request.user, 'dept_id')
if not user_dept_id:
self.message = "该用户无部门,无权限操作!"
return False
# 1. 判断过滤的数据是否有创建人所在部门 "dept_belong_id" 字段,没有则返回 True
if not getattr(instance, 'dept_belong_id', None):
return True
# 2. 如果用户没有关联角色则校验该数据是否属于本部门
if not hasattr(request.user, 'role'):
self.message = "该用户无角色,无权限操作!"
return False
# 3. 根据所有角色 获取所有权限范围
role_list = request.user.role.filter(status='1').values('admin', 'dataScope')
dataScope_list = []
for ele in role_list:
# 3.1 判断用户是否为超级管理员角色/如果有1(所有数据) 则有权限操作
if '1' == ele.get('dataScope') or ele.get('admin') == True:
return True
dataScope_list.append(ele.get('dataScope'))
dataScope_list = list(set(dataScope_list))
# 4. 只为仅本人数据权限时只有操作本人数据权限,并且部门为自己本部门(考虑到用户会变部门,只能看当前用户所在的部门数据)
if dataScope_list == ['5']:
return int(instance.dept_belong_id) == user_dept_id and request.user == instance.creator
# 5. 自定数据权限 获取部门,根据部门判断,是否有权限操作
dept_list = []
for ele in dataScope_list:
if ele == '2':
dept_list.extend(request.user.role.filter(status='1').values_list('dept__id', flat=True))
elif ele == '3':
dept_list.append(user_dept_id)
elif ele == '4':
dept_list.extend(get_dept(user_dept_id, ))
return int(instance.dept_belong_id) in list(set(dept_list))
def has_permission(self, request: Request, view: APIView):
return True
def has_object_permission(self, request: Request, view: APIView, instance):
self.message = f"没有此数据操作权限!"
res = self.check_queryset(request, instance)
return res
class DeptDestroyPermission(CustomPermission):
"""
部门删除权限校验:判断部门下是否有用户存在,存在不可删除
"""
message = '没有有操作权限'
def has_permission(self, request: Request, view: APIView):
return True
def check_queryset(self, request, instance):
if list(filter(None, instance.values_list('userprofile', flat=True))):
self.message = "该部门下有关联用户,无法删除!"
return False
if Dept.objects.filter(parentId__in=instance).count() > 0:
self.message = "该部门下有下级部门,请先删除下级部门!"
return False
return True
def has_object_permission(self, request: Request, view: APIView, instance):
res = self.check_queryset(request, instance)
return res
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论