博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Socket连接池
阅读量:6247 次
发布时间:2019-06-22

本文共 17525 字,大约阅读时间需要 58 分钟。

  “池”这个概念好像最早是在操作系统的课上听过的,到后来出来工作的第二天组长也跟我提起“池”这个东东。它给我的感觉是某种对象的集合,如果要用的话就取出,不用的话就放回。在学多线程的时候有接触过线程池,在写《》的时候想到了Socket连接池这回事,不过在网上谷歌了一下,发现这类的文章貌似不多,看了一下园友的博文《》,获益良多,下了份源码来看,虽然有一部分看不明白,而且由于个人水平跑不了那份代码,但是从中我学到了不少,至少我写的“池”有一部分是用了这位田大哥的思想。

  先来分析各个类之间的结构,整个连接池里面实际上是有两个池,一个是在异步通信中可以重复利用的SocketAsyncEventArgs池(当然处于别的方面考虑,池里面并不是单纯放SocketAsyncEventArgs的实例集合),另一个是接收数据时用到的byte[]缓冲池。而这两个池是外部不能访问的,外部通过一个控制(或者叫管理)的类进行操作。继承了前两篇博文中的异步通信的思想,最底的一个类存放了一次连接的信息,包括两个SocketAsyncEventArgs实例和通信另一端的Socket实例。下面将逐部分介绍。

 

1     ///  2     /// 连接单元 3     ///  4      class ConnectionUnit:IDisposable 5     { 6         private string _uid;//单元的编号,默认为-1 7         private bool _state;//单元的状态,true表示使用,false表示空闲 8         private SocketAsyncEventArgs _sendArg;//专用于发送 9         private SocketAsyncEventArgs _recArg;//专用于接收10         internal Socket client { get; set; }//客户端的socket实例11         internal ArrayList tempArray { get; set; }//暂存已接收的数据,避免粘包用的12 13 14         public ConnectionUnit(string UID)15         {16             _uid = UID;17             tempArray = new ArrayList();18         }19 20         public ConnectionUnit() : this("-1") { }21 22         public void Dispose()23         {24             if (_sendArg != null)25                 _sendArg.Dispose();26             if (_recArg != null)27                 _recArg.Dispose();28 29             _sendArg = null;30             _recArg = null;31         }32     }

 

  这个与之前一篇讲异步通信的博文一样,一个Socket两个SocketAsyncEventArgs。为了取客户端的Socket方便一点,两个SocketAsyncEventArgs一个用于收,一个用于发,田大哥说这样可以实现双工。的确是这样,当初把类定成这样也是为了在收的同时也能发。以上代码删掉了把字段装成属性的那部分。

  SocketAsyncEventArgsPool这个类就是上面链接单元的一个池,这个池才是整个连接池里最核心的池,也是真正意义上的池。池里面用了两个集合来存放这一堆连接单元。分别是空闲栈(freeCollecton)和在线字典集(busyCollection)。对于这样的一个池,我就认为,只要在我需要用的时候把对象取出来,顶多在去的时候给一些参数,让池帮忙把这个对象配置好,我就拿来使用,等到我用完了,把他放回池里,池就把对象还原,等待下次再使用。正体感觉这个池对外有点像个栈,取的时候就Pop,放的时候就Push,此外还提供了两个与栈不相干的的方法,根据编号获取在线的连接单元和获取所有在线连接单元的编号。

