xiaobaoqiu Blog

Think More, Code Less

Java并发容器

这篇博客的目的是了解Java Concurrent包中几个主要的并发容器的实现原理.

这里将这些并发容器分为4类:

(1).BlockingQueue系列

BlockingQueue
BlockingDeque
ArrayBlockingQueue
LinkedBlockingDeque
LinkedBlockingQueue
PriorityBlockingQueue
DelayQueue

(2).Concurrent系列

ConcurrentHashMap
ConcurrentLinkedQueue
ConcurrentMap
ConcurrentNavigableMap

(3).SkipList系列

ConcurrentSkipListMap
ConcurrentSkipListSet

(4).CopyOnWrite系列

CopyOnWriteArrayList
CopyOnWriteArraySet

后续系列将分别介绍:

1.Java并发容器之BlockingQueue

2.Java并发容器之Concurrent

3.Java并发容器之SkipList

4.Java并发容器之CopyOnWrite

揭秘sun.misc.Unsafe

Java最初被设计为一种安全的受控环境.尽管如此,Java HotSpot还是包含了一个“后门”,它提供了一些可以直接操控内存和线程的低层次操作.这个后门类就是sun.misc.Unsafe,它被JDK广泛用于自己的包中,如java.nio和java.util.concurrent.但是丝毫不建议在生产环境中使用这个后门.因为这个API十分不安全、不轻便、而且不稳定.这个不安全的类提供了一个观察HotSpot JVM内部结构并且可以对其进行修改.

1.源代码及文档

源码:http://www.docjar.com/html/api/sun/misc/Unsafe.java.html

文档:http://www.docjar.com/docs/api/sun/misc/Unsafe.html

2.Unsafe身影

典型的在concurrent包中的Atomic系列的实现中,比如compareAndSet等Atomic实现就是借助于Unsafe类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class AtomicInteger extends Number implements java.io.Serializable {
    private static final Unsafe unsafe = Unsafe.getUnsafe();    //
    private static final long valueOffset;

    static {
      try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
      } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
}

3.Unsafe初始化

sun.misc.Unsafe这个类是如此地不安全,以至于JDK开发者增加了很多特殊限制来访问它.它的构造器是私有的,工厂方法getUnsafe()的调用器只能被Bootloader加载.

在使用Unsafe之前,我们需要创建Unsafe对象的实例.这并不像Unsafe unsafe = new Unsafe()这么简单,因为Unsafe的构造器是私有的.它也有一个静态的getUnsafe()方法,但如果你直接调用Unsafe.getUnsafe(),你可能会得到SecurityException异常.只能从受信任的代码中使用这个方法:

public static Unsafe getUnsafe() {
    Class cc = sun.reflect.Reflection.getCallerClass(2);
    if (cc.getClassLoader() != null)
        throw new SecurityException("Unsafe");
    return theUnsafe;
}

Unsafe类包含一个私有的、名为theUnsafe的实例,我们可以通过Java反射窃取该变量:

1
2
3
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
Unsafe unsafe = (Unsafe) f.get(null);

4.Unsafe API

sun.misc.Unsafe类包含105个方法.实际上,对各种实体操作有几组重要方法:

4.1 Info-仅返回一些低级的内存信息

addressSize
pageSize

4.2 Objects-提供用于操作对象及其字段的方法

allocateInstance
objectFieldOffset

4.3 Classes-提供用于操作类及其静态字段的方法

staticFieldOffset
defineClass
defineAnonymousClass
ensureClassInitialized

4.4 Arrays-操作数组

arrayBaseOffset
arrayIndexScale

4.5 Synchronization-低级的同步原语

monitorEnter
tryMonitorEnter
monitorExit
compareAndSwapInt
putOrderedInt

4.6 Memory-直接内存访问方法

allocateMemory
copyMemory
freeMemory
getAddress
getInt
putInt

5.有趣的Case

5.1 避免初始化

当你想要跳过对象初始化阶段,或绕过构造器的安全检查,或实例化一个没有任何公共构造器的类,allocateInstance方法是非常有用的.考虑以下类:

1
2
3
4
5
6
7
8
9
10
class A {
    private long a; // not initialized value

    //构造器
    public A() {
        this.a = 1; // initialization
    }

    public long a() { return this.a; }
}

使用构造器、反射和unsafe初始化它,将得到不同的结果.

1
2
3
4
5
6
7
8
A o1 = new A(); // 构造器
o1.a(); // prints 1

A o2 = A.class.newInstance(); // 反射
o2.a(); // prints 1

A o3 = (A) unsafe.allocateInstance(A.class); // unsafe
o3.a(); // prints 0

