`
longzhun
  • 浏览: 360680 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

MemcachedClient

 
阅读更多

1. SET & ADD & REPLACE
a. 如果sanitizeKeys为true则key转变为UTF-8编码的字符串,遇异常如果errorHandler不为null则回调其
handleErrorOnSet函数,返回FALSE,value为null退出。
b. 找SockIOPool.SockIO根据key & hashCode获取一个SockIO,如果SockIO为null退出,回调handleErrorOnSet
函数,返回FALSE。如果传入的expiry为null则设置成0,表示这个操作永远不失效,如果不是则在指定到秒的时间
点把这个KEY-VALUE从MAP中清除而不是退一步。
c. value到byte[]的转换:如果NativeHandler能处理value,且参数asString=true表示value可以用其
toString()表示则其byte[]为value.toString的UTF-8编码的byte[],如不用toString表示则使用NativeHandler编码后
的byte[],同时flag标记NativeHandler编码;否则使用JAVA序列化机制实现到byte[]的步骤,同时flag标记序列化
过。如果上面的过程出错则回调errorHandler的handleErrorOnSet函数且还回b获取的SockIO,操作结束。接下
来对有必要压缩的byte[]使用GZIP进行压缩,如果出错只回调handleErrorOnSet而继续使用原先的byte[]。
d. 构造memcached指令“%s %s %d %d %d\r\n$value.byte[]$\r\n”依次填入指令名(set|add|replace)- key
- flag – expiry – byte[].length,使用byte[]填充$value.byte[]$,通过SockIO发送出去;然后读取来自服务器的响应
,使用READLINE,如果读取的STR为STORED表示操作成功,则还回SockIO,返回TRUE,如果是NOT_STORED则
表示操作没成功,如果是其它不应该出现的响应头文ERROR级别日志记录下,对于后两种情况还回SockIO,返回
FALSE。如果在上述网络操作中出现由于网络问题而引起的IOException则先回调handleErrorOnSet再ERROR级别
日志记录,接着关闭SockIO。
-------------------
2.GET & EXITS
a. 如果sanitizeKeys为true则key转变为UTF-8编码的字符串,遇异常如果errorHandler不为null则回调其
handleErrorOnGet函数,返回NULL。
b. 找SockIOPool.SockIO根据key & hashCode获取一个SockIO,如果SockIO为null退出,回调handleErrorOnGet
函数,返回NULL。
c. 构造memcached指令“get $key$\r\n”用key填充$key$,通过SockIO发送出去;然后读取来自服务器的响应,
死循环进行READLINE,如果读取的STR是END则退出循环;如果是以START开头的则以$1 $2 $3的规则解析STR,
其中$2是flag,$3是value.byte[].length。使用SockIO将长度为$3的byte[]填满,再清除其后的\r\n。如果flag标记
压缩则进行GZIP解压操作,过程中出现问题回调handleErrorOnGet函数并抛NestedIOException;如果flag标记序列
化过,则使用ObjectInputStream来读取流中的对象,为了解决分布式提供了ContextObjectInputStream,自己注入
ClassLoader来解决资源加载问题,序列化过程出错回调handleErrorOnGet函数,如果没有标记序列化过则在看
primitiveAsString|asString为true则表示value是以String的形式存放的则转换成String,否则使用NativeHandler
解码,解码过程中出错回调handleErrorOnGet函数并抛NestedIOException。在处理完START后继续读取期待END
的出现。在顺利处理完服务器输出后,还回SockIO,并返回解析出的对象。在做网络操作过程或者
NestedIOException导致的IOException则先回调handleErrorOnGet再关闭SockIO。
d. 对于EXITS而言其实就是GET操作,如果没有KEY的映射直接读取到END得到NULL,根据GET操作返回的是不
是NULL来判断是否存在。
-------------------
3. DELETE
a. 如果sanitizeKeys为true则key转变为UTF-8编码的字符串,遇异常如果errorHandler不为null则回调其
handleErrorOnDelete函数,返回FALSE。
b. 找SockIOPool.SockIO根据key & hashCode获取一个SockIO,如果SockIO为null退出,回调
handleErrorOnDelete函数,返回FALSE。
c. 构造memcached指令“delete $key$ [$expiry$]\r\n”使用key填充$key$,如果有失效时间的话则使用失效
d. 时间填充$expiry$。通过SockIO发送出去;然后读取来自服务器的响应,进行READLINE,如果STR是DELETED
则表示删除操作成功,然后还回SockIO并返回TRUE;如果是其他的还回SockIO并返回FALSE。在上述过程中如果
由于网络原因出现IOException则回调其handleErrorOnDelete函数,并关闭SockIO。
-------------------
4. COUNTER
针对计数器的操作有四类,storeCounter、getCounter、addOrIncr|addOrDecr、incr|decr。
a. storeCounter操作主要是将key-counter通过SET操作放入缓存中,无有效时间设置。
b. getCounter操作主要是通过GET操作以字符串形式获取key对应的counter,再进行到long的转换,如果在
GET操作中出现异常则调用handleErrorOnGet函数,并且返回-1。
c. addOrIncr|addOrDecr操作会先通过ADD操作,如果值已经存在则返回FALSE,则进一步处理,如果KEY不存
在则形成映射。这样在分布式或者多线程环境下通过memcached的原子性操作保证线程安全。跟zookeeper分布式
提供的函数一样。如果key已经存在则走incrdecr逻辑:
--------
    1. 如果sanitizeKeys为true则key转变为UTF-8编码的字符串,遇异常如果errorHandler不为null则回调其
    handleErrorOnGet函数,返回-1。
    2. 找SockIOPool.SockIO根据key & hashCode获取一个SockIO,如果SockIO为null退出,回调handleErrorOnSet
    函数,返回-1。
    3. 构造memcached指令“%s %s %d\r\n”依次填入指令名incr|decr,key, value。通过SockIO发送出去;然后
    读取来自服务器的响应,进行READLINE,如果读取的STR符合模式“\\d+”则表示操作成功,返回的服务器进行
    incr|decr操作后counter的值,然后还回SockIO,并返回转化为long的值,如果超过long的值范围回调
    handleErrorOnGet函数,返回-1;如果STR不符合模式“\\d+”则还回SockIO且返回-1。在上述过程中如果由于网络
    原因出现IOException则回调其handleErrorOnGet函数,并关闭SockIO且返回-1。
--------
d. incr|decr操作主要是针对已经存在的key的counter的操作,过程见上面的描述,如果在指令执行点key不
存在了则读取的STR是NOT_FOUND得到-1。
-------------------
5. MULTI
a. 如果sanitizeKeys为true则将String[] keys全部转换为UTF-8编码的字符串,对于其中出异常如果
errorHandler不为null则回调其handleErrorOnGet函数,或者key为NULL操作继续;如果keys存在对应hashCode的
根据key & hashCode获取一个SockIO,如果SockIO为null则不处理,操作继续;根据SockIO.host将keys按照host
分类,形成host -> “get keys[n] keys[m] …”的映射,在处理完后还回所有在这个步骤借用的SockIO。
b. 上步得到了每个机器对应的命令的MAP,再实例化一个用来存放最终结果的MAP。
c. 使用NIOLoader并发的做GET操作
-------
    1. Selector.open()获取用来这次并发GET的Selector,迭代每个机器对应的命令的MAP,针对每一个host获取
    一个SockIO,如果其中一个SockIO为NULL则退出整个获取操作[退出前进行Selector的关闭,所有
    SocketChannel关闭,所有SockIO关闭操作]。为每个SockIO建立底层数据结构:一个用来接收数据的
    List<ByteBuffer>,发往此机器的memcached指令,网络通信的SocketChannel句柄,且设置其为非阻塞模式,
    并到Selector上注册OP_WRITE事件。
    2. 进行while循环轮询Selector上监听的事件,退出的条件有两个:所有服务器的数据都读取完成或者超过了
    maxBusy时间设置。当SocketChannel能处理WRITE时,将发往此机器的memcached指令写入,再到Selector上
    注册OP_READ事件;当SocketChannel能处理READ时,到用来接收数据的List<ByteBuffer>获取当前用来接收数
    据的ByteBuffer,读入,这个链表是每次实例化8KB的空间,每次读取了数据后会检查读取的最后的几个字节是
    不是“END\r\n”,如果是接收数据完成,while循环检查退出处服务器的数据读取完成数目++,撤销Selector上
    监听。
    3. 步骤2结束有三种情况:在做网络操作时发生异常,回调handleErrorOnGet函数,并且结束整个MULTI
    操作[退出前进行Selector的关闭,所有SocketChannel关闭,所有SockIO关闭操作];顺利完成所有指令发生和
    数据的接收,则进行Selector的关闭,所有SocketChannel设置阻塞模式,所有SockIO归还操作,再进行数据整理
    操作;因为超时退出,则进行Selector的关闭,对于没有完成的进行所有SocketChannel关闭和所有SockIO关闭操
    作,对于完成进行所有SocketChannel设置阻塞模式和所有SockIO归还操作,再对完成的进行数据整理工作。
    4. 数据整理步骤,使用ByteBufArrayInputStream封装用来接收数据的List<ByteBuffer>,循环进行READLINE
    的操作,如果以VALUE开头则按模式“$0 $1 $2 $3”分割读取的STR,$0是VALUE,$1是key,$2是flag,$3是
    value.length,按照这些信息读取和解析value,然后将key-value放入用来存放最终结果的MAP中;如果STR是
    END则退出循环。
-------
d. MULTI操作分为两种,一种返回一个对应的Object[];一种是只返回正常流程走下来的key的放最终结果的MAP。
-------------------
6. FLUSHALL
a. 可以知道定点清除哪些服务器的数据或者全部的服务器,迭代需要清除的服务器,到连接池获取SockIO,如果
不为NULL,则构造memcached指令”flush_all\r\n”发送过去,再使用SockIO进行READLINE操作,如果读取的STR
是OK则清空操作成功。如果操作中没有发送网络异常则归还SockIO,否则关闭SockIO。
-------------------
7. 这是MemcachedClient操作层的行为,主要是SET & ADD & REPLACE,GET & EXITS,DELETE,COUNTER,
MULTI,FLUSHALL操作,其中COUNTER使用分步操作保证数据安全。这一层主要是使用底层的SockIOPool的
SockIO的池化管理,其使用的主要是根据key&hashCode或者host借用一个SockIO,用完以后归还SockIO,
或者在操作过程中发生网络异常或者数据由于超时关闭SockIO。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics