目录

Python学习笔记

Python知识图谱

学习课堂

荔枝微课

https://m.lizhiweike.com/account3/mywk

Python学习参考

参考资料

编译python最新版本

如果是更高版本的python安装【源代码编译安装】,还需要 gcc升级gcc编译 8.1以上版本。
–enable-optimizations(升级gcc至8.1.0才能使用)
yum install -y gcc gcc-c++ \
     zlib-devel libffi-devel openssl openssl-devel \
     bzip2-devel gdbm-devel xz-devel readline-devel \
     sqlite-devel libuuid-devel tk-devel tcl-devel
 
export CC=/opt/gcc84/bin/gcc ; 
export CXX=/opt/gcc84/bin/g++ ; 
ldconfig
 
./configure --prefix=/opt/python3.10.6 --with-openssl=/opt/openssl \
  --enable-shared  --enable-optimizations  \
  --enable-loadable-sqlite-extensions

vim针对python的配置:

set tabstop=2
set shiftwidth=2   " 默认用2个空格
set softtabstop=2  " sts用2个空格
set autoindent     " 自动缩进
set expandtab      " tab键转换成空格
set listchars=tab:>~,trail:.   " 把空格和tab显示为.
set list

Python多核场景与方法总结

Python 在某些场景下可以通过多进程、多线程或异步编程利用多核资源,但由于全局解释器锁(GIL)的存在,多线程在 CPU 密集型任务中无法充分利用多核。以下是 Python 中可以利用多核的主要场景及方法:

CPU 密集型(多进程)

  1. 并行计算(如蒙特卡洛模拟、数值积分)。
  2. 大规模数据处理(使用 multiprocessing.Pool 或 concurrent.futures.ProcessPoolExecutor)。
  3. 图像/视频处理(如 Pillow 或 OpenCV 结合多进程)。
from multiprocessing import Pool
 
def compute_square(n):
    return n * n
 
if __name__ == "__main__":
    with Pool(4) as p:  # 使用4个进程
        results = p.map(compute_square, [1, 2, 3, 4])

I/O 密集型(多线程/异步)

from concurrent.futures import ThreadPoolExecutor
import requests
 
def fetch_url(url):
    response = requests.get(url)
    return response.status_code
 
urls = ["https://example.com"] * 10
with ThreadPoolExecutor(max_workers=4) as executor:
    results = executor.map(fetch_url, urls)

科学计算与数据处理(专用库)

import numpy as np
# NumPy 的矩阵运算可能自动使用多核
a = np.random.rand(10000, 10000)
b = np.random.rand(10000, 10000)
c = np.dot(a, b)  # 底层可能并行执行

分布式任务与批处理(框架支持)

# 使用 Dask 并行处理大型数据集
import dask.array as da
x = da.random.random((100000, 100000), chunks=(1000, 1000))
y = x + x.T
result = y.mean().compute()  # 自动并行计算

Web 服务与并发请求

# 启动 Flask 服务,使用4个 worker 进程
gunicorn -w 4 app:app

机器学习模型训练

from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(n_estimators=100, n_jobs=4)  # 使用4核
model.fit(X_train, y_train)

多进程 vs 多线程 注意事项:

通过合理选择并发模型和工具,Python 可以高效利用多核资源提升性能。

Python图形界面

PySimpleGUI

PySimpleGUI 一个建立在Tkinter, wxPython, PyQt 甚至是Remi web之上简单但功能强大的GUI。

PyWebview

wxPython+wxFormBuilder

pyQT5+wxPython+kivy

PyScript浏览器wasm 和 beeware

BeeWare 正在积极的发展,在2024年会支持Android/iOS平台。未来可期

Flet写flutter图形app

Rio是个值得关注的项目 🐍

Blender三维引擎

Python游戏编程

Minecraft

Minecraft Pi:是Minecraft为树莓派(Raspberry Pi)而设计的«我的世界»版本。该版本基于携带版但进行了些许删减,同时包含一些改进过的特性以及对多种编程语言提供支持等。树莓派版旨在为编程新手提供一种教学工具。

Turtle【最简单 + 入门】

海龟绘图(trutle)是向孩子们介绍编程的一种流行方式。它是Wally Feurzig和Seymour Papert于1966年开发的Logo编程语言的一部分。Logo语言是一种早期的编程语言,也是一种与自然语言非常接近的编程语言,它通过“绘图”的方式来学习编程,对初学者特别是青少年儿童进行寓教于乐的教学方式。

想象一只小海龟,在一个横轴为x、纵轴为y的坐标系中。以坐标原点(0,0)开始,它根据输入的一组代码指令,在平面坐标系中移动。从而在它爬行的路径上绘制了各种酷炫、奇特的图形。

turtle代码

Arcade【简单】

Arcade 是一个简单易用创建 2D 游戏的 Python 第三方库,非常适合初学程序员或想要在不学习复杂框架的情况下创建2D游戏的程序员。它基于 Pyglet 和OpenGL,提供了动画、精灵、装饰、场景等一系列功能,适合作为桌面系统的游戏开发平台。更重要的是,正如其设计理念,使用 Acrade 开发非常简单!

Pyglet

Pyglet 是一个面向对象方式编写游戏或其他富媒体应用的第三方多媒体框架,轻量级可以跨平台使用,支持 OpenGL、图像、音视频、事件处理等。Pyglet 是一个较为底层的开发框架,不适合直接作为游戏开发平台,在其基础上封装的 cocos2d 和 Arcade 更为合适。但这两个学起来更难,学习成本更大。

Pyglet对中文支持极差,两个汉字只占一个字符,粘在一起,不推荐使用 !!!!

Pymunk

Pymunk(派曼k) 是一个简单易用的 2D 物理规则第三方库,用来在游戏或演示中增加符合物理规则的动态效果。另外一个有名的就是Pybox2D

Pymunk 不是一个游戏框架,比较适合为游戏提供相关功能,可以与 Arcade 游戏开发框架直接兼容使用。想想“愤怒的小鸟”,就知道物理有多重要了!

Pygame【强大+专业】

Pygame 是 Python 语言当之无愧的游戏入门开发第三方库,它基于 SDL 直接访问音视频硬件及输入输出外设,形成了基本的游戏开发引擎。Pygame 是进一步学习其他游戏引擎的基础。这个库尽管简单,却很强大,能够用于开发专业耐玩的桌面游戏。

Kivy【跨平台+多触控】

Kivy是Python的跨平台GUI库,不仅支持windows,macOS ,linux,还支持android和iOS。凭这一点就非常吸引我。Kivy的核心思想是界面逻辑分离,kivy的kv文件控制界面显示部分,python控制逻辑部分。kivy现在在github上有9000+ star,并且还在继续更新维护,所以值得学习和了解一下。

最简单的示例:

import kivy
from kivy.app import App
from kivy.uix.label import Label
from kivy.uix.gridlayout import GridLayout
from kivy.uix.textinput import TextInput
 
class app(App):
  def build(self):
    return Label(text="我是中国人",font_name="Songti.ttc",font_size="48sp")
 
if __name__ == "__main__":
  app().run()
利用buildozer打包android/ios下的软件包: 最好是ubuntu 20.04+buildozer或者 Virtual Box + Ubuntu 20.04.2.0 LTS
sudo apt -y update
sudo apt install -y python3-pip git zip unzip openjdk-8-jdk python3-pip autoconf libtool pkg-config zlib1g-dev libncurses5-dev libncursesw5-dev libtinfo5 cmake libffi-dev libssl-dev lld
 
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple buildozer virtualenv
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple --user --upgrade Cython==0.29.19
export PATH=$PATH:~/.local/bin/

buildozer.spec

[app]
title = Beats
package.name = Beatme
package.domain = org.test
source.dir = .
source.include_exts = py,png,jpg,kv,atlas,ttf
version = 0.1
requirements = python3,kivy,sqlite3,pygame,jnius,sdl2_image,sdl2_mixer,sdl2_ttf,png,jpeg
icon.filename = %(source.dir)s/data/icon.png
orientation = all
osx.python_version = 3
osx.kivy_version = 1.9.1
fullscreen = 0
android.arch = armeabi-v7a
ios.kivy_ios_url = https://github.com/kivy/kivy-ios
ios.kivy_ios_branch = master
ios.ios_deploy_url = https://github.com/phonegap/ios-deploy
ios.ios_deploy_branch = 1.7.0
 
[buildozer]
log_level = 2
warn_on_root = 1

一个kivy示例

from kivy.app import App
from kivy.uix.label import Label
from kivy.uix.button import Button
from kivy.uix.boxlayout import BoxLayout
 
class FirstApp(App):
    pass
 
if __name__ == "__main__":
    FirstApp().run()
 
 
#################################
 
<FirstLayout@BoxLayout>:
    count: 0
    orientation: "vertical"
    padding: 10
 
    Label:
        text: "Button Not Pressed"if root.count==0 else "Button is pressed " +str(root.count)+" time!" if root.count==1 else "Button is pressed " +str(root.count)+" times!"
 
    Button:
        text: "Press Me"
        size_hint: (1,0.3)
        on_press: root.count=root.count+1
FirstLayout:
 buildozer -v android debug

Processing有趣的艺术交互

Python动画框架

Python最快WEB框架

QraphQL的演示

HappyX全栈nim框架

HappyX 中你可以通过-d:httpx和-d:micro
Web框架的要素:asynchttpserver、ws、jsony、tiny_sqlitelimdb

Socketify最快

运行实例

from socketify import App, AppOptions, AppListenOptions
from prisma import Prisma
from urllib.parse import parse_qs
from jinja2_templates import Jinja2Template
import ujson
 
DEBUG = False
db = Prisma(auto_register=True)
 
App = App()
App.template(Jinja2Template("./templates", encoding="utf-8", followlinks=False))
App.json_serializer(ujson)
App.static("/static", "./static")
 
app = App.router()
 
def corksend(res, result="", status=200):
  res.cork(lambda res: res.write_status(f"{status}").end(f"{result}"))
 
@app.get("/")
async def head(res, req):
  if not db.is_connected(): db.connect()
  try:
    posts = db.post.find_many(take=10, order={"id":'desc'}, include={"author": True} )
    context = {
      "framework": "Robyn",
      "templating_engine": "Jinja2",
      "data": posts
    }
    if DEBUG: print(context)
 
    res.render("index.html", **context)
  except Exception as e:
    pass
 
@app.post("/submit")
async def addpost(res, req):
  content_type = req.get_header("content-type")
 
  if content_type == "application/json":
    data = await res.get_json()
  elif content_type == "application/x-www-form-urlencoded":
    data = await res.get_form_urlencoded()
  else:
    data = await res.get_text()
  data = parse_qs(data)
 
  message = data.get("message",[None])[0]
  if DEBUG: 
    print(f"post < {data}, message: {message}")
    response = "<h2>定时任务发布成功!</h2>"
    corksend(res, response)
  else:
    corksend(res)
 
###########################################
if __name__ == "__main__":
  App.listen(
    AppListenOptions(host="0.0.0.0", port=8080),
    lambda config: print( "Listening on port http://%s:%d now\n" % (config.host, config.port))
  ).run()

Sanic with Pony ORM数据库

DEBUG = False
REMOTE_HOST = "192.168.144.73"
FILENAME = "/tmp/.wireguard_config.json"
COMMAND = "uptime"
 
import subprocess
from json import dumps
import wireguard_config as wg_cfg
 
from sanic.response import json
from sanic import Sanic, request, response
from sanic_ext import Extend, openapi
from sanic_openapi import doc
from textwrap import dedent
 
app = Sanic("wireguard_keygen")
app.ext.openapi.describe(
    "WireGuard API",
    version="0.6.18",
    description=dedent("""
        ## 动态生成wireguard的公钥和密钥对
"""),
)
 
from pony.orm import *
db = Database()
 
class UserMetadata(db.Entity):
  id = PrimaryKey(int, size=64, auto=True)
  user_id = Required(int, size=64, default=0)
  already_fist_visit = Required(int, size=8, default=0)
  metadata = Optional(Json, nullable=True)
  _table_ = "user_metadata"
 
async def getmeta(user_id):
  result = None
  if DEBUG: print( f"1 -------------------{user_id}" )
  try:
    db.bind(provider='mysql', host='192.168.144.73', user='root', passwd='xxxx', db='test')
    db.generate_mapping(create_tables=True)
  except Exception as e:
    pass
 
  try:
    with db_session:
      #for p in UserMetadata.select(lambda p: p.user_id == user_id):
      #  print(p.metadata)
 
      #count = select(u for u in UserMetadata if u.user_id == user_id).count()
      user = UserMetadata.get(user_id=user_id)
      if user:
        if DEBUG: print(f"2 ------------------ {user.metadata}")
        if user.metadata is None or user.metadata == "None" :
          result = { "wireguard_info": wg_cfg.getnewpair() }
          if DEBUG: print(f"3 ------------------ {result}")
 
          # 执行原始 SQL
          try:
            user.metadata = result
          except Exception as e:
            user.metadata = dumps(result)
        else:
          if DEBUG: print(f"4 ------------------ exist")
          result = True
      return result
  except Exception as e:
    raise e
 