另外一种情况,当构造器的代价十分昂贵,我们可以绕过构造器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class ClassWithExpensiveConstructor {

    private final int value;

    private ClassWithExpensiveConstructor() {
    value = doExpensiveLookup();
    }

    private int doExpensiveLookup() {
    try {
      Thread.sleep(2000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return 1;
    }

    public int getValue() {
    return value;
    }
}

绕过昂贵的构造器:

1
2
3
4
5
6
@Test
public void testObjectCreation() throws Exception {
    ClassWithExpensiveConstructor instance = (ClassWithExpensiveConstructor)
    unsafe.allocateInstance(ClassWithExpensiveConstructor.class);
    assertEquals(0, instance.getValue());
}

5.2 内存崩溃(Memory corruption)

这对于每个C程序员来说是常见的.顺便说一下,它是绕过安全的常用技术.

考虑下那些用于检查“访问规则”的简单类:

1
2
3
4
5
6
7
class Guard {
    private int ACCESS_ALLOWED = 1;

    public boolean giveAccess() {
        return 42 == ACCESS_ALLOWED;
    }
}

客户端代码是非常安全的,并且通过调用giveAccess()来检查访问规则.可惜,对于客户,它总是返回false.只有特权用户可以以某种方式改变ACCESS_ALLOWED常量的值并且得到访问,即giveAccess()方法返回true.

实际上,这并不是真的.演示代码如下:

1
2
3
4
5
6
7
8
9
Guard guard = new Guard();
guard.giveAccess();   // false, no access

// by pass
Unsafe unsafe = getUnsafe();
Field f = guard.getClass().getDeclaredField("ACCESS_ALLOWED");
unsafe.putInt(guard, unsafe.objectFieldOffset(f), 42); // memory corruption

guard.giveAccess(); // true, access granted

现在所有的客户都拥有无限制的访问权限.

实际上,反射可以实现相同的功能.但值得关注的是,我们可以修改任何对象,甚至没有这些对象的引用.

例如,有一个guard对象,所在内存中的位置紧接着在当前guard对象之后.我们可以用以下代码来修改它的ACCESS_ALLOWED字段:

1
unsafe.putInt(guard, 16 + unsafe.objectFieldOffset(f), 42); // memory corruption

注意:我们不必持有这个对象的引用.16是Guard对象在32位架构上的大小.我们可以手工计算它,或者通过使用sizeOf方法.

5.2 sizeOf

使用objectFieldOffset方法可以实现C-风格(C-style)的sizeof方法.这个实现返回对象的自身内存大小:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static long sizeOf(Object o) {
    Unsafe u = getUnsafe();
    HashSet<Field> fields = new HashSet<Field>();
    Class c = o.getClass();
    while (c != Object.class) {
        for (Field f : c.getDeclaredFields()) {
            if ((f.getModifiers() & Modifier.STATIC) == 0) {
                fields.add(f);
            }
        }
        c = c.getSuperclass();
    }

    // get offset
    long maxSize = 0;
    for (Field f : fields) {
        long offset = u.objectFieldOffset(f);
        if (offset > maxSize) {
            maxSize = offset;
        }
    }

    return ((maxSize/8) + 1) * 8;   // padding
}

算法如下:通过所有非静态字段(包含父类的),获取每个字段的偏移量(offset),找到偏移最大值并填充字节数(padding).

如果我们仅读取对象的类结构大小值,sizeOf的实现可以更简单,这位于JVM 1.7 32 bit中的偏移量12.

1
2
3
4
public static long sizeOf(Object object){
    return getUnsafe().getAddress(
        normalize(getUnsafe().getInt(object, 4L)) + 12L);
}

normalize是一个为了正确内存地址使用,将有符号的int类型强制转换成无符号的long类型的方法.

1
2
3
4
private static long normalize(int value) {
    if(value >= 0) return value;
    return (~0L >>> 32) & value;
}

真棒,这个方法返回的结果与我们之前的sizeof方法一样.

实际上,对于良好、安全、准确的sizeof方法,最好使用 java.lang.instrument包,但这需要在JVM中指定agent选项.

5.3 浅拷贝(Shallow copy)

为了实现计算对象自身内存大小,我们可以简单地添加拷贝对象方法.标准的解决方案是使用Cloneable修改你的代码,或者在你的对象中实现自定义的拷贝方法,但它不会是多用途的方法.

浅拷贝:

1
2
3
4
5
6
7
static Object shallowCopy(Object obj) {
    long size = sizeOf(obj);
    long start = toAddress(obj);
    long address = getUnsafe().allocateMemory(size);
    getUnsafe().copyMemory(start, address, size);
    return fromAddress(address);
}

toAddress和fromAddress将对象转换为其在内存中的地址,反之亦然.

1
2
3
4
5
6
7
8
9
10
11
12
static long toAddress(Object obj) {
    Object[] array = new Object[] {obj};
    long baseOffset = getUnsafe().arrayBaseOffset(Object[].class);
    return normalize(getUnsafe().getInt(array, baseOffset));
}

static Object fromAddress(long address) {
    Object[] array = new Object[] {null};
    long baseOffset = getUnsafe().arrayBaseOffset(Object[].class);
    getUnsafe().putLong(array, baseOffset, address);
    return array[0];
}

这个拷贝方法可以用来拷贝任何类型的对象,动态计算它的大小.注意,在拷贝后,你需要将对象转换成特定的类型.

5.4 隐藏密码(Hide Password)

在Unsafe中,一个更有趣的直接内存访问的用法是,从内存中删除不必要的对象.

检索用户密码的大多数API的签名为byte[]或char[],为什么是数组呢?

这完全是出于安全的考虑,因为我们可以删除不需要的数组元素.如果将用户密码检索成字符串,这可以像一个对象一样在内存中保存,而删除该对象只需执行解除引用的操作.但是,这个对象仍然在内存中,由GC决定的时间来执行清除.

创建具有相同大小、假的String对象,来取代在内存中原来的String对象的技巧:

1
2
3
4
5
6
7
8
9
10
String password = new String("l00k@myHor$e");
String fake = new String(password.replaceAll(".", "?"));
System.out.println(password); // l00k@myHor$e
System.out.println(fake); // ????????????

getUnsafe().copyMemory(
          fake, 0L, null, toAddress(password), sizeOf(password));

System.out.println(password); // ????????????
System.out.println(fake); // ????????????

感觉很安全.

其实这并不安全.为了真正的安全,我们需要通过反射删除后台char数组:

1
2
3
4
5
6
Field stringValue = String.class.getDeclaredField("value");
stringValue.setAccessible(true);
char[] mem = (char[]) stringValue.get(password);
for (int i=0; i < mem.length; i++) {
  mem[i] = '?';
}

5.5 多继承(Multiple Inheritance)

Java中没有多继承,这是对的,除非我们可以将任意类型转换成我们想要的其他类:

long intClassAddress = normalize(getUnsafe().getInt(new Integer(0), 4L));
long strClassAddress = normalize(getUnsafe().getInt("", 4L));
getUnsafe().putAddress(intClassAddress + 36, strClassAddress);

这个代码片段将String类型添加到Integer超类中,因此我们可以强制转换,且没有运行时异常:

(String) (Object) (new Integer(666))

有一个问题,我们必须预先强制转换对象,以欺骗编译器.

5.6 动态类(Dynamic classes)

我们可以在运行时创建一个类,比如从已编译的class文件中.将类内容读取为字节数组,并正确地传递给defineClass方法.

byte[] classContents = getClassContent();
Class c = getUnsafe().defineClass(
              null, classContents, 0, classContents.length);
c.getMethod("a").invoke(c.newInstance(), null); // 1

从定义文件(class文件)中读取(代码)如下:

1
2
3
4
5
6
7
8
private static byte[] getClassContent() throws Exception {
    File f = new File("/home/mishadoff/tmp/A.class");
    FileInputStream input = new FileInputStream(f);
    byte[] content = new byte[(int)f.length()];
    input.read(content);
    input.close();
    return content;
}

当你必须动态创建类,而现有代码中有一些代理, 这是很有用的.

5.7 抛出异常(Throw an Exception)

不喜欢受检异常?没问题

getUnsafe().throwException(new IOException());

该方法抛出受检异常,但你的代码不必捕捉或重新抛出它,正如运行时异常一样.

5.8 大数组(Big Arrays)

正如你所知,Java数组大小的最大值为Integer.MAX_VALUE.使用直接内存分配,我们创建的数组大小受限于堆大小.

SuperArray的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class SuperArray {
    private final static int BYTE = 1;

    private long size;
    private long address;

    public SuperArray(long size) {
        this.size = size;
        address = getUnsafe().allocateMemory(size * BYTE);
    }

    public void set(long i, byte value) {
        getUnsafe().putByte(address + i * BYTE, value);
    }

    public int get(long idx) {
        return getUnsafe().getByte(address + idx * BYTE);
    }

    public long size() {
        return size;
    }
}

简单用法:

long SUPER_SIZE = (long)Integer.MAX_VALUE * 2;
SuperArray array = new SuperArray(SUPER_SIZE);
System.out.println("Array size:" + array.size()); // 4294967294
for (int i = 0; i < 100; i++) {
    array.set((long)Integer.MAX_VALUE + i, (byte)3);
    sum += array.get((long)Integer.MAX_VALUE + i);
}
System.out.println("Sum of 100 elements:" + sum);  // 300

实际上,这是堆外内存(off-heap memory)技术,在java.nio包中部分可用.

这种方式的内存分配不在堆上,且不受GC管理,所以必须小心Unsafe.freeMemory()的使用.它也不执行任何边界检查,所以任何非法访问可能会导致JVM崩溃.

这可用于数学计算,代码可操作大数组的数据.此外,这可引起实时程序员的兴趣,可打破GC在大数组上延迟的限制.

5.9 并发(Concurrency)

关于Unsafe的并发性.compareAndSwap方法是原子的,并且可用来实现高性能的、无锁的数据结构.

比如,考虑问题:在使用大量线程的共享对象上增长值.

首先,我们定义简单的Counter接口:

1
2
3
4
interface Counter {
    void increment();
    long getCounter();
}

然后,我们定义使用Counter的工作线程CounterClient:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class CounterClient implements Runnable {
    private Counter c;
    private int num;

    public CounterClient(Counter c, int num) {
        this.c = c;
        this.num = num;
    }

    @Override
    public void run() {
        for (int i = 0; i < num; i++) {
            c.increment();
        }
    }
}

测试代码:

int NUM_OF_THREADS = 1000;
int NUM_OF_INCREMENTS = 100000;
ExecutorService service = Executors.newFixedThreadPool(NUM_OF_THREADS);
Counter counter = ... // creating instance of specific counter
long before = System.currentTimeMillis();
for (int i = 0; i < NUM_OF_THREADS; i++) {
    service.submit(new CounterClient(counter, NUM_OF_INCREMENTS));
}
service.shutdown();
service.awaitTermination(1, TimeUnit.MINUTES);
long after = System.currentTimeMillis();
System.out.println("Counter result: " + c.getCounter());
System.out.println("Time passed in ms:" + (after - before));

第一个无锁版本的计数器:

1
2
3
4
5
6
7
8
9
10
11
12
13
class StupidCounter implements Counter {
    private long counter = 0;

    @Override
    public void increment() {
        counter++;
    }

    @Override
    public long getCounter() {
        return counter;
    }
}

输出:

Counter result: 99542945
Time passed in ms: 679

运行快,但没有线程管理,结果是不准确的.

第二次尝试,添加上最简单的java式同步:

1
2
3
4
5
6
7
8
9
10
11
12
13
class SyncCounter implements Counter {
    private long counter = 0;

    @Override
    public synchronized void increment() {
        counter++;
    }

    @Override
    public long getCounter() {
        return counter;
    }
}

输出:

Counter result: 100000000
Time passed in ms: 10136

激进的同步有效,但耗时长.

试试ReentrantReadWriteLock:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class LockCounter implements Counter {
    private long counter = 0;
    private WriteLock lock = new ReentrantReadWriteLock().writeLock();

    @Override
    public void increment() {
        lock.lock();
        counter++;
        lock.unlock();
    }

    @Override
    public long getCounter() {
        return counter;
    }
}

输出:

Counter result: 100000000
Time passed in ms: 8065

仍然正确,耗时较短.

atomics的运行效果如何?

1
2
3
4
5
6
7
8
9
10
11
12
13
class AtomicCounter implements Counter {
    AtomicLong counter = new AtomicLong(0);

    @Override
    public void increment() {
        counter.incrementAndGet();
    }

    @Override
    public long getCounter() {
        return counter.get();
    }
}

输出:

Counter result: 100000000
Time passed in ms: 6552

AtomicCounter的运行结果更好.

最后,试试Unsafe原始的compareAndSwapLong,看看它是否真的只有特权才能使用它?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class CASCounter implements Counter {
    private volatile long counter = 0;
    private Unsafe unsafe;
    private long offset;

    public CASCounter() throws Exception {
        unsafe = getUnsafe();
        offset = unsafe.objectFieldOffset(CASCounter.class.getDeclaredField("counter"));
    }

    @Override
    public void increment() {
        long before = counter;
        while (!unsafe.compareAndSwapLong(this, offset, before, before + 1)) {
            before = counter;
        }
    }

    @Override
    public long getCounter() {
        return counter;
    }

输出:

Counter result: 100000000
Time passed in ms: 6454

看起来似乎等价于atomics,实际上,atomics正是使用Unsafe实现.

实际上,这个例子很简单,但它展示了Unsafe的一些能力.

如我所说,CAS原语可以用来实现无锁的数据结构.背后的原理很简单:

有一些状态
创建它的副本
修改它
执行CAS
如果失败,重复尝试

实际上,现实中比你现象的更难.存在着许多问题,如ABA问题、指令重排序等.

如果你真的感兴趣,可以参考lock-free HashMap的精彩展示.

6.结论

即使Unsafe对应用程序很有用,但建议不要使用它.

参考

http://mishadoff.com/blog/java-magic-part-4-sun-dot-misc-dot-unsafe/ http://java.dzone.com/articles/understanding-sunmiscunsafe

Memcached网络模型

这篇文章的目的是学习Memcached网络模型相关的源代码.

Memcached采用了很典型的Master-Worker模型,采用的是多线程而不是多进程. 主线程(Master)接收连接, 然后把连接平分派给工作线程(Worker),工作线程处理业务逻辑.

核心的共享数据是消息队列,主线程会把收到的事件请求放入队列,随后调度程序会选择一个空闲的Worker线程来从队列中取出事件请求进行处理.

1.libevent简介

Memcached使用libevent实现事件循环,libevent在Linux环境下默认采用epoll作为IO多路复用方法. 用户线程使用libevent则通常按以下步骤: (1).用户线程通过event_init()函数创建一个event_base对象。event_base对象管理所有注册到自己内部的IO事件。多线程环境下,event_base对象不能被多个线程共享,即一个event_base对象只能对应一个线程。 (2).然后该线程通过event_add函数,将与自己感兴趣的文件描述符相关的IO事件,注册到event_base对象,同时指定事件发生时所要调用的事件处理函数(event handler)。服务器程序通常监听套接字(socket)的可读事件。比如,服务器线程注册套接字sock1的EV_READ事件,并指定event_handler1()为该事件的回调函数。libevent将IO事件封装成struct event类型对象,事件类型用EV_READ/EV_WRITE等常量标志。 (3).注册完事件之后,线程调用event_base_loop进入循环监听(monitor)状态。该循环内部会调用epoll等IO复用函数进入阻塞状态,直到描述符上发生自己感兴趣的事件。此时,线程会调用事先指定的回调函数处理该事件。例如,当套接字sock1发生可读事件,即sock1的内核buff中已有可读数据时,被阻塞的线程立即返回(wake up)并调用event_handler1()函数来处理该次事件。 (4).处理完这次监听获得的事件后,线程再次进入阻塞状态并监听,直到下次事件发生。

2.Memcached网络模型

大致的图示如下:

2.1主要数据结构

首先是CQ_ITEM, CQ_ITEM实际上是主线程accept后返回的已建立连接的fd的封装:

thread.c
1
2
3
4
5
6
7
8
9
10
/* An item in the connection queue. */
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
    int               sfd;
    enum conn_states  init_state;
    int               event_flags;
    int               read_buffer_size;
    enum network_transport     transport;
    CQ_ITEM          *next;
};

CQ是一个管理CQ_ITEM的单向链表:

thread.c
1
2
3
4
5
6
7
/* A connection queue. */
typedef struct conn_queue CQ;
struct conn_queue {
    CQ_ITEM *head;
    CQ_ITEM *tail;
    pthread_mutex_t lock;
};

LIBEVENT_THREAD是Memcached对线程结构的封装,每个线程都包含一个CQ队列,一条通知管道pipe 和一个libevent的实例event_base :

thread.c
1
2
3
4
5
6
7
8
9
10
11
typedef struct {
    pthread_t thread_id;        /* unique ID of this thread */
    struct event_base *base;    /* libevent handle this thread uses */
    struct event notify_event;  /* listen event for notify pipe */
    int notify_receive_fd;      /* receiving end of notify pipe */
    int notify_send_fd;         /* sending end of notify pipe */
    struct thread_stats stats;  /* Stats generated by this thread */
    struct conn_queue *new_conn_queue; /* CQ队列 */
    cache_t *suffix_cache;      /* suffix cache */
    uint8_t item_lock_type;     /* use fine-grained or global item lock */
} LIBEVENT_THREAD;

2.2主流程

在memcached.c的main函数中展示了客户端请求处理的主流程:

(1).对主线程的libevent做了初始化

1
2
/* initialize main thread libevent instance */
 main_base = event_init();

(2).初始化所有的线程(包括Master和Worker线程),并启动

1
2
/* start up worker threads if MT mode */
thread_init(settings.num_threads, main_base);

其中settings.num_threads表示线程数目,默认是4个:

1
settings.num_threads = 4;         /* N workers */

下面简单分析thread_init的核心代码(thread.c):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/*
 * Initializes the thread subsystem, creating various worker threads.
 *
 * nthreads  Number of worker event handler threads to spawn
 * main_base Event base for main thread
 */
void thread_init(int nthreads, struct event_base *main_base) {

    ...//省略若干代码

    //threads的声明在thread.c头部,用于保存所有的线程
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
    if (! threads) {
        perror("Can't allocate thread descriptors");
        exit(1);
    }

    dispatcher_thread.base = main_base;
    dispatcher_thread.thread_id = pthread_self();

    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) {    //创建管道
            perror("Can't create notify pipe");
            exit(1);
        }

        threads[i].notify_receive_fd = fds[0];  //读端
        threads[i].notify_send_fd = fds[1];     //写端

        //创建所有workers线程的libevent实例
        setup_thread(&threads[i]);
        /* Reserve three fds for the libevent base, and two for the pipe */
        stats.reserved_fds += 5;
    }

    //创建线程
    /* Create threads after we've done all the libevent setup. */
    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }

    //等待所有线程启动起来之后,这个函数再返回
    /* Wait for all the threads to set themselves up before returning. */
    pthread_mutex_lock(&init_lock);
    wait_for_thread_registration(nthreads);
    pthread_mutex_unlock(&init_lock);
}

