使用原生 asyncio 和 zeroMQ 的异步 RPC 库 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
firejoke
V2EX    Python

使用原生 asyncio 和 zeroMQ 的异步 RPC 库

  •  
  •   firejoke 2024-01-26 11:28:45 +08:00 1658 次点击
    这是一个创建于 691 天前的主题,其中的信息可能已经有所发展或是发生改变。

    gate-rpc

    之前使用过 zerorpc ,但 zerorpc 是用的 gevent ,在没有 asyncio 的时候很好用,而在原生 asyncio 更友好的现在,更愿意使用原生 asyncio 来调试,所以使用 ZeroMQ 和 asyncio 写了一个 rpc 库。

    • 使用 msgpack 序列化消息
    • 支持在线程池和进程池里运行普通函数
    • 当函数返回的是 Generator 或 AsyncGenerator 时,会换成流式传输,接收端通过遍历 StreamReply 实例即可获得
    • 当函数返回的值过大时,会转换成 HugeData 对象,压缩返回值并分块流式传输,默认块大小通过 Settings 的 HUGE_DATA_SIZEOF 来设置
    • 使用队列传输和记录日志,避免因为日志导致事件循环被阻塞
    • 使用可以异步设置键值和异步获取键值的 BoundedDict 来简化超时等待获取
    • 在每次实例化 Worker 、Server 、AMajordomo 、Client 各类之前,通过修改 Settings 的属性来修改运行配置

    安装

    可以直接使用 pip 安装,或直接下载源码随意放置。

    pip install gaterpc 

    配置

    在实例化 Worker 、Service 、AMajordomo 、Client 各类之前,需要运行 Settings.setup 函数来配置全局配置 [^f1] ,
    特殊返回值的序列化通过 MessagePack 的全局实例来定制 [^f2]

     # 可能会修改的几个主要配置 Settings.MESSAGE_MAX: int = Worker 和 Client 实例里等待处理的消息最大数量 Settings.HUGE_DATA_SIZEOF: int = 每次传输的结果值的最大大小,超过该值的将会被压缩并分片传输 Settings.HUGE_DATA_COMPRESS_MODULE: str = 使用的压缩模块的名称 [^f1] Settings.SERVICE_DEFAULT_NAME: str = 默认的服务名,当在实例化 Service 时如果不提供 name 参数则会以这个为服务名 Settings.MDP_INTERNAL_SERVICE_PREFIX: bytes = MDP 内部服务的前缀 Settings.MDP_HEARTBEAT_INTERVAL: int = 服务端和客户端相对于中间代理的心跳间隔时间,默认 1500 毫秒 Settings.MDP_HEARTBEAT_LIVENESS: int = 判定掉线的丢失心跳次数,即当超过该次数*心跳时间没有收到心跳则认为已经掉线,默认 3 次 Settings.REPLY_TIMEOUT: float = 客户端调用远程方法时,等待回复的超时时间,应设置的远远大于心跳时间,默认是一分钟 Settings.STREAM_REPLY_MAXSIZE: int = 流式数据使用的缓存队列的最大长度(使用的 asyncio.Queue ) Settings.REPLY_TIMEOUT: float = 获取回复的超时时间,也是流式传输的每一个子回复的超时时间 Settings.ZAP_PLAIN_DEFAULT_USER: str = ZAP 的 PLAIN 机制的默认用户名 Settings.ZAP_PLAIN_DEFAULT_PASSWORD: str = ZAP 的 PLAIN 机制的默认密码 Settings.ZAP_ADDR: str = ZAP 服务绑定的地址 Settings.ZAP_REPLY_TIMEOUT: float = 等待 ZAP 服务的回复的超时时间 Settings.setup() # 特殊返回值的序列化配置 [^f2] from gaterpc.utils.message_pack message_pack.prepare_pack = 在使用 msgpack.packb 时,传递给 default 参数的可执行对象 message_pack.unpack_object_hook = 在使用 msgpack.unpackb 时,传递给 object_hook 的可执行对象 message_pack.unpack_object_pairs_hook = 在使用 msgpack.unpackb 时,传递给 object_pairs_hook 的可执行对象 message_pack.unpack_object_list_hook = 在使用 msgpack.unpackb 时,传递给 list_hook 的可执行对象 

    [^f1]: Settings.HUGE_DATA_COMPRESS_MODULE 除了内置的 gzip ,bz2 ,lzma ,还可以使用外部模块,只要模块提供 compressor 和 decompressor 方法即可,
    compressor 需要返回一个带有 compress 方法的增量压缩器对象,decompressor 需要返回一个带有 decompress 的增量解压缩器对象
    [^f2]: 单一返回值和生成器的元素返回值,以及巨型返回值都会使用 utils.msg_pack 和 utils.msg_unpack 来序列化和反序列化,
    这两个方法内部是使用的 utils.MessagePack 的全局实例,如果不能返回常规的“字符串”,“列表”,“字典”返回值,建议配置这几个配置

    测试示范

    实例化 ZAP 服务后,需要配置校验策略,也可以不用 ZAP 服务,Majordomo 代理就不会对任何请求做校验。

     zap = AsyncZAPService() zap.configure_plain( Settings.ZAP_DEFAULT_DOMAIN, { Settings.ZAP_PLAIN_DEFAULT_USER: Settings.ZAP_PLAIN_DEFAULT_PASSWORD } ) zap.start() 

    继承 Worker 类,用 interface 装饰希望被远程调用的方法,然后实例化一个 Server 来创建 Worker 的实例,这个 worker 实例的描述信息由 server 实例提供。

     # Worker class GRWorker(Worker): @interface async def atest(self, *args, **kwargs): loop = self._get_loop() return { "name": "async atest", "args": args, "kwargs": kwargs, "loop_time": loop.time() } @interface def test(self, *args, **kwargs): return { "name": "test", "args": args, "kwargs": kwargs, "loop_time": time() } @interface def test_generator(self, maximum: int): i = 0 while i < maximum: yield i i += 1 @interface async def test_agenerator(self, maximum: int): i = 0 while i < maximum: await asyncio.sleep(0.1) yield i i += 1 Settings.setup() gr = Service(name="SRkv") gr_worker = gr.create_worker( GRWorker, "tcp://127.0.0.1:5555", zap_mechanism=Settings.ZAP_MECHANISM_PLAIN.decode("utf-8"), zap_credentials=( Settings.ZAP_PLAIN_DEFAULT_USER, Settings.ZAP_PLAIN_DEFAULT_PASSWORD ) ) gr_worker.run() 

    当要执行 IO 密集或 CPU 密集型操作时,可以在方法内使用执行器来执行,可以使用自带的两个执行器,也可以使用自定义的;
    另,所有同步的函数都会使用默认执行器执行,默认执行器是 ThreadPoolExecutor 实例,可以修改。

     @interface async def test_io(): result = await self.run_in_executor(self.thread_executor, func, *args, **kwargs) return result @nterface async def test_cpu(): # 如果需要和 CPU 密集型执行器里的方法交换数据,可以使用 utils.SyncManager 来创建代理对象使用。 queue = SyncManager.Queue() result = await self.run_in_executor(self.process_executor, func, queue, *args, **kwargs) return result 

    实例化代理时会绑定两个地址,一个用于给后端服务连接上来,一个给前端客户端连接上来,bind 方法是绑定的给客户端访问的地址也就是前端地址。

     # Majordomo Settings.setup() gr_majordomo = AMajordomo( backend_addr="tcp://127.0.0.1:5555", zap_mechanism=Settings.ZAP_MECHANISM_PLAIN.decode("utf-8"), zap_addr=Settings.ZAP_ADDR ) gr_majordomo.bind("tcp://127.0.0.1:777") gr_majordomo.run() 

    客户端直接连接代理地址,使用点语法调用远程方法,一般格式是 client.服务名.方法名,当直接使用 client.方法名时,会使用默认服务名调用。

     # Client Settings.setup() gr_cli = Client( broker_addr="tcp://127.0.0.1:777" zap_mechanism=Settings.ZAP_MECHANISM_PLAIN.decode("utf-8"), zap_credentials=( Settings.ZAP_PLAIN_DEFAULT_USER, Settings.ZAP_PLAIN_DEFAULT_PASSWORD ) ) await gr_cli.SRkv.test("a", "b", "c", time=time()) await gr_cli.SRkv.atest("a", "b", "c", time=time()) async for i in await gr_cli.SRkv.test_agenerator(10): print(i) 

    客户端调用的远程方法后,会创建一个延迟回调用来删掉缓存的已经执行完毕的请求,包括超时没拿到回复的请求,
    而流式回复会每次回调时都检查一次该 StreamReply 实例是否已经结束,没结束就再创建一个延迟回调后续再检查。

    注意点

    客户端和服务端对请求和回复的异步处理是使用的 utils.BoundedDict 异步字典来处理

     # 请求远程方法 request_id = await request(service_name, body) respOnse= await requests.aget(request_id, timeout=reply_timeout) # 接收回复 respOnse= await socket.recv_multipart() await requests.aset(request_id, response) 

    如果自定义方法的返回对象的大小无法使用 sys.getsizeof 准确获取,建议用 HugeData 包装后再返回

     # data 必须要是 bytes 或 bytearray ,简言之能用 memoryview 包装的 hd = HugeData(data=data, compress_module="gzip", compress_level=9) # 或者不提供 data ,HugeData 初始化时会创建一个 Queue 的跨进程代理对象,往这个跨进程队列里传输数据即可 hd = HugeData(compress_module="gzip", compress_level=9) d = process_data() hd.data.put(d) 

    HugeData 的 compress 和 decompress 方法都会在进程池里执行增量压缩和增量解压缩,
    返回的生成器每次获取的字节数大小不会超过 Settings.HUGE_DATA_SIZEOF ,
    compress 方法对每一块返回的大小的限制是 HugeData 内部实现,
    decompress 方法对每一块返回的大小限制则是由压缩模块来实现,会在调用解压缩器实例的 decompress 方法时传递一个 max_length 位置参数。

    1 条回复    2024-01-30 09:30:45 +08:00
    Maerd
        1
    Maerd  
       2024-01-30 09:30:45 +08:00
    性能如何呢?之前用过 aiozmq ,每秒并发也就 2000 左右,性能实在不堪一提
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     1312 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 25ms UTC 17:08 PVG 01:08 LAX 09:08 JFK 12:08
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86