def exec_remote_cmd(host, port=65422, user="root", command="uptime"):
  ssh_command = f"ssh -i /root/.ssh/hzup.ssh.2018 -o StrictHostKeyChecking=no -p{port} {user}@{host} {command}"
  result = subprocess.run(ssh_command, shell=True, capture_output=True, text=True)
  return result.stdout, result.stderr
 
def upload_remote_file(host, port=65422, user="root", filename=""):
  scp_command = f"scp -i /root/.ssh/hzup.ssh.2018 -o StrictHostKeyChecking=no -P{port} {filename} {host}:/root/"
  result = subprocess.run(scp_command, shell=True, capture_output=True, text=True)
  return result.stdout, result.stderr
 
class UserID:
  uid: int
 
#@app.get("/users/<user_id:int>")
#async def getuserid(request: request.Request, user_id):
@app.post("/users/")
@openapi.summary("动态指定用户uid")
@openapi.description("# 接口描述")
@openapi.body(
    { 
      "application/x-www-form-urlencoded" : UserID,
    },
    description="输入真实有效的用户id",
    required=True,
)
async def getuserid(request):
  """
  openapi:
  ---
  parameters:
  responses:
    '200':
      description: 成功创建公密钥对
    '201':
      description: 公密钥对已经存在
    '400':
      description: 请指定用户正确ID
    '401':
      description: 查无此用户!!!
    '402':
      description: 用户必须真实有效
  """
  try:
    user_id = request.form.get("uid")
  except Exception as e:
    user_id = None
 
  if user_id is None:
    return json({"error": "请指定用户正确ID ?"}, status=400)
 
  try:
    user_id = int(user_id)
    if user_id <= 0:
      return json({"error": "用户必须真实有效 !"}, status=402)
    else:
      res = await getmeta(user_id)
      if res is None:
        return json({"error": "查无此用户   X"}, status=401)
      elif res is True:
        return json({"success": "公密钥对已经存在 #"}, status=201)
      else:
        ssh_res, ssh_err = exec_remote_cmd(REMOTE_HOST, command=COMMAND)
        scp_res, scp_err = upload_remote_file(REMOTE_HOST, filename=FILENAME)
        if DEBUG:
          with open("/tmp/xxx", "w") as f:
            f.write(f"------------------------- \n")
            f.write(f"{ssh_res}  ---  {ssh_err} \n")
            f.write(f"{scp_res}  ---  {scp_err} \n")
 
        return json({"success": "成功创建公密钥对。"}, status=200)
  except ValueError:
    return json({"error": "Parameter 'user_id' must be an integer"}, status=402)
 
 
if __name__ == "__main__":
  app.run(host="0.0.0.0", port=8888, workers=2, auto_reload=True)
在使用 Pony ORM 时,直接插入 JSON 格式的数据并不被支持。不过,你可以将 JSON 数据转换成 Python 字典,然后逐个字段插入到数据库中。

SQLAlchemy 支持 JSON 类型的读写。

Sanic支持热加载代码,真是贴心棒!

Sanic 内部本身有很多开箱即用的配置,直接就在 app.config

print(app.config.keys())

PeeWee异步ORM真好用

DEBUG = True
REMOTE_HOST = "192.168.144.73"
FILENAME = "/tmp/.wireguard_config.json"
COMMAND = ""
 
import logging
 
# 配置日志记录器
logging.basicConfig(
  level=logging.INFO,  # 设置日志级别
  format='%(asctime)s - %(levelname)s - %(message)s',  # 设置日志格式
  handlers=[
    logging.FileHandler('/tmp/xxx.log'),  # 日志输出到文件
  ]
)
 
import subprocess
from json import dumps
import wireguard_config as wg_cfg
 
from sanic.response import json
from sanic import Sanic, request, response
from sanic_ext import Extend, openapi
from sanic_openapi import doc
from textwrap import dedent
 
app = Sanic("wireguard_keygen")
app.ext.openapi.describe(
    "WireGuard API",
    version="0.6.18",
    description=dedent("""
      ## 动态生成wireguard的公钥和密钥对
"""),
)
 
from peewee import *
from playhouse.mysql_ext import JSONField
from peewee_async import Manager, PooledMySQLDatabase, AioModel
 
MySQLPool = PooledMySQLDatabase('holdon',
      host='192.168.5.22', port=3306,
      user='holdon', password='qPxgU!y3p2',
      max_connections=20,
      connect_timeout=3600,
)
 
MySQLPool2 = PooledMySQLDatabase('test',
      host='192.168.144.73', port=3306,
      user='root', password='6jAEGZjh5f',
      max_connections=20,
      connect_timeout=360,
)
 
 
class UserMetadata(AioModel):
  id = AutoField()  # primary_key=True 
  user_id = BigIntegerField(default=0, unique=True)
  already_fist_visit = BooleanField(default=0)
  metadata = JSONField(null=True)
  class Meta:
    database = MySQLPool
    table_name = 'user_metadata' # 自定义表名
 
class UserID:
  uid: int
 
db = Manager(MySQLPool)
 
with db.allow_sync():
  MySQLPool.create_tables([UserMetadata]) # 创建表用同步操作
 
MySQLPool.set_allow_sync(False)          # 禁止使用同步操作
 
 
async def getmeta(user_id):
  result = None
 
  try:
    if DEBUG: logging.warning(f'1 ->  获取UserID: {user_id}')
    query = await UserMetadata.select().where(UserMetadata.user_id == user_id).aio_execute()
 
    if query:
      for user in query:
        if DEBUG: logging.warning(f'2 ->  获取meta: {user.metadata}')
        if user.metadata is None or user.metadata == "None" :
          result = { "wireguard_info": wg_cfg.getnewpair() }
          if DEBUG: logging.warning(f'3 ->  获取公密钥: {result}')
 
          # 执行原始 SQL
          try:
            await UserMetadata.update(metadata=result).where(UserMetadata.user_id == user_id).aio_execute()
          except Exception as e:
            await UserMetadata.update(metadata=dumps(result)).where(UserMetadata.user_id == user_id).aio_execute()
        else:
          if DEBUG: logging.warning(f'4 ->  公密钥 已经存在.')
          result = True
    return result
  except Exception as e:
    raise e
 
def exec_remote_cmd(host, port=65422, user="root", command="uptime"):
  ssh_command = f"ssh -i /root/.ssh/hzup.ssh.2018 -o StrictHostKeyChecking=no -p{port} {user}@{host} {command}"
  result = subprocess.run(ssh_command, shell=True, capture_output=True, text=True)
  return result.stdout, result.stderr
 
def upload_remote_file(host, port=65422, user="root", filename=""):
  scp_command = f"scp -i /root/.ssh/hzup.ssh.2018 -o StrictHostKeyChecking=no -P{port} {filename} {host}:/root/"
  result = subprocess.run(scp_command, shell=True, capture_output=True, text=True)
  return result.stdout, result.stderr
 
@app.middleware('request')
async def handle_request(request):
  pass
 
@app.middleware('response')
async def handle_response(request, response):
  pass
 
#@app.get("/users/<user_id:int>")
#async def getuserid(request: request.Request, user_id):
@app.post("/users/")
@openapi.summary("动态指定用户uid")
@openapi.description("# 接口描述")
@openapi.body(
    { 
      "application/x-www-form-urlencoded" : UserID,
    },
    description="输入真实有效的用户id",
    required=True,
)
async def getuserid(request):
  """
  openapi:
  ---
  parameters:
  responses:
    '200':
      description: 成功创建公密钥对
    '201':
      description: 公密钥对已经存在
    '400':
      description: 输入正确的用户ID
    '401':
      description: 查无此用户!!!
    '402':
      description: 用户必须真实有效
  """
  try:
    user_id = request.form.get("uid")
  except Exception as e:
    if DEBUG: logging.error(f'400 #  输入正确的用户ID ?')
    return json({"error":   "输入正确的用户ID ?"}, status=400)
 
  try:
    user_id = int(user_id)
 
    if user_id <= 0:
      if DEBUG: logging.error(f'402 #  用户必须真实有效 !')
      return json({"error": "用户必须真实有效 !"}, status=402)
    else:
      res = await getmeta(user_id)
      if res is None:
        if DEBUG:logging.error(f'401 #  查询无此用户ID.  X')
        return json({"error": "查询无此用户ID.  X"}, status=401)
      elif res is True:
        if DEBUG:logging.info(f'201 #  用户信息已经存在 #')
        return json({"success": "用户信息已经存在 #"}, status=201)
      else:
        if COMMAND:       # 如果有远程命令要执行
          ssh_res, ssh_err = exec_remote_cmd(REMOTE_HOST, command=COMMAND)
          scp_res, scp_err = upload_remote_file(REMOTE_HOST, filename=FILENAME)
          if DEBUG:logging.info(f'206 #  {ssh_res}:{ssh_err}')
        if DEBUG:logging.info(f'200 #  成功创建公密钥对。')
        return json({"success": "成功创建公密钥对。"}, status=200)
  except Exception as e:
    if DEBUG: logging.error(f'402 #  用户必须真实有效!!')
    return json({"error": "用户必须真实有效 !"}, status=402)
 
 
if __name__ == "__main__":
  app.run(host="0.0.0.0", port=8888, workers=2, auto_reload=True)
peewee-async是一个专为Peewee ORM设计的异步接口,基于Python 3.8+版本的asyncio库构建。这一创新性结合,使得原本同步的数据操作能够无缝融入到你的异步工作流中,显著提升应用程序在处理I/O密集型任务时的效率。

方法同步转异步:以aio_为前缀,提供了一系列与Peewee同步方法对应的异步方法,保持了代码结构的一致性和易读性。

from peewee_async import Manager, PooledMySQLDatabase, AioModel
....
class Users(AioModel):
....
 
result = await Users.select().where(Users.uid == uid).aio_execute()
await Users.update(metadata=result).where(Users.uid == uid).aio_execute()
 
db = Manager(MySQLPool)
 
with db.allow_sync():
  MySQLPool.create_tables([Users])
 
# 如果希望只有异步调用,并将同步视为不需要的或错误, 需要设置
db.allow_sync = False

🏆Robyn with HTMX

Robyn - 一个带有Rust运行时的异步Python Web框架

Robyn+Prisma+Jinja2+HTMX

编译python新版本

安装robyn依赖模块

requirements.txt

anyio==3.6.2
certifi==2022.12.7
charset-normalizer==2.1.1
click==8.1.3
dill==0.3.6
h11==0.14.0
helpers==0.2.0
httpcore==0.16.2
httpx==0.23.3
idna==3.4
inquirerpy==0.3.4
Jinja2==3.0.1
MarkupSafe==2.1.1
multiprocess==0.70.14
nestd==0.3.1
pfzy==0.3.4
prisma==0.7.1
prompt-toolkit==3.0.39
pydantic==1.10.2
python-dotenv==0.21.0
requests==2.28.1
rfc3986==1.5.0
robyn==0.44.0
sniffio==1.3.0
tomlkit==0.11.6
typing_extensions==4.4.0
urllib3==1.26.13
uvloop==0.17.0
watchdog==2.2.1
wcwidth==0.2.6
 
pip3.10 install -r requirements.txt

某些特殊需要指定的版本

pip3.10 install robyn==0.24.0 -U
pip3.10 install "robyn[templating]" -U
 
pip3.10 install prisma httpx certifi urllib3 -U -i https://pypi.tuna.tsinghua.edu.cn/simple

给Robyn加个限速的中间件

pip install robyn robyn-rate-limits
from robyn import Robyn, Robyn
from robyn_rate_limits import InMemoryStore
from robyn_rate_limits import RateLimiter
 
app = Robyn(__file__)
limiter = RateLimiter(store=InMemoryStore, calls_limit=3, limit_ttl=100)
 
@app.before_request()
def middleware(request: Request):
    return limiter.handle_request(app, request)

给Robyn加个日志记录

from robyn import Robyn, Request, Response
from robyn.logger import Logger
from datetime import datetime
app = Robyn(__file__)
 
logger = Logger()
 
class LoggingMiddleware:
 
    def request_info(request: Request):
        ip_address = request.ip_addr
        request_url = request.url.host
        request_path = request.url.path
        request_method = request.method
        request_time = str(datetime.now())
 
        return {
            "ip_address": ip_address,
            "request_url": request_url,
            "request_path": request_path,
            "request_method": request_method,
            "request_time": request_time,
 
        }
 
    def response_info(response: Response):
        status_code = response.status_code
        response_type = response.response_type
 
        return {
            "status_code": status_code,
            "response_type": response_type
        }
 
 
@app.before_request()
def log_request(request: Request):
 
    logger.info(f"Received request: %s", 
        LoggingMiddleware.request_info(request))
 
    return request
 
@app.after_request()
def log_response(response: Response):
 
    logger.info(f"Sending response: %s", 
        LoggingMiddleware.response_info(response))
    return response
 
@app.get("/")
def h():
    return "Hello, World!"
 
app.start(port=8080)

如何升级robyn

#!/bin/sh
cp -a requirements.txt requirements.bak 
pip3.10 install -U pip
pip3.10 install "robyn" -U
pip3.10 install "robyn[templating]" -U
prisma db push --schema sqldb/schema.prisma 
supervisorctl stop robyn
/opt/python3.10.6/bin/python3 /root/robyn_demo/main.py 
supervisorctl restart robyn
supervisorctl status
python3.10 -m pip freeze > requirements.txt

