[新闻] dactor项目

qiezi 2009-05-11
根据自己对网络编程、并发、并行、异步、分布式的理解,特别是来自Erlang的一些想法,建立了这个项目。

目前这个项目的c++版本已经用在公司项目中,效果还不错。有些新的想法,使用C++来实现代码量会比较多,还有些架构和局部优化,在公司项目中修改也可能带来不稳定性,所以打算使用D语言来另外实现一份,当然和公司的项目有很大差异。

要实现的特性和进度如下:
  • Coroutine 及调度器[完成]
  • IO事件调度[完成]
  • AIO集成[初步可用,待封装]
  • Socket封装,使用IO事件[未]
  • File封装,使用AIO实现[未]
  • Actor模型,类似erlang的消息机制[未]
  • 分布式Actor,类似erlang的分布式消息机制[未]
  • 并行执行(非并行调度器)[未]

(以上特性已经在公司项目中使用)
  • 类似Erlang的match语法[未]
  • 并行调度器[未]
  • 类似Erlang的内存管理(私有堆)及资源管理[未]

当然公司项目中根据应用做了很多扩充,本项目目前只把目标定位在轻量级线程运行平台。

遇到比较大的麻烦是测试不方便,涉及到IO事件、超时、分布式这些特性,感觉自动化测试不是很方便,目前还没找到好的方法。

项目链接:dactor

有兴趣的可参与讨论
hurd 2009-05-11
首先感谢你在DNS那里给我的提示,现在已经完成了。

我写的iocp小程序里,GetQueuedCompletionStatus后根据各注册Key关联的fiber来执行程序。
	static void callback(void* ov, DWORD ret,  DWORD bytes){
		HttpContext* context	= cast(HttpContext*) ov;
		context.iocp_bytes		= bytes;
		context.iocp_ret		= ret;
		scope(exit){
			if( context.fiber.state == Fiber.State.TERM ){
				context.fiber.reset;
			}
		}
		context.fiber.call;
	}


程序里需要各种读、写、关闭、打开、接受、等异步的地方就Fiber.getThis.yield,然后检测下连接和更新下timer。
一个线程生成AcceptEx用的Socket上下文时生成一个Fiber, 在其他Dns,文件io等需要异步的地方用Fiber.getThis注册到他们自己的重叠Key里。所有操作没有使用状态机。现在对如何包装Sqlite进去还没主意。。。。

使用时发现响应并完成客户请求时大部分使用0ms就完成了,偶尔会出现15ms的情况,我猜测是因为GC的原因。

我对Erlang和ioLanguage等都没了解,不知道采用Actor形式有什么优势?
qiezi 2009-05-11
举个例子吧,用Actor模型编写代码(以下代码应该和dactor后面要完成的形式相似):
void client_loop(Socket sock) {
  File file = new File("1.txt", O_RDWR, 0600);
  while (true) {
    char[] data = new char[1024];
    int ret = sock.Recv(data, 1000); // timeout is 1000 ms
    if (ret <= 0) {
      break;
    }
    file.Write(data);
    sock.Send(data);
  }
}

void server_loop(Socket sock) {
  while(true) {
    Socket client = sock.Accept();
    spawn(&client_loop, client);
  }
}

上面的代码看起来很简单,逻辑也很连贯。它有几个优势:
*类似多线程程序的写法,却没有本地线程数限制导致的并发能力限制
*基于事件调度
*文件使用异步机制实现,看起来却是同步调用语法

这只是其中一部分特性,分布式Actor更强大,可以简单了解一下erlang。这里给个简单的例子:
void echo_service() {
  while (true) {
    auto msg = Receive();
    Send(msg.sender, msg.data);
  }
}

void test_client() {
  while(true) {
    Send("echo_service", "abc");
    auto msg = Receive();
    writefln(msg);
  }
}

void test_remote_client() {
  while(true) {
    Send("echo_service@127.0.0.1", "abc");
    auto msg = Receive();
    writefln(msg);
  }
}

