造个Python轮子,实现根据Excel生成Model和数据导入脚本

python,轮子,实现,根据,excel,生成,model,数据,导入,脚本 · 浏览次数 : 714

小编点评

当然,以下内容可以帮助生成内容时带简单的排版: 1.在定义每个字段的长度之前,先进行一下排版,例如: ```python field.max_length = item_len + 32 if item_len > field.max_length else field.max_length print(field.verbose_name, field.name, field.max_length) ``` 2.在遍历数据确定每个字段的最大长度之后,再进行一些排版,例如: ```python if item_len > field.max_length: field.max_length = item_len + 32 print(field.verbose_name, field.name, field.max_length) ``` 3.在生成文件时,可以将一些排版信息写入到文件中,例如: ```python context.update({'model': Model( model_name, verbose_name, self.find_field_by_verbose_name(id_field_verbose_name), self.fields ), 'excel_filepath': self.filepath, 'excel_header': self.header_index, }) print(context.get('model')) print(context.get('excel_filepath')) print(context.get('excel_header')) ``` 4.最后,在生成文件时,可以将一些排版信息写入到文件中,例如: ```python with open(output_filepath, 'w+', encoding='utf-8') as f: render_result = template.render(context) f.write(render_result) ```

正文

前言

最近遇到一个需求,有几十个Excel,每个的字段都不一样,然后都差不多是第一行是表头,后面几千上万的数据,需要把这些Excel中的数据全都加入某个已经上线的Django项目

这就需要每个Excel建个表,然后一个个导入了

这样的效率太低,不能忍

所以我造了个自动生成 Model 和导入脚本的轮子

思路

首先拿出 pandas,它的 DataFrame 用来处理数据很方便

pandas 加载 Excel 之后,提取表头,我们要通过表头来生成数据表的字段。有些 Excel 的表头是中文的,需要先做个转换。

一开始我是想用翻译API,全都翻译成英文,不过发现免费的很慢有限额,微软、DeepL都要申请,很麻烦。索性用个拼音转换库,全都转换成拼音得了~

然后字段的长度也要确定,或者全部用不限制长度的 TextField

权衡一下,我还是做一下字段长度判定的逻辑,遍历整个表,找出各个字段最长的数据,然后再加一个偏移量,作为最大长度。

接着生成 Model 类,这里我用 jinja2 模板语言,先把大概的模板写好,然后根据提取出来的字段名啥的生成。

最后生成 admin 配置和导入脚本,同理,也是用 jinja2 模板。

实现

简单介绍下思路,现在开始上代码。

就几行而已,Python很省代码~

模型

首先定义俩模型

字段模型

class Field(object):
    def __init__(self, name: str, verbose_name: str, max_length: int = 128):
        self.name = name
        self.verbose_name = verbose_name
        self.max_length = max_length

    def __str__(self):
        return f'<Field>{self.name}:{self.verbose_name}'

    def __repr__(self):
        return self.__str__()

Model模型

为了符合Python关于变量的命名规范,snake_name 属性是用正则表达式实现驼峰命名转蛇形命名

class Model(object):
    def __init__(self, name: str, verbose_name: str, id_field: Field, fields: List[Field]):
        self.name = name
        self.verbose_name = verbose_name
        self.id_field = id_field
        self.fields: List[Field] = fields

    @property
    def snake_name(self):
        import re
        pattern = re.compile(r'(?<!^)(?=[A-Z])')
        name = pattern.sub('_', self.name).lower()
        return name

    def __str__(self):
        return f'<Model>{self.name}:{self.verbose_name}'

    def __repr__(self):
        return self.__str__()

代码模板

使用 jinja2 实现。

本身 jinja2 是 Flask、Django 之类的框架用来渲染网页的。

不过单独使用的效果也不错,我的 DjangoStarter 框架也是用这个 jinja2 来自动生成 CRUD 代码~

Model模板

# -*- coding:utf-8 -*-
from django.db import models

class {{ model.name }}(models.Model):
    """{{ model.verbose_name }}"""
    {% for field in model.fields -%}
    {{ field.name }} = models.CharField('{{ field.verbose_name }}', default='', null=True, blank=True, max_length={{ field.max_length }})
    {% endfor %}
    class Meta:
        db_table = '{{ model.snake_name }}'
        verbose_name = '{{ model.verbose_name }}'
        verbose_name_plural = verbose_name

Admin配置模板

@admin.register({{ model.name }})
class {{ model.name }}Admin(admin.ModelAdmin):
    list_display = [{% for field in model.fields %}'{{ field.name }}', {% endfor %}]
    list_display_links = None

    def has_add_permission(self, request):
        return False

    def has_delete_permission(self, request, obj=None):
        return False

    def has_view_permission(self, request, obj=None):
        return False

数据导入脚本

这里做了几件事:

  • 使用 pandas 处理空值,填充空字符串
  • 已有数据进行批量更新
  • 新数据批量插入

更新逻辑麻烦一点,因为数据库一般都有每次最大更新数量的限制,所以我做了分批处理,通过 update_data_once_max_lines 控制每次最多同时更新多少条数据。

