Twisted Defer and DeferredQueue

写在最前面

这篇文章本来是想用英文写的,但是最近英文水平下降的和狗一样。还是怂一波吧。

写在前面

最近在用Twisted库写一个诡异的项目,具体内容暂且不在这里讨论。在写的过程中,被Twisted里面的一个重要概念 —— defer,折腾的不行。最终通过阅读twisted的部分源码,以及与代码做斗争的丰富经验,最终算是解决了问题。

本文算是使用twisted开发踩坑的一个小小总结,如果一切顺利,后面会有大菜。:)

Twisted介绍

Twisted is an event-driven networking engine written in Python.

Twisted是一个基于事件驱动的网络框架。那么什么是“事件驱动”呢?

事件驱动指的是将事件与事件回调绑定起来,在程序运行时根据实时的事件触发相应的响应的一种机制。

例如select/poll/epoll这些IO复用函数,在文件描述符(fd)可读/可写/出错时,会立即返回,由相应的处理函数来对新事件进行处理。事实上,twisted的事件驱动功能,正是由这些IO复用函数提供的。

但与IO复用函数不同的是,twisted中的事件可以是“更高层次的事件”,即对网络的读 …

more ...

Mosca源码阅读

先在前面

最近心血来潮看了看一个比较有名的开源MQTT broker —— Mosca。不读不知道,读完才恍然大悟 —— 这是啥破玩意(哈哈)。

由于我是nodejs的超级初学者,所以本文会比较浅显,并且只关注big picture,不陷入细节。

这里先规定几个缩写,让后面行文时少打一点字:

  • MQ - MessageQueue
  • Asco - Ascoltatori

Ascoltatori - 听者

Ascoltatori是一个意大利语单词,翻译成英文就是listener。

这里严重吐槽作者取名字的方式,mosca这种短小的外语单词我们是可以接受的,你说ascoltatori这么长的意大利语单词,你让我们怎么记。
差评,退款,邮费也要退!

Asco模块的作用是提供一个一致的MQ的抽象,供上层broker使用。

这里我们只分析基于Redis的实现,原因是Redis我相对比较熟悉,功能也比较简单。

接口分析

RedisAscoltatore有三个半接口:

  • subscribe
  • unsubscribe
  • publish

剩下的那半个是模块的构造函数。接下来我们分别分析接口的功能及其实现。

Subscribe接口

this._subRedisAscoltatore用来subscribe的连接。首先我们要向MQ订阅指定的topic …

more ...

总结 - phxrpc代码阅读(8)

写在前面

这应该是phxrpc代码阅读系列正文的最后一篇。通过阅读代码,发现了自己在知识上的若干不足。

临渊羡鱼,不如退而结网。接下来可能会在网络编程方面再下一点工夫。请大家期待下一个系列吧。

其实真没人读,我就是在骗自己。

先补充一点 - 代码生成

protobuf并不包含RPC的实现,但是它可以声明rpc。客户端和服务端需要实现RPC接口,来实现通信。

phxrpc使用proto文件来定义接口,然后解析并使用代码模板进行生成。

这里我们不讨论代码生成的细节,因为pb实在太过流行,代码生成的方法也有不少的流派。并且用C++来做代码生成,真心不是我的菜。

想了解更多,可以参考这篇博客

工作流程 - 客户端

客户端与服务器的通信有如下的特点:

  1. 连接少
  2. 负载少
  3. 通信的主动方

所以,所有的网络交互相关的内容可以托管给网络库中的协程。每个协程主动运行一段时间后,主动放弃CPU时间,将控制权交还给主控制流的epoll。

所以协程中不能有CPU密集的运算,幸好面对开发者,phxrpc并不暴露内部函数,而是将CPU密集的运算分配给工作线程来完成。

工作流程 - 服务端

服务器的通信有以下的特点:

  1. 连接多
  2. 负载多
  3. 通信的被动方
  4. 响应时间敏感

这里说一下响应时间的问题 …

more ...

RPC - phxrpc代码阅读(7)

