IT技术互动交流平台

服务器主逻辑代码的重构

作者:archy_yu  来源:IT165收集  发布日期:2014-04-29 20:28:01

      不知道前主程是处于什么目的,总之我接手这套程序的时候,出现了超级多的问题,也发现了超级多的问题。

     比如说吧,接受网络消息逻辑是线程独立的,而发送消息给客户端缺阻塞在了逻辑线程里面;原本可以放在一个进程里面处理的逻辑,却分散在了四个进程里面去处理,导致我完成一个功能,大部分时间要话费了进程之间的玩家信息的同步上面,在我无法忍受的情况下,我终于是用NIO将网络底层从新写了,而且将四个进程合并,但是在很多逻辑上还是尽量保持了和原逻辑处理的兼容!

      先说说这个重构的底层吧!

     我们看下最重要的ClientHandle类,主要处理每个连接的收发数据的!

public class ClientHandle implements ISession
{
	public final static int RW_BUFFER_SIZE = 1024;
	private SocketChannel socket = null;
	
	private java.nio.ByteBuffer reader = java.nio.ByteBuffer.allocate(RW_BUFFER_SIZE);
	private java.nio.ByteBuffer writer = java.nio.ByteBuffer.allocate(4*RW_BUFFER_SIZE);
	
	BlockingQueue<ByteBuffer> writeQueue = new LinkedBlockingQueue<ByteBuffer>();
	
	private IPlayer player = null;
	
	private boolean active = false;

   包含SocketChannel对象不用说了,reader和writer是用来做消息收发的缓冲的,因为服务器广播的压力会大一些,所以将writer的大小设置为reader的4倍,当然这个可以调整。

     writeQueue是用来存储需要发送给客户端的ByteBuffer,每次在这个链接可以写数据的时候,就将writeQueue里面存储的数据转移到writer中,并且一次发送,减少了writer的系统调用次数。ByteBuffer的结构简单说下,不同于java.nio.ByteBuffer,而是自己封装的一个消息解析器,给出源代码

package NetBase;

/**
 * 类说明:字节缓存类,字节操作高位在前,低位在后
 * 
 * @version 1.0
 * @author fxxxysh <hanshuang@linekong.com>
 */

public class ByteBuffer
{

	/* static fields */
	/** 默认的初始容量大小 */
	public static final int CAPACITY = 32;

	/** 默认的动态数据或文字的最大长度,400k */
	public static final int MAX_DATA_LENGTH = 400 * 1024;

	/* fields */
	/** 字节数组 */
	byte[] bytes;

	/** 字节缓存的长度 */
	int top;

	/** 字节缓存的偏移量 */
	int offset;

	/* constructors */
	/** 按默认的大小构造一个字节缓存对象 */
	public ByteBuffer()
	{
		this(CAPACITY);
	}

	/** 按指定的大小构造一个字节缓存对象 */
	public ByteBuffer(int capacity)
	{
		if (capacity < 1)
			throw new IllegalArgumentException(getClass().getName()
					+ " <init>, invalid capatity:" + capacity);
		bytes = new byte[capacity];
		top = 0;
		offset = 0;
	}

	/** 按指定的字节数组构造一个字节缓存对象 */
	public ByteBuffer(byte[] data)
	{
		if (data == null)
			throw new IllegalArgumentException(getClass().getName()
					+ " <init>, null data");
		bytes = data;
		top = data.length;
		offset = 0;
	}

	/** 按指定的字节数组构造一个字节缓存对象 */
	public ByteBuffer(byte[] data, int index, int length)
	{
		if (data == null)
			throw new IllegalArgumentException(getClass().getName()
					+ " <init>, null data");
		if (index < 0 || index > data.length)
			throw new IllegalArgumentException(getClass().getName()
					+ " <init>, invalid index:" + index);
		if (length < 0 || data.length < index + length)
			throw new IllegalArgumentException(getClass().getName()
					+ " <init>, invalid length:" + length);
		bytes = data;
		top = index + length;
		offset = index;
	}

