[入门] 一个lock-free的Dispatcher, 各位大侠给看看安全不!
hurd
2008-12-23
http://dlang.group.iteye.com/topics/download/f0e126cf-34b8-3f5e-bdb6-2645126d7f0d
import tango.time.Clock, tango.util.log.Trace, tango.io.Console, tango.core.sync.Semaphore, tango.core.Atomic, tango.core.Thread; void main(){ alias Dispatcher!(myActor, 5) disp; disp.start; int datai = 0; int num = 12 ; myActor.Msg* m; while(true){ Trace.formatln("\n一次投递{}个消息 现在有{}个消息待处理\n", num, disp.count).flush; for(int i = 0; i < num; i ++ ){ m = new myActor.Msg; m.data = i; m.t = Clock.now; disp.push(m); } Thread.sleep(2); } } class Dispatcher(A, int ThreadNumber){ private static{ Semaphore sem; A[ThreadNumber] actors; MsgInfo* EmptyMsgInfo; int _msg_count; } static this(){ newActor!( ThreadNumber - 1); sem = new Semaphore(); Msg* EmptyMsg = new Msg; EmptyMsg.nextMsg = &Msg.init; EmptyMsg.preMsg = &Msg.init; EmptyMsg.msg = &A.Msg.init; EmptyMsgInfo = new MsgInfo; EmptyMsgInfo.top = EmptyMsg; EmptyMsgInfo.bottom = EmptyMsg; ver.store(EmptyMsgInfo); } final static{ // 开始 Actor 进程 void start(){ for(int i =0; i < ThreadNumber; i ++ ){ pool[i].start; } Thread.sleep(0.1); } int count(){ return atomicLoad(_msg_count); } // 投递一个新消息, lock-free void push(in A.Msg* _msg){ MsgInfo* omi; MsgInfo* nmi = new MsgInfo; Msg* m = new Msg; m.msg = _msg; m.nextMsg = &Msg.init; nmi.bottom = m; do{ omi = ver.load(); if( omi.top.msg == &A.Msg.init ){ //Trace.formatln("\n添加到栈顶 _msg {} ", _msg).flush; nmi.top = m; nmi.bottom = m; m.preMsg = &Msg.init; // 栈定消息的前一消息 = 默认消息 }else{ //Trace.formatln("\n添加到栈底 _msg {} ", _msg).flush; omi.bottom.nextMsg = m; //老栈底下一个消息= 新消息 m.preMsg = omi.bottom; //新栈底前一消息 = 老栈底 nmi.top = omi.top; //新栈顶 = 老栈顶 nmi.bottom = m; //新栈底 = 新消息 } }while( !ver.storeIf(nmi, omi) ); //计数器加1 atomicIncrement(_msg_count); sem.notify; } // 取出栈底消息, lock-free private bool pop(out A.Msg* pmsg){ MsgInfo* omi, nmi; nmi = new MsgInfo; Msg* msg; msg = new Msg; msg.preMsg = &Msg.init; do{ omi = ver.load(); if( omi.top.msg == &A.Msg.init ){ // 没有需要处理的消息 return false; }else{ pmsg = omi.top.msg ; //取出第一个消息 if( omi.top.nextMsg == &Msg.init ){ //取出最后一个消息 nmi.top = EmptyMsgInfo.top; //新栈顶 = 空栈顶 nmi.bottom = EmptyMsgInfo.bottom; //新栈底 = 空栈底 //Trace.formatln("pop 最后一个消息 {}", pmsg).flush; }else{ nmi.top = omi.top.nextMsg ; // 新栈顶 = 老栈顶下一个消息 nmi.top.preMsg = &Msg.init; // 新栈顶 前一消息 = 空消息 if( nmi.top.nextMsg == &Msg.init ){ // 新栈顶 下一消息为空 nmi.bottom = nmi.top; // 新栈底 = 新栈顶 }else{ nmi.bottom = omi.bottom; // 新栈底 = 老栈底 } //Trace.formatln("pop 一个消息 {}", pmsg).flush; } } }while( !ver.storeIf(nmi, omi) ); //计数器减1 atomicDecrement(_msg_count); return true; } private { Thread[ThreadNumber] pool; struct Msg{ A.Msg* msg; Msg* preMsg; Msg* nextMsg; } struct MsgInfo{ Msg* top; Msg* bottom; } Atomic!(MsgInfo*) ver; // 初始化线程 void newActor(int i)(){ static if( i > 0 ){ newActor!(i-1); } actors[i] = new A(i); pool[i] = new Thread(&Runner!(i)); } // Actor 线程循环 void Runner(int id)(){ //Trace.formatln("Actor {} 第一次等待信号量", id).flush; sem.wait; int i = 0; while(true){ i++; A.Msg* pmsg; if( !pop(pmsg) ){ //Trace.formatln("Actor {} 收取失败 等待信号量", id).flush; sem.wait; continue; } actors[id].handle(pmsg); } } } } } class myActor{ struct Msg{ //自定义的消息类型 int data; Time t; } int id; this(int i){ id = i; } void handle(in Msg* msg){ //消息处理函数 //Trace.formatln("Actor {} 开始 handle 消息 {}!", id, msg.data).flush; Thread.sleep(1); Trace.formatln("Actor {} 完成 handle 消息 {}!{}ms", id, msg.data, (Clock.now - msg.t).millis).flush; } } |
|
hurd
2008-12-24
上面的程序取消息不需要同步,添加消息是需要同步的。 所以添加消息是需要在一个线程里进行。
下面的的是一个固定容量的双向链表,可以多线程无锁添加删除消息,不过每次添加删除消息都会至少发生一次 (表容量*4+16) 个字节的内存申请。 import tango.core.Memory, tango.core.Atomic; class FreeList(MSG, uint LEN){ private{ alias MSG* Node; alias Node[LEN] NodeList; static const uint VoidSize = LEN * Node.sizeof ; private struct Nodes{ private { uint __count; uint first; uint last; const void* node; } static Nodes* init(){ Nodes* li = new Nodes; li.node = GC.malloc(VoidSize); return li; } static Nodes* clone(in Nodes* old){ Nodes* li = new Nodes; li.first = old.first; li.last = old.last; li.__count = old.__count; li.node = GC.malloc(VoidSize); memcpy(li.node, old.node, VoidSize); return li; } uint count(){ return __count; } bool push(in MSG* p){ if( __count is LEN ){ return false; } (*(cast(NodeList*) node))[last] = p; if( ++last >= LEN ){ last = 0 ; } ++__count; return true; } bool pop(out MSG* p){ if( __count is 0 ){ return false; } if( last is 0 ){ last = LEN - 1 ; }else{ last = last - 1; } p = (*(cast(NodeList*) node))[last] ; --__count; return true; } bool unshift(in MSG* p){ if( __count is LEN ){ return false; } if( first is 0 ){ first = LEN - 1 ; }else{ first = first - 1; } (*(cast(NodeList*) node))[first] = p; ++__count; Trace.formatln("unshift : {} ", p); return true; } bool shift(out MSG* p){ if( __count is 0 ){ return false; } p = (*(cast(NodeList*) node))[first] ; if( ++first >= LEN ){ first = 0 ; } --__count; return true; } } Atomic!(Nodes*) ver; bool ch(char[] act)(ref MSG* p){ Nodes* NewNodes, OldNodes; bool isDone; do{ OldNodes = ver.load(); if( OldNodes.count >= LEN ){ return false; } NewNodes = Nodes.clone(OldNodes); mixin("isDone = NewNodes." ~ act ~ " (p);"); }while( !ver.storeIf(NewNodes, OldNodes)); return isDone; } } public{ this(){ static assert( VoidSize < uint.max); ver.store(Nodes.init); Trace.formatln("Nodes.sizeof {} {}", Node.sizeof , Nodes.sizeof ); } alias ch!("pop") pop; alias ch!("push") push; alias ch!("shift") shift; alias ch!("unshift") unshift; } } |
|
hurd
2008-12-24
又一个不固定容量的无锁双向连表, 每次添加删除节点时申请的内存=当前消息数*4。
这个比上面哪个效率高多了,但是不知道安全不。 import tango.core.Memory, tango.core.Atomic; extern (C) void * memcpy (void *dst, void *src, uint); class FreeList(MSG){ private{ struct Node{ MSG* m; } struct NodeList{ Node* n; uint c; } Atomic!(NodeList*) ver; } this(){ ver.store(&NodeList.init); } uint count(){ NodeList* now = ver.load(); return now.c; } void unshift(in MSG* p){ NodeList* ol, nl; nl = new NodeList; uint size; Node* n; do{ ol = ver.load(); nl.c = ol.c + 1; n = cast(Node*) GC.malloc( Node.sizeof * nl.c ); if( ol.c > 0 ){ memcpy(cast(void*)n + Node.sizeof , cast(void*)ol.n, Node.sizeof * ol.c ); } nl.n = n; n.m = p; }while( !ver.storeIf(nl, ol) ); } void push(in MSG* p){ NodeList* ol, nl; nl = new NodeList; uint size; Node* n; do{ ol = ver.load(); nl.c = ol.c + 1; n = cast(Node*) GC.malloc( Node.sizeof * nl.c ); if( ol.c > 0 ){ memcpy(cast(void*)n, cast(void*)ol.n, Node.sizeof * ol.c ); } nl.n = n; (n+ol.c).m = p; }while( !ver.storeIf(nl, ol) ); } bool pop(out MSG* p){ NodeList* ol, nl; nl = new NodeList; uint size; Node* n; do{ ol = ver.load(); if( ol.c is 0 ){ return false; } nl.c = ol.c - 1; p = ( ol.n + nl.c ).m; if( nl.c > 0 ){ n = cast(Node*) GC.malloc( Node.sizeof * nl.c ); memcpy(cast(void*)n , cast(void*)ol.n , Node.sizeof * nl.c ); nl.n = n; }else{ nl = &NodeList.init; } }while( !ver.storeIf(nl, ol) ); return true; } bool shift(out MSG* p){ NodeList* ol, nl; nl = new NodeList; uint size; Node* n; do{ ol = ver.load(); if( ol.c is 0 ){ return false; } nl.c = ol.c - 1; p = ol.n .m; if( nl.c > 0 ){ n = cast(Node*) GC.malloc( Node.sizeof * nl.c ); memcpy(cast(void*)n , cast(void*)ol.n + Node.sizeof , Node.sizeof * nl.c ); nl.n = n; }else{ nl = &NodeList.init; } }while( !ver.storeIf(nl, ol) ); return true; } } |
|
hurd
2008-12-24
针对上面FreeList的多线程投递,多线程处理代码。
import tango.util.log.Trace, tango.core.Atomic, tango.core.Thread, tango.time.Clock, tango.core.sync.Semaphore; FreeList!(Msg) li; Semaphore sem; struct Msg{ int i; Time now; } int AddNum = 20; int di; void main(){ li = new FreeList!(Msg); sem = new Semaphore; for(int i = 0; i < 100; i++){ auto t = new Thread(&get); t.start; } for(int i = 0; i < 10; i++){ auto t = new Thread(&add); t.start; } } void add(){ Msg* p; while(true){ for(int i = 0; i < AddNum; i++){ p = new Msg; p.i = atomicIncrement(di); p.now = Clock.now; li.push(p); sem.notify(); } auto c = li.count; if( c > 0 ) Trace.formatln("现在有{}个消息", c).flush; Thread.sleep(2); } } void get(){ Msg* p; while(true){ if( !li.pop(p) ){ sem.wait; continue; } if( p.i % 100 is 0 ) Trace.formatln("{} {}ms",p.i, (Clock.now - p.now).millis).flush; Thread.sleep(.1); } } |
|
redsea
2008-12-24
第一个:
应该不是线程安全的, push 和 pop 动作, 都不是一条 atomic 指令完成了工作, 而中间又没有锁, 所以我认为不安全. |
|
hurd
2008-12-25
第一个的pop应该命名为shift才对,因为这是一个弹出最早添加的消息的动作。
第一个在pop时,我仔细检查下还是觉得对已有数据的修改只有一次。就是在ver.storeIf(nmi, omi)时,这样应该是线程安全的吧? 第一个在push时,是非线程安全的。 适用于一个线程监听,然后交给其线成去做。 第二个例子效率非常差,第三个例子在阻塞的消息数不大于3000的时候效率还是很不错的。 阻塞消息数大于一万在我的机器上cpu占用就比较大了。 |
|
hurd
2008-12-25
一个使用Semaphore阻塞进出操作的堆, 不知道Atomic&Semaphore和锁的效率哪个更好点。
import tango.core.Memory, tango.core.sync.Semaphore, tango.core.Atomic; private{ extern (C) void * memcpy (void *dst, void *src, uint); } class HeapList(MSG){ private{ struct Node{ MSG* msg; } struct NodeList{ Node[] list; uint size; uint count; uint first; } Atomic!(bool) isDown; Semaphore sem; NodeList* nodeList; uint GrowStep; } this(in uint step = 1024){ sem = new Semaphore; nodeList = new NodeList; nodeList.list = new Node[step]; nodeList.size = step; GrowStep = step; isDown.store(true); sem.notify; } bool shift(out MSG* p){ while( !isDown.load() ){ sem.wait; } isDown.store(false); scope(exit){ isDown.store(true); sem.notify; } if( nodeList.count is 0 ){ return false; } p = nodeList.list[nodeList.first].msg; ++nodeList.first; --nodeList.count; return true; } void push(in MSG* p){ while( !isDown.load() ){ sem.wait; } isDown.store(false); scope(exit){ isDown.store(true); sem.notify; } uint i = nodeList.count + nodeList.first; if( i >= nodeList.size ) { if( nodeList.first > nodeList.count ){ memcpy(&nodeList.list[0], &nodeList.list[nodeList.first], nodeList.count * Node.sizeof ); }else{ nodeList.size = nodeList.size + GrowStep; auto list = new Node[nodeList.size]; memcpy(&list[0], &nodeList.list[nodeList.first], nodeList.count * Node.sizeof ); nodeList.list = list; } nodeList.first = 0; i = nodeList.count; } nodeList.list[i].msg = p ; ++nodeList.count; } } |