supervisord托管

[program:robyn]
command = /opt/python3.10.6/bin/python3 /root/robyn_demo/app.py --processes 4 --workers 4 
autostart = true
autorestart = true
stopsignal = KILL
stopasgroup = true
stdout_logfile = NONE
stderr_logfile = NONE

创建prisma表结构

prisma db push --schema sqldb/schema.prisma 

schema.prisma

generator client {
  provider             = "prisma-client-py"
  interface = "sync"
  recursive_type_depth = -1
}
 
datasource db {
  provider = "sqlite"
  url      = "file:./sqlite.db"
}
 
model User {
  id        Int      @id @default(autoincrement())
  email     String?  @unique
  createdAt DateTime @default(now())
  name      String
  posts     Post[]
}
 
model Post {
  id        Int      @id @default(autoincrement())
  createdAt DateTime @default(now())
  updatedAt DateTime @updatedAt
  published Boolean  @default(false)
  title     String   @unique
  views     Int      @default(0)
  authorId  Int
  author    User     @relation(fields: [authorId], references: [id])
}

prisma 时经常使用的命令

  • prisma format :顾名思义,它格式化架构文件并验证其内容。
  • prisma db pull :从数据库中获取模型并生成客户端。
  • prisma generate :生成客户端,在您想要修改模型而不推送数据库中的更改的拉策略中很有用。
  • prisma db push :当您测试应用程序时,这是您应该用来重置数据库并生成客户端的命令。
  • prisma migrate dev –name <name> :当您确定您的更改时,在db push之后使用。它将创建一个迁移文件,将其应用于您的本地环境并生成客户端。
  • prisma migrate deploy :在生产环境中应用迁移文件。
  • prisma py version :显示有关 python 客户端、支持的 prisma 版本、二进制文件等的调试信息…

robyn主程序

from robyn import Robyn, serve_file, serve_html, jsonify, WebSocket
from robyn.robyn import Response, Request
from robyn.templating import JinjaTemplate
from urllib.parse import parse_qs
from prisma import Prisma
from os import path
import asyncio
 
DEBUG=True
db = Prisma(auto_register=True)
db.connect()
 
app = Robyn(__file__)
websocket = WebSocket(app, "/webst")
 
current_dir = path.dirname(__file__)
jinja_template = JinjaTemplate(path.join(current_dir, "templates"))
 
 
@app.get("/")
@app.get("/post")
async def head(req):
        try:
                #posts = db.post.query_raw('''select * from Post order by title desc''')
                posts = db.post.find_many( order={"title":'asc'}, include={"author": True} )
 
                context = {
                        "framework": "Robyn",
                        "templating_engine": "Jinja2",
                        "data": posts
                }
                if DEBUG: print(context)
 
                response = jinja_template.render_template(template_name="index.html", **context)
                return response
        except Exception as e:
                raise(e)
 
 
@app.post("/submit")
async def addpost(req: Request):
        data = parse_qs(req.body) # (bytearray(req.get("body")).decode("utf-8")) unused
 
        title = data.get("title")[0]
        author = data.get("author")[0]
 
        if title is None or author is None:
                return jsonify({
                        "error": "you need to provide title and author!"
                })
 
        user = db.user.find_first(where={'name': author})
        if user is None:
                user = db.user.create(
                        data={  
                                "name": author,
                        }
                )
 
        post = db.post.create(
                data={  
                        'title': title,
                        'authorId': user.id,
                }
        )
        if DEBUG: print("post <", data, " -> ", title, author, post.id)
 
        response = f"""<tr>
                <td>{post.title}</td>
                <td>{post.views}</td>
                <td>{author}</td>
                <td>{post.updatedAt}</td>
                <td>
                <button class='btn btn-warning' hx-put='/edit/{post.id}'> Edit </button>
                <button class='btn btn-danger' hx-put='/delete/{post.id}' hx-confirm='Are you sure?'> Delete </button>
                </td>
                </tr>"""
        return response
 
 
@app.get("/view/:id")
async def getpost(req):
        try:
                postId = int(req.path_params.get("id")) #(req.get("path_params").get("id")) unused
                if DEBUG: print(postId, "view")
 
                post = db.post.find_unique(where={'id': postId}, include={'author': True})
                if DEBUG: print(post)
 
                if post is None:
                        return jsonify({"error": "Post doesn't exist"})
 
                response = f"""<tr>
                <td>{post.title}</td>
                <td>{post.views}</td>
                <td>{post.author.name}</td>
                <td>{post.updatedAt}</td>
                <td>
                <button class='btn btn-warning' hx-put='/edit/{post.id}'> Edit </button>
                <button class='btn btn-danger' hx-put='/delete/{post.id}' hx-confirm='Are you sure?'> Delete </button>
                </td>
                </tr>"""
                return response
        except Exception as e:
                raise(e)
 
 
@app.put("/edit/:id")
async def edit(req):
        postId = int(req.path_params.get("id"))
        if DEBUG: print(postId," edit")
 
        post = db.post.find_unique(where={'id': postId}, include={'author': True})
        if DEBUG: print(post)
 
        response = f"""
                <tr hx-trigger='cancel' class='editing' hx-get='/post/{post.id}' hx-swap='outerHTML'>
                <td><input name='title' value='{post.title}'/></td>
                <td>{post.views}</td>
                <td>{post.author.name}</td>
                <td>{post.updatedAt}</td>
                <td>
                        <button class='btn btn-info' hx-get='/view/{post.id}'> Cancel </button>
                        <button class='btn btn-primary' hx-put='/update/{post.id}' hx-include='closest tr'> Save </button>
                </td>
    </tr>"""
        return response
 
 
@app.put("/update/:id")
async def update(req):
        try:
                postId = int(req.path_params.get("id"))
                if DEBUG: print(postId," update")
 
                data = parse_qs(req.body)
                title = data.get("title")[0]
 
                post = db.post.find_unique(where={'id': postId}, include={'author': True})
                if post.title != title:
                        post = db.post.update(where={"id": postId }, include={"author": True}, data={"title": title,"views":{"increment": 1}})
                if DEBUG: print(post)
 
                response = f"""
          <tr>  
                <td>{title}</td>
                <td>{post.views}</td>
                <td>{post.author.name}</td>
                <td>{post.updatedAt}</td>
                <td>
                <button class='btn btn-warning' hx-put='/edit/{post.id}'> Edit </button>
                <button class='btn btn-danger' hx-put='/delete/{post.id}' hx-confirm='Are you sure?'> Delete </button>
                </td>
                </tr>"""
                return response
        except Exception as e:
                raise(e)
 
@app.put("/delete/:id")
async def delete(req: Request):
        postId = int(req.path_params.get("id"))
        if DEBUG: print(postId," delete")
 
        post = db.post.delete(where={'id': postId})
        if post is None:
                return jsonify({"error": "Post doesn't exist."})
        return ""
 
# simple user demo
@app.get("/user")
async def getuser(req):
        try:
                users = db.user.find_many(order={"name":'desc'}, include={"posts": False})
                if DEBUG: print(users)
 
                return jsonify({
                        "data": [user.json() for user in users]
                })
        except Exception as e:
                raise(e)
 
@app.post("/user")
async def adduser(req):
        try:
                data = parse_qs(req.body)
 
                name = data.get("name")[0]
                if name is None:
                        return jsonify({
                                "error": "you need to provide name"
                        })
 
                user = db.user.create(data={"name": name,})
                return user.json(indent=2)
        except: 
                return jsonify({
                        "error": "wrong post json path_params!"
                })
###########################################
i = -1
# simple websocket demo
@websocket.on("message")
def connect():
        global i
        i += 1
        if i == 0:
                return "Hello ?"
        elif i == 1:
                return "Who are u?"
        elif i == 2:
                return "haha, I'm shy."
        elif i == 3:
                i = -1
                return "again!"
 
@websocket.on("close")
def close():
        return "Goodbye world, from ws"
 
@websocket.on("connect")
def message():
        return "Hello world, from ws"
 
app.start(port=5555, host="0.0.0.0") # host defaults to 127.0.0.1

jinja/htmx模板

