xiaobaoqiu Blog

Think More, Code Less

动态数据源

1.数据库层面的动态数据源

比如多个读库(Read),可以配置一个虚拟IP,每次读数据库的请求被均衡的分配到各个读库(使用Keepalived等).

这中方式对应用程序是透明的.

2.数据库Proxy

很多现成的数据库Proxy,在Proxy中可以做负载均衡(一般使用LVS,Keepalived等现成应用),权限验证,过滤,业务缓存等控制逻辑.比如:

1.360基于mysql-proxy的Atlas
Atlas是由 Qihoo 360,Web平台部基础架构团队开发维护的一个基于MySQL协议的数据中间层项目。它是在mysql-proxy 0.8.2版本的基础上,对其进行了优化,增加了一些新的功能特性。360内部使用Atlas运行的mysql业务,每天承载的读写请求数达几十亿条。
github:https://github.com/Qihoo360/Atlas
参考:https://github.com/Qihoo360/Atlas/wiki/Atlas%E7%9A%84%E6%9E%B6%E6%9E%84

2.阿里的DRDS
分布式关系型数据库服务(Distribute Relational Database Service,简称DRDS)是一种水平拆分、可平滑扩缩容、读写分离的在线分布式数据库服务。前身为淘宝开源的TDDL,是近千个应用首选组件,已稳定服务了七年以上。已经商业化.
参考:http://docs.aliyun.com/?spm=5176.7622920.9.2.qGx2nq#/pub/drds/brief-manual/summary&summary

3.网易的分布式数据库中间件DDB
DDB(Distributed database)是网易杭研院立项最早,应用最为广泛的后台产品之一,也是国内最早出现的基于现有database之上开发的分布式数据库中间件,目前依然在为网易易信,云音乐,云阅读等大型互联网产品提供稳定的数据库服务。
参考:http://www.majin163.com/2014/09/24/ddb-introduce/

4.百度DDBS

3.代码层面

使用Spring实现数据源动态配置,可以在代码层面达到Master-Slave数据库分离的效果.

其实原理很简单,需要做两个工作:

1.实现一个根据不同key使用不同实际DataSource的自定义DataSource;
2.在请求的ThreadLocal中保存当前请求需要使用的数据库的key;

其中第一个工作Spring已经帮我们做了,见类org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource.

3.1 AbstractRoutingDataSource简介

下面简单介绍这个类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/**
 * 抽象的javax.sql.DataSource实现,可以完成基于一个查找key来路由 #getConnection()到某些特性目标DataSourcesd的一个。一般通过绑定线程上下文来决定。
 */
public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {
private Map<Object, Object> targetDataSources;

    private Object defaultTargetDataSource;

    private boolean lenientFallback = true;

    private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();

    private Map<Object, DataSource> resolvedDataSources;

    private DataSource resolvedDefaultDataSource;

    /**
     * 设置目标DataSources的map映射,其中查找key作为 map的key。
     * 这个映射的value可以是对象的DataSource实例,或者是一个数据源name的字符串(可以被DataSourceLookup解析)。
     *
     * key可以是任意的类型,只要实现了普通的查找处理。
     * 具体的key表示形式,将会被resolveSpecifiedLookupKey和determineCurrentLookupKey处理
     *
     */
    public void setTargetDataSources(Map<Object, Object> targetDataSources) {
        this.targetDataSources = targetDataSources;
    }

    /**
     * 设置默认目标数据源。如果我们在map中找不到对应的key时,则会使用这里设置的默认数据源
     */
    public void setDefaultTargetDataSource(Object defaultTargetDataSource) {
        this.defaultTargetDataSource = defaultTargetDataSource;
    }

    /**
     * 指定默认的DataSource,当通过指定的查找key不能找到对应的DataSource。
     * 如果为false,则直接返回失败,如果为true,则使用默认的数据源。默认为true
     */
    public void setLenientFallback(boolean lenientFallback) {
        this.lenientFallback = lenientFallback;
    }

    /**
     * 设置DataSourceLookup的实现类,该实现类可以把字符串配置的数据源,解析成我们需要的DataSource类.默认使用JndiDataSourceLookup。
     *
     * JndiDataSourceLookup方法使用ref bean方式获取配置文件中配置的dataSource数据源,也就是我们一般使用xml中配置datasource的方式就是jndi。
     */
    public void setDataSourceLookup(DataSourceLookup dataSourceLookup) {
        this.dataSourceLookup = (dataSourceLookup != null ? dataSourceLookup : new JndiDataSourceLookup());
    }

    /**
    * 初始化,将targetDataSources转换成resolvedDataSources
    */
    public void afterPropertiesSet() {
        if (this.targetDataSources == null) {
            throw new IllegalArgumentException("Property 'targetDataSources' is required");
        }
        this.resolvedDataSources = new HashMap<Object, DataSource>(this.targetDataSources.size());
        for (Map.Entry entry : this.targetDataSources.entrySet()) {
            Object lookupKey = resolveSpecifiedLookupKey(entry.getKey());
            DataSource dataSource = resolveSpecifiedDataSource(entry.getValue());
            this.resolvedDataSources.put(lookupKey, dataSource);
        }
        if (this.defaultTargetDataSource != null) {
            this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);
        }
    }

    /**
     * 根据lookupKey获取map中存放的key值,默认两者是一样的
     */
    protected Object resolveSpecifiedLookupKey(Object lookupKey) {
        return lookupKey;
    }
    /**
     * 转换从获取map中存放的dataSource
     */
    protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {
        if (dataSource instanceof DataSource) {
            return (DataSource) dataSource;
        }
        else if (dataSource instanceof String) {
            return this.dataSourceLookup.getDataSource((String) dataSource);
        }
        else {
            throw new IllegalArgumentException(
                    "Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);
        }
    }

    /**
    * 这里就是抽象类给我们实现的接口方法,根据我们的配置上下文,抽象类决定实现哪个连接
    */
    public Connection getConnection() throws SQLException {
        return determineTargetDataSource().getConnection();
    }

    public Connection getConnection(String username, String password) throws SQLException {
        return determineTargetDataSource().getConnection(username, password);
    }

    protected DataSource determineTargetDataSource() {
        Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
        Object lookupKey = determineCurrentLookupKey();
        DataSource dataSource = this.resolvedDataSources.get(lookupKey);
        if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
            dataSource = this.resolvedDefaultDataSource;
        }
        if (dataSource == null) {
            throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
        }
        return dataSource;
    }

    /**
    * 这里是我们使用这个抽象类需要实现的方法,主要就是告诉该抽象类,当前需要使用的数据源的key是什么,这样抽象类就可以知道使用哪个数据库连接
    */
    protected abstract Object determineCurrentLookupKey();
}