1     class SocketAsyncEventArgsPool:IDisposable 2     { 3         private Dictionary
busyCollection; 4 private Stack
freeCollecton; 5 6 internal SocketAsyncEventArgsPool() 7 { 8 busyCollection = new Dictionary
(); 9 freeCollecton = new Stack
();10 }11 12 ///
13 /// 取出14 /// 15 internal ConnectionUnit Pop(string uid)16 {17 ConnectionUnit unit = freeCollecton.Pop();18 unit.State = true;19 unit.Uid = uid;20 busyCollection.Add(uid, unit);21 return unit;22 }23 24 ///
25 /// 放回26 /// 27 internal void Push(ConnectionUnit unit)28 {29 if (!string.IsNullOrEmpty(unit.Uid) && unit.Uid != "-1")30 busyCollection.Remove(unit.Uid);31 unit.Uid = "-1";32 unit.State = false;33 freeCollecton.Push(unit);34 }35 36 ///
37 /// 获取38 /// 39 internal ConnectionUnit GetConnectionUnitByUID(string uid)40 {41 if (busyCollection.ContainsKey(uid))42 return busyCollection[uid];43 return null;44 }45 46 ///
47 /// 48 /// 49 internal string[] GetOnLineList()50 {51 return busyCollection.Keys.ToArray();52 }53 54 public void Dispose()55 {56 foreach (KeyValuePair
item in busyCollection)57 item.Value.Dispose();58 59 busyCollection.Clear();60 61 while (freeCollecton.Count > 0)62 freeCollecton.Pop().Dispose();63 }

 

  BufferManager这个是专给接收的SocketAsyncEventArgs用的缓冲池,整一个连接池里面所有接收用的缓冲区都用这个BufferManager,参照田大哥的思想,现在内存里开辟一大片区域存放byte,然后给每一个接收用的SocketAsyncEventArgs分配一块。