前言

看了这么久代码,终于我们要接近phxrpc的核心部分了。

但是出人意料的是,rpc部分并没有过多的概念和magic trick。而且因为ucontext已经被封装好了,所以在rpc里的操作,可以完全按照同步的写法来搞,开发者们不需要切换同步异步的思维模式,就可以在底层的封装之上,做自己想做的事了。

线程安全(?)的队列 - ThreadQueue

我不知道开发者为啥要起ThdQueue这样令人迷惑的名字,这种诡异的命名风格贯穿了整个代码。咋一看这个类是maintain一堆线程的,类似于线程池,但其实这个类就是一个BlockingQueue的实现。

之后,这个队列有三种操作,pushpluckbreak_out。push操作不用多说,pluck对应的我们所理解的pop操作,即从队列中弹出元素(pluck这个词貌似是从grpc里面来的,那我就不吐槽了,毕竟Google爸爸)。

更令人疑惑的是break_out这个操作。从代码来看,像是清空队列,并且在dtor中也显式的调用了这个函数。

但是有以下的几个问题。

一,break_out_是一个bool变量,且在不同线程间共享,问题在于这个变量可能被cache住,直接访问可能会造成非预期的结果,可能需要 …

more ...

非阻塞TCP流和HttpClient - phxrpc代码阅读(6)

写在前面

其实这点东西有点鸡肋。因为TCP流在前面已经讲过,难点在于“流”和“流缓冲区”部分。而HttpClient只是TCP流的一个应用,代码不多,且重点在于HTTP协议的调教上面。

不过因为前面有写阻塞TCP流,还是前后呼应,把非阻塞TCP流也小小的讲解一下。顺便饶一段HttpClient的讲解,算是充实一下内容吧。

非阻塞TCP流缓冲区 - UThreadTcpStreamBuf

这个其实没啥可讲的,传入一个socket,然后读写分别调用UThreadRecvUThreadSend,IO复用和协程切换的复杂操作都被封装在里面了。剩下的操作都由基类函数来解决。

非阻塞TCP流 - UThreadTcpStream

确实没啥可说的,你们自己去读代码吧。。。

非阻塞TCP流和阻塞TCP流的区别是~~它不阻塞~~,在阻塞TCP流中,我们传入的是一个TCP流,而非阻塞TCP流传入的是一个协程调度器和一个TCP流。

这个很好理解,一个阻塞流自然会占满一个线程的IO和CPU —— 在阻塞流IO读写时,CPU空闲;在CPU忙时,IO空闲。

而非阻塞流会将自己IO wait的时间托管给epoll,把剩下的时间用于CPU计算(和一些overhead上)。所以一个线程可以handle多个socket,协程调度器就是必须的了。之后的读写操作就交由我们前面讨论过的epoll和ucontext协程来共同完成了。

HttpClient …

more ...

使用epoll驱动ucontext - phxrpc代码阅读(5)

用pipe叫醒你 — EpollNotifier

class EpollNotifier类型封装了一个使用pipe传递信号的Notifier类。

Run()函数(其实我觉得叫Register或Activate会更好)首先声明了两个单向的pipe:pipe_fds_,从文档中我们可以知道pipe_fds_[0]是读管道,而pipe_fds_[1]是写管道。这里有一丁点反直觉,就是pipe拿了两个fd,但是仍旧是单工的。

然后将读fd设为O_NONBLOCK以供epoll调度,最后将Func()函数传入scheduler_中。

这里跑个题,想起了当年我大一的时候上过的通信导论的选修课。那会我还没有沉迷代码,还是一个积极乐观好好学习的新时代大学生。自从开始写了代码,人就越来越废物了,连女朋友都找不到了。
年轻人们啊,有饭辙干点啥都行,千万别写码啊。

Func()函数做的事情很简单,就是从管道里尝试poll一段数据,拿到数据后直接扔掉。因为管道里传来的数据并没有实际意义,这样设计的主要意义在于唤醒epoll。

我们可以从Notify()函数中看出 …

more ...