I/O 模型 通常输入操作的两个阶段:
等待数据准备好
从内核缓存中读取数据
对于网络(套接字)上的输入操作这两个阶段分别为:
等待数据从网络到达,并将等待分组的数据(如果有的话)复制到内核中的缓冲区
将数据从内核缓冲区复制到应用进程缓冲区
实际上,第一个阶段在时间上的消耗远远要大于第二个阶段,所以操作系统历史演进的过程中也是抓主要矛盾,用各种手段解决第一阶段的时间消耗问题(事实上也就是下面5种I/O模型的前4种都是用这种思路解决问题)。
以下的图片来自 UNIX网络编程 卷1:套接字联网API
阻塞I/O(blocking I/O)
非阻塞I/O(Non-blocking I/O)
I/O 复用(select poll)
信号驱动的I/O(SIGIO)
异步I/O(POSIX 的aio_系列函数)
五种I/O模型的对比
用生活中的例子来类比这个I/O模型 举一个在餐厅点餐的例子,餐厅点餐会分为两个阶段:
排队等待点餐
点餐、付款以及等餐
对应的就是输入操作的两个阶段:
等待数据准备好
从内核缓存中读取数据
那么这5种I/O模型对应的就是下面的点餐模式:
阻塞I/O 顾客排队依次等待点餐(等待的过程也不做其他的事情),点完餐以后就一直站在队列前等待,直到点的餐做好取走餐后才离开,然后接待下一个顾客。
非阻塞I/O 顾客排队等待的过程觉得时间太长,所以他先去做些其他事情过程中每隔一段时间看下队有没有排到自己,如果排到了就开始点餐,然后等到餐完成后取餐离开。
I/O 复用(select poll epoll) 这样每个人每隔一段时间过来看下队有没有排到自己,似乎还是很低效的,而事实上多数情况下都是组队来的,所以可以委托一个人专门在这里排队批量处理这一队人的排队问题(多路复用 multiplexing),轮训(select、poll)或者查看排队完成提示(epoll)来通知对应的人来点餐。
信号驱动的I/O(SIGIO) 大家发现这种排队的方式实在是太落后了,决定开发一套排队系统。顾客在排队系统上取了号(sigaction系统调用)之后就去做其他的事情,等到快排到自己的时候排队系统会通知(SIGIO)顾客过来点餐。
异步I/O 后来大家还是不满足于这种排队系统带来的便利,除了排队,在系统上支持点单和支付的业务支持(aio ),这样就做到了在线上搞定排队点餐支付的事情,系统在餐准备好后通知顾客来就餐(听起来是不是太方便了)。
I/O复用模型内部实现细节 在C10k问题提出之前,互联网应用在单位时间处理的连接数还不够大,因此基于select
和poll
的I/O复用模型还是足够应付的, 从时间的顺序上来说是: select
(1984) -> poll
(1997) -> epoll
(2002)
异步编程在各个技术生态的发展 Java stack 几个容易搞混淆的技术
Tomcat、Jetty、Netty、WebFlux 首先按照在Java生态中所处的层次分一下类: 应用容器: Tomcat、Jetty 网络编程框架: Netty Spring web framework: Spring WebFlux, Spring Web MVC
WebFlux 是Spring 框架中的web框架模块,支持以Reactive Stream的方式进行网络编程。 WebFlux = Non-Blocking I/O + 函数式编程 什么是Reactive? 从网络角度的Reactive的角度就是 IO 事件驱动的设计模式 + 非阻塞式的编程(NonBlocking back pressure)
Spring WebFlux framework choice
异步在JavaEE标准的演进
Servlet 3.0 之前版本: 仅支持同步处理请求,并且只支持Blocking IO
Servlet 3.0 版本 : 支持异步处理请求,但是只支持Blocking IO
Servlet 3.1、4.0 版本 : 支持异步处理请求,同时支持NoBlocking IO
Wikipedia - Java Servlet
在Android生态中的应用 我们都知道,在Android中有很重要的一块知识点是需要和事件以及消息打交道,这里就离不开事件循环,我们都知道任何的一个事件循环都是通过一个类似下面形式的循环实现的:
1 2 3 4 5 6 for (;;;) { message = obtainMessage(); dispatchMessage(message); }
当然Android中也不例外,从Looper的源代码我们就可以探知一二(为了方便查看主要逻辑,我将与本主题无关的代码以\\ ......
略去):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public static void loop () { final Looper me = myLooper(); if (me == null ) { throw new RuntimeException ("No Looper; Looper.prepare() wasn't called on this thread." ); } final MessageQueue queue = me.mQueue; for (;;) { Message msg = queue.next(); if (msg == null ) { return ; } try { msg.target.dispatchMessage(msg); if (observer != null ) { observer.messageDispatched(token, msg); } dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0 ; } catch (Exception exception) { if (observer != null ) { observer.dispatchingThrewException(token, msg, exception); } throw exception; } finally { ThreadLocalWorkSource.restore(origWorkSource); if (traceTag != 0 ) { Trace.traceEnd(traceTag); } } msg.recycleUnchecked(); } }
我们注意到MessageQueue的对象queue在for循环体内有调用方法next()
, 而且在代码注释中有提到// might block
, 那么到底是怎样block住的呢, 我们需要看下MessageQueue
中next方法的内部实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @UnsupportedAppUsage Message next () { final long ptr = mPtr; if (ptr == 0 ) { return null ; } for (;;) { nativePollOnce(ptr, nextPollTimeoutMillis); synchronized (this ) { } } }
起关键作用的就是这个 nativePollOnce
方法,到底这个native的方法做了什么使得这个for循环可以不占用CPU时钟block住呢? 我们通过在Android Code Search 中查找nativePollOnce
的jni定义找到了这样的定义android / platform / frameworks / base / master / . / core / jni / android_os_MessageQueue.cpp
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 #define LOG_TAG "MessageQueue-JNI" #include <nativehelper/JNIHelp.h> #include <android_runtime/AndroidRuntime.h> #include <utils/Looper.h> #include <utils/Log.h> #include "android_os_MessageQueue.h" #include "core_jni_helpers.h" namespace android {static struct { jfieldID mPtr; jmethodID dispatchEvents; } gMessageQueueClassInfo; static const int CALLBACK_EVENT_INPUT = 1 << 0 ;static const int CALLBACK_EVENT_OUTPUT = 1 << 1 ;static const int CALLBACK_EVENT_ERROR = 1 << 2 ;class NativeMessageQueue : public MessageQueue, public LooperCallback {public : NativeMessageQueue (); virtual ~NativeMessageQueue (); virtual void raiseException (JNIEnv* env, const char * msg, jthrowable exceptionObj) ; void pollOnce (JNIEnv* env, jobject obj, int timeoutMillis) ; void wake () ; void setFileDescriptorEvents (int fd, int events) ; virtual int handleEvent (int fd, int events, void * data) ; private : JNIEnv* mPollEnv; jobject mPollObj; jthrowable mExceptionObj; }; NativeMessageQueue::NativeMessageQueue () : mPollEnv (NULL ), mPollObj (NULL ), mExceptionObj (NULL ) { mLooper = Looper::getForThread (); if (mLooper == NULL ) { mLooper = new Looper (false ); Looper::setForThread (mLooper); } } NativeMessageQueue::~NativeMessageQueue () { } void NativeMessageQueue::pollOnce (JNIEnv* env, jobject pollObj, int timeoutMillis) { mPollEnv = env; mPollObj = pollObj; mLooper->pollOnce (timeoutMillis); mPollObj = NULL ; mPollEnv = NULL ; if (mExceptionObj) { env->Throw (mExceptionObj); env->DeleteLocalRef (mExceptionObj); mExceptionObj = NULL ; } } void NativeMessageQueue::wake () { mLooper->wake (); } static void android_os_MessageQueue_nativePollOnce (JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast <NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce (env, obj, timeoutMillis); } static void android_os_MessageQueue_nativeWake (JNIEnv* env, jclass clazz, jlong ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast <NativeMessageQueue*>(ptr); nativeMessageQueue->wake (); } static const JNINativeMethod gMessageQueueMethods[] = { { "nativeInit" , "()J" , (void *)android_os_MessageQueue_nativeInit }, { "nativeDestroy" , "(J)V" , (void *)android_os_MessageQueue_nativeDestroy }, { "nativePollOnce" , "(JI)V" , (void *)android_os_MessageQueue_nativePollOnce }, { "nativeWake" , "(J)V" , (void *)android_os_MessageQueue_nativeWake }, { "nativeIsPolling" , "(J)Z" , (void *)android_os_MessageQueue_nativeIsPolling }, { "nativeSetFileDescriptorEvents" , "(JII)V" , (void *)android_os_MessageQueue_nativeSetFileDescriptorEvents }, }; int register_android_os_MessageQueue (JNIEnv* env) { int res = RegisterMethodsOrDie (env, "android/os/MessageQueue" , gMessageQueueMethods, NELEM (gMessageQueueMethods)); jclass clazz = FindClassOrDie (env, "android/os/MessageQueue" ); gMessageQueueClassInfo.mPtr = GetFieldIDOrDie (env, clazz, "mPtr" , "J" ); gMessageQueueClassInfo.dispatchEvents = GetMethodIDOrDie (env, clazz, "dispatchEvents" , "(II)I" ); return res; } }
完整代码参见 https://cs.android.com | android_os_MessageQueue.cpp
为了便于说明我将与本话题无关的代码使用// ......
进行了处理,实际上这部分代码也只是JNI实现的Native方法注册部分,更多的实现细节还需要查看<utils/Looper.h>
对应的实现 https://cs.android.com | Looper.cpp
看到源代码中的第一行注释,我们好像终于知道了些内部细节:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #define LOG_TAG "Looper" #define DEBUG_POLL_AND_WAKE 0 #define DEBUG_CALLBACKS 0 #include <utils/Looper.h> #include <sys/eventfd.h> #include <cinttypes>
原来Android 的Looper内部是基于epoll这个IO模型实现的。 至于为什么pollOnce
方法为什么能够等待下一个消息的到来而不让CPU空转,这里有一篇文章做了很好的解释 Stackoverflow - android - what is message queue native poll once in android?
Python 各种 CGI 是什么,他们与servlet又有什么关系 CGI通俗的来理解就是web server 与服务器上处理请求的程序之间的统一接口标准Wikipedia - Common Gateway Interface 。
当然上面说的是一个统一接口集,到具体的实现上其实有很多种:
后来Python在抛弃了语言的通用性基础上提出了
WSGI - web server与Python语言实现的应用服务标准
标准和协议的内容由PEP-3333 规定。
当然随着Python提出自己的标准,其他语言也陆续提出类似的标准Wikipedia - WSGI - see also
当然Java EE 也有规定自己的接口标准,这个标准在JavaEE一系列标准中叫servlet,当然servlet是基于线程建立的连接模型。
Go Golang build-in runtime 包中提供netpoll的epoll实现:
golang.org - Source file src/runtime/netpoll_epoll.go
参考文章 UNIX网络编程 卷1:套接字联网API
Shichao’s Notes - Unix Network Programming
Thousands of Threads and Blocking I/O
Java servlet async
Spring Framework - Web on Reactive Stack
Java EE - Servlet Nonblocking I/O sample
Async IO on Linux: select, poll, and epoll
Stackoverflow - Why is epoll faster than select?
Three mechanisms for IO multiplexing Select, Poll, Epoll
简书 - nginx的epoll
Wikipedia - Epoll