1     class BufferManager:IDisposable 2     { 3         private byte[] buffers;//缓冲池 4         private int bufferSize;//每个单元使用的大小 5         private int allSize;//池的总大小 6         private int currentIndex;//当前可用的索引 7         private Stack
freeIndexs;//已使用过的空闲索引 8 9 ///
10 /// 构造缓存池11 /// 12 ///
池总大小13 ///
默认单元大小14 internal BufferManager(int buffersSize, int defaultSize) 15 {16 this.bufferSize=defaultSize;17 this.allSize=buffersSize;18 currentIndex=0;19 this.buffers = new byte[allSize];20 freeIndexs = new Stack
();21 }22 23 ///
24 /// 给SocketAsyncEventArgs设置缓冲区25 /// 26 internal bool SetBuffer(SocketAsyncEventArgs e)27 {28 //首先看看空闲栈里有没有空闲的区域,有就使用29 if (freeIndexs.Count > 0)30 {31 e.SetBuffer(buffers, freeIndexs.Pop(), bufferSize);32 }33 else34 {35 //没有就得从buffers里取,如果buffers用光了当然取不了36 if ((allSize - currentIndex) < bufferSize) return false;37 e.SetBuffer(buffers, currentIndex, bufferSize);38 currentIndex += bufferSize;39 }40 return true;41 }42 43 ///
44 /// 释放SocketAsyncEventArgs的缓冲区45 /// 46 ///
47 internal void FreeBuffer(SocketAsyncEventArgs e)48 {49 //把索引放到空闲索引栈里面,供下次取的时候重复利用50 freeIndexs.Push(e.Offset);51 //同时清空这部分区域的数据免得上次使用时遗留的数据会掺52 //和到下次读取的数据中53 for (int i = e.Offset; i < e.Offset + bufferSize; i++)54 {55 if (buffers[i] == 0) break;56 buffers[i] = 0;57 }58 e.SetBuffer(null, 0, 0);59 }60 61 public void Dispose()62 {63 buffers = null;64 freeIndexs = null;65 }66 }

 

  其实上面两个池都是很大程度参照了《》中的内容。下面这个SocketPoolController是对外的类,这个类的设计参照的就没那么多了。而对于一个Socket通信(服务端的)来说,无非都是三件事,接受连接,接收数据,发送数据。这三件事我再操作类里面是这样做的

  接受连接:接受再运行池的时候就开始了,异步循环地接受,执行一次异步接受就阻塞,等到接受完成才被唤醒。

1         ///  2         /// 异步Accept客户端的连接 3         ///  4         void MyAsyncAccept() 5         { 6             //这里使用Action的方式异步循环接受客户端的连接 7             //模仿同事的做法没开线程,不知这种方式是好是坏 8             Action callback = new Action(delegate() 9             {10                 while (true)11                 {12                     //每次接受都要新开一个SocketAsyncEventArgs,否则会报错13                     //其实我也想重复利用的14                     SocketAsyncEventArgs e = new SocketAsyncEventArgs();15                     e.Completed += new EventHandler
(Accept_Completed);16 17 acceptLock.Reset();18 server.AcceptAsync(e);19 //在异步接受完成之前阻塞当前线程20 acceptLock.WaitOne();21 }22 });23 callback.BeginInvoke(null, null);24 }25 26 void Accept_Completed(object sender, SocketAsyncEventArgs e)27 {28 Socket client = e.AcceptSocket;29 try30 {31 if (client.Connected)32 {33 IPEndPoint point = client.RemoteEndPoint as IPEndPoint;34 string uid = point.Address + ":" + point.Port;35 ConnectionUnit unit = pool.Pop(uid);36 unit.client = client;37 unit.State = true;38 unit.Uid = uid;39 unit.RecArg.UserToken = unit;40 unit.SendArg.UserToken = unit;41 buffer.SetBuffer(unit.RecArg);42 43 //在接受成功之后就开始接收数据了44 client.ReceiveAsync(unit.RecArg);45 //设置并发限制信号和增加当前连接数46 semaphoreAccept.WaitOne();47 Interlocked.Increment(ref currentConnect);48 49 if (OnAccept != null) OnAccept(uid);50 }51 else if (client != null)52 {53 client.Close();54 client.Dispose();55 client = null;56 }57 }58 catch (Exception ex) { Console.WriteLine(ex.ToString()); }59 //设置Accept信号,以便下次Accept的执行60 acceptLock.Set();61 e.Dispose();62 }

 

  接收消息:在异步接受成功的时候开始接收,每次接收完成之后就进行下一次接收,直到客户端断开连接才终止。

1         void RecArg_Completed(object sender, SocketAsyncEventArgs e) 2         { 3             Socket client = sender as Socket; 4             ConnectionUnit unit = e.UserToken as ConnectionUnit; 5             //这里大致与上一篇异步通信的一样,只是对缓冲区的处理有一点差异 6             if (e.SocketError == SocketError.Success) 7             { 8                 int rec = e.BytesTransferred; 9                 if (rec == 0)10                 {11                     CloseSocket(unit);12                     return;13                 }14                 if (client.Available > 0)15                 {16                     unit.tempArray.AddRange(e.Buffer);17                     buffer.FreeBuffer(unit.RecArg);18                     buffer.SetBuffer(unit.RecArg);19                     client.SendAsync(unit.RecArg);20                     return;21                 }22                 byte[] data = e.Buffer;23                 int len = rec;24                 int offset = e.Offset;25                 if (unit.tempArray.Count != 0)26                 {27                     foreach (byte item in data)28                     {29                         if (item == 0) break;30                         unit.tempArray.Add(item);31                     }32                     data = unit.tempArray.ToArray(typeof(byte)) as byte[];33                     rec = data.Length;34                     offset = 0;35                     unit.tempArray.Clear();36                 }37 38                 string dataStr = Encoding.ASCII.GetString(data,offset,len);39                 if (OnReceive != null)40                     OnReceive(unit.Uid, dataStr);41 42                 if (!unit.State) return;43                 buffer.FreeBuffer(e);44                 buffer.SetBuffer(e);45                 client.ReceiveAsync(e);46             }47             //这里还多个了一个关闭当前连接48             else49             {50                 CloseSocket(unit);51             }52         }53 54         /// 55         /// 关闭一个连接单元56         /// 57         private void CloseSocket( ConnectionUnit unit )58         {59             //关闭并释放客户端socket的字眼60             if (unit.client != null)61             {62                 unit.client.Shutdown(SocketShutdown.Both);63                 unit.client.Dispose();64                 unit.client = null;65             }66             //Console.WriteLine(unit.Uid+" disconnect ");67             //把连接放回连接池68             pool.Push(unit);69             //释放并发信号70             semaphoreAccept.Release();71             //减少当前连接数72             Interlocked.Decrement(ref currentConnect);73         }

 

  发送消息:外放方法,在需要的时候自行调用方法发送。

1         ///  2         /// 发送消息 3         ///  4         ///  5         ///  6         public void SendMessage(string uid, string message) 7         { 8             sendLock.Reset(); 9             ConnectionUnit unit=pool.GetConnectionUnitByUID(uid);10             //如果获取不了连接单元就不发送了,11             if (unit == null)12             { 13                 if(OnSend!=null) OnSend(uid,"100");14                 sendLock.Set();15                 return;16             }17             byte[] datas = Encoding.ASCII.GetBytes(message);18             unit.SendArg.SetBuffer(datas, 0, datas.Length);19             unit.client.SendAsync(unit.SendArg);20             //阻塞当前线程,等到发送完成才释放21             sendLock.WaitOne();22         }23 24         void SendArg_Completed(object sender, SocketAsyncEventArgs e)25         {26             Socket client = sender as Socket;27             ConnectionUnit unit = e.UserToken as ConnectionUnit;28             //这里的消息码有三个,2字头的是成功的,1字头是不成功的29             //101是未知错误,100是客户端不在线30             if (e.SocketError == SocketError.Success)31                 if (OnSend != null) OnSend(unit.Uid, "200");32             else if (OnSend != null) OnSend(unit.Uid, "101");33             //释放信号,以便下次发送消息执行34             sendLock.Set();35         }

 

  下面则是类里面的一些字段信息和构造函数

1         ///  2         /// 初始化池的互斥体 3         ///  4         private Mutex mutex = new Mutex(); 5  6         ///  7         /// Accept限制信号 8         ///  9         private Semaphore semaphoreAccept;10 11         /// 12         /// Accept信号13         /// 14         private static ManualResetEvent acceptLock = new ManualResetEvent(false);15 16         /// 17         /// Send信号18         /// 19         private static ManualResetEvent sendLock = new ManualResetEvent(false);20 21         /// 22         /// 最大并发数(连接数)23         /// 24         private int maxConnect;25 26         /// 27         /// 当前连接数(并发数)28         /// 29         private int currentConnect;30 31         /// 32         /// 缓冲池33         /// 34         private BufferManager buffer;35 36         /// 37         /// SocketasyncEventArgs池38         /// 39         private SocketAsyncEventArgsPool pool;40 41         /// 42         /// 服务端Socket43         /// 44         private Socket server;45 46         /// 47         /// 完成接受的委托48         /// 49         public delegate void AcceptHandler(string uid);50 51         /// 52         /// 完成发送的委托53         /// 54         public delegate void SendHandler(string uid, string result);55 56         /// 57         /// 完成接收的委托58         /// 59         public delegate void RecevieHandler(string uid, string data);60 61         /// 62         /// 完成接受事件63         /// 64         public event AcceptHandler OnAccept;65 66         /// 67         /// 完成发送事件68         /// 69         public event SendHandler OnSend;70 71         /// 72         /// 完成接收事件73         /// 74         public event RecevieHandler OnReceive;75 76         /// 77         /// 构造函数78         /// 79         /// 单元缓冲区大小80         /// 并发总数81         public SocketPoolController(int buffersize, int maxCount)82         {83             buffer = new BufferManager(buffersize * maxCount,buffersize);84             this.currentConnect = 0;85             this.maxConnect = maxCount;86             this.currentConnect = 0;87             this.pool = new SocketAsyncEventArgsPool();88             //设置并发数信号,经试验过是并发数-1才对89             this.semaphoreAccept = new Semaphore(maxCount-1, maxCount-1);90             InitPool();91         }

 

构造函数里用到的方法

1         ///  2         /// 初始化SocketAsyncEventArgs池 3         /// 这里主要是给空闲栈填充足够的实例 4         ///  5         private void InitPool() 6         { 7             ConnectionUnit unit = null; 8             for (int i = 0; i < maxConnect; i++) 9             {10                 unit = new ConnectionUnit();11                 unit.Uid = "-1";12                 unit.RecArg = new SocketAsyncEventArgs();13                 unit.RecArg.Completed += new EventHandler
(RecArg_Completed);14 unit.SendArg = new SocketAsyncEventArgs();15 unit.SendArg.Completed += new EventHandler
(SendArg_Completed);16 this.pool.Push(unit);17 }18 }

 

  其他外放专门控制池的方法

1         ///  2         /// 启动池 3         ///  4         /// 服务端的IP 5         /// 端口 6         public void RunPool(string ipAddress, int port) 7         { 8             IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse(ipAddress), port); 9             server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);10             server.Bind(endpoint);11             server.Listen(100);12 13             //调用方法异步Accept客户端的连接14             MyAsyncAccept();15             //设置信号,防止再池在已经启动的情况下再次启动16             mutex.WaitOne();17         }18 19         /// 20         /// 停止池21         /// 22         public void StopPool()23         {24             //把服务端的socket关了25             if (server != null)26                 server.Close();27             //释放互斥信号,等待下次启动28             mutex.ReleaseMutex();29             //释放资源30             Dispose();31         }

 

  要把这个操作类与现实的事物类比的话,这个与我们平常跟某个或一堆人聊天时差不多,别人说的东西,不用你自己控制你都会听得到(排除带上耳机,塞住耳朵这种极端的情况),所以接收消息那个方法就不需要了,而你要说什么这个要靠个人控制,而事件那些也好类比,OnAccept就相当于某人加入到这次聊天中你会做些什么(say hello 或者无视),OnRecevie就听到别人说什么自己有什么反应,OnSend就自己说完话之后有什么反应(有时候发现说错了会纠正,有时觉得自己说得好笑的也笑一场诸如此类)。

  在使用的时候可以这样子

1             SocketPoolController pool; 2             pool = new SocketPoolController(32 * 1024, 1000); 3             pool.OnReceive += new SocketPoolController.RecevieHandler(pool_OnReceive); 4             pool.OnSend += new SocketPoolController.SendHandler(pool_OnSend); 5             pool.OnAccept += new SocketPoolController.AcceptHandler(pool_OnAccept); 6             pool.RunPool("127.0.0.1", 8081); 7             Console.WriteLine("Pool has run\r\npress any key to stop..."); 8             Console.ReadKey(); 9             pool.StopPool();10             Console.WriteLine("Pool has stop\r\npress any key to exit...");11             Console.ReadLine();

 

  在池里面有一部分地方看似跟用同步的差不多,像接受客户端的连接,发送消息这些地方。可是用这种异步,万一客户端突然断开连接也不会有同步那样马上抛异常。还有一点的是在这份代码里面缺少了对异常的捕捉,有一部分错误我在测试的过程中设了判断避开了。以前跟某只猫提过Socket编程会与多线程一起使用,我也觉得是这样,在之前一篇博文 《Socket一对多通信》里我也用到线程,后来的异步通信也是有线程的,不过不是.net framework自行创建的。看了田大哥的博文给我的另一个收获是信号量的使用,在以前不懂得使用信号量,只会设置一大堆标识状态的布尔值或整形的变量来计数判断。田大哥的博文介绍的时高性能的Socket,而我的这个应该性能不会高到哪里去。上面有什么说错的请各位指出,有什么说漏的,请各位提点,多多指导。谢谢!

整份源码

 

转载于:https://www.cnblogs.com/zhaolizhe/p/6923567.html

你可能感兴趣的文章
Android之等比例显示图片
查看>>
HTML5 data-* 自定义属性
查看>>
linux系统装windows时需要注意的问题
查看>>
android textview 行间距
查看>>
HDU-4529 郑厂长系列故事——N骑士问题 状态压缩DP
查看>>
[JS5] 利用onload执行脚本
查看>>
剑指OFFER之矩形覆盖(九度OJ1390)
查看>>
Scrum 学习笔记
查看>>
Ext.form.ComboBox常用属性详解
查看>>
HTTP Header 详解
查看>>
Java中的HashMap 浅析
查看>>
调和映射的Bochner公式
查看>>
windows批处理
查看>>
网页上的摄影展:等高响应布局实现
查看>>
IE无法打开internet网站已终止操作的解决的方法
查看>>
Plus One leetcode java
查看>>
FineUI(专业版)公测版发布(这速度,真TM快!)
查看>>
boost中asio网络库多线程并发处理实现,以及asio在多线程模型中线程的调度情况和线程安全。...
查看>>
LINQ To SQL 语法及实例大全
查看>>
sqlserver锁机制详解(sqlserver查看锁)
查看>>