thread_init首先malloc线程的空间,然后第一个threads作为主线程,其余都是workers线程 然后为每个线程创建一个pipe,这个pipe被用来作为主线程通知workers线程有新的连接到达.

其中pipe()函数用于创建管道,管道两端可分别用描述字fds[0]以及fds[1]来描述.需要注意的是,管道的两端是固定的。即一端只能用于读,由描述字fds[0]表示,称其为管道读端;另一端则只能用于写,由描述字fds[1]来表示,称其为管道写端.

setup_thread主要是创建所有workers线程的libevent实例(主线程的libevent实例在main函数中已经建立),setup_thread()的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*
 * Set up a thread's information.
 */
static void setup_thread(LIBEVENT_THREAD *me) {
    me->base = event_init();
    if (! me->base) {
        fprintf(stderr, "Can't allocate event base\n");
        exit(1);
    }

    //注意这里只有notify_receive_fd,即读端口
    /* Listen for notifications from other threads */
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);

    if (event_add(&me->notify_event, 0) == -1) {
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
        exit(1);
    }

    me->new_conn_queue = malloc(sizeof(struct conn_queue));
    if (me->new_conn_queue == NULL) {
        perror("Failed to allocate memory for connection queue");
        exit(EXIT_FAILURE);
    }
    cq_init(me->new_conn_queue);

    ...
}

