[入门] 一个lock-free的Dispatcher, 各位大侠给看看安全不!

hurd 2008-12-23
http://dlang.group.iteye.com/topics/download/f0e126cf-34b8-3f5e-bdb6-2645126d7f0d


import
	tango.time.Clock,
	tango.util.log.Trace,
	tango.io.Console,
	tango.core.sync.Semaphore,
	tango.core.Atomic,
	tango.core.Thread;
	
void main(){
	alias Dispatcher!(myActor, 5) disp;
	disp.start;
	int datai	= 0;
	int num		= 12 ;
	myActor.Msg* 	m;
	while(true){
		
		Trace.formatln("\n一次投递{}个消息 现在有{}个消息待处理\n", num, disp.count).flush;
		for(int i = 0; i < num; i ++ ){
			m	= new myActor.Msg;
			m.data	= i;
			m.t	= Clock.now;
			disp.push(m);
		}
		
		Thread.sleep(2);
	}

}


class Dispatcher(A, int ThreadNumber){
	private static{
		Semaphore		sem;
		A[ThreadNumber]		actors;
		MsgInfo* 		EmptyMsgInfo;
		int			_msg_count;
	}
	static this(){
		newActor!( ThreadNumber - 1);
		
		sem			= new Semaphore();
		
		Msg* 	EmptyMsg	= new Msg;
		
		EmptyMsg.nextMsg	= &Msg.init;
		EmptyMsg.preMsg		= &Msg.init;
		EmptyMsg.msg		= &A.Msg.init;
		
		EmptyMsgInfo		= new MsgInfo;
		EmptyMsgInfo.top	= EmptyMsg;
		EmptyMsgInfo.bottom	= EmptyMsg;
		ver.store(EmptyMsgInfo);

	}
	final static{
		// 开始 Actor 进程
		void start(){
			for(int i =0; i < ThreadNumber; i ++ ){
				pool[i].start;
			}
			Thread.sleep(0.1);
		}
		
		int count(){
			return atomicLoad(_msg_count);
		}
		
		// 投递一个新消息, lock-free
		void push(in A.Msg* _msg){
			MsgInfo* omi;
			MsgInfo* nmi	= new MsgInfo;
			
			Msg* m		= new Msg;
			m.msg		= _msg;
			m.nextMsg	= &Msg.init;
			
			nmi.bottom	= m;
			do{
				omi	= ver.load();
				if( omi.top.msg == &A.Msg.init ){
					//Trace.formatln("\n添加到栈顶 _msg {} ", _msg).flush;
					nmi.top		= m;
					nmi.bottom	= m;
					m.preMsg	= &Msg.init; 		// 栈定消息的前一消息 = 默认消息
				}else{
					//Trace.formatln("\n添加到栈底 _msg {} ", _msg).flush;
					omi.bottom.nextMsg	= m;		//老栈底下一个消息= 新消息
					m.preMsg		= omi.bottom;	//新栈底前一消息 = 老栈底
					
					nmi.top			= omi.top; 	//新栈顶 = 老栈顶
					nmi.bottom		= m;		//新栈底 = 新消息
				}
			}while( !ver.storeIf(nmi, omi) );
			
			//计数器加1
			atomicIncrement(_msg_count);
			sem.notify;
		}
		
		// 取出栈底消息, lock-free
		private bool pop(out A.Msg* pmsg){
			MsgInfo* omi, nmi;
			nmi		= new MsgInfo;
			
			Msg* msg;
			msg		= new Msg;
			msg.preMsg	= &Msg.init;
			
			do{
				omi	= ver.load();
				if( omi.top.msg	 == &A.Msg.init ){
					// 没有需要处理的消息
					return false;
				}else{
					pmsg	= omi.top.msg	; //取出第一个消息

					if( omi.top.nextMsg == &Msg.init ){ 		//取出最后一个消息
						nmi.top		= EmptyMsgInfo.top;	//新栈顶 = 空栈顶
						nmi.bottom	= EmptyMsgInfo.bottom;	//新栈底 = 空栈底 
						//Trace.formatln("pop 最后一个消息 {}", pmsg).flush;
					}else{
						nmi.top		= omi.top.nextMsg ;	// 新栈顶	= 老栈顶下一个消息
						nmi.top.preMsg	= &Msg.init;		// 新栈顶 前一消息 = 空消息
						
						if( nmi.top.nextMsg == &Msg.init ){ 	// 新栈顶 下一消息为空
							nmi.bottom	= nmi.top; 	// 新栈底 = 新栈顶 
						}else{
							nmi.bottom	= omi.bottom;	// 新栈底	= 老栈底
						}
						//Trace.formatln("pop 一个消息 {}", pmsg).flush;
					}
				}
			}while( !ver.storeIf(nmi, omi) );
			//计数器减1
			atomicDecrement(_msg_count);
			return true;
		}
		
		private {
			Thread[ThreadNumber]	pool;
			struct Msg{
				A.Msg*	msg;
				Msg*	preMsg;
				Msg*	nextMsg;
			}
			struct MsgInfo{
				Msg* top;
				Msg* bottom;
			}
			Atomic!(MsgInfo*)	ver;
			
			// 初始化线程 
			void newActor(int i)(){
				static if( i > 0 ){
					newActor!(i-1);
				}
				actors[i]	= new A(i);
				pool[i]		= new Thread(&Runner!(i));
			}
			
			// Actor 线程循环
			void Runner(int id)(){
				//Trace.formatln("Actor {} 第一次等待信号量", id).flush;
				sem.wait;
				int i	= 0;
				while(true){
					i++;
					A.Msg* pmsg;
					if( !pop(pmsg) ){
						//Trace.formatln("Actor {} 收取失败 等待信号量", id).flush;
						sem.wait;
						continue;
					}
					actors[id].handle(pmsg);
				}
			}
			
		}
	}
}