void main() {
  // init environment
  Pid pid = spawn(&echo_service);
  register("echo_service", pid);
  spawn(&test_client);
  //
}


大致是这样的。io language的处理方式比较简单,如果不了解erlang的底层,看看这个也行,它集成了socket/aio,使用libevent来做事件调度,dactor和它差不多,不过它的aio处理方式不太好。
hqs7636 2009-05-11
 
hurd 2009-05-11
没看到在那里调用EventManager.run的,是不是AioEvent.__aioEventCb就在EventManager.run的线程里运行?

dactor里也没同步的代码,是不是全在一个线程里运行?
qiezi 2009-05-11
hurd 写道
没看到在那里调用EventManager.run的,是不是AioEvent.__aioEventCb就在EventManager.run的线程里运行?

dactor里也没同步的代码,是不是全在一个线程里运行?

spawn(&EventManager.instance.run);

这行是在test.d里的,目前测试先放在这里,以后可能要把这种框架初始化的代码放到一组init/fini/run/stop函数里去调用。

轻量级线程是在同一个线程里运行的,aio用信号来通知,所以目前没有用到多线程。以后线程池、并行调度器会使用多线程的。
qiezi 2009-05-11
目前aio仅支持linux〜〜
davesun 2009-05-14

能不能给coroutine也加个池什么的,spawn时从Coroutine Pool 中提取一个空闲的协程来调度,调度结束后再reset一下放回到池里,这样效率不就更高了吗?

上次你给我的多线程调度Fiber的例子,我改了一下使用dmd2.0.29编译在ubuntu上测试,不知道怎麽回事,调度一段时间后,有时会出现“segmentation fault”,好像是在线程切换时发生的,不知道为什么在windows下确没事儿,请教了!! 我的机器是1个G内存。启动了30000个纤程,再多一些好像就不行了。

private import core.thread; 
private import std.stdio; 

private class SyncQueue(T) 
{ 

    private T[] queue; 

    public synchronized void push(T o) 
    { 
        queue ~= o; 
    } 

    public synchronized T pop() 
    { 
        T o = null; 
        if(queue.length > 1){ 
            o = queue[0]; 
            queue = queue[1..$]; 
        } 
        else if (queue.length == 1) 
        { 
            o = queue[0]; 
            queue = []; 
        } 
        return o; 
    } 
} 

private class Scheduler 
{ 
    private SyncQueue!(Fiber) runQueue; 
    public this () 
    { 
        runQueue  = new SyncQueue!(Fiber); 
    } 

    public void addFiber(Fiber fiber) 
    { 
        runQueue.push(fiber); 
    } 

    public void run(int threadNum =1) 
    { 
        ThreadGroup tg = new ThreadGroup;
		for(int i =0;i<threadNum;i++){
		  tg.create({
		    while (true) 
            { 
                Fiber fiber = runQueue.pop(); 
                if (fiber is null) 
                { 
                   continue; 
                } 
                else 
                { 
                  fiber.call(); 
                  if (fiber.state != Fiber.State.TERM) 
                     runQueue.push(fiber); 
                } 
            } 
		  });
		}
		tg.joinAll;
    } 
} 

class WorkFiber:Fiber 
{ 
    this(){ 
        super(&run); 
    } 

    void run() 
    { 
        while (true) 
        { 
             writefln("Thread:%p,Fiber:%p",cast(void*)Thread.getThis,cast(void*)Fiber.getThis);
             Fiber.yield(); 
        } 
    } 
} 


void main() 
{ 
    Scheduler p = new Scheduler; 
	for(int i=0;i<30000;i++)
      p.addFiber(new WorkFiber); 
    p.run(4); 
}

 

qiezi 2009-05-14
性能优化尽量晚一些再做的,一般Coroutine的创建和销毁也不会太频繁。
qiezi 2009-05-14
从代码中没看出什么问题,很可能是Fiber的实现问题〜core dump时堆栈都乱了
Global site tag (gtag.js) - Google Analytics