这里会为所有worker thread线程注册与notify_event_fd描述符有关的IO事件,这里的notify_event_fd描述符是该worker thread线程与main thread线程通信的管道的接收端(读)描述符。通过注册与该描述符有关的IO事件,worker thread线程就能监听main thread线程发给自己的数据(即事件).

注意这里event_set中的thread_libevent_process参数,其意义在于监听Worker线程与main thread线程通信的管道上的可读事件,并指定用thread_libevent_process()函数处理该事件,即每次管道读端有数据刻度,即触发thread_libevent_process过程.

thread_libevent_process的代码如下,其中最重要的一个就是数据为c的,后续会详细分析这块代码.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/*
 * Processes an incoming "handle a new connection" item. This is called when
 * input arrives on the libevent wakeup pipe.
 */
static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
    CQ_ITEM *item;
    char buf[1];

    //从管道中读数据
    if (read(fd, buf, 1) != 1)
        if (settings.verbose > 0)
            fprintf(stderr, "Can't read from libevent pipe\n");

    switch (buf[0]) {
    case 'c':   //c表示有新的连接请求被主线程分配到当前Worker线程
    item = cq_pop(me->new_conn_queue);

    if (NULL != item) {
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);
        if (c == NULL) {
            if (IS_UDP(item->transport)) {
                fprintf(stderr, "Can't listen for events on UDP socket\n");
                exit(1);
            } else {
                if (settings.verbose > 0) {
                    fprintf(stderr, "Can't listen for events on fd %d\n",
                        item->sfd);
                }
                close(item->sfd);
            }
        } else {
            c->thread = me;
        }
        cqi_free(item);
    }
        break;
    /* we were told to flip the lock type and report in */
    case 'l':
        ...
    case 'g':
        ...
    }
}

thread_init函数中create_worker实际上就是真正启动了线程, create_worker的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
 * Creates a worker thread.
 */
static void create_worker(void *(*func)(void *), void *arg) {
    pthread_t       thread;
    pthread_attr_t  attr;
    int             ret;

    pthread_attr_init(&attr);

    if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
        fprintf(stderr, "Can't create thread: %s\n",
                strerror(ret));
        exit(1);
    }
}

pthread_create是创建线程函数,第三个参数是线程运行函数的起始地址,这里即worker_libevent函数,该方法执行event_base_loop启动该线程的libevent.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
 * Worker thread: main event loop
 */
static void *worker_libevent(void *arg) {
    LIBEVENT_THREAD *me = arg;

    /* Any per-thread setup can happen here; thread_init() will block until
     * all threads have finished initializing.
     */

    /* set an indexable thread-specific memory item for the lock type.
     * this could be unnecessary if we pass the conn *c struct through
     * all item_lock calls...
     */
    me->item_lock_type = ITEM_LOCK_GRANULAR;
    pthread_setspecific(item_lock_type_key, &me->item_lock_type);

    register_thread_initialized();

    event_base_loop(me->base, 0);
    return NULL;
}

这里我们需要记住每个workers线程目前只在自己线程的管道的读端有数据时可读时触发,并调用 thread_libevent_process方法.

(3).主线程调用

1
2
/* create the listening socket, bind it, and init */
server_sockets(settings.port, tcp_transport, portnumber_file)

在worker thread线程启动后,main thread线程就要创建监听套接字(listening socket)来等待客户端连接请求。这个方法主要是封装了创建监听socket,绑定地址,设置非阻塞模式并注册监听socket的libevent 读事件等一系列操作.

套接字被封装成conn对象,表示与客户端的连接,定义十分庞大(见memcached.h).

端口号默认是11211:

1
settings.port = 11211;

server_sockets函数主要调用server_socket()函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/**
 * Create a socket and bind it to a specific port number
 * @param interface the interface to bind to
 * @param port the port number to bind to
 * @param transport the transport protocol (TCP / UDP)
 * @param portnumber_file A filepointer to write the port numbers to
 *        when they are successfully added to the list of ports we
 *        listen on.
 */
static int server_socket(const char *interface,
                         int port,
                         enum network_transport transport,
                         FILE *portnumber_file) {

    ...//省略若干代码

    //主机名到地址解析,结果存在ai中,为addrinfo的链表
    error= getaddrinfo(interface, port_buf, &hints, &ai);
    if (error != 0) {
        if (error != EAI_SYSTEM)
          fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
        else
          perror("getaddrinfo()");
        return 1;
    }

    for (next= ai; next; next= next->ai_next) {
        conn *listen_conn_add;
        if ((sfd = new_socket(next)) == -1) {   //创建socket
            /* getaddrinfo can return "junk" addresses,
             * we make sure at least one works before erroring.
             */
            if (errno == EMFILE) {
                /* ...unless we're out of fds */
                perror("server_socket");
                exit(EX_OSERR);
            }
            continue;
        }
        //IPV4地址,设置socket选项
        setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
        ...//省略若干代码

        //socket和地址绑定
        if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
            if (errno != EADDRINUSE) {
                perror("bind()");
                close(sfd);
                freeaddrinfo(ai);
                return 1;
            }
            close(sfd);
            continue;
        } else {
            success++;
            ...//省略若干代码
        }

        if (IS_UDP(transport)) {
            ...//省略若干代码
        } else {
            if (!(listen_conn_add = conn_new(sfd, conn_listening,
                                             EV_READ | EV_PERSIST, 1,
                                             transport, main_base))) {
                fprintf(stderr, "failed to create listening connection\n");
                exit(EXIT_FAILURE);
            }
            listen_conn_add->next = listen_conn;
            listen_conn = listen_conn_add;
        }
    }

    freeaddrinfo(ai);

    /* Return zero iff we detected no errors in starting up connections */
    return success == 0;
}

conn_new()是这里的最关键的一个函数,此函数负责将原始套接字封装成为一个conn对象,同时会注册与该conn对象相关的IO事件,并指定该连接(conn)的初始状态。这里要注意的是listening socket的conn对象被初始化为conn_listening状态.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) {

    ...//省略若干代码

    //设置fd和初始状态
    c->sfd = sfd;
    c->state = init_state;

    //注册与该连接有关的IO事件
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    c->ev_flags = event_flags;

    if (event_add(&c->event, 0) == -1) {
        perror("event_add");
        return NULL;
    }

    ...//
}

所有conn对象IO事件相关的处理函数都是event_handler()函数,这个函数主要是调用drive_machine()函数:

1
2
3
4
5
6
void event_handler(const int fd, const short which, void *arg) {
    conn *c;
    ...
    drive_machine(c);
    ...
}