class myActor{
	struct Msg{	//自定义的消息类型
		int data;
		Time t;
	}
	int id;
	
	this(int i){
		id	= i;
	}
	
	void handle(in Msg* msg){ //消息处理函数
		//Trace.formatln("Actor {} 开始 handle  消息 {}!", id, msg.data).flush;
		Thread.sleep(1);
		Trace.formatln("Actor {} 完成 handle  消息 {}!{}ms", id, msg.data, (Clock.now - msg.t).millis).flush;
	}
}
hurd 2008-12-24
上面的程序取消息不需要同步,添加消息是需要同步的。 所以添加消息是需要在一个线程里进行。

下面的的是一个固定容量的双向链表,可以多线程无锁添加删除消息,不过每次添加删除消息都会至少发生一次 (表容量*4+16) 个字节的内存申请。

import
	tango.core.Memory,
	tango.core.Atomic;
class FreeList(MSG, uint LEN){
	
	private{
		alias MSG* 		Node;
		alias Node[LEN]	NodeList;
		
		static	const uint VoidSize	= LEN * Node.sizeof ;
		
		private struct Nodes{
			private {
				uint		__count;
				uint		first;
				uint		last;
				const void*	node;
			}
			
			static Nodes* init(){
				Nodes*	li	= new Nodes;
				li.node		= GC.malloc(VoidSize);
				return li;
			}
			
			static Nodes* clone(in Nodes* old){
				Nodes*	li	= new Nodes;
				
				li.first	= old.first;
				li.last		= old.last;
				li.__count	= old.__count;
				
				li.node		= GC.malloc(VoidSize);
				memcpy(li.node,  old.node, VoidSize);
				return li;
			}
			
			uint count(){
				return __count;
			}
			
			bool push(in MSG* p){
				if( __count is LEN ){
					return false;
				}
				(*(cast(NodeList*) node))[last]	= p;
				if( ++last >= LEN ){
					last	= 0 ;
				}
				++__count;
				return true;
			}
			
			bool pop(out MSG* p){
				if( __count is 0 ){
					return false;
				}
				if( last is 0 ){
					last	= LEN - 1 ;
				}else{
					last	= last - 1;
				}
				p		= (*(cast(NodeList*) node))[last] ;
				--__count;
				return true;
			}
			
			
			bool unshift(in MSG* p){
				if( __count is LEN ){
					return false;
				}
				if( first is 0 ){
					first	= LEN - 1 ;
				}else{
					first	= first - 1;
				}
				(*(cast(NodeList*) node))[first]	= p;
				++__count;
				
				
				Trace.formatln("unshift : {} ", p);
				return true;
			}
			
			bool shift(out MSG* p){
				if( __count is 0 ){
					return false;
				}
				p		= (*(cast(NodeList*) node))[first] ;
				if( ++first >= LEN ){
					first	= 0 ;
				}
				--__count;
				
				return true;
			}
		}
		
		Atomic!(Nodes*)		ver;
		
		
		bool ch(char[] act)(ref MSG* p){
			Nodes* NewNodes, OldNodes;
			bool isDone;
			do{
				OldNodes	= ver.load();
				if( OldNodes.count >= LEN ){
					return false;
				}
				NewNodes	= Nodes.clone(OldNodes);
				mixin("isDone	= NewNodes." ~ act ~ " (p);");
			}while( !ver.storeIf(NewNodes, OldNodes));
			return isDone;
		}
		
	}
	
	
	public{
		this(){
			static assert( VoidSize < uint.max);
			ver.store(Nodes.init);
			Trace.formatln("Nodes.sizeof  {} {}", Node.sizeof , Nodes.sizeof );
		}
		
		alias ch!("pop") 	pop;
		alias ch!("push")	push;
		alias ch!("shift")	shift;
		alias ch!("unshift")	unshift;
	}
}