def import_{{ model.snake_name }}():
    file_path = path_proc(r'{{ excel_filepath }}')

    logger.info(f'读取文件: {file_path}')
    xlsx = pd.ExcelFile(file_path)
    df = pd.read_excel(xlsx, 0, header={{ excel_header }})
    df.fillna('', inplace=True)

    logger.info('开始处理数据')

    id_field_list = {{ model.name }}.objects.values_list('{{ model.id_field.name }}', flat=True)
    item_list = list({{ model.name }}.objects.all())

    def get_item(id_value):
        for i in item_list:
            if i.shen_qing_ren_zheng_jian_hao_ma == id_value:
                return i
        return None

    insert_data = []
    update_data_once_max_lines = 100
    update_data_sub_set_index = 0
    update_data = [[]]
    update_fields = set()

    for index, row in df.iterrows():
        if '{{ model.id_field.verbose_name }}' not in row:
            logger.error('id_field {} is not existed'.format('{{ model.id_field.verbose_name }}'))
            continue

        if row['{{ model.id_field.verbose_name }}'] in id_field_list:
            item = get_item(row['{{ model.id_field.verbose_name }}'])
            {% for field in model.fields -%}
            if '{{ field.verbose_name }}' in row:
                if item.{{ field.name }} != row['{{ field.verbose_name }}']:
                    item.{{ field.name }} = row['{{ field.verbose_name }}']
                    update_fields.add('{{ field.name }}')
            {% endfor %}
            if len(update_data[update_data_sub_set_index]) >= update_data_once_max_lines:
                update_data_sub_set_index += 1
                update_data.append([])
            update_data[update_data_sub_set_index].append(item)
        else:
            # {% for field in model.fields -%}{{ field.verbose_name }},{%- endfor %}
            model_obj = {{ model.name }}()
            {% for field in model.fields -%}
            if '{{ field.verbose_name }}' in row:
                model_obj.{{ field.name }} = row['{{ field.verbose_name }}']
            {% endfor %}
            insert_data.append(model_obj)

    logger.info('开始批量导入')
    {{ model.name }}.objects.bulk_create(insert_data)
    logger.info('导入完成')

    if len(update_data[update_data_sub_set_index]) > 0:
        logger.info('开始批量更新')
        for index, update_sub in enumerate(update_data):
            logger.info(f'正在更新 {index * update_data_once_max_lines}-{(index + 1) * update_data_once_max_lines} 条数据')
            {{ model.name }}.objects.bulk_update(update_sub, list(update_fields))
        logger.info('更新完成')

主体代码

剩下的全是核心代码了

引用依赖

先把用到的库导入

import os
import re
from typing import List, Optional

from pypinyin import pinyin, lazy_pinyin, Style
from jinja2 import Environment, PackageLoader, FileSystemLoader

或者后面直接去我的完整代码里面拿也行~

老规矩,我封装了一个类。

构造方法需要指定 Excel 文件地址,还有表头的行索引。

class ExcelToModel(object):
    def __init__(self, filepath, header_index=0):
        self.filepath = filepath
        self.header_index = header_index
        self.columns = []
        self.fields: List[Field] = []

        self.base_dir = os.path.dirname(os.path.abspath(__file__))
        self.template_path = os.path.join(self.base_dir, 'templates')
        self.jinja2_env = Environment(loader=FileSystemLoader(self.template_path))

        self.load_file()

这里面有个 self.load_file() 后面再贴。

字段名中文转拼音

用了 pypinyin 这个库,感觉还不错。

转换后用正则表达式,去除符号,只保留英文和数字。

代码如下,也是放在 ExcelToModel 类里边。

@staticmethod
def to_pinyin(text: str) -> str:
    pattern = r'~`!#$%^&*()_+-=|\';"":/.,?><~·!@#¥%……&*()——+-=“:’;、。,?》{《}】【\n\]\[ '
    text = re.sub(r"[%s]+" % pattern, "", text)
    return '_'.join(lazy_pinyin(text, style=Style.NORMAL))

加载文件

拿出万能的 pandas,按照前面说的思路,提取表头转换成字段,并且遍历数据确定每个字段的最大长度,我这里偏移值是32,即在当前数据最大长度基础上加上32个字符。

def load_file(self):
    import pandas as pd
    xlsx = pd.ExcelFile(self.filepath)
    df = pd.read_excel(xlsx, 0, header=self.header_index)
    df.fillna('', inplace=True)
    self.columns = list(df.columns)
    for col in self.columns:
        field = Field(self.to_pinyin(col), col)
        self.fields.append(field)
        for index, row in df.iterrows():
            item_len = len(str(row[col]))
            if item_len > field.max_length:
                field.max_length = item_len + 32

        print(field.verbose_name, field.name, field.max_length)

如果觉得这样生成表太慢,可以把确定最大长度的这块代码去掉,就下面这块代码

for index, row in df.iterrows():
    item_len = len(str(row[col]))
    if item_len > field.max_length:
        field.max_length = item_len + 32

手动指定最大长度或者换成不限制长度的 TextField 就行。

生成文件

先构造个 context 然后直接用 jinja2 的 render 功能生成代码。