drive_machine这个函数就全权负责处理与客户连接相关的事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static void drive_machine(conn *c) {
    ...
    assert(c != NULL);

    while (!stop) {
        switch(c->state) {
        case conn_listening:
            addrlen = sizeof(addr);
            sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
            ...

            if (settings.maxconns_fast &&
                ...
            } else {
                dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, tcp_transport);
            }

            stop = true;
            break;
        case conn_waiting:
        ...
        case conn_read:
        ...
        case conn_parse_cmd:
        ...
        case conn_new_cmd:
        ...
        case conn_nread:
        ...
        case conn_swallow:
        ...
        case conn_write:
        ...
        case conn_mwrite:
        ...
        case conn_closing:
        ...
        case conn_closed:
    ...
}

drive_machine中就是conn对象的state字段发挥作用的地方了,drive_machine()函数是一个巨大的switch语句,它根据conn对象的当前状态,即state字段的值选择执行不同的分支,因为listening socket的conn对象被初始化为conn_listening状态,所以drive_machine()函数会执行switch语句中case conn_listenning的分支,即接受客户端连接并通过dispatch_conn_new()函数将连接分派给Worker线程.

dispatch_conn_new代码如下(thread.c):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/*
 * Dispatches a new connection to another thread. This is only ever called
 * from the main thread, either during initialization (for UDP) or because
 * of an incoming connection.
 */
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport) {
    CQ_ITEM *item = cqi_new();  //新申请一个CQ_ITEM
    char buf[1];
    if (item == NULL) {
        close(sfd);
        /* given that malloc failed this may also fail, but let's try */
        fprintf(stderr, "Failed to allocate memory for connection object\n");
        return ;
    }

    //分发给Worker线程
    int tid = (last_thread + 1) % settings.num_threads;

    LIBEVENT_THREAD *thread = threads + tid;

    last_thread = tid;

    item->sfd = sfd;
    item->init_state = init_state;  //注意这里的状态为conn_new_cmd
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;

    //把新申请的CQ_ITEM放到被分配的Worker线程的队列中
    cq_push(thread->new_conn_queue, item);

    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    buf[0] = 'c';
    //向worker thread线程的管道写入一字节的数据
    if (write(thread->notify_send_fd, buf, 1) != 1) {
        perror("Writing to thread notify pipe");
    }
}

向Worker线程写一个字符的意义在于触发Worker线程管道的读端,即notify_receive_fd描述符的可读事件.

主线程在新连接到来的时候是如何选择处理副线程的呢?很简单,有一个计数器last_thread, 每次将last_thread加一,再模线程数来选择线程ID.

通过之前的分析,我们知道,Worker线程的管道有读时间触发的时候,会调用thread_libevent_process来处理,这里详细分析一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/*
 * Processes an incoming "handle a new connection" item. This is called when
 * input arrives on the libevent wakeup pipe.
 */
static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
    CQ_ITEM *item;
    char buf[1];

    //从管道中读数据
    if (read(fd, buf, 1) != 1)
        if (settings.verbose > 0)
            fprintf(stderr, "Can't read from libevent pipe\n");

    switch (buf[0]) {
    case 'c':   //c表示有新的连接请求被主线程分配到当前Worker线程
    //从当前Worker线程的连接请求队列中弹出一个请求
    //此对象即先前main thread线程推入new_conn_queue队列的对象
    item = cq_pop(me->new_conn_queue);

    if (NULL != item) {
        //根据这个CQ_ITEM对象,创建并初始化conn对象
        //该对象负责客户端与该worker thread线程之间的通信
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);
        if (c == NULL) {
            if (IS_UDP(item->transport)) {
                fprintf(stderr, "Can't listen for events on UDP socket\n");
                exit(1);
            } else {
                if (settings.verbose > 0) {
                    fprintf(stderr, "Can't listen for events on fd %d\n",
                        item->sfd);
                }
                close(item->sfd);
            }
        } else {
            c->thread = me;
        }
        cqi_free(item);
    }
        break;
    /* we were told to flip the lock type and report in */
    case 'l':
        ...
    case 'g':
        ...
    }
}

到这里,Worker线程就建立了和客户端的连接.

conn_new的一个值得注意的地方就是会设置线程的事件处理函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) {
    ...
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
        event_base_set(base, &c->event);
        c->ev_flags = event_flags;

        if (event_add(&c->event, 0) == -1) {
            perror("event_add");
            return NULL;
        }
    ...
}

我们可以看到,Worker线程也是使用event_handler函数来处理客户端请求过来的数据,根当前请求连接的状态来处理.

(4).事件循环

1
2
/* enter the event loop */
event_base_loop(main_base, 0);

这时主线程启动开始通过libevent来接受外部连接请求,整个启动过程完毕.

3.总结

Memcached中采用的就是所谓的半同步-半异步模式,最早应该是由ACE的作者提出,原文在这里.

简单示意图如下:

3.1半同步-半异步模式

几个模块的之间的交互为:

(1).异步模块接收可能会异步到来的各种事件(I/O,信号等),然后将它们放入队列中;
(2).同步模块一般只有一种动作,就是不停的从队列中取出消息进行处理;

半同步-半异步模式的出现是为了给服务器的功能进行划分,尽可能将的可能阻塞的操作放在同步模块中,这样不会影响到异步模块的处理.

举个例子说明:

假设现在有一个服务器,在接收完客户端请求之后会去数据库查询,这个查询可能会很慢.这时,如果还是采用的把接收客户端的连接和处理客户端的请求(在这里这个处理就是查询数据库)放在一个模块中来处理,很可能将会有很多连接的处理响应非常慢.

此时,考虑使用半同步半异步的模式,开一个进程,使用多路复用IO(如epoll/select)等监听客户端的连接,接收到新的连接请求之后就将这些请求存放到通过某种IPC方式实现的消息队列中,同时,还有N个处理进程,它们所做的工作就是不停的从消息队列中取出消息进行处理.这样的划分,将接收客户端请求和处理客户端请求划分为不同的模块,相互之间的通过IPC进行通讯,将对彼此功能的影响限制到最小.

优点

(1).接收操作只在主循环中处理,因此不会出现惊群现象;
(2).主副线程分工明确, 主线程仅负责I/O, 副线程负责业务逻辑处理;
(3).多个副线程之间不会有影响,因为大家都有各自独立的连接队列;

缺点

假如业务逻辑是类似于web服务器之类的, 那么一个简单的请求也需要这个比较繁琐的操作的话(最重要的是,很可能一个进程就能处理完的事情,非得从一个线程接收再到另一个线程去处理), 那么显然代价是不值得的.

XMemcached Client

1.XMemcached是什么

XMemcached是众多Memcached Client中的后起之秀,而且还是一个Chinese主导的.写这篇Blog的目的就是研究下这个Memcached Client的代码,顺便研究下传说中的一致性Hash.

XMemcached项目:https://code.google.com/p/xmemcached/

源代码:https://github.com/killme2008/xmemcached

使用文档:http://blog.sina.com.cn/s/blog_6094008a0102v6gj.html

特点:

(1).支持所有的Memcached文本协议(text based protocols)和二进制协议(二进制协议从1.2.0开始支持);
(2).支持分布式的Memcached,包括标准Hash和一致性哈希策略;
(3).支持JMX,从而允许使用者监控和控制XMemcached Client的行为;同时可以修改优化参数,动态添加或者删除服务器;
(4).支持待权重的服务器配置;
(5).支持连接池,使用Java的nio,对同一个Memcached服务器使用者能够创建更多的连接;
(6).支持故障模式和备用节点;
(7).支持和Spring框架和hibernate-memcached的整合;
(8).高性能;
(9).支持和kestrel(一个Scala实现的MQ)和TokyoTyrant的对话;

2.XMemcached的类图

XMemcached的类图(几个主要的类):

3.XMemcached的示例