	/* properties */
	/** 得到字节缓存的容积 */
	public int capacity()
	{
		return bytes.length;
	}

	/** 设置字节缓存的容积,只能扩大容积 */
	public void setCapacity(int len)
	{
		int c = bytes.length;
		if (len <= c)
			return;
		for (; c < len; c = (c << 1) + 1)
			;
		byte[] temp = new byte[c];
		System.arraycopy(bytes, 0, temp, 0, top);
		bytes = temp;
	}

	/** 得到字节缓存的长度 */
	public int top()
	{
		return top;
	}

	/** 设置字节缓存的长度 */
	public void setTop(int top)
	{
		if (top < offset)
			throw new IllegalArgumentException(this + " setTop, invalid top:"
					+ top);
		if (top > bytes.length)
			setCapacity(top);
		this.top = top;
	}

	/** 得到字节缓存的偏移量 */
	public int offset()
	{
		return offset;
	}

	/** 设置字节缓存的偏移量 */
	public void setOffset(int offset)
	{
		if (offset < 0 || offset > top)
			throw new IllegalArgumentException(this
					+ " setOffset, invalid offset:" + offset);
		this.offset = offset;
	}

	/** 得到字节缓存的使用长度 */
	public int length()
	{
		return top - offset;
	}

	/** 得到字节缓存的字节数组,一般使用toArray()方法 */
	public byte[] getByteArray()
	{
		return bytes;
	}

	/* methods */
	/* byte methods */
	/** 得到指定偏移位置的字节 */
	public byte read(int pos)
	{
		return bytes[pos];
	}

	/** 设置指定偏移位置的字节 */
	public void write(int b, int pos)
	{
		bytes[pos] = (byte) b;
	}

	/* read methods */
	/**
	 * 按当前偏移位置读入指定的字节数组
	 * 
	 * @param data
	 *            指定的字节数组
	 * @param pos
	 *            指定的字节数组的起始位置
	 * @param len
	 *            读入的长度
	 */
	public void read(byte[] data, int pos, int len)
	{
		System.arraycopy(bytes, offset, data, pos, len);
		offset += len;
	}

	/** 读出一个布尔值 */
	public boolean readBoolean()
	{
		return (bytes[offset++] != 0);
	}

	/** 读出一个字节 */
	public byte readByte()
	{
		return bytes[offset++];
	}

	/** 读出一个无符号字节 */
	public int readUnsignedByte()
	{
		return bytes[offset++] & 0xff;
	}

	/** 读出一个字符 */
	public char readChar()
	{
		return (char) readUnsignedShort();
	}

	/** 读出一个短整型数值 */
	public short readShort()
	{
		return (short) readUnsignedShort();
	}

	/** 读出一个无符号的短整型数值 */
	public int readUnsignedShort()
	{
		int pos = offset;
		offset += 2;
		return (bytes[pos + 1] & 0xff) + ((bytes[pos] & 0xff) << 8);
	}

	/** 读出一个整型数值 */
	public int readInt()
	{
		int pos = offset;
		offset += 4;
		return (bytes[pos + 3] & 0xff) + ((bytes[pos + 2] & 0xff) << 8)
				+ ((bytes[pos + 1] & 0xff) << 16) + ((bytes[pos] & 0xff) << 24);
	}

	/** 读出一个浮点数值 */
	public float readFloat()
	{
		return Float.intBitsToFloat(readInt());
	}

	/** 读出一个长整型数值 */
	public long readLong()
	{
		int pos = offset;
		offset += 8;
		return (bytes[pos + 7] & 0xffL) + ((bytes[pos + 6] & 0xffL) << 8)
				+ ((bytes[pos + 5] & 0xffL) << 16)
				+ ((bytes[pos + 4] & 0xffL) << 24)
				+ ((bytes[pos + 3] & 0xffL) << 32)
				+ ((bytes[pos + 2] & 0xffL) << 40)
				+ ((bytes[pos + 1] & 0xffL) << 48)
				+ ((bytes[pos] & 0xffL) << 56);
	}