为了在导入时判断数据存不存在,生成代码时要指定 id_field_verbose_name,即Excel文件中类似“证件号码”、“编号”之类的列名,注意是Excel中的表头列名。

def find_field_by_verbose_name(self, verbose_name) -> Optional[Field]:
    for field in self.fields:
        if field.verbose_name == verbose_name:
            return field
    return None

def generate_file(self, model_name: str, verbose_name: str, id_field_verbose_name: str, output_filepath: str):
    template = self.jinja2_env.get_template('output.jinja2')
    context = {
        'model': Model(
            model_name, verbose_name,
            self.find_field_by_verbose_name(id_field_verbose_name),
            self.fields
        ),
        'excel_filepath': self.filepath,
        'excel_header': self.header_index,
    }
    with open(output_filepath, 'w+', encoding='utf-8') as f:
        render_result = template.render(context)
        f.write(render_result)

使用

看代码。

tool = ExcelToModel('file.xlsx')
tool.generate_file('CitizenFertility', '房价与居民生育率', '证件号码', 'output/citizen_fertility.py')

生成出来的代码都在一个文件里,请根据实际情况放到项目的各个位置。

完整代码

发布到Github了

地址: https://github.com/Deali-Axy/excel_to_model

小结

目前看来完美契合需求,极大节省工作量~

实际跑起来,不得不吐槽 Python 羸弱的性能,占内存还大… 凑合着用吧。也许后面有时间会优化一下~

与造个Python轮子,实现根据Excel生成Model和数据导入脚本相似的内容:

造个Python轮子,实现根据Excel生成Model和数据导入脚本

前言 最近遇到一个需求,有几十个Excel,每个的字段都不一样,然后都差不多是第一行是表头,后面几千上万的数据,需要把这些Excel中的数据全都加入某个已经上线的Django项目 这就需要每个Excel建个表,然后一个个导入了 这样的效率太低,不能忍 所以我造了个自动生成 Model 和导入脚本的轮

前端文件上传的几种交互造轮子

前端文件上传本来是一个常规交互操作,没什么特殊性可言,但是最近在做文件上传,需要实现截图粘贴上传,去找了下有没有什么好用的组件,网上提供的方法有,但是没找完整的组件来支持cv上传,经过了解发现可以用剪贴板功能让自己的cv实现文件上传,于是自己就整合了目前几种文件上传的交互方式,码了一个支持cv的vue3文件上传组件(造个轮子)。

树莓派安装向日葵教程

树莓派安装向日葵教程 Raspberry Pi版本:2024-03-15-raspios-bookworm-arm64-full.img 下载麒麟arm版本客户端 向日葵远程控制app官方下载 - 贝锐向日葵官网 安装依赖包 sudo apt-get update sudo apt-get inst

造轮子之消息实时推送

前面我们的EventBus已经弄好了,那么接下来通过EventBus来实现我们的消息推送就是自然而然的事情了。说到消息推送,很多人肯定会想到Websocket,既然我们使用Asp.net core,那么SignalR肯定是我们的首选。接下来就用SignalR来实现我们的消息实时推送。 Notific

造轮子之EventBus

前面基础管理的功能基本开发完了,接下来我们来优化一下开发功能,来添加EventBus功能。EventBus也是我们使用场景非常广的东西。这里我会实现一个本地的EventBus以及分布式的EventBus。分别使用MediatR和Cap来实现。 现在简单介绍一下这两者:MediatR是一个轻量级的中介

造轮子之菜单管理

前面完成了基础管理的相关API,接下来就得做一个菜单管理了,用于对接管理后台前端界面。 设计菜单结构 菜单是一个多级结构,所以我们得设计一个树形的。包含自己上级和下级的属性。同时预留Permission用于做可选的权限限制。 namespace Wheel.Domain.Menus { ///

造轮子之多语言管理

多语言也是我们经常能用到的东西,asp.net core中默认支持了多语言,可以使用.resx资源文件来管理多语言配置。但是在修改资源文件后,我们的应用服务无法及时更新,属实麻烦一些。我们可以通过扩展IStringLocalizer,实现我们想要的多语言配置方式,比如Json配置,PO 文件配置,E

造轮子之权限管理

上文已经完成了自定义授权策略,那么接下来就得完善我们的权限管理了。不然没有数据,如何鉴权~ 表设计 创建我们的表实体类: namespace Wheel.Domain.Permissions { public class PermissionGrant : Entity { public

造轮子之自定义授权策略

前面我们已经弄好了用户角色这块内容,接下来就是我们的授权策略。在asp.net core中提供了自定义的授权策略方案,我们可以按照需求自定义我们的权限过滤。这里我的想法是,不需要在每个Controller或者Action打上AuthorizeAttribute,自动根据ControllerName和

造轮子之asp.net core identity

在前面我们完成了应用最基础的功能支持以及数据库配置,接下来就是我们的用户角色登录等功能了,在asp.net core中原生Identity可以让我们快速完成这个功能的开发,在.NET8中,asp.net core identity支持了WebApi的注册登录。这让我们在WebApi中可以更爽快的使用