我们可以继承这个类实现自己的RoutingDataSource,只需要实现determineCurrentLookupKey()方法.下面是一个实现示例,其中DataSourceKeyHolder用来在ThreadLocal中保存key,而我们的RoutingDataSource就可以从ThreadLocal中获取key来决定使用那个具体的数据源:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class RoutingDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        return DataSourceKeyHolder.getDataSource();
    }
}

public class DataSourceKeyHolder {
    private static final ThreadLocal<String> holder = new ThreadLocal<String>();

    public static String getDataSource() {
        return holder.get();
    }

    public static void putDataSource(String value) {
        holder.set(value);
    }

    public static void clear(){
        holder.remove();
    }
}

3.2 自己实现RoutingDataSource

Spring自带的AbstractRoutingDataSource功能强大,但是略先麻烦(最终使用的其实只有resolvedDataSources这个Map).我们完全可以自己实现一个简单的RoutingDataSource,比如我们要实现读写分离的动态数据库,一个简单实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
public class RoutingDataSource extends AbstractDataSource implements InitializingBean {
    /**
     * 写库, key是String
     */
    private Map<String, DataSource> writeDataSourceMap;
    /**
     * 读库, key是String
     */
    private Map<String, DataSource> readDataSourceMap;

    @Override
    public Connection getConnection() throws SQLException {
        //从上下文中获取key
        ConnectionKey connectionKey = ConnectionKeyHolder.get();
        try {
            //默认情况,返回随机一个写库
            if (connectionKey == null) {
                return fetchConnection(writeDataSourceMap, null);
            }

            //只读
            if (connectionKey.getType().equals(ConnectionKey.READ)) {
                return fetchConnection(readDataSourceMap, connectionKey.getKey());
            }

            //可读可写
            if (connectionKey.getType().equals(ConnectionKey.READ_WRITE)) {
                return fetchConnection(writeDataSourceMap, connectionKey.getKey());
            }
        } catch (Exception e) {
            logger.error("getConnectionError", e);
            if (e instanceof SQLException) {
                throw (SQLException) e;
            } else {
                throw new SQLException(e);
            }
        }

        // impossible code
        throw new IllegalArgumentException("invalid connection type: " + connectionKey.getType() + ", key: "
                + connectionKey.getKey());
    }

    /**
     * 根据key从对应的Map中获取数据库连接
     *
     * @param dbSourceMap
     * @param key
     * @return
     * @throws SQLException
     */
    private Connection fetchConnection(final Map<String, DataSource> dbSourceMap, String key) throws SQLException {
        if (key == null || key.length() == 0) { // null key, return a random read connection
            key = randomKey(dbSourceMap);
        }
        if (dbSourceMap.get(key) == null) {
            key = randomKey(dbSourceMap);
        }
        return dbSourceMap.get(key).getConnection();
    }