	/** 读出一个双浮点数值 */
	public double readDouble()
	{
		return Double.longBitsToDouble(readLong());
	}

	/**
	 * 读出动态长度, 数据大小采用动态长度,整数类型下,最大为512M 1xxx,xxxx表示(0~0x80) 0~128B
	 * 01xx,xxxx,xxxx,xxxx表示(0~0x4000) 0~16K
	 * 001x,xxxx,xxxx,xxxx,xxxx,xxxx,xxxx,xxxx表示(0~0x20000000) 0~512M
	 */
	public int readLength()
	{
		int n = bytes[offset] & 0xff;
		if (n >= 0x80)
		{
			offset++;
			return n - 0x80;
		}
		else if (n >= 0x40)
			return readUnsignedShort() - 0x4000;
		else if (n >= 0x20)
			return readInt() - 0x20000000;
		else
			throw new IllegalArgumentException(this
					+ " readLength, invalid number:" + n);
	}

	/** 读出一个指定长度的字节数组,可以为null */
	public byte[] readData()
	{
		int len = readLength() - 1;
		if (len < 0)
			return null;
		if (len > MAX_DATA_LENGTH)
			throw new IllegalArgumentException(this
					+ " readData, data overflow:" + len);
		byte[] data = new byte[len];
		read(data, 0, len);
		return data;
	}

	/** 读出一个短字节数组,长度不超过254 */
	public byte[] readShortData()
	{
		int len = readUnsignedByte();
		if (len == 255)
			return null;
		byte[] data = new byte[len];
		if (len != 0)
			read(data, 0, len);
		return data;
	}

	/** 读出一个指定长度的字符串 */
	public String readString(int len)
	{
		byte[] data = new byte[len];
		if (len == 0)
			return "";
		read(data, 0, len);
		return new String(data);
	}

	/** 读出一个短字符串,长度不超过254 */
	public String readShortString()
	{
		int len = readUnsignedByte();
		if (len == 255)
			return null;
		return readString(len);
	}

	/** 读出一个字符串,长度不超过65534 */
	public String readString()
	{
		int len = readUnsignedShort();
		if (len == 65535)
			return null;
		return readString(len);
	}

	/** 读出一个指定长度和编码类型的字符串 */
	public String readUTF(String charsetName)
	{
		int len = readLength() - 1;
		if (len < 0)
			return null;
		if (len > MAX_DATA_LENGTH)
			throw new IllegalArgumentException(this
					+ " readUTF, data overflow:" + len);
		byte[] data = new byte[len];
		read(data, 0, len);
		if (charsetName == null)
			return new String(data);
		try
		{
			return new String(data, charsetName);
		}
		catch (Exception e)
		{
			throw new IllegalArgumentException(this
					+ " readUTF, invalid charsetName:" + charsetName);
		}
	}

	/** 读出一个指定长度的utf字符串 */
	public String readUTF()
	{
		int len = readLength() - 1;
		if (len < 0)
			return null;
		if (len == 0)
			return "";
		if (len > MAX_DATA_LENGTH)
			throw new IllegalArgumentException(this
					+ " readUTF, data overflow:" + len);
		StringBuffer sb = new StringBuffer(len);
		int pos = ByteKit.readUTF(bytes, offset, len, sb);
		if (pos > 0)
			throw new IllegalArgumentException(this
					+ " readUTF, format err, len=" + len + ", pos:" + pos);
		offset += len;
		return sb.toString();
	}

	/* write methods */
	/**
	 * 写入指定字节数组
	 * 
	 * @param data
	 *            指定的字节数组
	 * @param pos
	 *            指定的字节数组的起始位置
	 * @param len
	 *            写入的长度
	 */
	public void write(byte[] data, int pos, int len)
	{
		if (bytes.length < top + len)
			setCapacity(top + len);
		System.arraycopy(data, pos, bytes, top, len);
		top += len;
	}

