Python学习笔记
- 🏆Python Classes and Objects Youtube最有未来科技感的Python教学
Python知识图谱
学习课堂
荔枝微课
Python学习参考
参考资料
- Django Book Online 一个基于Python的web框架
编译python最新版本
–enable-optimizations(升级gcc至8.1.0才能使用)
yum install -y gcc gcc-c++ \ zlib-devel libffi-devel openssl openssl-devel \ bzip2-devel gdbm-devel xz-devel readline-devel \ sqlite-devel libuuid-devel tk-devel tcl-devel export CC=/opt/gcc84/bin/gcc ; export CXX=/opt/gcc84/bin/g++ ; ldconfig ./configure --prefix=/opt/python3.10.6 --with-openssl=/opt/openssl \ --enable-shared --enable-optimizations \ --enable-loadable-sqlite-extensions
vim针对python的配置:
set tabstop=2 set shiftwidth=2 " 默认用2个空格 set softtabstop=2 " sts用2个空格 set autoindent " 自动缩进 set expandtab " tab键转换成空格 set listchars=tab:>~,trail:. " 把空格和tab显示为. set list
Python多核场景与方法总结
Python 在某些场景下可以通过多进程、多线程或异步编程利用多核资源,但由于全局解释器锁(GIL)的存在,多线程在 CPU 密集型任务中无法充分利用多核。以下是 Python 中可以利用多核的主要场景及方法:
CPU 密集型(多进程)
- 场景:需要大量计算(如数值计算、数据处理、密码学运算)。
- 方法:使用 multiprocessing 模块绕过 GIL,启动多个进程,每个进程运行在独立的 CPU 核心上。
- 示例:
- 并行计算(如蒙特卡洛模拟、数值积分)。
- 大规模数据处理(使用 multiprocessing.Pool 或 concurrent.futures.ProcessPoolExecutor)。
- 图像/视频处理(如 Pillow 或 OpenCV 结合多进程)。
from multiprocessing import Pool def compute_square(n): return n * n if __name__ == "__main__": with Pool(4) as p: # 使用4个进程 results = p.map(compute_square, [1, 2, 3, 4])
I/O 密集型(多线程/异步)
- 场景:涉及网络请求、文件读写、数据库查询等 I/O 等待操作。
- 方法:使用多线程(threading)或异步编程(asyncio)避免阻塞主线程,通过并发提高效率。
- 注意:虽然多线程受 GIL 限制,但 I/O 等待期间会释放 GIL,因此多线程仍可有效利用时间。
from concurrent.futures import ThreadPoolExecutor import requests def fetch_url(url): response = requests.get(url) return response.status_code urls = ["https://example.com"] * 10 with ThreadPoolExecutor(max_workers=4) as executor: results = executor.map(fetch_url, urls)
科学计算与数据处理(专用库)
- 场景:使用 NumPy、Pandas、SciPy 等库进行大规模数值计算。
- 方法:部分底层库(如 NumPy、BLAS)通过 C 扩展实现多线程并行,无需手动编码。
- 优化:设置环境变量(如 OMPNUMTHREADS)控制线程数。
import numpy as np # NumPy 的矩阵运算可能自动使用多核 a = np.random.rand(10000, 10000) b = np.random.rand(10000, 10000) c = np.dot(a, b) # 底层可能并行执行
分布式任务与批处理(框架支持)
- 场景:处理分布式任务队列(如 Celery、Dask、Ray)。
- 方法:利用框架自动分配任务到多核或多节点。
# 使用 Dask 并行处理大型数据集 import dask.array as da x = da.random.random((100000, 100000), chunks=(1000, 1000)) y = x + x.T result = y.mean().compute() # 自动并行计算
Web 服务与并发请求
- 场景:高并发的 Web 服务(如 Flask、Django)。
- 方法:使用多进程部署(如 Gunicorn 多 worker 模式)。
# 启动 Flask 服务,使用4个 worker 进程 gunicorn -w 4 app:app
机器学习模型训练
- 场景:训练机器学习模型(如 Scikit-learn、XGBoost)。
- 方法:设置 n_jobs 参数利用多核加速。
from sklearn.ensemble import RandomForestClassifier model = RandomForestClassifier(n_estimators=100, n_jobs=4) # 使用4核 model.fit(X_train, y_train)
多进程 vs 多线程 注意事项:
- CPU 密集型任务用多进程(绕过 GIL)。
- I/O 密集型任务用多线程或异步(减少等待时间)。
- 资源开销:进程间通信(IPC)成本较高,需谨慎设计。
- 第三方库:优先使用已支持并行的库(如 joblib、Dask)。
通过合理选择并发模型和工具,Python 可以高效利用多核资源提升性能。
Python图形界面
PySimpleGUI
PySimpleGUI 一个建立在Tkinter, wxPython, PyQt 甚至是Remi web之上简单但功能强大的GUI。
PyWebview
🏆Python使用pywebview做终端app 基于Chromium Embedded Framework (CEF),内置浏览器引擎,尺寸稍大,但响应比eel要快 🏆Python使用eel做终端app 最近M$的edge浏览器也使用了Chromium引擎,eel会更加流行(速度慢)
wxPython+wxFormBuilder
pyQT5+wxPython+kivy
PyScript浏览器wasm 和 beeware
BeeWare 正在积极的发展,在2024年会支持Android/iOS平台。未来可期
Flet写flutter图形app
Blender三维引擎
Python游戏编程
Minecraft
Minecraft Pi:是Minecraft为树莓派(Raspberry Pi)而设计的«我的世界»版本。该版本基于携带版但进行了些许删减,同时包含一些改进过的特性以及对多种编程语言提供支持等。树莓派版旨在为编程新手提供一种教学工具。
Turtle【最简单 + 入门】
海龟绘图(trutle)是向孩子们介绍编程的一种流行方式。它是Wally Feurzig和Seymour Papert于1966年开发的Logo编程语言的一部分。Logo语言是一种早期的编程语言,也是一种与自然语言非常接近的编程语言,它通过“绘图”的方式来学习编程,对初学者特别是青少年儿童进行寓教于乐的教学方式。
想象一只小海龟,在一个横轴为x、纵轴为y的坐标系中。以坐标原点(0,0)开始,它根据输入的一组代码指令,在平面坐标系中移动。从而在它爬行的路径上绘制了各种酷炫、奇特的图形。
Arcade【简单】
Arcade 是一个简单易用创建 2D 游戏的 Python 第三方库,非常适合初学程序员或想要在不学习复杂框架的情况下创建2D游戏的程序员。它基于 Pyglet 和OpenGL,提供了动画、精灵、装饰、场景等一系列功能,适合作为桌面系统的游戏开发平台。更重要的是,正如其设计理念,使用 Acrade 开发非常简单!
Pyglet
Pyglet 是一个面向对象方式编写游戏或其他富媒体应用的第三方多媒体框架,轻量级可以跨平台使用,支持 OpenGL、图像、音视频、事件处理等。Pyglet 是一个较为底层的开发框架,不适合直接作为游戏开发平台,在其基础上封装的 cocos2d 和 Arcade 更为合适。但这两个学起来更难,学习成本更大。Pyglet对中文支持极差,两个汉字只占一个字符,粘在一起,不推荐使用 !!!!
Pymunk
Pymunk(派曼k) 是一个简单易用的 2D 物理规则第三方库,用来在游戏或演示中增加符合物理规则的动态效果。另外一个有名的就是Pybox2D
Pymunk 不是一个游戏框架,比较适合为游戏提供相关功能,可以与 Arcade 游戏开发框架直接兼容使用。想想“愤怒的小鸟”,就知道物理有多重要了!
Pygame【强大+专业】
Pygame 是 Python 语言当之无愧的游戏入门开发第三方库,它基于 SDL 直接访问音视频硬件及输入输出外设,形成了基本的游戏开发引擎。Pygame 是进一步学习其他游戏引擎的基础。这个库尽管简单,却很强大,能够用于开发专业耐玩的桌面游戏。
Kivy【跨平台+多触控】
Kivy是Python的跨平台GUI库,不仅支持windows,macOS ,linux,还支持android和iOS。凭这一点就非常吸引我。Kivy的核心思想是界面逻辑分离,kivy的kv文件控制界面显示部分,python控制逻辑部分。kivy现在在github上有9000+ star,并且还在继续更新维护,所以值得学习和了解一下。
最简单的示例:
import kivy from kivy.app import App from kivy.uix.label import Label from kivy.uix.gridlayout import GridLayout from kivy.uix.textinput import TextInput class app(App): def build(self): return Label(text="我是中国人",font_name="Songti.ttc",font_size="48sp") if __name__ == "__main__": app().run()
sudo apt -y update sudo apt install -y python3-pip git zip unzip openjdk-8-jdk python3-pip autoconf libtool pkg-config zlib1g-dev libncurses5-dev libncursesw5-dev libtinfo5 cmake libffi-dev libssl-dev lld pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple buildozer virtualenv pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple --user --upgrade Cython==0.29.19 export PATH=$PATH:~/.local/bin/
buildozer.spec
[app] title = Beats package.name = Beatme package.domain = org.test source.dir = . source.include_exts = py,png,jpg,kv,atlas,ttf version = 0.1 requirements = python3,kivy,sqlite3,pygame,jnius,sdl2_image,sdl2_mixer,sdl2_ttf,png,jpeg icon.filename = %(source.dir)s/data/icon.png orientation = all osx.python_version = 3 osx.kivy_version = 1.9.1 fullscreen = 0 android.arch = armeabi-v7a ios.kivy_ios_url = https://github.com/kivy/kivy-ios ios.kivy_ios_branch = master ios.ios_deploy_url = https://github.com/phonegap/ios-deploy ios.ios_deploy_branch = 1.7.0 [buildozer] log_level = 2 warn_on_root = 1
一个kivy示例
from kivy.app import App from kivy.uix.label import Label from kivy.uix.button import Button from kivy.uix.boxlayout import BoxLayout class FirstApp(App): pass if __name__ == "__main__": FirstApp().run() ################################# <FirstLayout@BoxLayout>: count: 0 orientation: "vertical" padding: 10 Label: text: "Button Not Pressed"if root.count==0 else "Button is pressed " +str(root.count)+" time!" if root.count==1 else "Button is pressed " +str(root.count)+" times!" Button: text: "Press Me" size_hint: (1,0.3) on_press: root.count=root.count+1 FirstLayout:
buildozer -v android debug
Processing有趣的艺术交互
Python动画框架
- Making this animation in Python Manim 初学
Python最快WEB框架
QraphQL的演示
HappyX全栈nim框架
HappyX 中你可以通过-d:httpx和-d:micro
Socketify最快
运行实例
from socketify import App, AppOptions, AppListenOptions from prisma import Prisma from urllib.parse import parse_qs from jinja2_templates import Jinja2Template import ujson DEBUG = False db = Prisma(auto_register=True) App = App() App.template(Jinja2Template("./templates", encoding="utf-8", followlinks=False)) App.json_serializer(ujson) App.static("/static", "./static") app = App.router() def corksend(res, result="", status=200): res.cork(lambda res: res.write_status(f"{status}").end(f"{result}")) @app.get("/") async def head(res, req): if not db.is_connected(): db.connect() try: posts = db.post.find_many(take=10, order={"id":'desc'}, include={"author": True} ) context = { "framework": "Robyn", "templating_engine": "Jinja2", "data": posts } if DEBUG: print(context) res.render("index.html", **context) except Exception as e: pass @app.post("/submit") async def addpost(res, req): content_type = req.get_header("content-type") if content_type == "application/json": data = await res.get_json() elif content_type == "application/x-www-form-urlencoded": data = await res.get_form_urlencoded() else: data = await res.get_text() data = parse_qs(data) message = data.get("message",[None])[0] if DEBUG: print(f"post < {data}, message: {message}") response = "<h2>定时任务发布成功!</h2>" corksend(res, response) else: corksend(res) ########################################### if __name__ == "__main__": App.listen( AppListenOptions(host="0.0.0.0", port=8080), lambda config: print( "Listening on port http://%s:%d now\n" % (config.host, config.port)) ).run()
Sanic with Pony ORM数据库
DEBUG = False REMOTE_HOST = "192.168.144.73" FILENAME = "/tmp/.wireguard_config.json" COMMAND = "uptime" import subprocess from json import dumps import wireguard_config as wg_cfg from sanic.response import json from sanic import Sanic, request, response from sanic_ext import Extend, openapi from sanic_openapi import doc from textwrap import dedent app = Sanic("wireguard_keygen") app.ext.openapi.describe( "WireGuard API", version="0.6.18", description=dedent(""" ## 动态生成wireguard的公钥和密钥对 """), ) from pony.orm import * db = Database() class UserMetadata(db.Entity): id = PrimaryKey(int, size=64, auto=True) user_id = Required(int, size=64, default=0) already_fist_visit = Required(int, size=8, default=0) metadata = Optional(Json, nullable=True) _table_ = "user_metadata" async def getmeta(user_id): result = None if DEBUG: print( f"1 -------------------{user_id}" ) try: db.bind(provider='mysql', host='192.168.144.73', user='root', passwd='xxxx', db='test') db.generate_mapping(create_tables=True) except Exception as e: pass try: with db_session: #for p in UserMetadata.select(lambda p: p.user_id == user_id): # print(p.metadata) #count = select(u for u in UserMetadata if u.user_id == user_id).count() user = UserMetadata.get(user_id=user_id) if user: if DEBUG: print(f"2 ------------------ {user.metadata}") if user.metadata is None or user.metadata == "None" : result = { "wireguard_info": wg_cfg.getnewpair() } if DEBUG: print(f"3 ------------------ {result}") # 执行原始 SQL try: user.metadata = result except Exception as e: user.metadata = dumps(result) else: if DEBUG: print(f"4 ------------------ exist") result = True return result except Exception as e: raise e def exec_remote_cmd(host, port=65422, user="root", command="uptime"): ssh_command = f"ssh -i /root/.ssh/hzup.ssh.2018 -o StrictHostKeyChecking=no -p{port} {user}@{host} {command}" result = subprocess.run(ssh_command, shell=True, capture_output=True, text=True) return result.stdout, result.stderr def upload_remote_file(host, port=65422, user="root", filename=""): scp_command = f"scp -i /root/.ssh/hzup.ssh.2018 -o StrictHostKeyChecking=no -P{port} {filename} {host}:/root/" result = subprocess.run(scp_command, shell=True, capture_output=True, text=True) return result.stdout, result.stderr class UserID: uid: int #@app.get("/users/<user_id:int>") #async def getuserid(request: request.Request, user_id): @app.post("/users/") @openapi.summary("动态指定用户uid") @openapi.description("# 接口描述") @openapi.body( { "application/x-www-form-urlencoded" : UserID, }, description="输入真实有效的用户id", required=True, ) async def getuserid(request): """ openapi: --- parameters: responses: '200': description: 成功创建公密钥对 '201': description: 公密钥对已经存在 '400': description: 请指定用户正确ID '401': description: 查无此用户!!! '402': description: 用户必须真实有效 """ try: user_id = request.form.get("uid") except Exception as e: user_id = None if user_id is None: return json({"error": "请指定用户正确ID ?"}, status=400) try: user_id = int(user_id) if user_id <= 0: return json({"error": "用户必须真实有效 !"}, status=402) else: res = await getmeta(user_id) if res is None: return json({"error": "查无此用户 X"}, status=401) elif res is True: return json({"success": "公密钥对已经存在 #"}, status=201) else: ssh_res, ssh_err = exec_remote_cmd(REMOTE_HOST, command=COMMAND) scp_res, scp_err = upload_remote_file(REMOTE_HOST, filename=FILENAME) if DEBUG: with open("/tmp/xxx", "w") as f: f.write(f"------------------------- \n") f.write(f"{ssh_res} --- {ssh_err} \n") f.write(f"{scp_res} --- {scp_err} \n") return json({"success": "成功创建公密钥对。"}, status=200) except ValueError: return json({"error": "Parameter 'user_id' must be an integer"}, status=402) if __name__ == "__main__": app.run(host="0.0.0.0", port=8888, workers=2, auto_reload=True)
SQLAlchemy 支持 JSON 类型的读写。
Sanic支持热加载代码,真是贴心棒!
Sanic 内部本身有很多开箱即用的配置,直接就在 app.config
print(app.config.keys())
PeeWee异步ORM真好用
DEBUG = True REMOTE_HOST = "192.168.144.73" FILENAME = "/tmp/.wireguard_config.json" COMMAND = "" import logging # 配置日志记录器 logging.basicConfig( level=logging.INFO, # 设置日志级别 format='%(asctime)s - %(levelname)s - %(message)s', # 设置日志格式 handlers=[ logging.FileHandler('/tmp/xxx.log'), # 日志输出到文件 ] ) import subprocess from json import dumps import wireguard_config as wg_cfg from sanic.response import json from sanic import Sanic, request, response from sanic_ext import Extend, openapi from sanic_openapi import doc from textwrap import dedent app = Sanic("wireguard_keygen") app.ext.openapi.describe( "WireGuard API", version="0.6.18", description=dedent(""" ## 动态生成wireguard的公钥和密钥对 """), ) from peewee import * from playhouse.mysql_ext import JSONField from peewee_async import Manager, PooledMySQLDatabase, AioModel MySQLPool = PooledMySQLDatabase('holdon', host='192.168.5.22', port=3306, user='holdon', password='qPxgU!y3p2', max_connections=20, connect_timeout=3600, ) MySQLPool2 = PooledMySQLDatabase('test', host='192.168.144.73', port=3306, user='root', password='6jAEGZjh5f', max_connections=20, connect_timeout=360, ) class UserMetadata(AioModel): id = AutoField() # primary_key=True user_id = BigIntegerField(default=0, unique=True) already_fist_visit = BooleanField(default=0) metadata = JSONField(null=True) class Meta: database = MySQLPool table_name = 'user_metadata' # 自定义表名 class UserID: uid: int db = Manager(MySQLPool) with db.allow_sync(): MySQLPool.create_tables([UserMetadata]) # 创建表用同步操作 MySQLPool.set_allow_sync(False) # 禁止使用同步操作 async def getmeta(user_id): result = None try: if DEBUG: logging.warning(f'1 -> 获取UserID: {user_id}') query = await UserMetadata.select().where(UserMetadata.user_id == user_id).aio_execute() if query: for user in query: if DEBUG: logging.warning(f'2 -> 获取meta: {user.metadata}') if user.metadata is None or user.metadata == "None" : result = { "wireguard_info": wg_cfg.getnewpair() } if DEBUG: logging.warning(f'3 -> 获取公密钥: {result}') # 执行原始 SQL try: await UserMetadata.update(metadata=result).where(UserMetadata.user_id == user_id).aio_execute() except Exception as e: await UserMetadata.update(metadata=dumps(result)).where(UserMetadata.user_id == user_id).aio_execute() else: if DEBUG: logging.warning(f'4 -> 公密钥 已经存在.') result = True return result except Exception as e: raise e def exec_remote_cmd(host, port=65422, user="root", command="uptime"): ssh_command = f"ssh -i /root/.ssh/hzup.ssh.2018 -o StrictHostKeyChecking=no -p{port} {user}@{host} {command}" result = subprocess.run(ssh_command, shell=True, capture_output=True, text=True) return result.stdout, result.stderr def upload_remote_file(host, port=65422, user="root", filename=""): scp_command = f"scp -i /root/.ssh/hzup.ssh.2018 -o StrictHostKeyChecking=no -P{port} {filename} {host}:/root/" result = subprocess.run(scp_command, shell=True, capture_output=True, text=True) return result.stdout, result.stderr @app.middleware('request') async def handle_request(request): pass @app.middleware('response') async def handle_response(request, response): pass #@app.get("/users/<user_id:int>") #async def getuserid(request: request.Request, user_id): @app.post("/users/") @openapi.summary("动态指定用户uid") @openapi.description("# 接口描述") @openapi.body( { "application/x-www-form-urlencoded" : UserID, }, description="输入真实有效的用户id", required=True, ) async def getuserid(request): """ openapi: --- parameters: responses: '200': description: 成功创建公密钥对 '201': description: 公密钥对已经存在 '400': description: 输入正确的用户ID '401': description: 查无此用户!!! '402': description: 用户必须真实有效 """ try: user_id = request.form.get("uid") except Exception as e: if DEBUG: logging.error(f'400 # 输入正确的用户ID ?') return json({"error": "输入正确的用户ID ?"}, status=400) try: user_id = int(user_id) if user_id <= 0: if DEBUG: logging.error(f'402 # 用户必须真实有效 !') return json({"error": "用户必须真实有效 !"}, status=402) else: res = await getmeta(user_id) if res is None: if DEBUG:logging.error(f'401 # 查询无此用户ID. X') return json({"error": "查询无此用户ID. X"}, status=401) elif res is True: if DEBUG:logging.info(f'201 # 用户信息已经存在 #') return json({"success": "用户信息已经存在 #"}, status=201) else: if COMMAND: # 如果有远程命令要执行 ssh_res, ssh_err = exec_remote_cmd(REMOTE_HOST, command=COMMAND) scp_res, scp_err = upload_remote_file(REMOTE_HOST, filename=FILENAME) if DEBUG:logging.info(f'206 # {ssh_res}:{ssh_err}') if DEBUG:logging.info(f'200 # 成功创建公密钥对。') return json({"success": "成功创建公密钥对。"}, status=200) except Exception as e: if DEBUG: logging.error(f'402 # 用户必须真实有效!!') return json({"error": "用户必须真实有效 !"}, status=402) if __name__ == "__main__": app.run(host="0.0.0.0", port=8888, workers=2, auto_reload=True)
方法同步转异步:以aio_为前缀,提供了一系列与Peewee同步方法对应的异步方法,保持了代码结构的一致性和易读性。
from peewee_async import Manager, PooledMySQLDatabase, AioModel .... class Users(AioModel): .... result = await Users.select().where(Users.uid == uid).aio_execute() await Users.update(metadata=result).where(Users.uid == uid).aio_execute() db = Manager(MySQLPool) with db.allow_sync(): MySQLPool.create_tables([Users]) # 如果希望只有异步调用,并将同步视为不需要的或错误, 需要设置 db.allow_sync = False
🏆Robyn with HTMX
编译python新版本
安装robyn依赖模块
requirements.txt
anyio==3.6.2 certifi==2022.12.7 charset-normalizer==2.1.1 click==8.1.3 dill==0.3.6 h11==0.14.0 helpers==0.2.0 httpcore==0.16.2 httpx==0.23.3 idna==3.4 inquirerpy==0.3.4 Jinja2==3.0.1 MarkupSafe==2.1.1 multiprocess==0.70.14 nestd==0.3.1 pfzy==0.3.4 prisma==0.7.1 prompt-toolkit==3.0.39 pydantic==1.10.2 python-dotenv==0.21.0 requests==2.28.1 rfc3986==1.5.0 robyn==0.44.0 sniffio==1.3.0 tomlkit==0.11.6 typing_extensions==4.4.0 urllib3==1.26.13 uvloop==0.17.0 watchdog==2.2.1 wcwidth==0.2.6 pip3.10 install -r requirements.txt
某些特殊需要指定的版本
pip3.10 install robyn==0.24.0 -U pip3.10 install "robyn[templating]" -U pip3.10 install prisma httpx certifi urllib3 -U -i https://pypi.tuna.tsinghua.edu.cn/simple
给Robyn加个限速的中间件
pip install robyn robyn-rate-limits
from robyn import Robyn, Robyn from robyn_rate_limits import InMemoryStore from robyn_rate_limits import RateLimiter app = Robyn(__file__) limiter = RateLimiter(store=InMemoryStore, calls_limit=3, limit_ttl=100) @app.before_request() def middleware(request: Request): return limiter.handle_request(app, request)
给Robyn加个日志记录
from robyn import Robyn, Request, Response from robyn.logger import Logger from datetime import datetime app = Robyn(__file__) logger = Logger() class LoggingMiddleware: def request_info(request: Request): ip_address = request.ip_addr request_url = request.url.host request_path = request.url.path request_method = request.method request_time = str(datetime.now()) return { "ip_address": ip_address, "request_url": request_url, "request_path": request_path, "request_method": request_method, "request_time": request_time, } def response_info(response: Response): status_code = response.status_code response_type = response.response_type return { "status_code": status_code, "response_type": response_type } @app.before_request() def log_request(request: Request): logger.info(f"Received request: %s", LoggingMiddleware.request_info(request)) return request @app.after_request() def log_response(response: Response): logger.info(f"Sending response: %s", LoggingMiddleware.response_info(response)) return response @app.get("/") def h(): return "Hello, World!" app.start(port=8080)
如何升级robyn
#!/bin/sh cp -a requirements.txt requirements.bak pip3.10 install -U pip pip3.10 install "robyn" -U pip3.10 install "robyn[templating]" -U prisma db push --schema sqldb/schema.prisma supervisorctl stop robyn /opt/python3.10.6/bin/python3 /root/robyn_demo/main.py supervisorctl restart robyn supervisorctl status python3.10 -m pip freeze > requirements.txt
supervisord托管
[program:robyn] command = /opt/python3.10.6/bin/python3 /root/robyn_demo/app.py --processes 4 --workers 4 autostart = true autorestart = true stopsignal = KILL stopasgroup = true stdout_logfile = NONE stderr_logfile = NONE
创建prisma表结构
prisma db push --schema sqldb/schema.prisma
schema.prisma
generator client { provider = "prisma-client-py" interface = "sync" recursive_type_depth = -1 } datasource db { provider = "sqlite" url = "file:./sqlite.db" } model User { id Int @id @default(autoincrement()) email String? @unique createdAt DateTime @default(now()) name String posts Post[] } model Post { id Int @id @default(autoincrement()) createdAt DateTime @default(now()) updatedAt DateTime @updatedAt published Boolean @default(false) title String @unique views Int @default(0) authorId Int author User @relation(fields: [authorId], references: [id]) }
prisma 时经常使用的命令
- prisma format :顾名思义,它格式化架构文件并验证其内容。
- prisma db pull :从数据库中获取模型并生成客户端。
- prisma generate :生成客户端,在您想要修改模型而不推送数据库中的更改的拉策略中很有用。
- prisma db push :当您测试应用程序时,这是您应该用来重置数据库并生成客户端的命令。
- prisma migrate dev –name <name> :当您确定您的更改时,在db push之后使用。它将创建一个迁移文件,将其应用于您的本地环境并生成客户端。
- prisma migrate deploy :在生产环境中应用迁移文件。
- prisma py version :显示有关 python 客户端、支持的 prisma 版本、二进制文件等的调试信息…
robyn主程序
from robyn import Robyn, serve_file, serve_html, jsonify, WebSocket from robyn.robyn import Response, Request from robyn.templating import JinjaTemplate from urllib.parse import parse_qs from prisma import Prisma from os import path import asyncio DEBUG=True db = Prisma(auto_register=True) db.connect() app = Robyn(__file__) websocket = WebSocket(app, "/webst") current_dir = path.dirname(__file__) jinja_template = JinjaTemplate(path.join(current_dir, "templates")) @app.get("/") @app.get("/post") async def head(req): try: #posts = db.post.query_raw('''select * from Post order by title desc''') posts = db.post.find_many( order={"title":'asc'}, include={"author": True} ) context = { "framework": "Robyn", "templating_engine": "Jinja2", "data": posts } if DEBUG: print(context) response = jinja_template.render_template(template_name="index.html", **context) return response except Exception as e: raise(e) @app.post("/submit") async def addpost(req: Request): data = parse_qs(req.body) # (bytearray(req.get("body")).decode("utf-8")) unused title = data.get("title")[0] author = data.get("author")[0] if title is None or author is None: return jsonify({ "error": "you need to provide title and author!" }) user = db.user.find_first(where={'name': author}) if user is None: user = db.user.create( data={ "name": author, } ) post = db.post.create( data={ 'title': title, 'authorId': user.id, } ) if DEBUG: print("post <", data, " -> ", title, author, post.id) response = f"""<tr> <td>{post.title}</td> <td>{post.views}</td> <td>{author}</td> <td>{post.updatedAt}</td> <td> <button class='btn btn-warning' hx-put='/edit/{post.id}'> Edit </button> <button class='btn btn-danger' hx-put='/delete/{post.id}' hx-confirm='Are you sure?'> Delete </button> </td> </tr>""" return response @app.get("/view/:id") async def getpost(req): try: postId = int(req.path_params.get("id")) #(req.get("path_params").get("id")) unused if DEBUG: print(postId, "view") post = db.post.find_unique(where={'id': postId}, include={'author': True}) if DEBUG: print(post) if post is None: return jsonify({"error": "Post doesn't exist"}) response = f"""<tr> <td>{post.title}</td> <td>{post.views}</td> <td>{post.author.name}</td> <td>{post.updatedAt}</td> <td> <button class='btn btn-warning' hx-put='/edit/{post.id}'> Edit </button> <button class='btn btn-danger' hx-put='/delete/{post.id}' hx-confirm='Are you sure?'> Delete </button> </td> </tr>""" return response except Exception as e: raise(e) @app.put("/edit/:id") async def edit(req): postId = int(req.path_params.get("id")) if DEBUG: print(postId," edit") post = db.post.find_unique(where={'id': postId}, include={'author': True}) if DEBUG: print(post) response = f""" <tr hx-trigger='cancel' class='editing' hx-get='/post/{post.id}' hx-swap='outerHTML'> <td><input name='title' value='{post.title}'/></td> <td>{post.views}</td> <td>{post.author.name}</td> <td>{post.updatedAt}</td> <td> <button class='btn btn-info' hx-get='/view/{post.id}'> Cancel </button> <button class='btn btn-primary' hx-put='/update/{post.id}' hx-include='closest tr'> Save </button> </td> </tr>""" return response @app.put("/update/:id") async def update(req): try: postId = int(req.path_params.get("id")) if DEBUG: print(postId," update") data = parse_qs(req.body) title = data.get("title")[0] post = db.post.find_unique(where={'id': postId}, include={'author': True}) if post.title != title: post = db.post.update(where={"id": postId }, include={"author": True}, data={"title": title,"views":{"increment": 1}}) if DEBUG: print(post) response = f""" <tr> <td>{title}</td> <td>{post.views}</td> <td>{post.author.name}</td> <td>{post.updatedAt}</td> <td> <button class='btn btn-warning' hx-put='/edit/{post.id}'> Edit </button> <button class='btn btn-danger' hx-put='/delete/{post.id}' hx-confirm='Are you sure?'> Delete </button> </td> </tr>""" return response except Exception as e: raise(e) @app.put("/delete/:id") async def delete(req: Request): postId = int(req.path_params.get("id")) if DEBUG: print(postId," delete") post = db.post.delete(where={'id': postId}) if post is None: return jsonify({"error": "Post doesn't exist."}) return "" # simple user demo @app.get("/user") async def getuser(req): try: users = db.user.find_many(order={"name":'desc'}, include={"posts": False}) if DEBUG: print(users) return jsonify({ "data": [user.json() for user in users] }) except Exception as e: raise(e) @app.post("/user") async def adduser(req): try: data = parse_qs(req.body) name = data.get("name")[0] if name is None: return jsonify({ "error": "you need to provide name" }) user = db.user.create(data={"name": name,}) return user.json(indent=2) except: return jsonify({ "error": "wrong post json path_params!" }) ########################################### i = -1 # simple websocket demo @websocket.on("message") def connect(): global i i += 1 if i == 0: return "Hello ?" elif i == 1: return "Who are u?" elif i == 2: return "haha, I'm shy." elif i == 3: i = -1 return "again!" @websocket.on("close") def close(): return "Goodbye world, from ws" @websocket.on("connect") def message(): return "Hello world, from ws" app.start(port=5555, host="0.0.0.0") # host defaults to 127.0.0.1
jinja/htmx模板
{# templates/index.html #}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Robyn HTMX Demo</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.0.0-beta1/dist/css/bootstrap.min.css" rel="stylesheet">
<!-- HTMX -->
<script src="https://unpkg.com/htmx.org@1.5.0"></script>
<script src="https://cdn.jsdelivr.net/gh/alpinejs/alpine@v2.8.2/dist/alpine.min.js" defer></script>
<style>
body{
padding: 20px;
}
table {
font-family: arial, sans-serif;
border-collapse: collapse;
width: 100%;
}
tr.htmx-swapping td {
opacity: 0;
transition: opacity 0.3s ease-out;
}
td, th {
border: 1px solid #383737;
text-align: left;
padding: 8px;
}
tr:nth-child(even) {
background-color: #dddddd;
}
</style>
</head>
<!-- Place <body> </body> code here -->
<body>
<h1>{{framework}} � {{templating_engine}} � HTMX</h1>
<div x-data="{show: false}">
<button @click="show=!show" class="btn btn-primary">New Post</button>
<hr>
<div x-show="show">
<form hx-post="/submit" hx-swap="beforeend" hx-target="#new-book" class="mb-3">
<input type="text" placeholder="Post Title" name="title" class="form-control mb-3" />
<input type="text" placeholder="Post Author" name="author" class="form-control mb-3" />
<button type="submit" class="btn btn-primary">Submit</button>
</form>
</div>
</div>
<table class="table">
<thead>
<tr>
<th scope="col">Post Title</th>
<th scope="col">Post Views</th>
<th scope="col">Post Author</th>
<th scope="col">LastModify Time</th>
<th scope="col">Operation</th>
</tr>
</thead>
<tbody id="new-book" hx-target="closest tr" hx-swap="outerHTML swap:0.3s">
{%for post in data%}
<tr>
{%if not post.published %}
<td>{{post.title}}</td>
<td>{{post.views}}</td>
<td>{{post.author.name}}</td>
<td>{{post.updatedAt}}</td>
<td>
<button class="btn btn-warning" hx-put="/edit/{{post.id}}"> Edit </button>
<button class="btn btn-danger" hx-put="/delete/{{post.id}}" hx-confirm='Are you sure?'> Delete </button>
</td>
{%endif%}
</tr>
{%endfor%}
</tbody>
</table>
</body>
</html>
Robyn的容器化
docker pull python:3.10-slim docker run -tid --name robyn-py3 python:3.10-slim apt update && apt upgrade -y # 安装 prisma 需要的数据库 apt install curl build-essential gcc make sqlite3 -y curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh pip3 install -r requirements.txt # 导出 / 导入新的容器镜像 docker export efe5493b8e5d > robyn_py3.tar cat robyn_py3.tar | docker import - robyn-python3.10:latest docker run -tid -v /app:/app -p 5555:5555 --name robyn-py3 robyn-python3.10 sh -c "prisma db push && python3 /root/app/app.py"
pip install robyn
无法适配于 python3.11和alpine,最佳镜像:python:3.10-slim
Dockerfile
FROM python:3.12.7-slim-bookworm WORKDIR /workspace COPY . . RUN pip install --no-cache-dir --upgrade -r requirements.txt EXPOSE 8080 CMD ["python3", "app.py", "--log-level=DEBUG"] #docker build -t "upyun/robyn-wireguard-api" --rm . #docker save upyun/robyn-wireguard-api:latest > robyn-wireguard.tar #docker run -tid --restart=always -v /root/robyn_wireguard_api:/workspace -p 80:8080 upyun/robyn-wireguard-api
Robyn & SQLAlchemy 分发公密钥
DEBUG = True import wireguard_config as wg_cfg from urllib.parse import parse_qs import json from robyn import Robyn, jsonify from robyn.robyn import Response, Request app = Robyn(__file__) from sqlalchemy import create_engine, Boolean, BIGINT, Column, Text, JSON, MetaData, Table, select, update from sqlalchemy.orm import sessionmaker ''' from sqlalchemy.orm import declarative_base db = declarative_base() #db.metadata.create_all(engine) class UserMetadata(db): id = Column(BIGINT, primary_key=True) user_id = Column(BIGINT, nullable=False, default=0) already_fist_visit = Column(Boolean, nullable=False, default=0) metadata = Column(JSON) # 使用 JSON 类型 __tablename__ = "user_metadata" ''' # 因为表结构里metadata与sqlalchemy 关键字冲突了,所以使用Table & MetaData方式 db = MetaData() engine = create_engine('mysql+pymysql://root:6jAEGZjh5f@192.168.144.73/test') #AsyncSessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False) Session = sessionmaker(bind=engine) UserMetadata = Table('user_metadata', db, Column('id', BIGINT, primary_key=True), Column('user_id', BIGINT, nullable=False, default=0), Column('already_fist_visit', Boolean, nullable=False, default=0), Column('metadata', JSON) ) try: # 创建表 db.create_all(engine) except Exception as e: print(f"Error occurred while creating tables: {e}") def getmeta(user_id): result = None if DEBUG: print( f"1 -------------------{user_id}" ) try: session = Session() # 查询数据, 当你使用 Table 定义表结构时,应该使用 select() 方法而不是 session.query() query = select(UserMetadata).where(UserMetadata.c.user_id == user_id) try: #user = session.query(UserMetadata.metadata).filter_by(user_id=user_id).scalar() user = session.execute(query).fetchall()[0] except IndexError: return None if user: if DEBUG: print(f"2 ------------------ {user.metadata}") if user.metadata is None or user.metadata == "None": result = { "wireguard_info": wg_cfg.getnewpair() } if DEBUG: print(f"3 ------------------ {result}") try: # 插入/更新 数据 query = update(UserMetadata).where(UserMetadata.c.user_id == user_id).values(metadata=result) session.execute(query) except Exception as e: query = update(UserMetadata).where(UserMetadata.c.user_id == user_id).values(metadata=json.dumps(result)) session.execute(query) session.commit() else: if DEBUG: print(f"4 --------------- exist") result = True return result except Exception as e: session.rollback() # 如果出现错误,回滚事务 raise e finally: session.close() @app.get("/userid/:id") async def getuserid(request: Request): try: user_id = int(request.path_params.get("id")) #(req.get("path_params").get("id")) unused if user_id is None: return jsonify({"error": "Missing parameter 'user_id'"}) elif user_id <= 0: return jsonify({"error": "User id must be an integer"}) else: res = getmeta(user_id) if res is None: return jsonify({"error": "No such user id"}) elif res is True: return jsonify({"success": "User metadata is exist."}) else: return jsonify({"success": "User metadata updated"}) except Exception as e: return jsonify({"error": "User id must be an integer"}) ########################################### if __name__ == "__main__": app.start(port=8888, host="0.0.0.0") # host defaults to 127.0.0.1
Robyn & peewee & openapi 分发公密钥
from robyn import Robyn, OpenAPI, Request, Response from robyn.openapi import OpenAPIInfo from robyn.templating import JinjaTemplate from peewee import * from playhouse.mysql_ext import JSONField from peewee_async import Manager, PooledMySQLDatabase, AioModel from os import path, getenv from typing import TypedDict from dotenv import load_dotenv, find_dotenv, dotenv_values import subprocess, json, logging, datetime import wireguard_config as wg_cfg DEBUG = True load_dotenv(find_dotenv(), override=True) # 加载 .env 文件到环境变量 # 配置日志记录器 logging.basicConfig( level=logging.INFO, # 设置日志级别 format='%(asctime)s - %(levelname)s - %(message)s', # 设置日志格式 handlers=[ logging.FileHandler(getenv("Logfile")), # 日志输出到文件 ] ) app = Robyn( file_object=__file__, openapi=OpenAPI( info=OpenAPIInfo( title=getenv("Title"), description=getenv("Description"), version=getenv("Version"), ) ) ) ''' current_dir = path.dirname(__file__) static_dir = path.join(current_dir, "static") jinja_template = JinjaTemplate(path.join(current_dir, "templates")) # 读取环境变量并保存到字典 config = dotenv_values(find_dotenv()) ''' MySQLPool = PooledMySQLDatabase(getenv("DB_Name"), host=getenv("DB_Host"), port=int(getenv("DB_Port")), user=getenv("DB_User"), password=getenv("DB_Pass"), max_connections=int(getenv("DB_Pool")), connect_timeout=int(getenv("DB_Timeout")), ) class UserMetadata(AioModel): id = AutoField() # primary_key=True user_id = BigIntegerField(default=0, unique=True) info = JSONField(null=True) create_at=DateTimeField(default=datetime.datetime.now) update_at=DateTimeField(default=datetime.datetime.now) class Meta: database = MySQLPool table_name = getenv("DB_Table") # 自定义表名 class UserID(TypedDict): dcid: int uid: int def HTTPException(status=403, headers={}): responses = { 401: {"description": "查询无此用户ID. X"}, 402: {"description": "用户必须真实有效!"}, 403: {"description": "拒绝访问!"}, 200: {"description": "成功创建公密钥对:"}, 201: {"description": "用户信息已经存在#"}, } if DEBUG: logging.info(f'{status} # {responses.get(status).get("description")}') return Response(status_code=status, description=responses.get(status).get("description"), headers=headers) async def postmeta(user_id, dc_id=0): result = content = None try: if DEBUG: logging.warning(f'1 -> 获取UserID: {user_id} 机房: {dc_id}') query = await UserMetadata.select().where(UserMetadata.user_id == user_id).aio_execute() if query: for user in query: if DEBUG: logging.warning(f'2 -> 获取meta: {user.user_id} -> {user.info}') if user.info is None or not user.info.get("wireguard_info", None): result = "Success" content = { "dc_no": dc_id, "server_pubkey": "1b5j0BafMgF9ofXuOCvRRznv00GuHXbfw94f6h8peRk=", "wireguard_info": wg_cfg.getnewpair(dc_id) } if DEBUG: logging.warning(f'3 -> 获取公密钥: ***********') # 执行原始 SQL try: await UserMetadata.update(info=content).where(UserMetadata.user_id == user_id).aio_execute() except Exception as e: await UserMetadata.update(info=json.dumps(content)).where(UserMetadata.user_id == user_id).aio_execute() else: if DEBUG: logging.warning(f'4 -> 公密钥 已经存在.') result = True content = { "wireguard_info": f"{user.info}" } return result, content except Exception as e: raise e def exec_remote_cmd(host, port=65422, user="root", command="uptime"): ssh_command = f"ssh {getenv('SSH_OPS')} -p{port} {user}@{host} {command}" result = subprocess.run(ssh_command, shell=True, capture_output=True, text=True) return result.stdout, result.stderr def upload_remote_file(host, port=65422, user="root", filename=""): scp_command = f"scp {getenv('SSH_OPS')} -P{port} {filename} {host}:/root/wireguard_config.json" result = subprocess.run(scp_command, shell=True, capture_output=True, text=True) return result.stdout, result.stderr @app.get("/users/", openapi_name="Get User ID", openapi_tags=["UserID"]) async def GetUserID(r: Request, query_params=UserID): try: user_id = int(r.query_params.get("uid")) if user_id <= 0: return HTTPException(status=403) res, content = await postmeta(user_id) if res is True: return content else: return HTTPException(status=401) except Exception as e: return HTTPException(status=402) @app.post("/users/", openapi_name="Post User ID", openapi_tags=["UserID"]) async def PostUserID(r: Request, body=UserID, description="XXXXXXXX"): try: post = r.json() user_id = int(post.get("uid")) dc_id = int(post.get("dcid")) if user_id <= 0 or dc_id > 1000 : return HTTPException(status=403) FILENAME = f"/tmp/.wireguard_config.json.{dc_id}" REMOTE_HOST = f"HOST_{dc_id}" res, content = await postmeta(user_id, dc_id) if res is None: return HTTPException(status=401) elif res is True: return HTTPException(status=201) else: if getenv("COMMAND") and getenv(REMOTE_HOST): # 如果有远程命令要执行 scp_res, scp_err = upload_remote_file( getenv(REMOTE_HOST), filename=FILENAME ) ssh_res, ssh_err = exec_remote_cmd( getenv(REMOTE_HOST), command=getenv("COMMAND") ) if DEBUG:logging.info(f'206 # {dc_id }/{getenv(REMOTE_HOST)} {ssh_res}:{ssh_err}') return HTTPException(status=200) except Exception as e: return HTTPException(status=402) if __name__ == "__main__": db = Manager(MySQLPool) with db.allow_sync(): MySQLPool.create_tables([UserMetadata]) # 创建表用同步操作 MySQLPool.set_allow_sync(False) # 禁止使用同步操作 app.start(port=8080, host="0.0.0.0") # host defaults to 127.0.0.1
FastHTML with HTMX
FastUI也是一个简单实用的用户界面库,由大名鼎鼎的pydantic作者写的。
NextPy: Reflex + ReactPy + FastApi
Reflex应用程序编译为React前端应用程序和FastAPI后端应用程序。只有UI被编译为next.js;所有的应用程序逻辑和状态管理都保持在Python中,并在服务器上运行。Reflex使用WebSockets将事件从前端发送到后端,并将状态更新从后端发送到前端。
🤔 Nextpy:应用开发的未来
轻松快速地构建任何 Web 应用 — ⚡。它简化了从后端到前端(是的,用 Python 构建视觉上惊艳的前端!)、AI 集成、API 等的 Pythonic 开发,赋予人类和 AI 代理以力量。
⏰ 你可以在1小时内构建什么: 美观的作品集、门户、数据应用、内部工具、API 等。
📚 最棒的部分?可转移的知识: 使用 nextpy 进行构建会逐步教会你最好的 python 库——FastAPI、Pydantic、SQLModel、Pandas、Jinja2、SQLAlchemy、Reflex、Reactpy等。
Nextpy依赖的bun,需要更高版本的glibc !!!
FastAPI ASGI with PyPy
- FastAPI framework, high performance, easy to learn, fast to code, ready for production
安装fastapi
pip3.7 install fastapi pydantic uvicorn
最简单的get/post
main.py
from fastapi import FastAPI, Query from enum import Enum from typing import Optional from pydantic import BaseModel, Field app = FastAPI() # 定义数据模型,可以做一些条件约束 class People(BaseModel): name: str age: int = Field(le=100) address: str = Field(max_length=10) salary: float = Field(le=100000) Gender = Enum('Gender', {'male': 'boy', 'female': 'girl'}) @app.get('/') def index(): return {'msg': '欢迎使用 FastAPI !'} @app.get('/query/{uid}') # 做数据类型检验 def query(uid: Optional[int] = None): msg = f'You query ID is {uid}' return {'msg': msg, 'success': True} @app.get('/gender/{sex}') # 这是可选的类型 async def gender(sex: Optional[Gender] = None): return {'msg': sex} @app.post('/insert') async def insert(people: People): salary = people.salary * 12 msg = f'你叫 {people.name},今年: {people.age},全年收入为{salary}' return {'msg': msg, 'success': True} @app.get('/search') # 参数验证和判断 async def search(q: str = Query(None, max_length=6)): return {'msg': q} if __name__ == '__main__': import uvicorn uvicorn.run(app="__main__:app", port=8000, reload=True)
给FastAPI添加中间件
from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware API_KEY = "my_secret_api_key" app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allows all origins allow_credentials=True, allow_methods=["*"], # Allows all methods allow_headers=["*"], # Allows all headers ) # Middleware: API key authentication @app.middleware("http") async def api_key_authentication(request: Request, call_next): api_key = request.headers.get("X-Api-Key") if api_key != API_KEY: raise HTTPException(status_code=401, detail="Unauthorized: Invalid API Key") response = await call_next(request) response.headers["X-Custom-Header"] = "CustomHeaderValue" return response
FastAPI的依赖注入
app = FastAPI() # Dependency Injections async def get_record(person_id: int)): record = await Person.get(person_id) if record is None: raise HTTPException(status_code=404, detail="Item not found") return record # FastAPI function @app.get("/items/{person_id}") async def read_item(person: Person = Depends(get_record)): return item
FastAPI最简单的RBAC角色权限访问控制
from fastapi import FastAPI, HTTPException, Request from starlette.middleware.base import BaseHTTPMiddleware app = FastAPI() # Define role-based access control (RBAC) structure RESOURCES_FOR_ROLES = { 'admin': { 'resource1': ['read', 'write', 'delete'], 'resource2': ['read', 'write'], }, 'user': { 'resource1': ['read'], 'resource2': ['read', 'write'], } } # 这个可以改进从数据库里动态的获取用户信息和角色 USERS = { 'user1': {'username': 'user1', 'role': 'user'}, 'admin1': {'username': 'admin1', 'role': 'admin'} } # Optionally, define paths to be excluded from checking for permissions EXLUDED_PATHS = ['docs', 'openapi.json'] #Map request methods to actions def translate_method_to_action(method: str) -> str: method_permission_mapping = { 'GET': 'read', 'POST': 'write', 'PUT': 'update', 'DELETE': 'delete', } return method_permission_mapping.get(method.upper(), 'read') # CHeck if permission granted or not def has_permission(user_role, resource_name, required_permission): if user_role in RESOURCES_FOR_ROLES and resource_name in RESOURCES_FOR_ROLES[user_role]: return required_permission in RESOURCES_FOR_ROLES[user_role][resource_name] return False # Define a custom Middleware for handling RBAC class RBACMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): request_method = str(request.method).upper() action = translate_method_to_action(request_method) resource = request.url.path[1:] if not resource in EXLUDED_PATHS: admin1 = USERS['admin1'] # 通过切换不同的用户实例来判断 # user1 = USERS['user1']. if not has_permission(admin1['role'], resource, action): raise HTTPException(status_code=403, detail="Insufficient permissions") response = await call_next(request) return response # Add the middleware to FastAPI app.add_middleware(RBACMiddleware) # Example protected route for resource 1 @app.get("/resource1") async def resource1(): return {"message": "This is a resource1 route"} @app.post("/resource1") async def add_resource1(add: int): return {"message": add} @app.delete("/resource1") async def delete_resource1(): return {"message": "This resource1 is deleted"} # Example protected route for resource 2 @app.get("/resource2") async def resource2(): return {"message": "This is an resource2 route"} @app.post("/resource2") async def add_resource2(add: int): return {"message": add} @app.delete("/resource2") async def delete_resource2(): return {"message": "This resource2 is deleted"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
🏆FastAPI+PeeWee真好用
DEBUG = True from os import getenv from dotenv import load_dotenv, find_dotenv, dotenv_values def load_env_vars(file_path): load_dotenv(file_path, override=True) # 加载 .env 文件到环境变量 config = dotenv_values(file_path) # 读取环境变量并保存到字典 return config # 现在 config 是一个字典,包含所有的环境变量 config = load_env_vars( find_dotenv() ) import wireguard_config as wg_cfg import uvicorn, subprocess, json, logging # 配置日志记录器 logging.basicConfig( level=logging.INFO, # 设置日志级别 format='%(asctime)s - %(levelname)s - %(message)s', # 设置日志格式 handlers=[ logging.FileHandler(getenv("Logfile")), # 日志输出到文件 ] ) from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field, conint app = FastAPI( title = getenv("Title"), description = getenv("Description"), version = getenv("Version"), ) from peewee import * from playhouse.mysql_ext import JSONField from peewee_async import Manager, PooledMySQLDatabase, AioModel MySQLPool = PooledMySQLDatabase(getenv("DB_Name"), host=getenv("DB_Host"), port=int(getenv("DB_Port")), user=getenv("DB_User"), password=getenv("DB_Pass"), max_connections=int(getenv("DB_Pool")), connect_timeout=int(getenv("DB_Timeout")), ) class UserMetadata(AioModel): id = AutoField() # primary_key=True user_id = BigIntegerField(default=0, unique=True) already_fist_visit = BooleanField(default=0) metadata = JSONField(null=True) class Meta: database = MySQLPool table_name = getenv("DB_Table") # 自定义表名 db = Manager(MySQLPool) with db.allow_sync(): MySQLPool.create_tables([UserMetadata]) # 创建表用同步操作 MySQLPool.set_allow_sync(False) # 禁止使用同步操作 async def getmeta(user_id, dc_id): result = None try: if DEBUG: logging.warning(f'1 -> 获取UserID: {user_id} 机房: {dc_id}') query = await UserMetadata.select().where(UserMetadata.user_id == user_id).aio_execute() if query: for user in query: if DEBUG: logging.warning(f'2 -> 获取meta: {user.user_id} -> {user.metadata}') if user.metadata is None or user.metadata == "None" : result = { "dc_no": dc_id, "wireguard_info": wg_cfg.getnewpair(dc_id) } if DEBUG: logging.warning(f'3 -> 获取公密钥: ***********') # 执行原始 SQL try: await UserMetadata.update(metadata=result).where(UserMetadata.user_id == user_id).aio_execute() except Exception as e: await UserMetadata.update(metadata=json.dumps(result)).where(UserMetadata.user_id == user_id).aio_execute() else: if DEBUG: logging.warning(f'4 -> 公密钥 已经存在.') result = True return result except Exception as e: raise e def exec_remote_cmd(host, port=65422, user="root", command="uptime"): ssh_command = f"ssh {getenv('SSH_OPS')} -p{port} {user}@{host} {command}" result = subprocess.run(ssh_command, shell=True, capture_output=True, text=True) return result.stdout, result.stderr def upload_remote_file(host, port=65422, user="root", filename=""): scp_command = f"scp {getenv('SSH_OPS')} -P{port} {filename} {host}:/root/" result = subprocess.run(scp_command, shell=True, capture_output=True, text=True) return result.stdout, result.stderr class UserID(BaseModel): dcid: conint(ge=100, le=999) uid: conint(gt=0) model_config = {"extra": "forbid"} responses = { 401: {"description": "查询无此用户ID. X"}, 402: {"description": "用户必须真实有效!"}, 200: {"description": "成功创建公密钥对:"}, 201: {"description": "用户信息已经存在#"}, } #@app.get("/users/{user_id:UserID}") @app.post("/users/", include_in_schema=True, responses={**responses}) async def PostUserID(userid: UserID): # Dependency function not ready try: user_id = int(userid.uid) dc_id = int(userid.dcid) FILENAME = f"/tmp/.wireguard_config.json.{dc_id}" res = await getmeta(user_id, dc_id) if res is None: if DEBUG: logging.error(f'401 # {responses.get(401)}') return HTTPException(detail=responses.get(401), status_code=401) elif res is True: if DEBUG: logging.info(f'201 # {responses.get(201)}') return HTTPException(detail=responses.get(201), status_code=201) else: if COMMAND: # 如果有远程命令要执行 ssh_res, ssh_err = exec_remote_cmd( getenv("REMOTE_HOST"), command=getenv("COMMAND") ) scp_res, scp_err = upload_remote_file( getenv("REMOTE_HOST"), filename=FILENAME) if DEBUG:logging.info(f'206 # {ssh_res}:{ssh_err}') if DEBUG: logging.info(f'200 # {responses.get(200)} {res}') return HTTPException(detail=responses.get(200), status_code=200) except Exception as e: if DEBUG: logging.error(f'402 # {responses.get(402)}') return HTTPException(detail=responses.get(402), status_code=402) if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=8888, workers=4, reload=DEBUG)
查看swagger doc
http://127.0.0.1:8000/docs http://127.0.0.1:8000/redoc
用ASGI异步运行
python3.7 -m uvicorn main:app --reload Server Software: uvicorn Server Hostname: 127.0.0.1 Server Port: 8000 Document Path: /query/1234 Document Length: 45 bytes Concurrency Level: 100 Time taken for tests: 4.408 seconds Complete requests: 10000 Failed requests: 0 Total transferred: 1700000 bytes HTML transferred: 450000 bytes Requests per second: 2268.75 [#/sec] (mean) Time per request: 44.077 [ms] (mean) Time per request: 0.441 [ms] (mean, across all concurrent requests) Transfer rate: 376.65 [Kbytes/sec] received Connection Times (ms) min mean[+/-sd] median max Connect: 0 1 0.8 1 6 Processing: 3 43 12.2 40 114 Waiting: 1 37 11.7 35 113 Total: 4 44 12.2 40 115
使用进程管理器,如gunicorn,supervisord,nginx
gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app --reload Server Software: uvicorn Server Hostname: 127.0.0.1 Server Port: 8000 Document Path: /query/1234 Document Length: 45 bytes Concurrency Level: 100 Time taken for tests: 1.448 seconds Complete requests: 10000 Failed requests: 0 Total transferred: 1700000 bytes HTML transferred: 450000 bytes Requests per second: 6903.71 [#/sec] (mean) Time per request: 14.485 [ms] (mean) Time per request: 0.145 [ms] (mean, across all concurrent requests) Transfer rate: 1146.12 [Kbytes/sec] received Connection Times (ms) min mean[+/-sd] median max Connect: 0 1 0.7 1 5 Processing: 2 14 20.5 11 220 Waiting: 2 12 19.2 10 220 Total: 3 14 20.5 12 221
gunicorn -w 4 -k uvicorn.workers.UvicornH11Worker main:app --reload Server Software: uvicorn Server Hostname: 127.0.0.1 Server Port: 8000 Document Path: /query/1234 Document Length: 45 bytes Concurrency Level: 100 Time taken for tests: 2.721 seconds Complete requests: 10000 Failed requests: 0 Total transferred: 1890000 bytes HTML transferred: 450000 bytes Requests per second: 3675.03 [#/sec] (mean) Time per request: 27.211 [ms] (mean) Time per request: 0.272 [ms] (mean, across all concurrent requests) Transfer rate: 678.30 [Kbytes/sec] received Connection Times (ms) min mean[+/-sd] median max Connect: 0 0 2.6 0 184 Processing: 4 27 20.2 24 220 Waiting: 3 22 15.9 20 217 Total: 4 27 20.4 24 221
Falcon WSGI with PyPy
- Falcon: 是一个适用于小型应用程序,应用程序后端和更高级别框架的微框架。它鼓励遵循REST概念,因此,在使用Falcon进行开发时,您应该考虑映射到HTTP方法的资源和状态转换。Falcon是Python中速度最快的Web框架之一。
Python静态编译器
PyPy的优缺点
PyPy是Python解释器CPython的直接替代品。CPython将Python编译为中间字节码然后由虚拟机解释,而PyPy使用实时(JIT)编译将Python代码转换为本地机器的汇编语言。
根据正在执行的任务,性能提升可能会非常显着。平均而言,PyPy将Python加速了大约7.6倍,一些任务加速了50倍或更多。CPython解释器根本不会执行与PyPy一样的优化方式,并且可能永远不会,因为这不是它的设计目标之一。
最好的部分是开发人员需要很少甚至不需要努力来解锁PyPy提供的收益。只需将CPython替换为PyPy,并且大部分都已完成。下面讨论了一些例外,但是PyPy的目标是运行现有的,并且未经修改的Python代码并为其提供自动化的速度提升。
PyPy最适合纯Python的应用程序
PyPy在“纯”Python应用程序中表现最佳,换句话说也就是用Python编写的没有夹杂其他语言的应用程序中表现最佳。由于PyPy模仿CPython的本机二进制接口的方式,与C库(如NumPy)接口的Python包也没有那么出类拔萃了。
PyPy的开发人员已经解决了这个问题,并使PyPy与大多数依赖于C扩展的Python包更加兼容。例如Numpy现在与PyPy兼容的非常好。但是,如果你希望与C的扩展最大程度地兼容,请使用CPython。
PyPy适用于运行时间较长的程序
PyPy优化Python程序的一个副作用是,运行时间较长的程序通过PyPy的优化获益最多。程序运行的时间越长,PyPy可以收集的运行时类型信息就越多,它可以进行的优化就越多。
一劳永逸的Python脚本不会从这种事情中受益。例如受益的Python应用程序通常具有长时间循环运行的行为,或者在Web框架的后台中连续运行。
PyPy没有预编译
PyPy编译Python代码,但它不是Python代码的编译器。由于PyPy执行其优化的方式和Python的固有动态特点,因此无法将生成的JITted代码作为独立二进制文件发出并重新使用它。
每次运行都必须编译每个程序。如果你想将Python编译成可以作为独立应用程序运行的更快的代码,那么还是请使用Cython、Numba或当前实验性的Nuitka项目。
Numba
Numba,是一款可以让python速度提升百倍,将python函数编译为机器代码的JIT编译器,经过Numba编译的python代码(仅限数组运算),其运行速度可以接近C或FORTRAN语言。
from numba import jit import time @jit(nopython=True) def count(n): sum = 0 for i in range(n): sum += i return sum start = time.time() sum = count(1000000001) end = time.time() print(f"sum is {sum}, used time is {end - start}")
Python最佳数据分析工具
Excel分析
PowerPoint处理
Word文字编辑
Pandas数据分析
import pandas as pd import matplotlib.pyplot as plt # 中文支持,不会显示小方框 plt.rcParams['font.sans-serif'] = ['SimHei', 'Songti SC', 'STFangsong'] # 正常显示负号 plt.rcParams['axes.unicode_minus'] = False # 设置线条宽度 plt.rcParams['lines.linewidth'] = 50 # 设置线条颜色 plt.rcParams['lines.color'] = 'red' # 设置线条样式,样式的各类还有 `--`为虚线,`-.`为点虚线 plt.rcParams['lines.linestyle'] = '--' # 直线 # 1. 创建一个示例 DataFrame data = { 'Name': ['张三', '李四', '王五', '赵六', '田七'], 'Age': [24, 27, 22, 32, 29], 'Salary': [5000, 8500, 6800, 7200, 6500], 'Department': ['HR', 'IT', 'Finance', 'IT', 'HR'] } df = pd.DataFrame(data) # 2. 数据查看 print(f"DataFrame: \n{df} \n") # 3. 基本统计 print(f"基本统计: \n {df.describe()} \n") # 4. 数据选择 print(f"选择 'Name' 和 'Salary' 列: \n {df[ ['Name', 'Salary']]} \n") # 5. 添加新列 df['Bonus'] = df['Salary'] * 0.1 print(f"添加 'Bonus' 列: \n {df} \n") # 6. 条件筛选 print(f"筛选出薪水大于 6000 的员工: \n {df[df['Salary'] > 6000]} \n") # 7. 分组统计 grouped = df.groupby('Department').mean(numeric_only=True) # 只计算数值列 print(f"按部门分组的平均值: \n {grouped} \n") # 8. 处理缺失值 df.loc[1, 'Salary'] = None # 人为制造缺失值 print(f"处理缺失值: \n {df.fillna( df['Salary'].mean())} \n") # 9. 数据排序 print(f"按年龄排序: \n {df.sort_values(by='Age')}") # 10. 数据可视化 plt.figure(figsize=(10, 5)) plt.xlabel('姓名') plt.ylabel('薪资') plt.title('员工薪水') plt.bar(df['Name'], df['Salary'], color='blue') plt.show()
Python技巧收集
- 🏆🏆 利用uv管理Python虚拟环境 venv+venv-pack 也是不错的选择
🏆Python之父教你写main函数
- Python之父教你写main()函数 <code bash> import sys import getopt
class Usage(Exception):
def __init__(self, msg):
self.msg = msg
def main(argv=None):
if argv is None:
argv = sys.argv
try:
try:
opts, args = getopt.getopt(argv[1:], "h", ["help"])
except getopt.error, msg:
raise Usage(msg)
# more code, unchanged
except Usage, err:
print >>sys.stderr, err.msg
print >>sys.stderr, "for help use --help"
return 2
if name == “main”:
sys.exit(main())
</code>
🏆二行命令打造Python的vim
curl -O https://raw.githubusercontent.com/vince67/v7_config/master/vim.sh bash vim.sh
🏆最简实用的文件打开
with open…as f 语句打开和关闭文件,如文件异常,则抛出一个内部块异常。
for line in f 文件对象f视为一个迭代器,会自动的采用缓冲IO和内存管理,适用于大小文件。
#with open(newfile, 'w') as outfile, open(oldfile, 'r', encoding='utf-8') as infile: with open('chenji.txt') as f: for line in f: if not line: #判断结尾 break elif line.strip() == '': #判断空行 pass else: print(line)
with可以用来简化try finally代码,看起来可以比try finally更清晰。
with如何工作?
这看起来充满魔法,但不仅仅是魔法,Python对with的处理还很聪明。
基本思想是with所求值的对象必须有一个enter()方法,一个exit()方法。
紧跟with后面的语句被求值后,返回对象的enter()方法被调用,这个方法的返回值将被赋值给as后面的变量。当with后面的代码块全部被执行完之后,将调用前面返回对象的exit()方法。
#!/usr/bin/env python # with_example01.py class Sample: def __enter__(self): print "In __enter__()" return "Foo" def __exit__(self, type, value, trace): print "In __exit__()" def get_sample(): return Sample() with get_sample() as sample: print "sample:", sample
代码输出结果如下:
In __enter__() sample: Foo In __exit__()
🏆利用google fire变身Python CLI
pip install fire
#!/usr/bin/env python # encoding: utf8 import fire class People: def __init__(self,name='nobody',sex='boy',age=30): self.name = name self.sex = sex self.age = age def say(self): return ('hello {},my age is {}'.format(self.name,self.age)) #chenmr = People(name='chenmr',sex='girl',age=40) if __name__ == '__main__': fire.Fire(People)
python demo1.py say --name=shaohy --sex=boy --age=40
python之ctypes妙用
定时更改桌面背景
import ctypes # Set the path of the wallpaper image wallpaper_path = "C:\\path\\to\\wallpaper.jpg" # Set the style of the wallpaper # 0: Center, 1: Stretch, 2: Tile, 6: Fit wallpaper_style = 0 # Load the image SPI_SETDESKWALLPAPER = 20 image = ctypes.c_wchar_p(wallpaper_path) # Set the wallpaper ctypes.windll.user32.SystemParametersInfoW(SPI_SETDESKWALLPAPER, 0, image, wallpaper_style)
全屏黑色无边框窗口
import tkinter as tk import tkinter.ttk as ttk import ctypes # Get user32 library user32 = ctypes.windll.user32 # Get the size of the screen screensize = user32.GetSystemMetrics(0), user32.GetSystemMetrics(1) # Create the window root = tk.Tk() # Set the window size to the size of the screen root.geometry('%dx%d+0+0' % screensize) # Set the window to be borderless root.overrideredirect(1) # Set the background color to black style = ttk.Style() style.configure('TFrame', background='black') # Run the window root.mainloop()
背景透明无边框时钟
功能特色
- 简洁美观:桌面时钟采用极简的设计风格,界面清爽,完美融入您的桌面环境。
- 轻量高效:仅需极少的系统资源,不会对电脑性能造成任何负担,即使长时间运行也毫无压力。
- 实时更新:桌面时钟会实时更新时间和日期,确保您始终掌握准确的时间信息。
- 易于使用:无需繁琐的配置,只需简单的安装步骤即可轻松使用。
from tkinter import * import time, datetime from time import gmtime, strftime root = Tk() # Window Attributes root.overrideredirect(1) # 隐藏窗口边框 root.wm_attributes("-transparentcolor", "gray99") # 设置透明背景色 running = True # 运行状态 # 关闭窗口函数 def close(event): global running running = False root.bind('<Escape>', close) # 绑定Esc键关闭窗口 screen_width = root.winfo_screenwidth() # 获取屏幕宽度 screen_height = root.winfo_screenheight() # 获取屏幕高度 timeframe = Frame(root, width=screen_width, height=screen_height, bg="gray99") # 创建主框架 timeframe.grid(row=0,column=0) tkintertime = StringVar() # 创建时间变量 timelabel = Label(timeframe, textvariable=tkintertime, fg="white", bg="gray99", font=("NovaMono", 40)) # 创建时间标签 timelabel.place(y=screen_height/2 - 60, x=screen_width/2, anchor="center") # 设置时间标签位置 tkinterdate = StringVar() # 创建日期变量 datelabel = Label(timeframe, textvariable=tkinterdate, fg="white", bg="gray99", font=("Bahnschrift", 15)) # 创建日期标签 datelabel.place(y=screen_height/2 + 60, x=screen_width/2, anchor="center") # 设置日期标签位置 while running: tkintertime.set(value=strftime("%H:%M:%S")) # 更新时间 tkinterdate.set(value=strftime("%A, %e %B")) # 更新日期 root.update_idletasks() # 更新窗口 root.update() # 更新窗口 time.sleep(1) # 延迟1秒
全屏canvas绘图
import tkinter as tk def toggle_fs(dummy=None): state = False if root.attributes('-fullscreen') else True root.attributes('-fullscreen', state) if not state: root.geometry('300x300+100+100') root = tk.Tk() canvas = tk.Canvas(root, bg='cyan', highlightthickness=0) canvas.pack(expand=True, fill=tk.BOTH) root.attributes('-fullscreen', True) root.bind('<Escape>', toggle_fs) root.mainloop()
显示消息在最上层
import ctypes def main(): WS_EX_TOPMOST = 0x40000 windowTitle = "Python Windows Message Box Test" message = "Hello, world! This Message box was created by calling the Windows API using the ctypes library." # display a message box; execution will stop here until user acknowledges ctypes.windll.user32.MessageBoxExW(None, message, windowTitle, WS_EX_TOPMOST) print("User clicked OK.") if __name__ == "__main__": main()
开启/关闭屏保
def turn_screen_saver_off(): if sys.platform == "win32": print("turn_screen_saver_off") ctypes.windll.kernel32.SetThreadExecutionState(0x80000002) def turn_screen_saver_on(): if sys.platform == "win32": print("turn_screen_saver_on") ctypes.windll.kernel32.SetThreadExecutionState(0x80000000)
高级特性突破
&,|,~ 和 and,or的区别
如果a,b是数值变量, 则&, |, ~表示位运算, and,or则依据是否非0来决定输出
1 & 2 # 输出为0 1 | 2 # 输出为3 2 and 0 # 返回0 2 and 1 # 返回1 1 and 2 # 返回2 2 or 0 # 返回2 2 or 1 # 返回2 0 or 1 # 返回1如何a, b是逻辑变量, 则两类的用法基本一致
(3>0) or (3<1) # True (3>0) | (3<1) # True (3>0) and (3<1) # False (3>0) & (3<1) # False
数据结构
| 名称 | 特点 | 顺序 | 遍历 | 切片 | 增 | 删 | 改 | 查 |
|---|---|---|---|---|---|---|---|---|
| 列表 | 可变 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
| 元组 | 不可变 | ✓ | ✓ | ✓ | ✗ | ✗ | ✗ | ✓ |
| 字典 | 键值对 | ✗ | ✓ | ✗ | ✓ | ✓ | ✓ | ✓ |
| 集合 | 去重复 | ✗ | ✓ | ✗ | ✓ | ✓ | ✓ | ✓ |
正则表达式
import re str = '有多个中文86的情况也适用' rr = re.compile(r'^[^0-9]*([0-9]*)[^0-9]*',re.I|re.S|re.M) print(re.match(rr,str).group(1)) str = 'abe(ac)ad)' p1 = re.compile(r'\((.*?)\)', re.S) #最小匹配 p2 = re.compile(r'\((.*)\)', re.S) #贪婪匹配 print(re.findall(p1,str)) print(re.findall(p2,str))
类的封装、继承、多态
面向对象编程有三大重要特征:封装、继承和多态。
参考资料
封装
封装是指将数据与具体操作的实现代码放在某个对象内部,使这些代码的实现细节不被外界发现,外界只能通过接口使用该对象,而不能通过任何形式修改对象内部实现,正是由于封装机制,程序在使用某一对象时不需要关心该对象的数据结构细节及实现操作的方法。使用封装能隐藏对象实现细节,使代码更易维护,同时因为不能直接调用、修改对象内部的私有信息,在一定程度上保证了系统安全性。
class Student:
classroom = '101'
address = 'beijing'
def __init__(self, name, age):
self.name = name
self.age = age
def print_age(self):
print('%s: %s' % (self.name, self.age))
✗ 以下是错误的用法: 类将它内部的变量和方法封装起来,阻止外部的直接访问
✗ print(classroom)
✗ print(adress)
✗ print_age()
print(Student.classroom) dongjun = Student('东俊',12) dongjun.print_age()
继承
继承来源于现实世界,一个最简单的例子就是孩子会具有父母的一些特征,即每个孩子都会继承父亲或者母亲的某些特征,当然这只是最基本的继承关系,现实世界中还存在着更复杂的继承。继承机制实现了代码的复用,多个类公用的代码部分可以只在一个类中提供,而其他类只需要继承这个类即可。
在OOP程序设计中,当我们定义一个新类的时候,新的类称为子类(Subclass),而被继承的类称为基类、父类或超类(Base class、Super class)。继承最大的好处是子类获得了父类的全部变量和方法的同时,又可以根据需要进行修改、拓展。其语法结构如下:
class Foo(superA, superB,superC....): class DerivedClassName(modname.BaseClassName): ## 当父类定义在另外的模块时
Python支持多父类的继承机制,所以需要注意圆括号中基类的顺序,若是基类中有相同的方法名,并且在子类使用时未指定,Python会从左至右搜索基类中是否包含该方法。一旦查找到则直接调用,后面不再继续查找。
# 父类定义 class people: def __init__(self, name, age, weight): self.name = name self.age = age self.__weight = weight def speak(self): print("%s 说: 我 %d 岁。 我的体重是 %d" % (self.name, self.age, self.__weight)) # 单继承示例 class student(people): def __init__(self, name, age, weight, grade): # 调用父类的实例化方法 super().__init__(name, age, weight) self.grade = grade # 重写父类的speak方法 def speak(self): print("%s 说: 我 %d 岁了,我在读 %d 年级" % (self.name, self.age, self.grade)) s = student('shaodj', 10, 30, 3) s.speak()
为什么要用super()函数
我们都知道,在子类中如果有与父类同名的成员,那就会覆盖掉父类里的成员。那如果你想强制调用父类的成员呢?使用super()函数! 这是一个非常重要的函数,最常见的就是通过super调用父类的实例化方法init!
class A(object): def __init__(self, name): self.name = name print("父类的__init__方法被执行了!") def show(self): print("父类的show方法被执行了!") class B(A): def __init__(self, name, age): super().__init__(name=name) self.age = age def show(self): super().show() obj = B("jack", 18) obj.show()
新类就是所有类都必须要有继承的类,如果什么都不想继承,就继承到object类,如 class A(object):
Python 3 和 Python 2 的另一个区别是:
Python 3 可以使用直接使用 super().xxx 代替 super(Class, self).xxx
多态
先看下面的代码:
class Animal:
def kind(self):
print("i am animal")
class Dog(Animal):
def kind(self):
print("i am a dog")
class Cat(Animal):
def kind(self):
print("i am a cat")
class Pig(Animal):
def kind(self):
print("i am a pig")
# 这个函数接收一个animal参数,并调用它的kind方法
def show_kind(animal):
animal.kind()
d = Dog()
c = Cat()
p = Pig()
show_kind(d)
show_kind(c)
show_kind(p)
打印结果:
i am a dog
i am a cat
i am a pig
狗、猫、猪都继承了动物类,并各自重写了kind方法。show_kind()函数接收一个animal参数,并调用它的kind方法。可以看出,无论我们给animal传递的是狗、猫还是猪,都能正确的调用相应的方法,打印对应的信息。这就是多态。
实际上,由于Python的动态语言特性,传递给函数show_kind()的参数animal可以是任何的类型,只要它有一个kind()的方法即可。动态语言调用实例方法时不检查类型,只要方法存在,参数正确,就可以调用。这就是动态语言的“鸭子类型”,它并不要求严格的继承体系,一个对象只要“看起来像鸭子,走起路来像鸭子”,那它就可以被看做是鸭子。
class Job:
def kind(self):
print("i am not animal, i am a job")
j = Job()
show_kind(j)
举个栗子
import random
class Flower:
def __init__(self,color,du):
self.color = color
self.__du = du # 定义为私有变量,只能通过内部方法调用
# 内部方法来获取 __du 这个属性
def getdu(self):
return self.__du
class Person:
def __init__(self,name):
self.name = name
class Tfboy(Person): # 这个就是类的继承
count = 0
def __init__(self,name,age):
super().__init__(name) # 用super()调用父类的方法初始化
self.age = age
def touch(self,du): # 统计他摸了几朵无毒的花
if du == "无毒":
Tfboy.count += 1
def total(self):
print("=> {} {}岁,共摸过了 {} 朵花是无毒的".format(self.name, self.age, Tfboy.count))
color = ['红色','绿色','蓝色','黄色','粉色']
du = ['有毒','无毒']
dongjun = Tfboy("东俊",12)
for _ in range(5):
f = Flower( random.choice(color), random.choice(du) )
print( "花的颜色是{},花是{}".format( f.color, f.getdu() ) )
dongjun.touch( f.getdu() )
dongjun.total()
新的类初始化
在 Python 3.7 版本中增加了数据类(data class)这一新功能, 类似于其他语言中的 struct
它就是一个使用装饰器创建的类:
from dataclasses import dataclass @dataclass class people: name: str age: int=0 __weight: int=150
class people: def __init__(self,name,age,weight=150): self.name = name self.age = age self.__weight = weight def saying(self): print('{}说: 我{} 岁了,体重是{}kg'.format(self.name,self.age,self.__weight)) class student(people): def __init__(self,name,age,weight,grade): people.__init__(self,name,age,weight) self.grade = grade def saying(self): print('{}说:我{}岁了,今年读{}年级'.format(self.name,self.age,self.grade)) shaohy = people('shaohy',30) shaohy.saying() shaodj = student('shao dongjun',11,60,5) shaodj.saying()
from dataclasses import dataclass @dataclass class people: name: str age: int=0 __weight: int=150 def saying(self): print('{}说: 我{} 岁了,体重是{}kg'.format(self.name,self.age,self.__weight)) @dataclass class student(people): grade: int=0 def saying(self): print('{}说:我{}岁了,今年读{}年级'.format(self.name,self.age,self.grade)) shaohy = people('shaohy',30) shaohy.saying() shaodj = student('shao dongjun',11,60,5) shaodj.saying()
星号作用
星号*把序列/集合解包(unpack)成位置参数,两个星号**把字典解包成关键字参数。
def foo(*args, **kwarg): for item in args: print(item) for k,v in kwarg.items(): print(k, v) print('*' * 50) if __name__ == '__main__': #foo(1, 2, 3, a=4, b=5) #foo(2, 3, a=4, b=5, c=1) v = (1, 2, 4) v2 = [11, 15, 23] d = {'a':1,'b':12} foo(v, d) foo(*v, **d) foo(v2, d) foo(*v2, **d)
输出结果 (1, 2, 4) {'a': 1, 'b': 12} ************************************************** 1 2 4 a 1 b 12 ************************************************** [11, 15, 23] {'a': 1, 'b': 12} ************************************************** 11 15 23 a 1 b 12 **************************************************
闭包
内部函数中对外部作用域的变量进行引用,(并且一般外部函数的返回值为内部函数),那么内部函数就被认为是闭包。
闭包的套路
# 累加计数器 def counter(): cnt = 0 def addone(): nonlocal cnt cnt += 1 return cnt return addone # a*x + b = y def aline(a,b): def arg_y(x): return a*x+b return arg_y
- 在内嵌函数中使用nonlocal关键字对外部变量进行声明,可以允许内嵌函数修改外部变量。
- 可以用lambda 函数来代替内部函数 极客时间: lambda
def aline(a,b): return lambda x: a*x+b print(type(aline)) line1 = aline(3,5) print(line1(10)) line2 = aline(5,8) print(line2(15))
利用lambda来解决猴子选大王的问题
m = 41 # 一共有多少只猴子 k = 3 # 报数 monkey = list(range(1, m+1)) n = 1 # 报数循环 i = 0 # 猴子的索引 while True: if i == m: i = 0 # ⭕ 到了末尾就从头开始 if n < k and monkey[i] != 0: n += 1 elif monkey[i] != 0: monkey[i] = 0 n = 1 i += 1 print(f"{monkey}, i:{i}, n:{n}, k:{k} ") # 调试输出 #if len(list(filter(lambda x: x != 0, monkey))) == 1: # filter+lambda 速度奇慢 if len(set(monkey)) == 2: print(max(monkey)) break
列表推导式
list1 = [11,22,33,88,77,55,99] dict1 = {'k1':[],'k2':[]} for i in list1: if i < 66: dict1['k1'].append(i) else: dict1['k2'].append(i) print(dict1) dict1['k1'] = list(filter(lambda x: x<66, list1)) dict1['k2'] = list(filter(lambda x: x>66, list1)) print(dict1) dict1 = {'k1':[],'k2':[]} [ dict1['k1'].append(i) if i < 66 else dict1['k2'].append(i) for i in list1 ] print(dict1)
doubled_list = map(lambda x: x * 2, old_list) summation = reduce(lambda x, y: x + y, numbers)
字典推导式
mcase = {'a': 10, 'b': 34, 'A': 7, 'Z': 3} mcase_frequency = { k.lower(): mcase.get(k.lower(), 0) + mcase.get(k.upper(), 0) for k in mcase.keys() if k.lower() in ['a','b'] } print(mcase_frequency)
生成器
通过列表生成式,我们可以直接创建一个列表。但是,受到内存限制,列表容量肯定是有限的。而且,创建一个包含100万个元素的列表,不仅占用很大的存储空间,如果我们仅仅需要访问前面几个元素,那后面绝大多数元素占用的空间都白白浪费了。
所以,如果列表元素可以按照某种算法推算出来,那我们是否可以在循环的过程中不断推算出后续的元素呢?这样就不必创建完整的list,从而节省大量的空间。在Python中,这种一边循环一边计算的机制,称为生成器:generator。
generator保存的是算法,通过for循环来迭代它,并且不需要关心StopIteration的错误。
# coding: utf-8 # 第一种生成器方式是用()把列表推导式转化成 生成器 g = (i*i for i in range(10)) # 使用 for 循环把next元素输出 for x in g: print(x) # 第二种方式是用 yield(),变成generator的函数, # 在每次调用next()的时候执行,遇到yield语句返回, # 再次执行时从上次返回的yield语句处继续执行。 def fab(n): # 斐波拉契数列的推算规则 m,a,b = 0,1,1 while m < n : yield(a) a,b = b,a+b m += 1 for i in fab(5): print(i)
迭代器
Python的迭代器(Iterator)对象表示的是一个数据流,Iterator对象可以被next()函数调用并不断返回下一个数据,直到没有数据时抛出StopIteration错误。可以把这个数据流看做是一个有序序列,但我们却不能提前知道序列的长度,只能不断通过next()函数实现按需计算下一个数据,所以Iterator的计算是惰性的,只有在需要返回下一个数据时它才会计算。
迭代器(Iterator)甚至可以表示一个无限大的数据流,例如全体自然数。而使用list是永远不可能存储全体自然数的。
装饰器
装饰器本质上是一个 Python 函数或类,类似传递函数的闭包。
它的作用就是为已经存在的对象在不需要做任何代码修改的前提下添加额外的功能,装饰器的返回值也是一个函数/类对象。
from functools import wraps def catch_specific_exceptions(func): @wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: print(f"捕获到异常: {e}") return None return wrapper @catch_specific_exceptions def specific_operation(x, y): return x / y @catch_specific_exceptions def read_file(file): f = open(file) data = json.load(f) f.close() specific_operation(10, 0) read_file("x.json")
#!/usr/bin/env python3 import logging logging.basicConfig(level = logging.DEBUG) min = 18.5 normal = 24.9 max = 29.9 def log_fun(level): def decorator(f): def wrapper(*args, **kwargs): if level == "warn": logging.warning(f"{f.__name__} is running") elif level == "debug": logging.debug(f"{args} is running") return f(*args, **kwargs) return wrapper return decorator @log_fun(level="debug") def cal_bmi(h, w): BMI = float(w/(h**2)) print(f"BMI值为:{BMI:.2f}") #公式 if BMI < min: print('身体状态:偏瘦') elif BMI < normal: print('身体状态:正常') elif BMI < max: print('身体状态:超重') elif BMI > max: print('身体状态:肥胖') if __name__ == "__main__": print('----欢迎使用BMI计算程序----') height = float(input('请键入您的身高(m):')) weight = float(input('请键入您的体重(kg):')) cal_bmi(height, weight)
上下文管理器
多线程
多线程类似于同时执行多个不同程序,多线程运行有如下优点:
- 使用线程可以把占据长时间的程序中的任务放到后台去处理
- 用户体验更加友好,比如用户点击了一个按钮去触发某些事件的处理,可以弹出进度条来显示处理的进度
- 程序的运行速度可能加快
- 在一些等待的任务实现上如用户输入、文件读写和网络收发数据等,线程就比较有用了。在这种情况下我们可以释放一些珍贵的资源如内存占用等等。
线程在执行过程中与进程还是有区别的。
- 每个独立的进程有一个程序运行的入口、顺序执行序列和程序的出口。
- 线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。
- 每个线程都有他自己的一组CPU寄存器,称为线程的上下文,该上下文反映了线程上次运行该线程的CPU寄存器的状态。
- 线程可以被抢占(中断)。
- 在其他线程正在运行时,线程可以暂时搁置(也称为睡眠) – 这就是线程的退让。
- 线程总是在进程得到上下文中运行的,指令指针和堆栈指针寄存器是线程上下文中两个最重要的寄存器,这些地址都用于标志拥有线程的进程地址空间中的内存。
import threading import queue data = [ [1,2,3], [3,4], [5,6] ] def job(L,q): print(threading.currentThread()) q.put([x*x for x in L]) """ print( list( map(job,[[1,2,3],[4,5,6]]) ) ) """ def multithread(data): q = queue.Queue() threads = [] for i in range(len(data)): t = threading.Thread(target=job,args=(data[i],q)) t.start() threads.append(t) # 这一段比较巧妙,利用threads池,把线程异步化 for _ in threads: _.join() for i in range(q.qsize()): print(q.get()) if __name__ == '__main__': multithread(data)
多进程
import multiprocessing as mp data = range(10) data2 = range(5) def job(L,q): q.put([x*x for x in L]) def job2(L): return([x*x for x in L]) def multicore(): queue = mp.Queue() p1 = mp.Process(target=job,args=(data,queue)) p2 = mp.Process(target=job,args=(data2,queue)) p1.start() p2.start() p1.join() p2.join() for i in range(queue.qsize()): print(queue.get()) pool = mp.Pool() print(pool.map(job2,(data,))) if __name__ == "__main__": multicore()
协程
消息队列和生产消费
Queue队列
Python中,队列是线程间最常用的交换数据的形式。Queue模块是提供队列操作的模块,虽然简单易用,但是不小心的话,还是会出现一些意外。Queue是线程安全的,自带锁,使用的时候,不用对队列加锁操作。
Python Queue模块有三种队列及构造函数,它们的区别仅仅是条目取回的顺序:
- FIFO队列: 先进先出。 class Queue.Queue(maxsize)
- LIFO类似于堆,先进后出。 class Queue.LifoQueue(maxsize)
- 优先级队列: 级别越低越先出来。 class Queue.PriorityQueue(maxsize)
数据分析
- NumPy: 是基础的数学计算库,包括基本的四则运行,方程式计算,微积分什么的,还有很多其他数学方面的计算,它是Python的矩阵处理器,但目前随着Pandas更新,大部分功能已经和Pandas融合了。
- Pandas: 偏重于上层做数据分析用的,主要是做表格数据呈现,属于一款很适合做数据分析的框架,可以理解成表格+时间戳。目前属于不少对冲基金标配数据分析。用Python处理Excel数据,中文全基础系列教程 我用Python展示Excel中常用的20个操作
- SciPy: 是在NumPy基础上,封装了一层科学计算库,没有那么纯数学,提供方法直接计算结果,同样可以和Pandas一起调用。
数据可视化
- Bokeh: Bokeh 是 Python 中交互性最强的库之一,可用于为 Web 浏览器构建描述性的图形表示形式。它可以轻松处理庞大的数据集,通过定义完善的特征,能够构建交互式的图表、仪表板和数据应用程序。通过与 Flask 和 Django 的集成,您可以在应用程序上表达特定的可视化效果。通过提供对于可视化文件的支持,用户可以将其转换为诸如 Matplotlib、Seaborn、以及 ggplot 等其他库。
- Matplotlib: Matplotlib 是 Python 中最基本的数据可视化软件包。它支持诸如:直方图、条形图、功率谱、误差图等各类图形。提供面向对象的 API 模块,可通过诸如 Tkinter、wxPython、以及 Qt 等 GUI 工具,将图形集成到应用程序中。
- Plotly: 作为知名的图形 Python 库之一,Ploty 通过交互式图形,以方便用户了解目标变量和预测变量之间的依赖性。它可以被用于分析与可视化统计,针对财务、商业和科学数据领域,生成清晰明了的图形、子图、热图、以及 3D 图表等。
python代码审核和优化
$ pip install --upgrade autopep8 $ autopep8 --in-place --aggressive --aggressive <filename>
性能火焰图
- Py-Spy: 是Python程序的抽样分析器。 它允许您可视化查看Python程序在哪些地方花了更多时间,整个监控方式无需重新启动程序或以任何方式修改工程代码。 Py-Spy的开销非常低:它是用Rust编写的,速度与编译的Python程序不在同一个进程中运行。 这意味着Py-Spy可以安全地用于生成生产环境中的Python应用调优分析。
py-spy record -o profile.svg --pid 12345 # 或者 py-spy record -o profile.svg -- python myprogram.py
- Heartrate 是一个 Python 的工具库,可以实时可视化 Python 程序的执行过程。监控运行中的 Python 程序如图:
左侧数字表示每行代码被触发的次数 长方框表示最近被触发的代码行 方框越长表示触发次数越多 颜色越浅表示最近被触发次数越多 没有记录具体的执行时间
- https://github.com/csurfer/pyheat pprofile + matplotlib = 对Python程序进行分析形成一张热图
Casbin RBAC权限管理
Python调用Luajit
import lupa,time from lupa import LuaRuntime lua = LuaRuntime(unpack_returned_tuples=False) lua_func = lua.eval('function(f, n) return f(n) end') def py_add(n): return n*2 print(lua_func(py_add, 10)) # 打印出翻倍的结果 lua_fib = lua.eval(''' function(n) a, b = 0, 1 for i = 1, n do a, b = b, a + b end return a end ''') start = time.time() x = 50 res = lua_fib(x) elapsed = time.time() - start print("Py3 Computed fib(%s)=%s in %0.2f seconds" % (x, res, elapsed))
Python调用Nim函数
纯Python递归求“斐波那契”数列
import time def fib2(n): if n <= 2: return 1 else: return fib2(n-1) + fib2(n-2) if __name__ == "__main__": x = 40 start = time.time() res = fib2(x) elapsed = time.time() - start print("Py3 Computed fib(%s)=%s in %0.2f seconds" % (x, res, elapsed))
Py3 Computed fib(40)=102334155 in 29.92 seconds
nim函数导出py模块
import nimpy proc fib(n: int): int {.exportpy.} = if n <= 2: return 1 else: return fib(n - 1) + fib(n - 2)
nimble install nimpy
然后导出c库函数,源文件名和库函数名要一致:
nim c -d:release --app:lib --threads:on --out:fib_nimpy.so fib_nimpy.nim
新python调用nim库
import time from fib_nimpy import fib def fib2(n): if n <= 2: return 1 else: return fib2(n-1) + fib2(n-2) if __name__ == "__main__": x = 40 start = time.time() #res = fib2(x) res = fib(x) elapsed = time.time() - start print("Py3 Computed fib(%s)=%s in %0.2f seconds" % (x, res, elapsed))
Py3 Computed fib(40)=102334155 in 0.57 seconds
pip3 install nimporter # 第一次会编译运行
Python调用百度翻译
from random import randrange from hashlib import md5 from requests import get from os import path,linesep from sys import argv from time import sleep # 百度在线翻译引擎的ID和密码 apiURL = "http://api.fanyi.baidu.com/api/trans/vip/translate" appID = '20210324000740987' secretKey = 'AQoxdv0Uhz1VmTtUhngu' file = "dongchi.txt" dir = path.dirname(argv[0]) dict_words = {} # 读取单词文本,存入字典 def baiduAPI_translate(query_str, to_lang): ''' 传入待翻译的字符串和目标语言类型,请求 apiURL,自动检测传入的语言类型获得翻译结果 :param query_str: 待翻译的字符串 :param to_lang: 目标语言类型 :return: 翻译结果字典 ''' salt = str(randrange(32768, 65536, 3)) # 生成随机的 salt 值 pre_sign = appID + query_str + salt + secretKey # 准备计算 sign 值需要的字符串 sign = md5(pre_sign.encode()).hexdigest() # 计算 md5 生成 sign params = { # 请求 apiURL 所有需要的参数 'q': query_str, 'from': 'auto', 'to': to_lang, 'appid': appID, 'salt': salt, 'sign': sign } try: response = get(apiURL, params=params) result_dict = response.json() # 获取返回的 json 数据 if 'trans_result' in result_dict: # 得到的结果正常则 return return result_dict else: print('Some errors occured:\n', result_dict) except Exception as e: print('Some errors occured: ', e) def translat(query_str, dst_lang='zh'): ''' 解析翻译结果后输出,默认实现英汉互译 :param query_str: 待翻译的字符串,必填 :param dst_lang: 目标语言类型,可缺省 :return: 翻译后的字符串 ''' result_dict = baiduAPI_translate(query_str, dst_lang) # 指定了目标语言类型,则直接翻译成指定语言 return result_dict['trans_result'][0]['dst'] # 提取翻译结果字符串,并输出返回 ''' 遍历文件分解成字典并处理异常 ''' with open(path.join(dir, file), 'r', encoding='utf-8') as f: for line in f: try: line = line.strip().split(':') dict_words[line[0]] = line[1] except: dict_words[line[0]] = "" f = open('new.txt',"a+",encoding="utf-8") for word in dict_words: res = f"{word}:{ translat(word)}\n" print(res) f.write(res) sleep(1) f.close()
YouTube上下载视频
from pytube import YouTube
import fire
class YT:
def __init__(self,yt_url):
self.url = yt_url
def download(self):
yt = YouTube(self.url)
yt.streams.filter(progressive=True,file_extension='mp4').get_highest_resolution().download()
#yt.streams.filter(progressive=False,file_extension='webm').get_highest_resolution().download()
if __name__ == '__main__':
fire.Fire(YT)
#source ~/myDemo/python_apps/venv/bin/activate.fish
#python3.8 youtube_get.py download --yt_url="https://www.youtube.com/watch?v=jO6qQDNa2UY"
#youtube-dl "https://www.youtube.com/watch?v=1lmRNoLC3Qk" --list-formats #youtube-dl "https://www.youtube.com/watch?v=1lmRNoLC3Qk" -f 315
YouTube的视频转音频mp3
brew install youtube-dl youtube-dl --extract-audio --audio-format mp3 http://www.youtube.com/watch?v=YOURVIDEO ffmpeg -i video.mp4 -vn audio.mp3
ffmpeg具体用法实例
AVI转FLV
ffmpeg -i test.avi -ab 56 -ar 22050 -b 500 -r 15 -s 320×240 test.flv
抓图JPG
ffmpeg -i 2.wmv -y -f image2 -ss 8 -t 0.001 -s 350×240 test.jpg
ffmpeg去掉一部电影的前面20秒
ffmpeg -i inputmovie.mp4 -ss 20 -c copy outputmovie.mp4
- -i input_movie.mp4 指定输入文件
- -ss 20 跳过前面20秒
- -c copy 使用“copy”编解码器来快速复制视频和音频轨道,以避免重新编码
- output_movie.mp4 指定输出文件名
批量转换mp4为flv脚本
#!/bin/sh SCALE="1.5" # 视频流冗余率 FILR="./good.txt" while read LINE;do rm -rf log.log filename=`echo $LINE|awk '{print $5}'` newfile=`echo $filename|cut -d- -f1` ffmpeg -i $filename 2>>log.log bitrate=`grep "bitrate" log.log|sed -r 's@.*bitrate:(.*)kb.*@\1@g'|sed -r 's: ::g'` bitrate=$(echo "$bitrate*$SCALE"|bc)"kb" #echo $bitrate [ -s "$newfile.flv" ] && rm -rf $newfile.flv ffmpeg -i $filename -b $bitrate -acodec libmp3lame -ar 44100 -ac 2 $newfile.flv done < $FILE
使用Pelican发布静态博客
安装python3并激活虚拟环境(略)
cd /Users/apple/myDemo/Python39_Demos source .venv/bin/activate.fish
安装pelican
pip install pelican markdown ghp-import
...
pelican-quickstart
快速配置 参数选项
AUTHOR = 'DJ.shao' SITENAME = "ShaoDJ's World" SITEURL = "https://shaodongjun.github.io" TIMEZONE = 'Asia/Shanghai' DEFAULT_LANG = 'zh-cn' PATH = "content" ARTICLE_URL = 'posts/{date:%Y}/{date:%m}/{date:%d}/{slug}/' ARTICLE_SAVE_AS = 'posts/{date:%Y}/{date:%m}/{date:%d}/{slug}/index.html' PAGE_URL = 'pages/{slug}/' PAGE_SAVE_AS = 'pages/{slug}/index.html' # Feed generation is usually not desired when developing FEED_ALL_ATOM = 'feeds/all.atom.xml' FEED_ALL_RSS = 'feeds/all.rss.xml' RSS_FEED_SUMMARY_ONLY = False # Blogroll LINKS = ( ("Shaohy's Blog", "http://shaohy.upyun.com/doku.php"), ) # Social widget SOCIAL = ( ("Another social link", "#"), ) DEFAULT_PAGINATION = 10
创建blog文章
在content文件夹用markdown写文章,然后测试。
autocmd bufnewfile *.md call HeaderMD() " add generic header to markdown files function HeaderMD() let l = 0 let l:current_date = strftime("%Y-%m-%d %H:%M") let l = l + 1 | call setline(l,'Title: ') let l = l + 1 | call setline(l,'Slug: ') let l = l + 1 | call setline(l,'Date: ' . l:current_date) let l = l + 1 | call setline(l,'Category: ') let l = l + 1 | call setline(l,'Tags: blog,') endfunction
make html && make serve pelican --autoreload --listen
Usage: make html (re)generate the web site make clean remove the generated files make regenerate regenerate files upon modification make publish generate using production settings make serve [PORT=8000] serve site at http://localhost:8000 make serve-global [SERVER=0.0.0.0] serve (as root) to :80 make devserver [PORT=8000] start/restart develop_server.sh make stopserver stop local server make ssh_upload upload the web site via SSH make rsync_upload upload the web site via rsync+ssh make dropbox_upload upload the web site via Dropbox make ftp_upload upload the web site via FTP make s3_upload upload the web site via S3 make cf_upload upload the web site via Cloud Files make github upload the web site via gh-pages
grep 'src="./images' $(OUTPUTDIR)/posts/ -r -l | xargs -I{} sed -i '' '/src=".\/images\//s@./images/@/images/@g' {}
博客托管到GitHub
首先在 GitHub 上新建一个名为username.github.io的仓库。(必须要同名)
这里很多人会推荐使用一个叫作ghp-import的包,用于将output文件夹的内容提交到Blog文件夹内仓库的master分支。
在pelican文件夹新建Git仓库,并绑定远程GitHub仓库地址。
git remote add origin https://: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@github.com/shaodongjun/shaodongjun.github.io.git # 从Makefile里可以看到github包括了一组操作命令 make github