    /**
     * 随机获取一个Key
     * @return
     */
    private String randomKey(final Map<String, DataSource> dbSourceMap) {
        String[] keys = dbSourceMap.keySet().toArray(new String[0]);
        int size = dbSourceMap.size();
        int rand = new Random().nextInt(size);
        return keys[rand];
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Preconditions.checkArgument(MapUtils.isNotEmpty(writeDataSourceMap));
        Preconditions.checkArgument(MapUtils.isNotEmpty(readDataSourceMap));
    }

    public void setWriteDataSource(Map<String, DataSource> writeDataSourceMap) {
        this.writeDataSourceMap = writeDataSourceMap;
    }

    public void setReadDataSourceMap(Map<String, DataSource> readDataSourceMap) {
        this.readDataSourceMap = readDataSourceMap;
    }
}

public class ConnectionKey {
    /**
     * 读库,只读
     */
    public static final String READ = "R";
    /**
     * 写库,可读可写
     */
    public static final String READ_WRITE = "RW";

    private String type;
    private String key;

    public ConnectionKey() {
    }

    public ConnectionKey(String type, String key) {
        this.type = type;
        this.key = key;
    }

    public String getType() {
        return type;
    }

    public String getKey() {
        return key;
    }

    public void setType(String type) {
        this.type = type;
    }

    public void setKey(String key) {
        this.key = key;
    }
}

public class ConnectionKeyHolder {
    private static ThreadLocal<ConnectionKey> connType = new ThreadLocal<ConnectionKey>();

    public static void set(ConnectionKey type) {
        connType.set(type);
    }

    public static ConnectionKey get() {
        return connType.get();
    }

    public static void release() {
        connType.remove();
    }
}

3.3 使用实现动态数据源

以我们自己实现的为例子,我们简单展示一下如何使用Spring加上AOP使用我们的动态数据源.

首先定义使用读库和使用写库的注解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 使用读库的注解,value()代表key
*/
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Read {
    String value() default "";
}
/**
* 使用写库的注解,value()代表key
*/
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Write {
    String value() default "";
}

然后定义Aspect,逻辑很简单,只是在执行我们的业务逻辑之前给线程上文写入ConnectionKey信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class RoutingAop {

    protected Logger log = LoggerFactory.getLogger(getClass());

    /**
     * 解析读库的注解
     *
     * @param pjp
     * @param read
     * @return
     * @throws Throwable
     */
    public Object aroundRead(ProceedingJoinPoint pjp, Read read) throws Throwable {
        ConnectionKey origType = ConnectionKeyHolder.get();
        try {
            ConnectionKey newType = ConnectionKey.buildReadConnectionKey(read.value());
            ConnectionKeyHolder.set(newType);
            return pjp.proceed();
        } catch (Throwable throwable) {
            log.warn("error while processing read method", throwable);
            throw throwable;
        } finally {
            if (origType != null) {
                ConnectionKeyHolder.set(origType);
            } else {
                ConnectionKeyHolder.release();
            }
        }
    }

    /**
     * 解析写库的注解
     *
     * @param pjp
     * @param write
     * @return
     * @throws Throwable
     */
    public Object aroundWrite(ProceedingJoinPoint pjp, Write write) throws Throwable {
        ConnectionKey origType = ConnectionKeyHolder.get();
        try {
            ConnectionKey newType = ConnectionKey.buildWriteConnectionKey(write.value());
            ConnectionKeyHolder.set(newType);
            return pjp.proceed();
        } catch (Throwable throwable) {
            log.warn("error while processing write method", throwable);
            throw throwable;
        } finally {
            if (origType != null) {
                ConnectionKeyHolder.set(origType);
            } else {
                ConnectionKeyHolder.release();
            }
        }
    }
}

在Spring配置文件中定义好各个读库和各个写库,定义我们的RoutingDataSource的bean,之后扫描注解就可以了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<!-- DataSource 写库  -->
<bean id="writeDataSource" class="...">

<!-- DataSource 读库  -->
<bean id="readDataSource" class="...">

<bean id="routingDataSource" class="com.Xxx.RoutingDataSource">
    <property name="writeDataSourceMap">
        <map key-type="java.lang.String" value-type="javax.sql.DataSource">
            <entry key="write1" value-ref="writeDataSource"/>
        </map>
    </property>
    <property name="readDataSourceMap">
        <map key-type="java.lang.String" value-type="javax.sql.DataSource">
            <entry key="read1" value-ref="readDataSource"/>
        </map>
    </property>
</bean>

<bean id="routingAop" class="com.Xxx.RoutingAop"/>
<!-- 定义Read Write注解扫描  -->
<aop:config>
    <aop:aspect ref="routingAop">
        <aop:around method="aroundRead" arg-names="read" pointcut="@annotation(read) && execution(public * com.Xxx.controller.*.*(..))" />
    </aop:aspect>

    <aop:aspect ref="routingAop">
        <aop:around method="aroundWrite" arg-names="write" pointcut="@annotation(write) && execution(public * com.Xxx.controller.*.*(..)) " />
    </aop:aspect>
</aop:config>