	/** 写入一个布尔值 */
	public void writeBoolean(boolean b)
	{
		if (bytes.length < top + 1)
			setCapacity(top + CAPACITY);
		bytes[top++] = (byte) (b ? 1 : 0);
	}

	/** 写入一个字节 */
	public void writeByte(int b)
	{
		if (bytes.length < top + 1)
			setCapacity(top + CAPACITY);
		bytes[top++] = (byte) b;
	}

	/** 写入一个字符 */
	public void writeChar(int c)
	{
		writeShort(c);
	}

	/** 写入一个短整型数值 */
	public void writeShort(int s)
	{
		int pos = top;
		if (bytes.length < pos + 2)
			setCapacity(pos + CAPACITY);
		bytes[pos] = (byte) (s >>> 8);
		bytes[pos + 1] = (byte) s;
		top += 2;
	}

	/** 在指定位置写入一个短整型数值,length不变 */
	public void writeShort(int s, int pos)
	{
		if (bytes.length < pos + 2)
			setCapacity(pos + CAPACITY);
		bytes[pos] = (byte) (s >>> 8);
		bytes[pos + 1] = (byte) s;
	}

	/** 写入一个整型数值 */
	public void writeInt(int i)
	{
		int pos = top;
		if (bytes.length < pos + 4)
			setCapacity(pos + CAPACITY);
		bytes[pos] = (byte) (i >>> 24);
		bytes[pos + 1] = (byte) (i >>> 16);
		bytes[pos + 2] = (byte) (i >>> 8);
		bytes[pos + 3] = (byte) i;
		top += 4;
	}

	/** 在指定位置写入一个整型数值,length不变 */
	public void writeInt(int i, int pos)
	{
		if (bytes.length < pos + 4)
			setCapacity(pos + CAPACITY);
		bytes[pos] = (byte) (i >>> 24);
		bytes[pos + 1] = (byte) (i >>> 16);
		bytes[pos + 2] = (byte) (i >>> 8);
		bytes[pos + 3] = (byte) i;
	}

	/** 写入一个浮点数值 */
	public void writeFloat(float f)
	{
		writeInt(Float.floatToIntBits(f));
	}

	/** 写入一个长整型数值 */
	public void writeLong(long l)
	{
		int pos = top;
		if (bytes.length < pos + 8)
			setCapacity(pos + CAPACITY);
		bytes[pos] = (byte) (l >>> 56);
		bytes[pos + 1] = (byte) (l >>> 48);
		bytes[pos + 2] = (byte) (l >>> 40);
		bytes[pos + 3] = (byte) (l >>> 32);
		bytes[pos + 4] = (byte) (l >>> 24);
		bytes[pos + 5] = (byte) (l >>> 16);
		bytes[pos + 6] = (byte) (l >>> 8);
		bytes[pos + 7] = (byte) l;
		top += 8;
	}

	/** 写入一个双浮点数值 */
	public void writeDouble(double d)
	{
		writeLong(Double.doubleToLongBits(d));
	}

	/** 写入动态长度 */
	public void writeLength(int len)
	{
		if (len >= 0x20000000 || len < 0)
			throw new IllegalArgumentException(this
					+ " writeLength, invalid len:" + len);
		if (len >= 0x4000)
			writeInt(len + 0x20000000);
		else if (len >= 0x80)
			writeShort(len + 0x4000);
		else
			writeByte(len + 0x80);
	}

	/** 写入一个字节数组,可以为null */
	public void writeData(byte[] data)
	{
		writeData(data, 0, (data != null) ? data.length : 0);
	}

	/** 写入一个字节数组,可以为null */
	public void writeData(byte[] data, int pos, int len)
	{
		if (data == null)
		{
			writeLength(0);
			return;
		}
		writeLength(len + 1);
		write(data, pos, len);
	}

	/** 写入一个字符串,可以为null */
	public void writeString(String s)
	{
		if (s != null)
		{
			byte[] temp = s.getBytes();
			if (temp.length > 65534)
				throw new IllegalArgumentException(getClass().getName()
						+ " writeString, invalid s:" + s);
			writeShort(temp.length);
			if (temp.length != 0)
				write(temp, 0, temp.length);
		}
		else
			writeShort(65535);
	}