hurd 2008-12-24
又一个不固定容量的无锁双向连表, 每次添加删除节点时申请的内存=当前消息数*4。
这个比上面哪个效率高多了,但是不知道安全不。


import
	
	tango.core.Memory,
	tango.core.Atomic;
	
extern (C)  void * memcpy (void *dst, void *src, uint);

class FreeList(MSG){
	
	private{
		struct Node{
			MSG*		m;
		}
		struct	NodeList{
			Node*	n;
			uint	c;
		}
		Atomic!(NodeList*)	ver;
	}
	
	this(){
		ver.store(&NodeList.init);
	}
	
	uint count(){
		NodeList* now	= ver.load();
		return	now.c;
	}
	
	void unshift(in MSG* p){
		NodeList* ol, nl;
		nl	= new NodeList;
		uint	size;
		Node*	n;
		do{
			ol	= ver.load();
			nl.c	= ol.c + 1;
			n	= cast(Node*) GC.malloc( Node.sizeof *  nl.c );
			if( ol.c > 0 ){
				memcpy(cast(void*)n + Node.sizeof , cast(void*)ol.n, Node.sizeof *  ol.c );
			}
			nl.n	= n;
			n.m	= p;
		}while( !ver.storeIf(nl, ol) );
	}
		
	void push(in MSG* p){
		NodeList* ol, nl;
		nl	= new NodeList;
		uint	size;
		Node*	n;
		do{
			ol	= ver.load();
			nl.c	= ol.c + 1;
			n	= cast(Node*) GC.malloc( Node.sizeof *  nl.c );
			if( ol.c > 0 ){
				memcpy(cast(void*)n, cast(void*)ol.n, Node.sizeof *  ol.c );
			}
			nl.n		= n;
			(n+ol.c).m	= p;
		}while( !ver.storeIf(nl, ol) );
	}
	
	bool pop(out MSG* p){
		NodeList* ol, nl;
		nl	= new NodeList;
		uint	size;
		Node*	n;
		do{
			ol	= ver.load();
			if( ol.c is 0 ){
				return false;
			}
			nl.c	= ol.c - 1;
			p	= ( ol.n + nl.c ).m;
			if( nl.c > 0 ){
				n	= cast(Node*) GC.malloc( Node.sizeof *  nl.c );
				memcpy(cast(void*)n , cast(void*)ol.n  , Node.sizeof *  nl.c );
				nl.n		= n;
			}else{
				nl	= &NodeList.init;
			}
		}while( !ver.storeIf(nl, ol) );
		return true;
	}
	
	bool shift(out MSG* p){
		NodeList* ol, nl;
		nl	= new NodeList;
		uint	size;
		Node*	n;
		do{
			ol	= ver.load();
			if( ol.c is 0 ){
				return false;
			}
			nl.c	= ol.c - 1;
			p	= ol.n .m;
			if( nl.c > 0 ){
				n	= cast(Node*) GC.malloc( Node.sizeof *  nl.c );
				memcpy(cast(void*)n , cast(void*)ol.n + Node.sizeof , Node.sizeof *  nl.c );
				nl.n		= n;
			}else{
				nl	= &NodeList.init;
			}
		}while( !ver.storeIf(nl, ol) );
		return true;
	}
	
}