{# templates/index.html #}
 
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="utf-8">
    <title>Robyn HTMX Demo</title>
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.0.0-beta1/dist/css/bootstrap.min.css" rel="stylesheet">
    <!-- HTMX -->
    <script src="https://unpkg.com/htmx.org@1.5.0"></script>
    <script src="https://cdn.jsdelivr.net/gh/alpinejs/alpine@v2.8.2/dist/alpine.min.js" defer></script>
    <style>
        body{
            padding: 20px;
        }
        table {
            font-family: arial, sans-serif;
            border-collapse: collapse;
            width: 100%;
        }
 
        tr.htmx-swapping td {
            opacity: 0;
            transition: opacity 0.3s ease-out;
        }
 
        td, th {
            border: 1px solid #383737;
            text-align: left;
            padding: 8px;
        }
 
        tr:nth-child(even) {
            background-color: #dddddd;
        }
    </style>
</head>
 
<!-- Place <body> </body> code here -->
<body>
    <h1>{{framework}} � {{templating_engine}} �  HTMX</h1>
 
    <div x-data="{show: false}">
        <button @click="show=!show" class="btn btn-primary">New Post</button>
        <hr>
        <div x-show="show">
        <form hx-post="/submit" hx-swap="beforeend" hx-target="#new-book" class="mb-3">
        <input type="text" placeholder="Post Title" name="title" class="form-control mb-3" />
        <input type="text" placeholder="Post Author" name="author" class="form-control mb-3" />
        <button type="submit" class="btn btn-primary">Submit</button>
        </form>
        </div>
    </div>
 
    <table class="table">
        <thead>
          <tr>
            <th scope="col">Post Title</th>
            <th scope="col">Post Views</th>
            <th scope="col">Post Author</th>
            <th scope="col">LastModify Time</th>
            <th scope="col">Operation</th>
          </tr>
        </thead>
        <tbody id="new-book" hx-target="closest tr" hx-swap="outerHTML swap:0.3s"> 
            {%for post in data%}
            <tr>
                {%if not post.published %}
                <td>{{post.title}}</td>
                <td>{{post.views}}</td>
                <td>{{post.author.name}}</td>
                <td>{{post.updatedAt}}</td>
                <td>
                <button class="btn btn-warning" hx-put="/edit/{{post.id}}"> Edit </button>
                <button class="btn btn-danger" hx-put="/delete/{{post.id}}" hx-confirm='Are you sure?'> Delete </button>
                </td>
                {%endif%}
            </tr> 
            {%endfor%}
        </tbody>
    </table>
 
</body>
 
</html>

Robyn的容器化

docker pull python:3.10-slim
docker run -tid --name robyn-py3 python:3.10-slim
 
apt update && apt upgrade -y
 
# 安装 prisma 需要的数据库
apt install curl build-essential gcc make sqlite3 -y
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
pip3 install -r requirements.txt
 
# 导出 / 导入新的容器镜像
docker export efe5493b8e5d > robyn_py3.tar
cat robyn_py3.tar | docker import - robyn-python3.10:latest
 
docker run -tid -v /app:/app -p 5555:5555 --name robyn-py3 robyn-python3.10 sh -c "prisma db push && python3 /root/app/app.py"
pip install robyn

无法适配于 python3.11和alpine,最佳镜像:python:3.10-slim

Dockerfile

FROM python:3.12.7-slim-bookworm
 
WORKDIR /workspace
 
COPY . .
 
RUN pip install --no-cache-dir --upgrade -r requirements.txt
 
EXPOSE 8080
 
CMD ["python3", "app.py", "--log-level=DEBUG"]
 
#docker build -t "upyun/robyn-wireguard-api" --rm .
#docker save upyun/robyn-wireguard-api:latest > robyn-wireguard.tar
#docker run -tid --restart=always -v /root/robyn_wireguard_api:/workspace -p 80:8080 upyun/robyn-wireguard-api

Robyn & SQLAlchemy 分发公密钥

DEBUG = True
 
import wireguard_config as wg_cfg
from urllib.parse import parse_qs
import json
 
from robyn import Robyn, jsonify
from robyn.robyn import Response, Request
app = Robyn(__file__)
 
from sqlalchemy import create_engine, Boolean, BIGINT, Column, Text, JSON, MetaData, Table, select, update
from sqlalchemy.orm import sessionmaker
 
'''
from sqlalchemy.orm import declarative_base
db = declarative_base()
#db.metadata.create_all(engine)
 
class UserMetadata(db):
    id = Column(BIGINT, primary_key=True)
    user_id = Column(BIGINT, nullable=False, default=0)
    already_fist_visit = Column(Boolean, nullable=False, default=0)
    metadata = Column(JSON)  # 使用 JSON 类型
    __tablename__ = "user_metadata"
'''
 
# 因为表结构里metadata与sqlalchemy 关键字冲突了,所以使用Table & MetaData方式
db = MetaData()
engine = create_engine('mysql+pymysql://root:6jAEGZjh5f@192.168.144.73/test')
 
#AsyncSessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
Session = sessionmaker(bind=engine)
 
UserMetadata = Table('user_metadata', db,
  Column('id', BIGINT, primary_key=True),
  Column('user_id', BIGINT, nullable=False, default=0),
  Column('already_fist_visit', Boolean, nullable=False, default=0),
  Column('metadata', JSON)
)
 
try: # 创建表
  db.create_all(engine)
except Exception as e:
  print(f"Error occurred while creating tables: {e}")
 
 
def getmeta(user_id):
  result = None
  if DEBUG: print( f"1 -------------------{user_id}" )
 
  try:
    session = Session()
 
    # 查询数据, 当你使用 Table 定义表结构时,应该使用 select() 方法而不是 session.query()
    query = select(UserMetadata).where(UserMetadata.c.user_id == user_id)
    try:
      #user = session.query(UserMetadata.metadata).filter_by(user_id=user_id).scalar()
      user = session.execute(query).fetchall()[0]
    except IndexError:
      return None
 
    if user:
      if DEBUG: print(f"2 ------------------ {user.metadata}")
      if user.metadata is None or user.metadata == "None":
        result = { "wireguard_info": wg_cfg.getnewpair() }
        if DEBUG: print(f"3 ------------------ {result}")
 
        try: # 插入/更新 数据
          query = update(UserMetadata).where(UserMetadata.c.user_id == user_id).values(metadata=result)
          session.execute(query)
        except Exception as e:
          query = update(UserMetadata).where(UserMetadata.c.user_id == user_id).values(metadata=json.dumps(result))
          session.execute(query)
        session.commit()
      else:
        if DEBUG: print(f"4 --------------- exist")
        result = True
    return result
  except Exception as e:
    session.rollback()  # 如果出现错误,回滚事务
    raise e
  finally:
    session.close()
 
@app.get("/userid/:id")
async def getuserid(request: Request):
  try:
    user_id = int(request.path_params.get("id")) #(req.get("path_params").get("id")) unused
 
    if user_id is None:
      return jsonify({"error": "Missing parameter 'user_id'"})
    elif user_id <= 0:
      return jsonify({"error": "User id must be an integer"})
    else:
      res = getmeta(user_id)
 
      if res is None:
        return jsonify({"error": "No such user id"})
      elif res is True:
        return jsonify({"success": "User metadata is exist."})
      else:
        return jsonify({"success": "User metadata updated"})
  except Exception as e:
    return jsonify({"error": "User id must be an integer"})
 
###########################################
if __name__ == "__main__":
  app.start(port=8888, host="0.0.0.0") # host defaults to 127.0.0.1

Robyn & peewee & openapi 分发公密钥

from robyn import Robyn, OpenAPI, Request, Response
from robyn.openapi import OpenAPIInfo
from robyn.templating import JinjaTemplate
 
from peewee import *
from playhouse.mysql_ext import JSONField
from peewee_async import Manager, PooledMySQLDatabase, AioModel
 
from os import path, getenv
from typing import TypedDict
from dotenv import load_dotenv, find_dotenv, dotenv_values
 
import subprocess, json, logging, datetime
import wireguard_config as wg_cfg
 
 
DEBUG = True
load_dotenv(find_dotenv(), override=True)  # 加载 .env 文件到环境变量
 
# 配置日志记录器
logging.basicConfig(
  level=logging.INFO,  # 设置日志级别
  format='%(asctime)s - %(levelname)s - %(message)s',  # 设置日志格式
  handlers=[
    logging.FileHandler(getenv("Logfile")),  # 日志输出到文件
  ]
)
 
app = Robyn(
  file_object=__file__,
  openapi=OpenAPI(
    info=OpenAPIInfo(
      title=getenv("Title"),
      description=getenv("Description"),
      version=getenv("Version"),
    )
  )
)
 
'''
current_dir = path.dirname(__file__)
static_dir = path.join(current_dir, "static")
jinja_template = JinjaTemplate(path.join(current_dir, "templates"))
 
# 读取环境变量并保存到字典
config = dotenv_values(find_dotenv())
'''
 
MySQLPool = PooledMySQLDatabase(getenv("DB_Name"),
  host=getenv("DB_Host"), port=int(getenv("DB_Port")),
  user=getenv("DB_User"), password=getenv("DB_Pass"),
  max_connections=int(getenv("DB_Pool")),
  connect_timeout=int(getenv("DB_Timeout")),
)
 
 
class UserMetadata(AioModel):
  id = AutoField()  # primary_key=True
  user_id = BigIntegerField(default=0, unique=True)
  info = JSONField(null=True)
  create_at=DateTimeField(default=datetime.datetime.now)
  update_at=DateTimeField(default=datetime.datetime.now)
  class Meta:
    database = MySQLPool
    table_name = getenv("DB_Table") # 自定义表名
 
 
class UserID(TypedDict):
  dcid: int
  uid: int
 
 
def HTTPException(status=403, headers={}):
  responses = {
    401: {"description": "查询无此用户ID. X"},
    402: {"description": "用户必须真实有效!"},
    403: {"description": "拒绝访问!"},
    200: {"description": "成功创建公密钥对:"},
    201: {"description": "用户信息已经存在#"},
  }
  if DEBUG: logging.info(f'{status} #  {responses.get(status).get("description")}')
  return Response(status_code=status, description=responses.get(status).get("description"), headers=headers)
 
async def postmeta(user_id, dc_id=0):
  result = content = None
  try:
    if DEBUG: logging.warning(f'1 ->  获取UserID: {user_id} 机房: {dc_id}')
    query = await UserMetadata.select().where(UserMetadata.user_id == user_id).aio_execute()
    if query:
      for user in query:
        if DEBUG: logging.warning(f'2 ->  获取meta: {user.user_id} -> {user.info}')
 
        if user.info is None or not user.info.get("wireguard_info", None):
          result = "Success"
          content = {
                "dc_no": dc_id,
                "server_pubkey": "1b5j0BafMgF9ofXuOCvRRznv00GuHXbfw94f6h8peRk=",
                "wireguard_info": wg_cfg.getnewpair(dc_id)
                }
          if DEBUG: logging.warning(f'3 ->  获取公密钥: ***********')
 
          # 执行原始 SQL
          try:
            await UserMetadata.update(info=content).where(UserMetadata.user_id == user_id).aio_execute()
          except Exception as e:
            await UserMetadata.update(info=json.dumps(content)).where(UserMetadata.user_id == user_id).aio_execute()
        else:
          if DEBUG: logging.warning(f'4 ->  公密钥 已经存在.')
          result = True
          content = { "wireguard_info": f"{user.info}" }
    return result, content
  except Exception as e:
    raise e
 
def exec_remote_cmd(host, port=65422, user="root", command="uptime"):
  ssh_command = f"ssh {getenv('SSH_OPS')} -p{port} {user}@{host} {command}"
  result = subprocess.run(ssh_command, shell=True, capture_output=True, text=True)
  return result.stdout, result.stderr
 
def upload_remote_file(host, port=65422, user="root", filename=""):
  scp_command = f"scp {getenv('SSH_OPS')} -P{port} {filename} {host}:/root/wireguard_config.json"
  result = subprocess.run(scp_command, shell=True, capture_output=True, text=True)
  return result.stdout, result.stderr
 
@app.get("/users/", openapi_name="Get User ID", openapi_tags=["UserID"])
async def GetUserID(r: Request, query_params=UserID):
  try:
    user_id = int(r.query_params.get("uid"))
    if user_id <= 0:
      return HTTPException(status=403)
 
    res, content = await postmeta(user_id)
    if res is True:
      return content
    else:
      return HTTPException(status=401)
  except Exception as e:
    return HTTPException(status=402)
 
@app.post("/users/", openapi_name="Post User ID", openapi_tags=["UserID"])
async def PostUserID(r: Request, body=UserID, description="XXXXXXXX"):
  try:
    post = r.json()
    user_id = int(post.get("uid"))
    dc_id = int(post.get("dcid"))
 
    if user_id <= 0 or dc_id > 1000 :
      return HTTPException(status=403)
 
    FILENAME = f"/tmp/.wireguard_config.json.{dc_id}"
    REMOTE_HOST = f"HOST_{dc_id}"
 
    res, content = await postmeta(user_id, dc_id)
 
    if res is None:
      return HTTPException(status=401)
    elif res is True:
      return HTTPException(status=201)
    else:
      if getenv("COMMAND") and getenv(REMOTE_HOST): # 如果有远程命令要执行
        scp_res, scp_err = upload_remote_file( getenv(REMOTE_HOST), filename=FILENAME  )
        ssh_res, ssh_err = exec_remote_cmd( getenv(REMOTE_HOST), command=getenv("COMMAND")  )
        if DEBUG:logging.info(f'206 #  {dc_id }/{getenv(REMOTE_HOST)} {ssh_res}:{ssh_err}')
      return HTTPException(status=200)
  except Exception as e:
    return HTTPException(status=402)
 
 
if __name__ == "__main__":
  db = Manager(MySQLPool)
  with db.allow_sync():
    MySQLPool.create_tables([UserMetadata]) # 创建表用同步操作
  MySQLPool.set_allow_sync(False)          # 禁止使用同步操作
 
  app.start(port=8080, host="0.0.0.0")  # host defaults to 127.0.0.1

FastHTML with HTMX

FastAPI with HTMX = FastHTML ?

FastUI也是一个简单实用的用户界面库,由大名鼎鼎的pydantic作者写的。

NextPy: Reflex + ReactPy + FastApi

Reflex应用程序编译为React前端应用程序和FastAPI后端应用程序。只有UI被编译为next.js;所有的应用程序逻辑和状态管理都保持在Python中,并在服务器上运行。Reflex使用WebSockets将事件从前端发送到后端,并将状态更新从后端发送到前端。

🤔 Nextpy:应用开发的未来

轻松快速地构建任何 Web 应用 — ⚡。它简化了从后端到前端(是的,用 Python 构建视觉上惊艳的前端!)、AI 集成、API 等的 Pythonic 开发,赋予人类和 AI 代理以力量。

⏰ 你可以在1小时内构建什么: 美观的作品集、门户、数据应用、内部工具、API 等。

📚 最棒的部分?可转移的知识: 使用 nextpy 进行构建会逐步教会你最好的 python 库——FastAPI、Pydantic、SQLModel、Pandas、Jinja2、SQLAlchemy、Reflex、Reactpy等。

Nextpy依赖的bun,需要更高版本的glibc !!!

FastAPI ASGI with PyPy

安装fastapi

pip3.7 install fastapi pydantic uvicorn

最简单的get/post

main.py

from fastapi import FastAPI, Query
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field
 
app = FastAPI()
 
 
# 定义数据模型,可以做一些条件约束
class People(BaseModel):
	name: str
	age: int = Field(le=100)
	address: str = Field(max_length=10)
	salary: float = Field(le=100000)
 
 
Gender = Enum('Gender', {'male': 'boy', 'female': 'girl'})
 
 
@app.get('/')
def index():
	return {'msg': '欢迎使用 FastAPI !'}
 
 
@app.get('/query/{uid}')  # 做数据类型检验
def query(uid: Optional[int] = None):
	msg = f'You query ID is {uid}'
	return {'msg': msg, 'success': True}
 
 
@app.get('/gender/{sex}')  # 这是可选的类型
async def gender(sex: Optional[Gender] = None):
	return {'msg': sex}
 
 
@app.post('/insert')
async def insert(people: People):
	salary = people.salary * 12
	msg = f'你叫 {people.name},今年: {people.age},全年收入为{salary}'
	return {'msg': msg, 'success': True}
 
 
@app.get('/search')  # 参数验证和判断
async def search(q: str = Query(None, max_length=6)):
	return {'msg': q}
 
 
if __name__ == '__main__':
	import uvicorn
 
	uvicorn.run(app="__main__:app", port=8000, reload=True)

给FastAPI添加中间件

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
 
API_KEY = "my_secret_api_key"
app = FastAPI()
 
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Allows all origins
    allow_credentials=True,
    allow_methods=["*"],  # Allows all methods
    allow_headers=["*"],  # Allows all headers
)
 
 
# Middleware: API key authentication
@app.middleware("http")
async def api_key_authentication(request: Request, call_next):
    api_key = request.headers.get("X-Api-Key")
    if api_key != API_KEY:
        raise HTTPException(status_code=401, detail="Unauthorized: Invalid API Key")
 
    response = await call_next(request)
    response.headers["X-Custom-Header"] = "CustomHeaderValue"
    return response

FastAPI的依赖注入

app = FastAPI()
 
# Dependency Injections
async def get_record(person_id: int)):
    record = await Person.get(person_id)
    if record is None:
        raise HTTPException(status_code=404, detail="Item not found")
    return record
 
# FastAPI function
@app.get("/items/{person_id}")
async def read_item(person: Person = Depends(get_record)):
    return item

FastAPI最简单的RBAC角色权限访问控制

from fastapi import FastAPI, HTTPException, Request
from starlette.middleware.base import BaseHTTPMiddleware
 
app = FastAPI()
 
# Define role-based access control (RBAC) structure
RESOURCES_FOR_ROLES = {
    'admin': {
        'resource1': ['read', 'write', 'delete'],
        'resource2': ['read', 'write'],
    },
    'user': {
        'resource1': ['read'],
        'resource2': ['read', 'write'],
    }
}
 