源代码中自带了例子,我们分析一个SimpleExample.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class SimpleExample {
    public static void main(String[] args) {
        if (args.length < 1) {
            System.err.println("Useage:java SimpleExample [servers]");
            System.exit(1);
        }
        MemcachedClient memcachedClient = getMemcachedClient(args[0]);
        if (memcachedClient == null) {
            throw new NullPointerException("Null MemcachedClient,please check memcached has been started");
        }
        try {
            // add a,b,c
            System.out.println("Add a,b,c");
            memcachedClient.set("a", 0, "Hello,xmemcached");
            memcachedClient.set("b", 0, "Hello,xmemcached");
            memcachedClient.set("c", 0, "Hello,xmemcached");
            // get a
            String value = memcachedClient.get("a");
            System.out.println("get a=" + value);
            System.out.println("delete a");
            // delete a
            memcachedClient.delete("a");
            // reget a
            value = memcachedClient.get("a");
            System.out.println("after delete,a=" + value);

            System.out.println("Iterate all keys...");
            // iterate all keys
            KeyIterator it = memcachedClient.getKeyIterator(AddrUtil.getOneAddress(args[0]));
            while (it.hasNext()) {
                System.out.println(it.next());
            }
            System.out.println(memcachedClient.touch("b", 1000));

        } catch (MemcachedException e) {
            System.err.println("MemcachedClient operation fail");
            e.printStackTrace();
        } catch (TimeoutException e) {
            System.err.println("MemcachedClient operation timeout");
            e.printStackTrace();
        } catch (InterruptedException e) {
            // ignore
        }
        try {
            memcachedClient.shutdown();
        } catch (Exception e) {
            System.err.println("Shutdown MemcachedClient fail");
            e.printStackTrace();
        }
    }

    public static MemcachedClient getMemcachedClient(String servers) {
        try {
            //默认是Text协议
            MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses(servers));
            return builder.build();
        } catch (IOException e) {
            System.err.println("Create MemcachedClient fail");
            e.printStackTrace();
        }
        return null;
    }
}

本地先起来两个Memcached实例:

1
2
/usr/bin/memcached -m 256 -p 11211 -u memcache -l 127.0.0.1 -d
/usr/bin/memcached -m 256 -p 11222 -u memcache -l 127.0.0.1 -d

在IDEA下运行,配上运行参数,即Memcached服务器的IP和端口,格式如下(解析代码见AddrUtil.java):

1
127.0.0.1:11211 127.0.0.1:11222

Debug代码,可以发现其运行过程,下面是get的一个运行序列图:

4.XMemcached的分布式实现

memcached虽然称为“分布式”缓存服务器,但服务器端并没有"“分布式”功能.至于memcached的分布式,则是完全由客户端程序库实现的.

4.1 Memcached的分布式是什么意思

这里多次使用了“分布式”这个词,但并未做详细解释.现在开始简单地介绍一下其原理,各个客户端的实现基本相同.

下面假设memcached服务器有node1~node3三台, 应用程序要保存键名为“tokyo”“kanagawa”“chiba”“saitama”“gunma” 的数据.

首先向memcached中添加“tokyo”.将“tokyo”传给客户端程序库后, 客户端实现的算法就会根据“键”来决定保存数据的memcached服务器. 服务器选定后,即命令它保存“tokyo”及其值.

同样,“kanagawa”“chiba”“saitama”“gunma”都是先选择服务器再保存.

接下来获取保存的数据.获取时也要将要获取的键“tokyo”传递给函数库. 函数库通过与数据保存时相同的算法,根据“键”选择服务器. 使用的算法相同,就能选中与保存时相同的服务器,然后发送get命令. 只要数据没有因为某些原因被删除,就能获得保存的值.

这样,将不同的键保存到不同的服务器上,就实现了memcached的分布式. memcached服务器增多后,键就会分散,即使一台memcached服务器发生故障 无法连接,也不会影响其他的缓存,系统依然能继续运行.

4.2 余数哈希

XMemcached默认使用的是Native Hash,见ArrayMemcachedSessionLocator.java:

1
2
3
4
public final long getHash(int size, String key) {
    long hash = this.hashAlgorighm.hash(key);
    return hash % size;
}

ArrayMemcachedSessionLocator中使用的hashAlgorighm是HashAlgorithm.NATIVE_HASH,而HashAlgorithm.NATIVE_HASH的hash()函数的实现:

1
2
3
4
5
6
7
public long hash(final String k) {
    long rv = 0;
    switch (this) {
    case NATIVE_HASH:
        rv = k.hashCode();
        break;
    ...

即XMemcached的Native Hash就是通常的余数哈希:首先求得字符串的hash值,根据该值除以服务器节点数目得到的余数决定服务器.

根据这种hash方式,上门这几个key分布的服务器如下(三台服务器分别为 0 1 2):

1
2
3
4
5
tokyo --> 2
kanagawa --> 1
chiba --> 2
saitama --> 1
gunma --> 2

优点 简单高效:计算简单,数据的分散性也相当优秀

缺点 当添加或移除服务器时,缓存重组的代价相当巨大. 添加服务器后,余数就会产生巨变,这样就无法获取与保存时相同的服务器, 从而影响缓存的命中率.

下面简单验证这个:首先在3台服务器的情况下将“a”到“z”的键保存到memcached并访问的情况;接下来增加一台memcached服务器;计算缓存命中率;

简单测试代码(忍不住吐槽下java真是一门罗嗦的语言):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class Hash {

    /**
     * 默认的Hash算法
     */
    private static HashAlgorithm hashAlgorithm = HashAlgorithm.NATIVE_HASH;

    /**
     * Memcached服务器数目
     */
    private static final int SERVER_SIZE = 3;

    public static void main(String[] args) {
        // 初始化
        Map<Long, List<String>> itemMap = new HashMap<Long, List<String>>();
        for (long i = 0; i < SERVER_SIZE; i++) {
            itemMap.put(i, new LinkedList<String>());
        }

        char letters[] = new char[27];
        letters[0] = 'a';
        for (int i = 0; i < 26; i++) {
            letters[i + 1] = (char) (letters[i] + 1);
            long hash = getHash(SERVER_SIZE, String.valueOf(letters[i]));
            itemMap.get(hash).add(String.valueOf(letters[i]));
        }

        for (Map.Entry<Long, List<String>> e : itemMap.entrySet()) {
            System.out.println(e.getKey() + " : " + join(e.getValue()));
        }

    }

    public static long getHash(int size, String key) {
        long hash = hashAlgorithm.hash(key);
        return hash % size;
    }

    private static String join(final List<String> list) {
        StringBuilder builder = new StringBuilder();
        for(String s : list) {
            builder.append(s + ' ');
        }

        return builder.toString();
    }
}

我们得到3台服务器(分别为0,1,2)的时候,26个字母的分布如下:

0 : c f i l o r u x 
1 : a d g j m p s v y 
2 : b e h k n q t w z 

4台服务器(分别为0,1,2,3)的时候,26个字母的分布如下:

0 : d h l p t x 
1 : a e i m q u y 
2 : b f j n r v z 
3 : c g k o s w

根据这两份数据,我们可以得到,加拉一台服务器之后,26个字母只有8个命中了.像这样,添加节点后键分散到的服务器会发生巨大变化.

同样,减少服务器(比如某一台服务器down机),也会导致大量的Miss,基本失去了缓存的作用.

带来的问题就是,在Web应用程序中使用memcached时, 在添加memcached服务器的瞬间缓存效率会大幅度下降,有可能会发生无法提供正常服务的情况.

4.3 一致性哈希(Consistent Hashing)

一致性哈希能够很大程度上(注意不是完全解决)解决余数哈希的增加服务器导致缓存失效的问题.

4.3.1 一致性哈希原理 Consistent Hashing如下所示:首先求出memcached服务器(节点)的哈希值, 并将其配置到0~2SUP(32)的圆(continuum)上. 然后用同样的方法求出存储数据的键的哈希值,并映射到圆上. 然后从数据映射到的位置开始顺时针查找,将数据保存到找到的第一个服务器上. 如果超过2SUP(32)仍然找不到服务器,就会保存到第一台memcached服务器上.

从上图的状态中添加一台memcached服务器.余数分布式算法由于保存键的服务器会发生巨大变化 而影响缓存的命中率,但Consistent Hashing中,只有在continuum上增加服务器的地点逆时针方向的 第一台服务器上的键会受到影响.

因此,Consistent Hashing最大限度地抑制了键的重新分布. 而且,有的Consistent Hashing的实现方法还采用了虚拟节点的思想. 使用一般的hash函数的话,服务器的映射地点的分布非常不均匀. 因此,使用虚拟节点的思想,为每个物理节点(服务器) 在continuum上分配100~200个点.这样就能抑制分布不均匀, 最大限度地减小服务器增减时的缓存重新分布.

4.3.2 虚拟节点 一致性哈希算法在服务节点太少时,容易因为节点分部不均匀而造成数据倾斜问题.例如我们的系统中有两台 server,其环分布如下:

此时必然造成大量数据集中到Server 1上,而只有极少量会定位到Server 2上.为了解决这种数据倾斜问题,一致性哈希算法引入了虚拟节点机制.

虚拟节点(virtual node)是实际节点(服务器)在 hash 空间的复制品,一个实际节点(服务器)对应了若干个虚拟节点,这个对应个数也称为为复制个数,虚拟节点在 hash 环中以hash值排列.

例如为上面的每台服务器设置三个虚拟节点:

在实际应用中,一个物理节点对应多少的虚拟节点才能达到比较好的均衡效果,有一个效果图:

纵轴为物理服务器的数目,横轴为虚拟节点的数目,可以看出,当物理服务器的数量很小时,需要更大的虚拟节点,反之则需要更少的节点,从图上可以看出,在物理服务器有10台时,差不多需要为每台服务器增加100~200个虚拟节点效果比较好.

4.4 XMemcached一致性哈希的实现

XMemcached Client中实现了一致性哈希,见KetamaMemcachedSessionLocator.java,使用的Hash算法是KETAMA HASH算法.

下面主要分析一下KetamaMemcachedSessionLocator.java代码,去掉了一些无关的代码:

4.4.1 根据服务器列表生成Hash环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private final void buildMap(Collection<Session> list, HashAlgorithm alg) {
    TreeMap<Long, List<Session>> sessionMap = new TreeMap<Long, List<Session>>();

    for (Session session : list) {  //遍历每个服务器
        String sockStr = null;  //根据服务器地址得到的字符串,用于hash
        if (this.cwNginxUpstreamConsistent) {
            ... //生成sockStr
        } else {
            ... //生成sockStr
        }
        /**
         * Duplicate 160 X weight references
         */
        int numReps = NUM_REPS; //虚拟节点数目, 默认160
        //考虑权重,权重小的服务器最终生成的虚拟节点就少,数据打到这个服务器的概率就小
        if (session instanceof MemcachedTCPSession) {
            numReps *= ((MemcachedSession) session).getWeight();
        }
        if (alg == HashAlgorithm.KETAMA_HASH) {     //一致性Hash
            for (int i = 0; i < numReps / 4; i++) {
                //计算Hash值,将虚拟节点映射到当前session代表的服务器
                byte[] digest = HashAlgorithm.computeMd5(sockStr + "-" + i);
                for (int h = 0; h < 4; h++) {
                    long k = (long) (digest[3 + h * 4] & 0xFF) << 24 | (long) (digest[2 + h * 4] & 0xFF) << 16
                            | (long) (digest[1 + h * 4] & 0xFF) << 8 | digest[h * 4] & 0xFF;
                    this.getSessionList(sessionMap, k).add(session);
                }

            }
        } else {
            ...
        }
    }
    this.ketamaSessions = sessionMap;
    this.maxTries = list.size();
}

private List<Session> getSessionList(TreeMap<Long, List<Session>> sessionMap, long k) {
    List<Session> sessionList = sessionMap.get(k);
    if (sessionList == null) {
        sessionList = new ArrayList<Session>();
        sessionMap.put(k, sessionList);
    }
    return sessionList;
}

buildMap的结果就是在0-232这个圈上生成一些列的虚拟节点,每个虚拟节点都指向一个真实服务器.

例如本机只开一个服务器情况下,没有设置权重,会生成160个虚拟节点,每个虚拟节点都指向中一台服务器:

4.4.2 对Key做Hash

做get或者set的时候,首先都会通过key找到服务器:

1
public Session getSessionByKey(final String key);

其实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//通过key找真实到服务器
public final Session getSessionByKey(final String key) {
    if (this.ketamaSessions == null || this.ketamaSessions.size() == 0) {
        return null;
    }
    long hash = this.hashAlg.hash(key);         //计算key的Hash值
    Session rv = this.getSessionByHash(hash);   //通过Hash值找服务器
    int tries = 0;
    while (!this.failureMode && (rv == null || rv.isClosed())
            && tries++ < this.maxTries) {
        hash = this.nextHash(hash, key, tries);
        rv = this.getSessionByHash(hash);
    }
    return rv;
}

public final Session getSessionByHash(final long hash) {
    TreeMap<Long, List<Session>> sessionMap = this.ketamaSessions;
    if (sessionMap.size() == 0) {
        return null;
    }
    Long resultHash = hash;
    if (!sessionMap.containsKey(hash)) {
        //下面的逻辑是找到大于hash值的第一个虚拟节点(即ceiling)
        //Java 1.6使用ceilingKey可以实现,为兼容jdk5,使用tailMap,tailMap为所有大于hash值的虚拟节点
        SortedMap<Long, List<Session>> tailMap = sessionMap.tailMap(hash);
        if (tailMap.isEmpty()) {//没有比hash值大的节点,则取第一个虚拟节点
            resultHash = sessionMap.firstKey();
        } else {
            resultHash = tailMap.firstKey();//取tailMap第一个,即最大于hash值最小节点
        }
    }

    List<Session> sessionList = sessionMap.get(resultHash);
    if (sessionList == null || sessionList.size() == 0) {
        return null;
    }
    int size = sessionList.size();
    return sessionList.get(this.random.nextInt(size));
}

这里调试时候key是'a',其Hash值是3111502092:

getSessionByHash的逻辑其实就是在Hash环上找第一个大于key对应hash值的虚拟节点,通过虚拟节点找到真实服务器.

这里的tailMap即为大于3111502092的虚拟节点,如下:

因此我们要找的虚拟节点就是3164521287对应的虚拟节点,其服务器指向的是127.0.0.1:11211.因此当前key(这里为'a')的请求被打到这台服务器上.

4.5 使用XMemcached的一致性哈希

默认情况下XMemcached使用的是余数哈希,如下使用一致性哈希:

1
2
3
4
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses(servers));
// 设置使用一致性hash
builder.setSessionLocator(new KetamaMemcachedSessionLocator());
return builder.build();

Memcached内存存储

早就听说过Memcached独特的内存管理方式,写着篇文章的目的就是了解Memcached的内存管理,学习其源代码.

1.什么是Slab Allocator

memcached默认情况下采用了名为Slab Allocator的机制分配、管理内存,Slab Allocator的基本原理是按照预先规定的大小,将分配的内存分割成特定长度的块,以期望完全解决内存碎片问题。而且,slab allocator还有重复使用已分配的内存的目的。 也就是说,分配到的内存不会释放,而是重复利用。

2.Slab Allocation的主要术语

Page        分配给Slab的内存空间,默认是1MB,分配给Slab之后根据slab的大小切分成chunk
Chunk       用于缓存记录的内存空间
Slab Class  特定大小的chunk的组

3.Slab初始化

在Memcached启动时候会调用slab的初始化代码(详见memcached.c中main函数调用slabs_init函数).

slabs_init函数声明:

1
2
3
4
5
6
7
/** Init the subsystem. 1st argument is the limit on no. of bytes to allocate,
    0 if no limit. 2nd argument is the growth factor; each slab will use a chunk
    size equal to the previous slab's chunk size times this factor.
    3rd argument specifies if the slab allocator should allocate all memory
    up front (if true), or allocate memory in chunks as it is needed (if false)
*/
void slabs_init(const size_t limit, const double factor, const bool prealloc);

其中limit表示memcached最大使用内存;factor表示slab中chunk size的增长因子,slab中chunk size的大小等于前一个slab的chunk size乘以factor;

memcached.c中main函数调用slabs_init函数:

1
slabs_init(settings.maxbytes, settings.factor, preallocate);

其中settings.maxbytes默认值为64M,启动memcached使用选项-m设置;settings.factor默认为1.25,启动memcached时候使用-f设置;preallocate指的是启动memcached的时候默认为每种类型slab预先分配一个page的内存,默认是false;

1
2
3
4
5
settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
...
settings.factor = 1.25;
...
preallocate = false

slabs_init函数实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
 * Determines the chunk sizes and initializes the slab class descriptors
 * accordingly.
 */
void slabs_init(const size_t limit, const double factor, const bool prealloc) {
    int i = POWER_SMALLEST - 1;
    //真实占用大小=对象大小+48
    unsigned int size = sizeof(item) + settings.chunk_size;

    mem_limit = limit;

    //开启预分配,则首先将limit大小(默认64M)的内存全部申请
    if (prealloc) {
        /* Allocate everything in a big chunk with malloc */
        mem_base = malloc(mem_limit);
        if (mem_base != NULL) {
            mem_current = mem_base;
            mem_avail = mem_limit;
        } else {
            fprintf(stderr, "Warning: Failed to allocate requested memory in"
                    " one large chunk.\nWill allocate in smaller chunks\n");
        }
    }

    //清空所有的slab
    memset(slabclass, 0, sizeof(slabclass));

    while (++i < POWER_LARGEST && size <= settings.item_size_max / factor) {
        /* Make sure items are always n-byte aligned */
        if (size % CHUNK_ALIGN_BYTES)
            size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);

        slabclass[i].size = size;
        slabclass[i].perslab = settings.item_size_max / slabclass[i].size;
        size *= factor;
        if (settings.verbose > 1) {
            fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n",
                    i, slabclass[i].size, slabclass[i].perslab);
        }
    }

    //最大chunksize的一个slab,chunksize为settings.item_size_max(默认1M)
    power_largest = i;
    slabclass[power_largest].size = settings.item_size_max;
    slabclass[power_largest].perslab = 1;
    if (settings.verbose > 1) {
        fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n",
                i, slabclass[i].size, slabclass[i].perslab);
    }

    //记录已分配的空间大小
    /* for the test suite:  faking of how much we've already malloc'd */
    {
        char *t_initial_malloc = getenv("T_MEMD_INITIAL_MALLOC");
        if (t_initial_malloc) {
            mem_malloced = (size_t)atol(t_initial_malloc);
        }
    }

    //开启了预分配,则为每种slab都分配一个page的空间
    if (prealloc) {
        slabs_preallocate(power_largest);
    }
}