hurd 2008-12-24
针对上面FreeList的多线程投递,多线程处理代码。
import
	tango.util.log.Trace,
	tango.core.Atomic,
	tango.core.Thread,
	tango.time.Clock,
	tango.core.sync.Semaphore;

	FreeList!(Msg)	li;
	Semaphore	sem;
	struct Msg{
		int 	i;
		Time	now;
	}
	int AddNum	= 20;
	int	di;
void main(){
	li	= new FreeList!(Msg);
	sem	= new Semaphore;
	
	
	for(int i = 0; i < 100; i++){
		auto t	= new Thread(&get);
		t.start;
	}
	
	for(int i = 0; i < 10; i++){
		auto t	= new Thread(&add);
		t.start;
	}
}

void add(){
	Msg* p;
	while(true){
		for(int i = 0; i < AddNum; i++){
			p	= new Msg;
			p.i	= atomicIncrement(di);
			p.now	= Clock.now;
			li.push(p);
			sem.notify();
		}
		auto	c	= li.count;
		if( c > 0 )
			Trace.formatln("现在有{}个消息", c).flush;
		Thread.sleep(2);
	}
}

void get(){
	Msg* p;
	while(true){
		if( !li.pop(p) ){
			sem.wait;
			continue;
		}
		if( p.i % 100 is 0 )
			Trace.formatln("{} {}ms",p.i, (Clock.now - p.now).millis).flush;
		Thread.sleep(.1);
	}
}
redsea 2008-12-24
第一个:

应该不是线程安全的, push 和 pop 动作, 都不是一条 atomic 指令完成了工作, 而中间又没有锁, 所以我认为不安全.
hurd 2008-12-25
第一个的pop应该命名为shift才对,因为这是一个弹出最早添加的消息的动作。

第一个在pop时,我仔细检查下还是觉得对已有数据的修改只有一次。就是在ver.storeIf(nmi, omi)时,这样应该是线程安全的吧?

第一个在push时,是非线程安全的。 适用于一个线程监听,然后交给其线成去做。


第二个例子效率非常差,第三个例子在阻塞的消息数不大于3000的时候效率还是很不错的。 阻塞消息数大于一万在我的机器上cpu占用就比较大了。
hurd 2008-12-25
一个使用Semaphore阻塞进出操作的堆, 不知道Atomic&Semaphore和锁的效率哪个更好点。

import
	tango.core.Memory,
	tango.core.sync.Semaphore,
	tango.core.Atomic;
	
private{
	extern (C)  void * memcpy (void *dst, void *src, uint);
}

class HeapList(MSG){
	private{
		struct Node{
			MSG*		msg;
		}
		struct	NodeList{
			Node[]		list;
			uint		size;
			uint		count;
			uint		first;
		}
		Atomic!(bool)	isDown;
		Semaphore	sem;
		NodeList*	nodeList;
		uint		GrowStep;

	}
	this(in uint step = 1024){
		sem		= new Semaphore;
		nodeList	= new NodeList;
		nodeList.list	= new Node[step];
		nodeList.size	= step;
		GrowStep	= step;
		isDown.store(true);
		sem.notify;
	}
	
	bool shift(out MSG* p){
		while( !isDown.load() ){ sem.wait; }
		isDown.store(false);
		scope(exit){
			isDown.store(true);
			sem.notify;
		}
		if( nodeList.count is 0 ){
			return false;
		}
		p	= nodeList.list[nodeList.first].msg;
		++nodeList.first;
		--nodeList.count;
		return true;
	}
	
	void push(in MSG* p){
		while( !isDown.load() ){ sem.wait; }
		isDown.store(false);
		scope(exit){
			isDown.store(true);
			sem.notify;
		}
		uint	i	= nodeList.count + nodeList.first;
		if( i >= nodeList.size ) {
			if( nodeList.first > nodeList.count ){
				memcpy(&nodeList.list[0], &nodeList.list[nodeList.first], nodeList.count * Node.sizeof );
			}else{
				nodeList.size	= nodeList.size + GrowStep;
				auto	list	= new Node[nodeList.size];
				memcpy(&list[0], &nodeList.list[nodeList.first], nodeList.count * Node.sizeof );
				nodeList.list	= list;
			}
			nodeList.first	= 0;
			i		= nodeList.count;
		}
		nodeList.list[i].msg	= p ;
		++nodeList.count;
	}
}
Global site tag (gtag.js) - Google Analytics