	/** 写入一个字符串,以指定的字符进行编码 */
	public void writeUTF(String str, String charsetName)
	{
		if (str == null)
		{
			writeLength(0);
			return;
		}
		byte[] data;
		if (charsetName != null)
		{
			try
			{
				data = str.getBytes(charsetName);
			}
			catch (Exception e)
			{
				throw new IllegalArgumentException(this
						+ " writeUTF, invalid charsetName:" + charsetName);
			}
		}
		else
			data = str.getBytes();
		writeLength(data.length + 1);
		write(data, 0, data.length);
	}

	/** 写入一个utf字符串,可以为null */
	public void writeUTF(String str)
	{
		writeUTF(str, 0, (str != null) ? str.length() : 0);
	}

	/** 写入一个utf字符串中指定的部分,可以为null */
	public void writeUTF(String str, int index, int length)
	{
		if (str == null)
		{
			writeLength(0);
			return;
		}
		int len = ByteKit.getUTFLength(str, index, length);
		writeLength(len + 1);
		int pos = top;
		if (bytes.length < pos + len)
			setCapacity(pos + len);
		ByteKit.writeUTF(str, index, length, bytes, pos);
		top += len;
	}

	/** 检查是否为相同类型的实例 */
	public boolean checkClass(Object obj)
	{
		return (obj instanceof ByteBuffer);
	}

	/** 在指定位置写入一个字节,length不变 */
	public void writeByte(int b, int pos)
	{
		if (bytes.length < pos + 1)
			setCapacity(pos + CAPACITY);
		bytes[pos] = (byte) b;
	}

	/** 得到字节缓存当前长度的字节数组 */
	public byte[] toByteArray()
	{
		byte[] data = new byte[top - offset];
		System.arraycopy(bytes, offset, data, 0, data.length);
		return data;
	}

	/** 清除字节缓存对象 */
	public void clear()
	{
		top = 0;
		offset = 0;
	}

	/* common methods */
	public int hashCode()
	{
		int hash = 17;
		for (int i = top - 1; i >= 0; i--)
			hash = 65537 * hash + bytes[i];
		return hash;
	}

	public boolean equals(Object obj)
	{
		if (this == obj)
			return true;
		if (!checkClass(obj))
			return false;
		ByteBuffer bb = (ByteBuffer) obj;
		if (bb.top != top)
			return false;
		if (bb.offset != offset)
			return false;
		for (int i = top - 1; i >= 0; i--)
		{
			if (bb.bytes[i] != bytes[i])
				return false;
		}
		return true;
	}

	public String toString()
	{
		return super.toString() + "[" + top + "," + offset + "," + bytes.length
				+ "] ";
	}
}

   下面看下 ClientHandle的可读逻辑:

public int handleRead() throws IOException 
	{
		int r = this.socket.read(this.reader);
		if(r <= 0)
		{
			return -1;
		}
		this.reader.flip();
		
		ByteBuffer data = this.createBuffer();
		while(data != null)
		{
			this.reader.get(data.getByteArray(), data.top(), data.capacity());
			this.processData(data);
			data = this.createBuffer();
		}
		
		this.reader.clear();
		return 0;
	}

  依次将数据读入到reader中,并且按照LC(L表示长度,C表示内容)结构将reader中的数据解析成一个个ByteBuffer对象处理。下面是createBuffer函数和processData函数:

private ByteBuffer createBuffer()
	{
		if(reader.remaining() < 4)
		{
			return null;
		}
		
		int len = reader.getInt();
		if(len > reader.remaining())
		{
			reader.clear();
			return null;
		}
		
		if (len > 0 && len <= 10 * 1024)
		{
			return new ByteBuffer(len);
		}
		
		return null;
	}

public void processData(ByteBuffer data)
{
       player.insertData(data);
}