# 这个可以改进从数据库里动态的获取用户信息和角色
USERS = {
    'user1': {'username': 'user1', 'role': 'user'},
    'admin1': {'username': 'admin1', 'role': 'admin'}
}
 
# Optionally, define paths to be excluded from checking for permissions
EXLUDED_PATHS = ['docs', 'openapi.json']
 
#Map request methods to actions
def translate_method_to_action(method: str) -> str:
    method_permission_mapping = {
        'GET': 'read',
        'POST': 'write',
        'PUT': 'update',
        'DELETE': 'delete',
    }
    return method_permission_mapping.get(method.upper(), 'read')
 
# CHeck if permission granted or not
def has_permission(user_role, resource_name, required_permission):
    if user_role in RESOURCES_FOR_ROLES and resource_name in RESOURCES_FOR_ROLES[user_role]:
        return required_permission in RESOURCES_FOR_ROLES[user_role][resource_name]
    return False
 
# Define a custom Middleware for handling RBAC
class RBACMiddleware(BaseHTTPMiddleware):
  async def dispatch(self, request: Request, call_next):
      request_method = str(request.method).upper()
      action = translate_method_to_action(request_method)
      resource = request.url.path[1:]
      if not resource in EXLUDED_PATHS:
            admin1 = USERS['admin1'] # 通过切换不同的用户实例来判断
            # user1 = USERS['user1'].
            if not has_permission(admin1['role'], resource, action):
                raise HTTPException(status_code=403, detail="Insufficient permissions")
      response = await call_next(request)
      return response
 
# Add the middleware to FastAPI
app.add_middleware(RBACMiddleware)
 
# Example protected route for resource 1
@app.get("/resource1")
async def resource1():
    return {"message": "This is a resource1 route"}
 
@app.post("/resource1")
async def add_resource1(add: int):
    return {"message": add}
@app.delete("/resource1")
async def delete_resource1():
    return {"message": "This resource1 is deleted"}
 
# Example protected route for resource 2
@app.get("/resource2")
async def resource2():
    return {"message": "This is an resource2 route"}
@app.post("/resource2")
async def add_resource2(add: int):
    return {"message": add}
@app.delete("/resource2")
async def delete_resource2():
    return {"message": "This resource2 is deleted"}
 
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

🏆FastAPI+PeeWee真好用

DEBUG = True
 
from os import getenv
from dotenv import load_dotenv, find_dotenv, dotenv_values
 
def load_env_vars(file_path):
  load_dotenv(file_path, override=True) # 加载 .env 文件到环境变量
  config = dotenv_values(file_path)     # 读取环境变量并保存到字典
  return config
 
# 现在 config 是一个字典,包含所有的环境变量
config = load_env_vars( find_dotenv() )
 
import wireguard_config as wg_cfg
import uvicorn, subprocess, json, logging
 
# 配置日志记录器
logging.basicConfig(
  level=logging.INFO,  # 设置日志级别
  format='%(asctime)s - %(levelname)s - %(message)s',  # 设置日志格式
  handlers=[
    logging.FileHandler(getenv("Logfile")),  # 日志输出到文件
  ]
)
 
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field, conint
 
app = FastAPI(
    title = getenv("Title"),
    description = getenv("Description"),
    version = getenv("Version"),
)
 
from peewee import *
from playhouse.mysql_ext import JSONField
from peewee_async import Manager, PooledMySQLDatabase, AioModel
 
MySQLPool = PooledMySQLDatabase(getenv("DB_Name"),
      host=getenv("DB_Host"), port=int(getenv("DB_Port")),
      user=getenv("DB_User"), password=getenv("DB_Pass"),
      max_connections=int(getenv("DB_Pool")),
      connect_timeout=int(getenv("DB_Timeout")),
)
 
class UserMetadata(AioModel):
  id = AutoField()  # primary_key=True
  user_id = BigIntegerField(default=0, unique=True)
  already_fist_visit = BooleanField(default=0)
  metadata = JSONField(null=True)
  class Meta:
    database = MySQLPool
    table_name = getenv("DB_Table") # 自定义表名
 
db = Manager(MySQLPool)
 
with db.allow_sync():
  MySQLPool.create_tables([UserMetadata]) # 创建表用同步操作
 
MySQLPool.set_allow_sync(False)          # 禁止使用同步操作
 
 
async def getmeta(user_id, dc_id):
  result = None
 
  try:
    if DEBUG: logging.warning(f'1 ->  获取UserID: {user_id} 机房: {dc_id}')
    query = await UserMetadata.select().where(UserMetadata.user_id == user_id).aio_execute()
 
    if query:
      for user in query:
        if DEBUG: logging.warning(f'2 ->  获取meta: {user.user_id} -> {user.metadata}')
        if user.metadata is None or user.metadata == "None" :
          result = { "dc_no": dc_id, "wireguard_info": wg_cfg.getnewpair(dc_id) }
          if DEBUG: logging.warning(f'3 ->  获取公密钥: ***********')
 
          # 执行原始 SQL
          try:
            await UserMetadata.update(metadata=result).where(UserMetadata.user_id == user_id).aio_execute()
          except Exception as e:
            await UserMetadata.update(metadata=json.dumps(result)).where(UserMetadata.user_id == user_id).aio_execute()
        else:
          if DEBUG: logging.warning(f'4 ->  公密钥 已经存在.')
          result = True
    return result
  except Exception as e:
    raise e
 
def exec_remote_cmd(host, port=65422, user="root", command="uptime"):
  ssh_command = f"ssh {getenv('SSH_OPS')} -p{port} {user}@{host} {command}"
  result = subprocess.run(ssh_command, shell=True, capture_output=True, text=True)
  return result.stdout, result.stderr
 
def upload_remote_file(host, port=65422, user="root", filename=""):
  scp_command = f"scp {getenv('SSH_OPS')} -P{port} {filename} {host}:/root/"
  result = subprocess.run(scp_command, shell=True, capture_output=True, text=True)
  return result.stdout, result.stderr
 
 
class UserID(BaseModel):
  dcid: conint(ge=100, le=999)
  uid: conint(gt=0)
  model_config = {"extra": "forbid"}
 
responses = {
    401: {"description": "查询无此用户ID. X"},
    402: {"description": "用户必须真实有效!"},
    200: {"description": "成功创建公密钥对:"},
    201: {"description": "用户信息已经存在#"},
}
 
#@app.get("/users/{user_id:UserID}")
@app.post("/users/", include_in_schema=True, responses={**responses})
async def PostUserID(userid: UserID):
  # Dependency function not ready
  try:
    user_id = int(userid.uid)
    dc_id = int(userid.dcid)
    FILENAME = f"/tmp/.wireguard_config.json.{dc_id}"
 
    res = await getmeta(user_id, dc_id)
 
    if res is None:
      if DEBUG: logging.error(f'401 #  {responses.get(401)}')
      return HTTPException(detail=responses.get(401), status_code=401)
    elif res is True:
      if DEBUG: logging.info(f'201 #  {responses.get(201)}')
      return HTTPException(detail=responses.get(201), status_code=201)
    else:
      if COMMAND:       # 如果有远程命令要执行
        ssh_res, ssh_err = exec_remote_cmd( getenv("REMOTE_HOST"), command=getenv("COMMAND") )
        scp_res, scp_err = upload_remote_file( getenv("REMOTE_HOST"), filename=FILENAME)
        if DEBUG:logging.info(f'206 #  {ssh_res}:{ssh_err}')
 
      if DEBUG: logging.info(f'200 #  {responses.get(200)} {res}')
      return HTTPException(detail=responses.get(200), status_code=200)
  except Exception as e:
    if DEBUG: logging.error(f'402 #  {responses.get(402)}')
    return HTTPException(detail=responses.get(402), status_code=402)
 
 
if __name__ == "__main__":
  uvicorn.run("main:app", host="0.0.0.0", port=8888, workers=4, reload=DEBUG)

查看swagger doc

http://127.0.0.1:8000/docs
http://127.0.0.1:8000/redoc

用ASGI异步运行

python3.7 -m uvicorn main:app --reload
 
Server Software:        uvicorn
Server Hostname:        127.0.0.1
Server Port:            8000
 
Document Path:          /query/1234
Document Length:        45 bytes
 
