[新闻] dactor项目
qiezi
2009-05-11
根据自己对网络编程、并发、并行、异步、分布式的理解,特别是来自Erlang的一些想法,建立了这个项目。
目前这个项目的c++版本已经用在公司项目中,效果还不错。有些新的想法,使用C++来实现代码量会比较多,还有些架构和局部优化,在公司项目中修改也可能带来不稳定性,所以打算使用D语言来另外实现一份,当然和公司的项目有很大差异。 要实现的特性和进度如下:
(以上特性已经在公司项目中使用)
当然公司项目中根据应用做了很多扩充,本项目目前只把目标定位在轻量级线程运行平台。 遇到比较大的麻烦是测试不方便,涉及到IO事件、超时、分布式这些特性,感觉自动化测试不是很方便,目前还没找到好的方法。 项目链接:dactor 有兴趣的可参与讨论 |
|
hurd
2009-05-11
首先感谢你在DNS那里给我的提示,现在已经完成了。
我写的iocp小程序里,GetQueuedCompletionStatus后根据各注册Key关联的fiber来执行程序。 static void callback(void* ov, DWORD ret, DWORD bytes){ HttpContext* context = cast(HttpContext*) ov; context.iocp_bytes = bytes; context.iocp_ret = ret; scope(exit){ if( context.fiber.state == Fiber.State.TERM ){ context.fiber.reset; } } context.fiber.call; } 程序里需要各种读、写、关闭、打开、接受、等异步的地方就Fiber.getThis.yield,然后检测下连接和更新下timer。 一个线程生成AcceptEx用的Socket上下文时生成一个Fiber, 在其他Dns,文件io等需要异步的地方用Fiber.getThis注册到他们自己的重叠Key里。所有操作没有使用状态机。现在对如何包装Sqlite进去还没主意。。。。 使用时发现响应并完成客户请求时大部分使用0ms就完成了,偶尔会出现15ms的情况,我猜测是因为GC的原因。 我对Erlang和ioLanguage等都没了解,不知道采用Actor形式有什么优势? |
|
qiezi
2009-05-11
举个例子吧,用Actor模型编写代码(以下代码应该和dactor后面要完成的形式相似):
void client_loop(Socket sock) { File file = new File("1.txt", O_RDWR, 0600); while (true) { char[] data = new char[1024]; int ret = sock.Recv(data, 1000); // timeout is 1000 ms if (ret <= 0) { break; } file.Write(data); sock.Send(data); } } void server_loop(Socket sock) { while(true) { Socket client = sock.Accept(); spawn(&client_loop, client); } } 上面的代码看起来很简单,逻辑也很连贯。它有几个优势: *类似多线程程序的写法,却没有本地线程数限制导致的并发能力限制 *基于事件调度 *文件使用异步机制实现,看起来却是同步调用语法 这只是其中一部分特性,分布式Actor更强大,可以简单了解一下erlang。这里给个简单的例子: void echo_service() { while (true) { auto msg = Receive(); Send(msg.sender, msg.data); } } void test_client() { while(true) { Send("echo_service", "abc"); auto msg = Receive(); writefln(msg); } } void test_remote_client() { while(true) { Send("echo_service@127.0.0.1", "abc"); auto msg = Receive(); writefln(msg); } } void main() { // init environment Pid pid = spawn(&echo_service); register("echo_service", pid); spawn(&test_client); // } 大致是这样的。io language的处理方式比较简单,如果不了解erlang的底层,看看这个也行,它集成了socket/aio,使用libevent来做事件调度,dactor和它差不多,不过它的aio处理方式不太好。 |
|
hqs7636
2009-05-11
|
|
hurd
2009-05-11
没看到在那里调用EventManager.run的,是不是AioEvent.__aioEventCb就在EventManager.run的线程里运行?
dactor里也没同步的代码,是不是全在一个线程里运行? |
|
qiezi
2009-05-11
hurd 写道 没看到在那里调用EventManager.run的,是不是AioEvent.__aioEventCb就在EventManager.run的线程里运行?
dactor里也没同步的代码,是不是全在一个线程里运行? spawn(&EventManager.instance.run); 这行是在test.d里的,目前测试先放在这里,以后可能要把这种框架初始化的代码放到一组init/fini/run/stop函数里去调用。 轻量级线程是在同一个线程里运行的,aio用信号来通知,所以目前没有用到多线程。以后线程池、并行调度器会使用多线程的。 |
|
qiezi
2009-05-11
目前aio仅支持linux〜〜
|
|
davesun
2009-05-14
能不能给coroutine也加个池什么的,spawn时从Coroutine Pool 中提取一个空闲的协程来调度,调度结束后再reset一下放回到池里,这样效率不就更高了吗? private import core.thread; private import std.stdio; private class SyncQueue(T) { private T[] queue; public synchronized void push(T o) { queue ~= o; } public synchronized T pop() { T o = null; if(queue.length > 1){ o = queue[0]; queue = queue[1..$]; } else if (queue.length == 1) { o = queue[0]; queue = []; } return o; } } private class Scheduler { private SyncQueue!(Fiber) runQueue; public this () { runQueue = new SyncQueue!(Fiber); } public void addFiber(Fiber fiber) { runQueue.push(fiber); } public void run(int threadNum =1) { ThreadGroup tg = new ThreadGroup; for(int i =0;i<threadNum;i++){ tg.create({ while (true) { Fiber fiber = runQueue.pop(); if (fiber is null) { continue; } else { fiber.call(); if (fiber.state != Fiber.State.TERM) runQueue.push(fiber); } } }); } tg.joinAll; } } class WorkFiber:Fiber { this(){ super(&run); } void run() { while (true) { writefln("Thread:%p,Fiber:%p",cast(void*)Thread.getThis,cast(void*)Fiber.getThis); Fiber.yield(); } } } void main() { Scheduler p = new Scheduler; for(int i=0;i<30000;i++) p.addFiber(new WorkFiber); p.run(4); }
|
|
qiezi
2009-05-14
性能优化尽量晚一些再做的,一般Coroutine的创建和销毁也不会太频繁。
|
|
qiezi
2009-05-14
从代码中没看出什么问题,很可能是Fiber的实现问题〜core dump时堆栈都乱了
|