   这里要注意,1:解析reader中的消息一定要做容错处理;2:将解析的待处理包放到玩家身上,让玩家自己处理!

     发送函数的处理:

public int handleWrite() throws IOException
	{
		ByteBuffer data = writeQueue.poll();
		while(data != null)
		{
			this.writer.putInt(data.length());
			this.writer.put(data.toByteArray(), 0, data.length());
			data = writeQueue.poll();
		}
		
		this.writer.flip();
		if(!this.writer.hasRemaining())
		{
			this.writer.limit(this.writer.capacity());
			return 0;
		}
		
		this.socket.write(writer);
		
		if(this.writer.hasRemaining())
		{
			this.writer.compact();
			this.writer.position(this.writer.limit());
			this.writer.limit(this.writer.capacity());
		}
		else
		{
			this.writer.compact();
			this.writer.limit(this.writer.capacity());
		}
		
		return 0;
	}

  发送函数的处理相对复杂些,首先要做的就是每个连接的发送函数每100ms(可以调整)触发一次,每次触发时候,要将待发送的数据包bytebuffer填充到writer缓冲区,然后一次发送!

     管理协调这些链接的新建和处理都是使用了java nio的selector结构,具体的代码就不贴出来了,想要的可以联系我,需要注意的有两点,1:对于空闲连接的处理,2:对于发送数据的处理

     大致讲完了网络线程,那么讲一讲主逻辑线程,逻辑线程采用线程绑定地图的设计;在服务器启动之时,启动n(可以调整)个地图线程,每个地图线程绑定N(可以调整)个地图,这N个地图上的所有玩家的逻辑处理,都有地图所在线程来处理,具体处理方式:

     地图线程的主逻辑:

public class SceneThread implements Runnable
{
	
	List<IScene> scenes = new ArrayList<IScene>();
	private int index = 0;
	
	@Override
	public void run()
	{
		while(true)
		{
			try
			{
				synchronized (scenes)
				{
					for(IScene scene : this.scenes)
					{
						scene.beatHeart();
					}	
				}
				Thread.sleep(100);
			}
			catch (Exception e)
			{
				e.printStackTrace();
			}
		}
	}
	
	public void addScene(IScene scene)
	{
		synchronized (scenes)
		{
			this.scenes.add(scene);
		}
	}
	
	public void removeScene(IScene scene)
	{
		synchronized (scenes)
		{
			this.scenes.remove(scene);
		}
	}
	
	public void setIndex(int index)
	{
		this.index = index;
	}
	
	public String toString()
	{
		return "SceneThread : " + index;
	}
	
}

  场景Scene的心跳函数:

public class Scene implements IScene
{
        public void beatHeart()
	{
		long now = System.currentTimeMillis();
		List<IPlayer> players = null;
		synchronized (idPlayerMap)
		{
			players = new ArrayList<IPlayer>(idPlayerMap.values());	
		}
		
		for(IPlayer player : players)
		{
			player.beatHeart(now);
		}
		
	}
		
}

  玩家的心跳函数:

BlockingQueue<ByteBuffer> dataToProcess = new LinkedBlockingDeque<ByteBuffer>();
	
	public void insertData(ByteBuffer data)
	{
		this.dataToProcess.offer(data);
	}

public void beatHeart(long now)
{
		
		ByteBuffer data = this.dataToProcess.poll();
		while(data != null)
		{
			this.processData(data);
			data = this.dataToProcess.poll();
		}
                //.....处理心跳定时器,上一篇有讲到
} 

  好了,大概的服务器的主逻辑就这些了,是不是精简小巧。晚上的时候还做了一下广播压力测试,效果还不错!

     欢迎大家提出宝贵意见!

Tag标签: 逻辑   代码   服务器  
  • 专题推荐

About IT165 - 广告服务 - 隐私声明 - 版权申明 - 免责条款 - 网站地图 - 网友投稿 - 联系方式
本站内容来自于互联网,仅供用于网络技术学习,学习中请遵循相关法律法规