Concurrency Level:      100
Time taken for tests:   4.408 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      1700000 bytes
HTML transferred:       450000 bytes
Requests per second:    2268.75 [#/sec] (mean)
Time per request:       44.077 [ms] (mean)
Time per request:       0.441 [ms] (mean, across all concurrent requests)
Transfer rate:          376.65 [Kbytes/sec] received
 
Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    1   0.8      1       6
Processing:     3   43  12.2     40     114
Waiting:        1   37  11.7     35     113
Total:          4   44  12.2     40     115

使用进程管理器,如gunicorn,supervisord,nginx

gunicorn -w 4 -k uvicorn.workers.UvicornWorker  main:app --reload 
 
Server Software:        uvicorn
Server Hostname:        127.0.0.1
Server Port:            8000
 
Document Path:          /query/1234
Document Length:        45 bytes
 
Concurrency Level:      100
Time taken for tests:   1.448 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      1700000 bytes
HTML transferred:       450000 bytes
Requests per second:    6903.71 [#/sec] (mean)
Time per request:       14.485 [ms] (mean)
Time per request:       0.145 [ms] (mean, across all concurrent requests)
Transfer rate:          1146.12 [Kbytes/sec] received
 
Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    1   0.7      1       5
Processing:     2   14  20.5     11     220
Waiting:        2   12  19.2     10     220
Total:          3   14  20.5     12     221
gunicorn -w 4 -k uvicorn.workers.UvicornH11Worker  main:app --reload 
 
Server Software:        uvicorn
Server Hostname:        127.0.0.1
Server Port:            8000
 
Document Path:          /query/1234
Document Length:        45 bytes
 
Concurrency Level:      100
Time taken for tests:   2.721 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      1890000 bytes
HTML transferred:       450000 bytes
Requests per second:    3675.03 [#/sec] (mean)
Time per request:       27.211 [ms] (mean)
Time per request:       0.272 [ms] (mean, across all concurrent requests)
Transfer rate:          678.30 [Kbytes/sec] received
 
Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   2.6      0     184
Processing:     4   27  20.2     24     220
Waiting:        3   22  15.9     20     217
Total:          4   27  20.4     24     221

Falcon WSGI with PyPy

Falcon根本不适合提供HTML页面。适用于RESTful API。

Python静态编译器

PyPy的优缺点

PyPy是Python解释器CPython的直接替代品。CPython将Python编译为中间字节码然后由虚拟机解释,而PyPy使用实时(JIT)编译将Python代码转换为本地机器的汇编语言。

根据正在执行的任务,性能提升可能会非常显着。平均而言,PyPy将Python加速了大约7.6倍,一些任务加速了50倍或更多。CPython解释器根本不会执行与PyPy一样的优化方式,并且可能永远不会,因为这不是它的设计目标之一。

最好的部分是开发人员需要很少甚至不需要努力来解锁PyPy提供的收益。只需将CPython替换为PyPy,并且大部分都已完成。下面讨论了一些例外,但是PyPy的目标是运行现有的,并且未经修改的Python代码并为其提供自动化的速度提升。

PyPy最适合纯Python的应用程序

PyPy在“纯”Python应用程序中表现最佳,换句话说也就是用Python编写的没有夹杂其他语言的应用程序中表现最佳。由于PyPy模仿CPython的本机二进制接口的方式,与C库(如NumPy)接口的Python包也没有那么出类拔萃了。

PyPy的开发人员已经解决了这个问题,并使PyPy与大多数依赖于C扩展的Python包更加兼容。例如Numpy现在与PyPy兼容的非常好。但是,如果你希望与C的扩展最大程度地兼容,请使用CPython。

PyPy适用于运行时间较长的程序

PyPy优化Python程序的一个副作用是,运行时间较长的程序通过PyPy的优化获益最多。程序运行的时间越长,PyPy可以收集的运行时类型信息就越多,它可以进行的优化就越多。

一劳永逸的Python脚本不会从这种事情中受益。例如受益的Python应用程序通常具有长时间循环运行的行为,或者在Web框架的后台中连续运行。

PyPy没有预编译

PyPy编译Python代码,但它不是Python代码的编译器。由于PyPy执行其优化的方式和Python的固有动态特点,因此无法将生成的JITted代码作为独立二进制文件发出并重新使用它。

每次运行都必须编译每个程序。如果你想将Python编译成可以作为独立应用程序运行的更快的代码,那么还是请使用Cython、Numba或当前实验性的Nuitka项目。

Numba

Numba,是一款可以让python速度提升百倍,将python函数编译为机器代码的JIT编译器,经过Numba编译的python代码(仅限数组运算),其运行速度可以接近C或FORTRAN语言。

from numba import jit
import time
 
@jit(nopython=True)
def count(n):
    sum = 0
    for i in range(n):
        sum += i
    return sum
 
start = time.time()
sum = count(1000000001)
end = time.time()
print(f"sum is {sum}, used time is {end - start}")

Python最佳数据分析工具

Excel分析

PowerPoint处理

Word文字编辑

Pandas数据分析

import pandas as pd
import matplotlib.pyplot as plt
 
# 中文支持,不会显示小方框
plt.rcParams['font.sans-serif'] = ['SimHei', 'Songti SC', 'STFangsong']
# 正常显示负号
plt.rcParams['axes.unicode_minus'] = False
# 设置线条宽度
plt.rcParams['lines.linewidth'] = 50
# 设置线条颜色
plt.rcParams['lines.color'] = 'red'
# 设置线条样式,样式的各类还有 `--`为虚线,`-.`为点虚线
plt.rcParams['lines.linestyle'] = '--' # 直线
 
# 1. 创建一个示例 DataFrame
data = {
    'Name': ['张三', '李四', '王五', '赵六', '田七'],
    'Age': [24, 27, 22, 32, 29],
    'Salary': [5000, 8500, 6800, 7200, 6500],
    'Department': ['HR', 'IT', 'Finance', 'IT', 'HR']
}
 
df = pd.DataFrame(data)
 
# 2. 数据查看
print(f"DataFrame: \n{df} \n")
 
# 3. 基本统计
print(f"基本统计: \n {df.describe()} \n")
 
# 4. 数据选择
print(f"选择 'Name' 和 'Salary' 列: \n {df[ ['Name', 'Salary']]} \n")
 
# 5. 添加新列
df['Bonus'] = df['Salary'] * 0.1
print(f"添加 'Bonus' 列: \n {df} \n")
 
# 6. 条件筛选
print(f"筛选出薪水大于 6000 的员工: \n {df[df['Salary'] > 6000]} \n")
 
# 7. 分组统计
grouped = df.groupby('Department').mean(numeric_only=True)  # 只计算数值列
print(f"按部门分组的平均值: \n {grouped} \n")
 
# 8. 处理缺失值
df.loc[1, 'Salary'] = None  # 人为制造缺失值
print(f"处理缺失值: \n {df.fillna( df['Salary'].mean())} \n")
 
# 9. 数据排序
print(f"按年龄排序: \n {df.sort_values(by='Age')}")
 
# 10. 数据可视化
plt.figure(figsize=(10, 5))
plt.xlabel('姓名')
plt.ylabel('薪资')
plt.title('员工薪水')
plt.bar(df['Name'], df['Salary'], color='blue')
plt.show()

Python技巧收集

🏆Python之父教你写main函数

class Usage(Exception):

  def __init__(self, msg):
      self.msg = msg

def main(argv=None):

  if argv is None:
      argv = sys.argv
  try:
      try:
          opts, args = getopt.getopt(argv[1:], "h", ["help"])
      except getopt.error, msg:
           raise Usage(msg)
      # more code, unchanged
  except Usage, err:
      print >>sys.stderr, err.msg
      print >>sys.stderr, "for help use --help"
      return 2

if name == “main”:

  sys.exit(main())

</code>

🏆二行命令打造Python的vim

curl -O https://raw.githubusercontent.com/vince67/v7_config/master/vim.sh
bash vim.sh

🏆最简实用的文件打开

with open…as f 语句打开和关闭文件,如文件异常,则抛出一个内部块异常。
for line in f 文件对象f视为一个迭代器,会自动的采用缓冲IO和内存管理,适用于大小文件。

#with open(newfile, 'w') as outfile, open(oldfile, 'r', encoding='utf-8') as infile:
with open('chenji.txt') as f:
  for line in f:
    if not line:  #判断结尾
      break
    elif line.strip() == '': #判断空行     
      pass
    else:
      print(line) 

with可以用来简化try finally代码,看起来可以比try finally更清晰。

with如何工作?

这看起来充满魔法,但不仅仅是魔法,Python对with的处理还很聪明。
基本思想是with所求值的对象必须有一个enter()方法,一个exit()方法。
紧跟with后面的语句被求值后,返回对象的enter()方法被调用,这个方法的返回值将被赋值给as后面的变量。当with后面的代码块全部被执行完之后,将调用前面返回对象的exit()方法。

#!/usr/bin/env python
# with_example01.py
 
class Sample:
    def __enter__(self):
        print "In __enter__()"
        return "Foo"
 
    def __exit__(self, type, value, trace):
        print "In __exit__()"
 
def get_sample():
    return Sample()
 
with get_sample() as sample:
    print "sample:", sample

代码输出结果如下:

In __enter__()
sample: Foo
In __exit__()

🏆利用google fire变身Python CLI

pip install fire
#!/usr/bin/env python
# encoding: utf8
import fire
 
class People:
    def __init__(self,name='nobody',sex='boy',age=30):
        self.name = name
        self.sex = sex
        self.age = age
    def say(self):
        return ('hello {},my age is {}'.format(self.name,self.age))
 
#chenmr = People(name='chenmr',sex='girl',age=40)
 
if __name__ == '__main__':
    fire.Fire(People)
python demo1.py say --name=shaohy --sex=boy --age=40

python之ctypes妙用

定时更改桌面背景

import ctypes
 
# Set the path of the wallpaper image
wallpaper_path = "C:\\path\\to\\wallpaper.jpg"
 
# Set the style of the wallpaper
# 0: Center, 1: Stretch, 2: Tile, 6: Fit
wallpaper_style = 0
 
# Load the image
SPI_SETDESKWALLPAPER = 20
image = ctypes.c_wchar_p(wallpaper_path)
 
# Set the wallpaper
ctypes.windll.user32.SystemParametersInfoW(SPI_SETDESKWALLPAPER, 0, image, wallpaper_style)

全屏黑色无边框窗口

import tkinter as tk
import tkinter.ttk as ttk
import ctypes
 
# Get user32 library
user32 = ctypes.windll.user32
 
# Get the size of the screen
screensize = user32.GetSystemMetrics(0), user32.GetSystemMetrics(1)
 
# Create the window
root = tk.Tk()
 
# Set the window size to the size of the screen
root.geometry('%dx%d+0+0' % screensize)
 
# Set the window to be borderless
root.overrideredirect(1)
 
# Set the background color to black
style = ttk.Style()
style.configure('TFrame', background='black')
 
# Run the window
root.mainloop()

背景透明无边框时钟

功能特色

from tkinter import *
import time, datetime
from time import gmtime, strftime
 
root = Tk()
 
# Window Attributes
root.overrideredirect(1)  # 隐藏窗口边框
root.wm_attributes("-transparentcolor", "gray99")  # 设置透明背景色
 
running = True  # 运行状态
 
# 关闭窗口函数
def close(event):
    global running
    running = False
 
root.bind('<Escape>', close)  # 绑定Esc键关闭窗口
 
screen_width = root.winfo_screenwidth()  # 获取屏幕宽度
screen_height = root.winfo_screenheight()  # 获取屏幕高度
 
timeframe = Frame(root, width=screen_width, height=screen_height, bg="gray99")  # 创建主框架
timeframe.grid(row=0,column=0)
 
tkintertime = StringVar()  # 创建时间变量
timelabel = Label(timeframe, textvariable=tkintertime, fg="white", bg="gray99", font=("NovaMono", 40))  # 创建时间标签
timelabel.place(y=screen_height/2 - 60, x=screen_width/2, anchor="center")  # 设置时间标签位置
 
tkinterdate = StringVar()  # 创建日期变量
datelabel = Label(timeframe, textvariable=tkinterdate, fg="white", bg="gray99", font=("Bahnschrift", 15))  # 创建日期标签
datelabel.place(y=screen_height/2 + 60, x=screen_width/2, anchor="center")  # 设置日期标签位置
 
while running:
    tkintertime.set(value=strftime("%H:%M:%S"))  # 更新时间
    tkinterdate.set(value=strftime("%A, %e %B"))  # 更新日期
    root.update_idletasks()  # 更新窗口
    root.update()  # 更新窗口
    time.sleep(1)  # 延迟1秒

全屏canvas绘图

import tkinter as tk
def toggle_fs(dummy=None):
    state = False if root.attributes('-fullscreen') else True
    root.attributes('-fullscreen', state)
    if not state:
        root.geometry('300x300+100+100')
root = tk.Tk()
canvas = tk.Canvas(root, bg='cyan', highlightthickness=0)
canvas.pack(expand=True, fill=tk.BOTH)
root.attributes('-fullscreen', True)
root.bind('<Escape>', toggle_fs)
root.mainloop()

显示消息在最上层

import ctypes
 
def main():
    WS_EX_TOPMOST = 0x40000
    windowTitle = "Python Windows Message Box Test"
    message = "Hello, world! This Message box was created by calling the Windows API using the ctypes library."
 
    # display a message box; execution will stop here until user acknowledges
    ctypes.windll.user32.MessageBoxExW(None, message, windowTitle, WS_EX_TOPMOST)
 
    print("User clicked OK.")
 
if __name__ == "__main__":
    main()

开启/关闭屏保

def turn_screen_saver_off():
    if sys.platform == "win32":
        print("turn_screen_saver_off")
        ctypes.windll.kernel32.SetThreadExecutionState(0x80000002)
 
 
def turn_screen_saver_on():
    if sys.platform == "win32":
        print("turn_screen_saver_on")
        ctypes.windll.kernel32.SetThreadExecutionState(0x80000000)
SPI_GETSCREENSAVERRUNNING: 获取屏保有没有启动

高级特性突破

&,|,~ 和 and,or的区别

如果a,b是数值变量, 则&, |, ~表示位运算, and,or则依据是否非0来决定输出

1 & 2         # 输出为0 
1 | 2         # 输出为3
 
2 and 0   # 返回0
2 and 1   # 返回1
1 and 2   # 返回2
 
2 or 0    # 返回2
2 or 1    # 返回2
0 or 1    # 返回1 

如何a, b是逻辑变量, 则两类的用法基本一致

(3>0) or (3<1)   # True
(3>0) | (3<1)    # True
 
(3>0) and (3<1)  # False
(3>0) & (3<1)    # False

数据结构

名称特点顺序遍历切片
列表可变
元组不可变
字典键值对
集合去重复

正则表达式

import re
 
str = '有多个中文86的情况也适用'
rr = re.compile(r'^[^0-9]*([0-9]*)[^0-9]*',re.I|re.S|re.M)
print(re.match(rr,str).group(1))
 
 
str = 'abe(ac)ad)'
p1 = re.compile(r'\((.*?)\)', re.S) #最小匹配
p2 = re.compile(r'\((.*)\)', re.S)  #贪婪匹配
print(re.findall(p1,str))
print(re.findall(p2,str))
加了?是最小匹配,不加是贪婪匹配

类的封装、继承、多态

面向对象编程有三大重要特征:封装、继承和多态。

参考资料

封装

封装是指将数据与具体操作的实现代码放在某个对象内部,使这些代码的实现细节不被外界发现,外界只能通过接口使用该对象,而不能通过任何形式修改对象内部实现,正是由于封装机制,程序在使用某一对象时不需要关心该对象的数据结构细节及实现操作的方法。使用封装能隐藏对象实现细节,使代码更易维护,同时因为不能直接调用、修改对象内部的私有信息,在一定程度上保证了系统安全性。

类通过将函数和变量封装在内部,实现了比函数更高一级的封装。
class Student:
    classroom = '101'
    address = 'beijing' 
 
    def __init__(self, name, age):
        self.name = name
        self.age = age
 
    def print_age(self):
        print('%s: %s' % (self.name, self.age))

以下是错误的用法: 类将它内部的变量和方法封装起来,阻止外部的直接访问

print(classroom)

print(adress)

print_age()

print(Student.classroom)
 
dongjun = Student('东俊',12)
dongjun.print_age()

继承

继承来源于现实世界,一个最简单的例子就是孩子会具有父母的一些特征,即每个孩子都会继承父亲或者母亲的某些特征,当然这只是最基本的继承关系,现实世界中还存在着更复杂的继承。继承机制实现了代码的复用,多个类公用的代码部分可以只在一个类中提供,而其他类只需要继承这个类即可。

在OOP程序设计中,当我们定义一个新类的时候,新的类称为子类(Subclass),而被继承的类称为基类、父类或超类(Base class、Super class)。继承最大的好处是子类获得了父类的全部变量和方法的同时,又可以根据需要进行修改、拓展。其语法结构如下:

class Foo(superA, superB,superC....):
class DerivedClassName(modname.BaseClassName):  ## 当父类定义在另外的模块时

Python支持多父类的继承机制,所以需要注意圆括号中基类的顺序,若是基类中有相同的方法名,并且在子类使用时未指定,Python会从左至右搜索基类中是否包含该方法。一旦查找到则直接调用,后面不再继续查找。

# 父类定义
class people:
    def __init__(self, name, age, weight):
        self.name = name
        self.age = age
        self.__weight = weight
 
    def speak(self):
        print("%s 说: 我 %d 岁。 我的体重是 %d" % (self.name, self.age, self.__weight))
 
# 单继承示例
class student(people):
    def __init__(self, name, age, weight, grade):
        # 调用父类的实例化方法
        super().__init__(name, age, weight)
        self.grade = grade
 
    # 重写父类的speak方法
    def speak(self):
        print("%s 说: 我 %d 岁了,我在读 %d 年级" % (self.name, self.age, self.grade))
 
s = student('shaodj', 10, 30, 3)
s.speak()

为什么要用super()函数

我们都知道,在子类中如果有与父类同名的成员,那就会覆盖掉父类里的成员。那如果你想强制调用父类的成员呢?使用super()函数! 这是一个非常重要的函数,最常见的就是通过super调用父类的实例化方法init

class A(object):
    def __init__(self, name):
        self.name = name
        print("父类的__init__方法被执行了!")
    def show(self):
        print("父类的show方法被执行了!")
 
class B(A):
    def __init__(self, name, age):
        super().__init__(name=name)
        self.age = age
 
    def show(self):
        super().show()
 
obj = B("jack", 18)
obj.show()
所谓经典类就是什么都不用继承的类,如 class A:
新类就是所有类都必须要有继承的类,如果什么都不想继承,就继承到object类,如 class A(object):

Python 3 和 Python 2 的另一个区别是:

Python 3 可以使用直接使用 super().xxx 代替 super(Class, self).xxx

多态

先看下面的代码:

class Animal:
    def kind(self):
        print("i am animal")
 
class Dog(Animal):
    def kind(self):
        print("i am a dog")
 
class Cat(Animal):
    def kind(self):
        print("i am a cat")
 
class Pig(Animal):
    def kind(self):
        print("i am a pig")
 
# 这个函数接收一个animal参数,并调用它的kind方法
 
def show_kind(animal):
    animal.kind()
 
d = Dog()
c = Cat()
p = Pig()
 
show_kind(d)
show_kind(c)
show_kind(p)

打印结果:

i am a dog
i am a cat
i am a pig

狗、猫、猪都继承了动物类,并各自重写了kind方法。show_kind()函数接收一个animal参数,并调用它的kind方法。可以看出,无论我们给animal传递的是狗、猫还是猪,都能正确的调用相应的方法,打印对应的信息。这就是多态。

实际上,由于Python的动态语言特性,传递给函数show_kind()的参数animal可以是任何的类型,只要它有一个kind()的方法即可。动态语言调用实例方法时不检查类型,只要方法存在,参数正确,就可以调用。这就是动态语言的“鸭子类型”,它并不要求严格的继承体系,一个对象只要“看起来像鸭子,走起路来像鸭子”,那它就可以被看做是鸭子。

class Job:
    def kind(self):
        print("i am not animal, i am a job")
 
j = Job()
show_kind(j)

举个栗子

import random
 
class Flower:
    def __init__(self,color,du):
        self.color = color
        self.__du = du # 定义为私有变量,只能通过内部方法调用
 
    # 内部方法来获取 __du 这个属性
    def getdu(self):
        return self.__du
 
class Person:
    def __init__(self,name):
        self.name = name
 
class Tfboy(Person): # 这个就是类的继承
    count = 0
    def __init__(self,name,age):
        super().__init__(name)  # 用super()调用父类的方法初始化
        self.age = age
 
    def touch(self,du): # 统计他摸了几朵无毒的花
        if du == "无毒":
            Tfboy.count += 1
 
    def total(self):
        print("=> {} {}岁,共摸过了 {} 朵花是无毒的".format(self.name, self.age, Tfboy.count))
 
color = ['红色','绿色','蓝色','黄色','粉色']
du = ['有毒','无毒']
 
dongjun = Tfboy("东俊",12)
 
for _ in range(5):
    f = Flower( random.choice(color),   random.choice(du)  )
    print( "花的颜色是{},花是{}".format( f.color, f.getdu() ) )
    dongjun.touch( f.getdu() )
 
dongjun.total()

新的类初始化

在 Python 3.7 版本中增加了数据类(data class)这一新功能, 类似于其他语言中的 struct

它就是一个使用装饰器创建的类:

from dataclasses import dataclass
 
@dataclass
class people:
    name: str
    age: int=0
    __weight: int=150
如果一个参数有默认值,那它之后的所有参数也必须有默认值
class people:
    def __init__(self,name,age,weight=150):
        self.name = name
        self.age = age
        self.__weight = weight
    def saying(self):
        print('{}说: 我{} 岁了,体重是{}kg'.format(self.name,self.age,self.__weight))
 
class student(people):
    def __init__(self,name,age,weight,grade):
        people.__init__(self,name,age,weight)
        self.grade = grade
    def saying(self):
        print('{}说:我{}岁了,今年读{}年级'.format(self.name,self.age,self.grade))
 
shaohy = people('shaohy',30)
shaohy.saying()
 
shaodj = student('shao dongjun',11,60,5)
shaodj.saying()
20行代码,共580个字符
from dataclasses import dataclass
 
@dataclass
class people:
    name: str
    age: int=0
    __weight: int=150
    def saying(self):
        print('{}说: 我{} 岁了,体重是{}kg'.format(self.name,self.age,self.__weight))
 
@dataclass
class student(people):
    grade: int=0
    def saying(self):
        print('{}说:我{}岁了,今年读{}年级'.format(self.name,self.age,self.grade))
 
shaohy = people('shaohy',30)
shaohy.saying()
shaodj = student('shao dongjun',11,60,5)
shaodj.saying()
20行代码,共460个字符,节省了 ~20% 的代码

星号作用

星号*把序列/集合解包(unpack)成位置参数,两个星号**把字典解包成关键字参数。

def foo(*args, **kwarg):
  for item in args:
    print(item)
  for k,v in kwarg.items():
    print(k, v)
  print('*' * 50)
if __name__ == '__main__':
  #foo(1, 2, 3, a=4, b=5)
  #foo(2, 3, a=4, b=5, c=1)
  v = (1, 2, 4)
  v2 = [11, 15, 23]
  d = {'a':1,'b':12}
  foo(v, d)
  foo(*v, **d)
  foo(v2, d)
  foo(*v2, **d)
输出结果
(1, 2, 4)
{'a': 1, 'b': 12}
**************************************************
1
2
4
a 1
b 12
**************************************************
[11, 15, 23]
{'a': 1, 'b': 12}
**************************************************
11
15
23
a 1
b 12
**************************************************

闭包

内部函数中对外部作用域的变量进行引用,(并且一般外部函数的返回值为内部函数),那么内部函数就被认为是闭包。

闭包的套路

# 累加计数器
def counter():
  cnt = 0
  def addone():
    nonlocal cnt
    cnt += 1
    return cnt
  return addone
 
# a*x + b = y
def aline(a,b):
  def arg_y(x):
     return a*x+b
  return arg_y
  • 在内嵌函数中使用nonlocal关键字对外部变量进行声明,可以允许内嵌函数修改外部变量。
def aline(a,b):
  return lambda x: a*x+b
 
print(type(aline))
 
line1 = aline(3,5)
print(line1(10))
 
line2 = aline(5,8)
print(line2(15))

利用lambda来解决猴子选大王的问题

m = 41  # 一共有多少只猴子
k = 3  # 报数
 
monkey = list(range(1, m+1))
 
n = 1  # 报数循环
i = 0  # 猴子的索引
 
while True:
    if i == m:
        i = 0  # ⭕ 到了末尾就从头开始
 
    if n < k and monkey[i] != 0:
        n += 1
    elif monkey[i] != 0:
        monkey[i] = 0
        n = 1
    i += 1
 
    print(f"{monkey}, i:{i}, n:{n}, k:{k} ")  # 调试输出
 
    #if len(list(filter(lambda x: x != 0, monkey))) == 1:    # filter+lambda 速度奇慢
    if len(set(monkey)) == 2:
        print(max(monkey))
        break

列表推导式

list1 = [11,22,33,88,77,55,99]
dict1 = {'k1':[],'k2':[]}
 
for i in list1:
  if i < 66:
    dict1['k1'].append(i)
  else:
    dict1['k2'].append(i)
print(dict1)
 
dict1['k1'] = list(filter(lambda x: x<66, list1))
dict1['k2'] = list(filter(lambda x: x>66, list1))
print(dict1)
 
dict1 = {'k1':[],'k2':[]}
[ dict1['k1'].append(i) if i < 66 else dict1['k2'].append(i)  for i in list1 ]
print(dict1)
避免编写for循环,用列表或函数替换for循环
doubled_list = map(lambda x: x * 2, old_list)
summation = reduce(lambda x, y: x + y, numbers)

字典推导式

mcase = {'a': 10, 'b': 34, 'A': 7, 'Z': 3}
 
mcase_frequency = {
    k.lower(): mcase.get(k.lower(), 0) + mcase.get(k.upper(), 0)
    for k in mcase.keys()
    if k.lower() in ['a','b']
}
 
print(mcase_frequency)

生成器

通过列表生成式,我们可以直接创建一个列表。但是,受到内存限制,列表容量肯定是有限的。而且,创建一个包含100万个元素的列表,不仅占用很大的存储空间,如果我们仅仅需要访问前面几个元素,那后面绝大多数元素占用的空间都白白浪费了。

所以,如果列表元素可以按照某种算法推算出来,那我们是否可以在循环的过程中不断推算出后续的元素呢?这样就不必创建完整的list,从而节省大量的空间。在Python中,这种一边循环一边计算的机制,称为生成器:generator。

generator保存的是算法,通过for循环来迭代它,并且不需要关心StopIteration的错误。

# coding: utf-8
# 第一种生成器方式是用()把列表推导式转化成 生成器
g = (i*i for i in range(10))
# 使用 for 循环把next元素输出
for x in g:
    print(x)
 
# 第二种方式是用 yield(),变成generator的函数,
# 在每次调用next()的时候执行,遇到yield语句返回,
# 再次执行时从上次返回的yield语句处继续执行。
def fab(n): # 斐波拉契数列的推算规则
    m,a,b = 0,1,1
    while m < n :
        yield(a)
        a,b = b,a+b
        m += 1
 
for i in fab(5):
    print(i)

迭代器

Python的迭代器(Iterator)对象表示的是一个数据流,Iterator对象可以被next()函数调用并不断返回下一个数据,直到没有数据时抛出StopIteration错误。可以把这个数据流看做是一个有序序列,但我们却不能提前知道序列的长度,只能不断通过next()函数实现按需计算下一个数据,所以Iterator的计算是惰性的,只有在需要返回下一个数据时它才会计算。

迭代器(Iterator)甚至可以表示一个无限大的数据流,例如全体自然数。而使用list是永远不可能存储全体自然数的。

装饰器

装饰器本质上是一个 Python 函数或类,类似传递函数的闭包

它的作用就是为已经存在的对象在不需要做任何代码修改的前提下添加额外的功能,装饰器的返回值也是一个函数/类对象。

from functools import wraps
 
def catch_specific_exceptions(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            print(f"捕获到异常: {e}")
            return None
    return wrapper
 
@catch_specific_exceptions
def specific_operation(x, y):
    return x / y
 
@catch_specific_exceptions
def read_file(file):
    f = open(file)
    data = json.load(f)
    f.close()
 
specific_operation(10, 0)
read_file("x.json")
#!/usr/bin/env python3
import logging
logging.basicConfig(level = logging.DEBUG)
 
min = 18.5
normal = 24.9
max = 29.9
 
def log_fun(level):
    def decorator(f):
        def wrapper(*args, **kwargs):
            if level == "warn":
                logging.warning(f"{f.__name__} is running")
            elif level == "debug":
                logging.debug(f"{args} is running")
            return f(*args, **kwargs)
        return wrapper
    return decorator
 
 
@log_fun(level="debug")
def cal_bmi(h, w):
    BMI = float(w/(h**2))
    print(f"BMI值为:{BMI:.2f}")
 
    #公式
    if BMI < min:
        print('身体状态:偏瘦')
    elif BMI < normal:
        print('身体状态:正常')
    elif BMI < max:
        print('身体状态:超重')
    elif BMI > max:
        print('身体状态:肥胖')
 
if __name__ == "__main__":
    print('----欢迎使用BMI计算程序----')
    height = float(input('请键入您的身高(m):'))
    weight = float(input('请键入您的体重(kg):'))
 
    cal_bmi(height, weight)

上下文管理器

多线程

多线程类似于同时执行多个不同程序,多线程运行有如下优点:

  1. 使用线程可以把占据长时间的程序中的任务放到后台去处理
  2. 用户体验更加友好,比如用户点击了一个按钮去触发某些事件的处理,可以弹出进度条来显示处理的进度
  3. 程序的运行速度可能加快
  4. 在一些等待的任务实现上如用户输入、文件读写和网络收发数据等,线程就比较有用了。在这种情况下我们可以释放一些珍贵的资源如内存占用等等。

线程在执行过程中与进程还是有区别的。

  1. 每个独立的进程有一个程序运行的入口、顺序执行序列和程序的出口。
  2. 线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。
  3. 每个线程都有他自己的一组CPU寄存器,称为线程的上下文,该上下文反映了线程上次运行该线程的CPU寄存器的状态。
  4. 线程可以被抢占(中断)。
  5. 在其他线程正在运行时,线程可以暂时搁置(也称为睡眠) – 这就是线程的退让。
  6. 线程总是在进程得到上下文中运行的,指令指针和堆栈指针寄存器是线程上下文中两个最重要的寄存器,这些地址都用于标志拥有线程的进程地址空间中的内存。
import threading
import queue
 
data = [
    [1,2,3],
    [3,4],
    [5,6]
]
def job(L,q):
    print(threading.currentThread())
    q.put([x*x for x in L])
""" 
print(
    list(
        map(job,[[1,2,3],[4,5,6]])
        )
    )
"""
 
def multithread(data):
    q = queue.Queue()
    threads = []
    for i in range(len(data)):
        t = threading.Thread(target=job,args=(data[i],q))
        t.start()
        threads.append(t)
    # 这一段比较巧妙,利用threads池,把线程异步化 
    for _ in threads:
        _.join()
 
    for i in range(q.qsize()):
        print(q.get())
 
 
if __name__ == '__main__':
    multithread(data)

多进程

import multiprocessing as mp
 
data = range(10)
data2 = range(5)
 
def job(L,q):
    q.put([x*x for x in L]) 
 
def job2(L):
    return([x*x for x in L]) 
 
def multicore():
    queue = mp.Queue()
    p1 = mp.Process(target=job,args=(data,queue))
    p2 = mp.Process(target=job,args=(data2,queue))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    for i in range(queue.qsize()):
        print(queue.get())
 
    pool = mp.Pool()
    print(pool.map(job2,(data,)))
 
if __name__ == "__main__":
    multicore()

协程

做一个小结,一个简单的做法是,启动程序后,分别创建一个进程池(进程数小于等于可用核数)、线程池和 ioloop,ioloop 负责调度一切的协程,遇到阻塞的调用时,I/O 型的扔进线程池,CPU 型的扔进进程池,这样代码逻辑简单,还能尽可能的利用机器性能。

消息队列和生产消费

Queue队列

Python中,队列是线程间最常用的交换数据的形式。Queue模块是提供队列操作的模块,虽然简单易用,但是不小心的话,还是会出现一些意外。Queue是线程安全的,自带锁,使用的时候,不用对队列加锁操作。

Python Queue模块有三种队列及构造函数,它们的区别仅仅是条目取回的顺序:

  1. FIFO队列: 先进先出。 class Queue.Queue(maxsize)
  2. LIFO类似于堆,先进后出。 class Queue.LifoQueue(maxsize)
  3. 优先级队列: 级别越低越先出来。 class Queue.PriorityQueue(maxsize)

数据分析

数据可视化

python代码审核和优化

$ pip install --upgrade autopep8
$ autopep8 --in-place --aggressive --aggressive <filename>

性能火焰图

py-spy record -o profile.svg --pid 12345
# 或者
py-spy record -o profile.svg -- python myprogram.py
  1. 左侧数字表示每行代码被触发的次数
  2. 长方框表示最近被触发的代码行
  3. 方框越长表示触发次数越多
  4. 颜色越浅表示最近被触发次数越多
  5. 没有记录具体的执行时间

Casbin RBAC权限管理

Python调用Luajit

import lupa,time
from lupa import LuaRuntime
 
lua = LuaRuntime(unpack_returned_tuples=False)
 
lua_func = lua.eval('function(f, n) return f(n) end')
def py_add(n): return n*2
 
print(lua_func(py_add, 10))   # 打印出翻倍的结果
 
lua_fib = lua.eval('''
  function(n)
    a, b = 0, 1
 
    for i = 1, n do
      a, b = b, a + b
    end
    return a
  end
''')
 
start = time.time()
x = 50
res = lua_fib(x)
elapsed = time.time() - start
print("Py3 Computed fib(%s)=%s in %0.2f seconds" % (x, res, elapsed))

Python调用Nim函数

纯Python递归求“斐波那契”数列

import time
 
def fib2(n):
    if n <= 2:
        return 1
    else:
        return fib2(n-1) + fib2(n-2)
 
if __name__ == "__main__":
    x = 40
    start = time.time()
    res = fib2(x)
    elapsed = time.time() - start
    print("Py3 Computed fib(%s)=%s in %0.2f seconds" % (x, res, elapsed))
Py3 Computed fib(40)=102334155 in 29.92 seconds

nim函数导出py模块

import nimpy
 
proc fib(n: int): int {.exportpy.} =
    if n <= 2:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)
如果没有nimpy模块,使用
nimble install nimpy

然后导出c库函数,源文件名和库函数名要一致

nim c -d:release --app:lib --threads:on --out:fib_nimpy.so fib_nimpy.nim

新python调用nim库

import time
from fib_nimpy import fib
 
def fib2(n):
    if n <= 2:
        return 1
    else:
        return fib2(n-1) + fib2(n-2)
 
if __name__ == "__main__":
    x = 40
    start = time.time()
    #res = fib2(x)
    res = fib(x)
    elapsed = time.time() - start
    print("Py3 Computed fib(%s)=%s in %0.2f seconds" % (x, res, elapsed))
Py3 Computed fib(40)=102334155 in 0.57 seconds
如果不想nim提前编译成so文件供python调用,也可以安装nimporter
pip3 install nimporter  # 第一次会编译运行

Python调用百度翻译

from random import randrange
from hashlib import md5
from requests import get
from os import path,linesep
from sys import argv
from time import sleep
 
 
# 百度在线翻译引擎的ID和密码
apiURL = "http://api.fanyi.baidu.com/api/trans/vip/translate"
appID = '20210324000740987'
secretKey = 'AQoxdv0Uhz1VmTtUhngu'
file = "dongchi.txt"
dir = path.dirname(argv[0])
dict_words = {}  # 读取单词文本,存入字典
 
 
def baiduAPI_translate(query_str, to_lang):
    '''
    传入待翻译的字符串和目标语言类型,请求 apiURL,自动检测传入的语言类型获得翻译结果
    :param query_str: 待翻译的字符串
    :param to_lang: 目标语言类型
    :return: 翻译结果字典
    '''
    salt = str(randrange(32768, 65536, 3))  # 生成随机的 salt 值
    pre_sign = appID + query_str + salt + secretKey  # 准备计算 sign 值需要的字符串
    sign = md5(pre_sign.encode()).hexdigest()  # 计算 md5 生成 sign
    params = {  # 请求 apiURL 所有需要的参数
        'q': query_str,
        'from': 'auto',
        'to': to_lang,
        'appid': appID,
        'salt': salt,
        'sign': sign
    }
    try:
        response = get(apiURL, params=params)
        result_dict = response.json()  # 获取返回的 json 数据
        if 'trans_result' in result_dict:  # 得到的结果正常则 return
            return result_dict
        else:
            print('Some errors occured:\n', result_dict)
    except Exception as e:
        print('Some errors occured: ', e)
 
 
def translat(query_str, dst_lang='zh'):
    '''
    解析翻译结果后输出,默认实现英汉互译
    :param query_str: 待翻译的字符串,必填
    :param dst_lang: 目标语言类型,可缺省
    :return: 翻译后的字符串
    '''
    result_dict = baiduAPI_translate(query_str, dst_lang)  # 指定了目标语言类型,则直接翻译成指定语言
    return result_dict['trans_result'][0]['dst']  # 提取翻译结果字符串,并输出返回
 
''' 遍历文件分解成字典并处理异常 '''
with open(path.join(dir, file), 'r', encoding='utf-8') as f:
    for line in f:
        try:
            line = line.strip().split(':')
            dict_words[line[0]] = line[1]
        except:
            dict_words[line[0]] = ""
 
f = open('new.txt',"a+",encoding="utf-8")
for word in dict_words:
    res = f"{word}:{ translat(word)}\n"
    print(res)
    f.write(res)
    sleep(1)
f.close()

YouTube上下载视频

from pytube import YouTube
import fire
 
class YT:
    def __init__(self,yt_url):
      self.url = yt_url
 
    def download(self):
      yt = YouTube(self.url)
      yt.streams.filter(progressive=True,file_extension='mp4').get_highest_resolution().download()
      #yt.streams.filter(progressive=False,file_extension='webm').get_highest_resolution().download()
 
 
if __name__ == '__main__':
  fire.Fire(YT)  
 
#source ~/myDemo/python_apps/venv/bin/activate.fish
#python3.8 youtube_get.py download --yt_url="https://www.youtube.com/watch?v=jO6qQDNa2UY" 
pytube 对1080p以上的流下载支持还不行, 如果需要高清建议使用 youtube-dl
#youtube-dl "https://www.youtube.com/watch?v=1lmRNoLC3Qk" --list-formats    
#youtube-dl "https://www.youtube.com/watch?v=1lmRNoLC3Qk" -f 315

YouTube的视频转音频mp3

brew install youtube-dl
youtube-dl --extract-audio --audio-format mp3 http://www.youtube.com/watch?v=YOURVIDEO 
 
ffmpeg -i video.mp4 -vn audio.mp3

ffmpeg具体用法实例

AVI转FLV

ffmpeg -i test.avi -ab 56 -ar 22050 -b 500 -r 15 -s 320×240 test.flv

抓图JPG

ffmpeg -i 2.wmv -y -f image2 -ss 8 -t 0.001 -s 350×240 test.jpg

ffmpeg去掉一部电影的前面20秒

ffmpeg -i inputmovie.mp4 -ss 20 -c copy outputmovie.mp4

  • -i input_movie.mp4 指定输入文件
  • -ss 20 跳过前面20秒
  • -c copy 使用“copy”编解码器来快速复制视频和音频轨道,以避免重新编码
  • output_movie.mp4 指定输出文件名

批量转换mp4为flv脚本

#!/bin/sh
SCALE="1.5" # 视频流冗余率
FILR="./good.txt"
 
while read LINE;do
        rm -rf log.log
        filename=`echo $LINE|awk '{print $5}'`
        newfile=`echo $filename|cut -d- -f1`
        ffmpeg -i $filename 2>>log.log
 
        bitrate=`grep "bitrate" log.log|sed -r 's@.*bitrate:(.*)kb.*@\1@g'|sed -r 's: ::g'`
        bitrate=$(echo "$bitrate*$SCALE"|bc)"kb"
        #echo $bitrate
 
        [ -s "$newfile.flv" ] && rm -rf $newfile.flv
        ffmpeg -i $filename -b $bitrate -acodec libmp3lame -ar 44100 -ac 2 $newfile.flv
done < $FILE

使用Pelican发布静态博客

安装python3并激活虚拟环境(略)

cd /Users/apple/myDemo/Python39_Demos
source .venv/bin/activate.fish   

安装pelican

pip install pelican markdown ghp-import
...
 
pelican-quickstart

快速配置 参数选项

AUTHOR = 'DJ.shao'
SITENAME = "ShaoDJ's World"
SITEURL = "https://shaodongjun.github.io"
TIMEZONE = 'Asia/Shanghai'
DEFAULT_LANG = 'zh-cn'
PATH = "content"
 
ARTICLE_URL = 'posts/{date:%Y}/{date:%m}/{date:%d}/{slug}/'
ARTICLE_SAVE_AS = 'posts/{date:%Y}/{date:%m}/{date:%d}/{slug}/index.html'
PAGE_URL = 'pages/{slug}/'
PAGE_SAVE_AS = 'pages/{slug}/index.html'
 
# Feed generation is usually not desired when developing
FEED_ALL_ATOM = 'feeds/all.atom.xml'
FEED_ALL_RSS = 'feeds/all.rss.xml'
RSS_FEED_SUMMARY_ONLY = False
 
# Blogroll
LINKS = (
    ("Shaohy's Blog", "http://shaohy.upyun.com/doku.php"),
)
 
# Social widget
SOCIAL = (
    ("Another social link", "#"),
)
 
DEFAULT_PAGINATION = 10

创建blog文章

在content文件夹用markdown写文章,然后测试。

vimrc中加入对pelican的支持
autocmd bufnewfile *.md call HeaderMD()
" add generic header to markdown files
function HeaderMD() 
    let l = 0
    let l:current_date = strftime("%Y-%m-%d %H:%M")
    let l = l + 1 | call setline(l,'Title: ')
    let l = l + 1 | call setline(l,'Slug: ')
    let l = l + 1 | call setline(l,'Date: ' . l:current_date)
    let l = l + 1 | call setline(l,'Category: ')
    let l = l + 1 | call setline(l,'Tags: blog,')
endfunction 
make html && make serve
pelican --autoreload --listen
Usage:                                                                    
   make html                           (re)generate the web site          
   make clean                          remove the generated files         
   make regenerate                     regenerate files upon modification 
   make publish                        generate using production settings 
   make serve [PORT=8000]              serve site at http://localhost:8000
   make serve-global [SERVER=0.0.0.0]  serve (as root) to :80    
   make devserver [PORT=8000]          start/restart develop_server.sh    
   make stopserver                     stop local server                  
   make ssh_upload                     upload the web site via SSH        
   make rsync_upload                   upload the web site via rsync+ssh  
   make dropbox_upload                 upload the web site via Dropbox    
   make ftp_upload                     upload the web site via FTP        
   make s3_upload                      upload the web site via S3         
   make cf_upload                      upload the web site via Cloud Files
   make github                         upload the web site via gh-pages   
文章静态图片修改相对路径→绝对路径,可以在Makefile中的make html:
grep 'src="./images' $(OUTPUTDIR)/posts/ -r -l | xargs -I{} sed -i '' '/src=".\/images\//s@./images/@/images/@g' {}

博客托管到GitHub

首先在 GitHub 上新建一个名为username.github.io的仓库。(必须要同名)

这里很多人会推荐使用一个叫作ghp-import的包,用于将output文件夹的内容提交到Blog文件夹内仓库的master分支。

在pelican文件夹新建Git仓库,并绑定远程GitHub仓库地址。

git remote add origin https://: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@github.com/shaodongjun/shaodongjun.github.io.git
 
# 从Makefile里可以看到github包括了一组操作命令
make github