其中settings.chunk_size默认为48:

settings.chunk_size = 48;         /* space for a modest key and value */

POWER_LARGEST指slab种类的最大值,默认只为200,在memcached.c中设置

#define POWER_LARGEST  200

settings.item_size_max就是每个page的大小,默认1M,在memcached.c中初始化:

settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */

默认不开启预分配,因为很多时候Memcached只存储一种类型的数据(即其大小相对比较固定),这时候其他类型的预分配的slab空间就会浪费.

预分配的逻辑就是从最小的slab开始,为每类slab分配一个Page大小的空间(空间不足时停止分配):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static void slabs_preallocate (const unsigned int maxslabs) {
    int i;
    unsigned int prealloc = 0;

    /* pre-allocate a 1MB slab in every size class so people don't get
       confused by non-intuitive "SERVER_ERROR out of memory"
       messages.  this is the most common question on the mailing
       list.  if you really don't want this, you can rebuild without
       these three lines.  */

    for (i = POWER_SMALLEST; i <= POWER_LARGEST; i++) {
        if (++prealloc > maxslabs)
            return;
        if (do_slabs_newslab(i) == 0) {
            fprintf(stderr, "Error while preallocating slab memory!\n"
                "If using -L or other prealloc options, max memory must be "
                "at least %d megabytes.\n", power_largest);
            exit(1);
        }
    }

}

do_slabs_newslab的工作就是为某一个slab分配空间,并将空间划分乘固定大小的chunk:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static int do_slabs_newslab(const unsigned int id) {
    slabclass_t *p = &slabclass[id];
    int len = settings.slab_reassign ? settings.item_size_max
        : p->size * p->perslab;
    char *ptr;

    if ((mem_limit && mem_malloced + len > mem_limit && p->slabs > 0) ||
        (grow_slab_list(id) == 0) ||
        ((ptr = memory_allocate((size_t)len)) == 0)) {  //申请内存

        MEMCACHED_SLABS_SLABCLASS_ALLOCATE_FAILED(id);
        return 0;
    }

    memset(ptr, 0, (size_t)len);
    //将内存划分乘chunk
    split_slab_page_into_freelist(ptr, id);

    //维护slab链表
    p->slab_list[p->slabs++] = ptr;
    mem_malloced += len;
    MEMCACHED_SLABS_SLABCLASS_ALLOCATE(id);

    return 1;
}

split_slab_page_into_freelist的主要控制就是Page划分乘chunk并清空:

1
2
3
4
5
6
7
8
static void split_slab_page_into_freelist(char *ptr, const unsigned int id) {
    slabclass_t *p = &slabclass[id];
    int x;
    for (x = 0; x < p->perslab; x++) {
        do_slabs_free(ptr, 0, id);
        ptr += p->size;
    }
}

memcached的内存分配策略就是:按slab需求分配page,各slab按需使用chunk存储.

按需分配的意思就是某一类slab没有对象可存,就不会分配(非preallocate模式),某类slab存储对象很多,就会分配多个slab形成链表.

这里有几个特点要注意:

1.Memcached分配出去的page不会被回收或者重新分配;
2.Memcached申请的内存不会被释放;
3.slab空闲的chunk不会借给任何其他slab使用(新版本memcached有slab_reassign,slab_automove的功能);

slab内存结构图,二维数组链表:

4.往Slab中缓存记录

memcached根据收到的数据的大小,选择最适合数据大小的slab. memcached中保存着slab内空闲chunk的列表,根据该列表选择chunk, 然后将数据缓存于其中.

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
 * Figures out which slab class (chunk size) is required to store an item of
 * a given size.
 *
 * Given object size, return id to use when allocating/freeing memory for object
 * 0 means error: can't store such a large object
 */
unsigned int slabs_clsid(const size_t size) {
    int res = POWER_SMALLEST;   //最小slab编号

    if (size == 0)
        return 0;
    while (size > slabclass[res].size)
        if (res++ == power_largest)     /* won't fit in the biggest slab */
            return 0;
    return res;
}

参数是待存储对象的大小,根据这个大小,从最小的Chunk Size开始查找,找到第一个(即最小的)能放下size大小的对象的Chunk.找不到(size大于最大的Chunk Size)返回0(这就是为什么slab class从1开始而不是从0开始).

如果某个Slab没有剩余的Chunk了,系统便会给这个Slab分配一个新的Page以供使用,如果没有Page可用,系统就会触发LRU机制,通过删除冷数据来为新数据腾出空间,这里有一点需要注意的是:LRU不是全局的,而是针对Slab而言的.

slab内存分配示例:

5.Slab Allocator的缺点

由于Slab Allocator分配的是特定长度的内存,因此无法有效利用分配的内存。 例如,将100字节的数据缓存到128字节的chunk中,剩余的28字节就浪费了。

6.Memcached减少内存浪费

4.1:调整growth factor

(1).估算我们item的大小
key键长+suffix+value值长+结构大小(48字节)
(2).逐步调整growth factor,使得某个slab的大小和我们的item大小接近(必须大于我们item的大小)

7.过期数据

(1).LRU过期策略;
(2).在slab级别上执行LRU策略;
(3).查看是否过去是在get的时候,即懒惰(lazy)检查;

8.memcached-tool脚本

memcached-tool脚本可以方便地获得slab的使用情况 (它将memcached的返回值整理成容易阅读的格式),可以从下面的地址获得脚本: http://www.netingcn.com/demo/memcached-tool.zip

使用方法也极其简单:

1
perl memcached-tool server_ip:prot option

比如:

1
2
3
4
5
perl memcached-tool 10.0.0.5:11211 display    # shows slabs
perl memcached-tool 10.0.0.5:11211            # same.  (default is display)
perl memcached-tool 10.0.0.5:11211 stats      # shows general stats
perl memcached-tool 10.0.0.5:11211 move 7 9   # takes 1MB slab from class #7
                                              # to class #9.

输出示例:

1
2
3
4
#  Item_Size   Max_age  1MB_pages Count   Full?
 1     104 B  1394292 s    1215 12249628    yes
 2     136 B  1456795 s      52  400919     yes
 ...

各列的含义为:

#           slab class编号
Item_Size   Chunk大小
Max_age     LRU内最旧的记录的生存时间
1MB_pages   分配给Slab的页数
Count       Slab内的记录数
Full?       Slab内是否含有空闲chunk