xiaobaoqiu Blog

Think More, Code Less

R Tree

1.R-Tree简介

Btree以及它的变体B+tree对数据库的发展可谓是功不可没,但是,Btree良好的性能却仅仅只能在一维数据上产生效果,如果涉及到二维数据甚至多维数据,那么Btree也无能为力了。然而随着GIS(地理信息系统)和CAD(计算机辅助设计)的广泛应用,多维空间数据(spatial data)的处理变得相当普遍,因此必须在二维和多维数据上找到一种有效的索引方法,于是Rtree就出现了.

1984年,加州大学伯克利分校的Guttman发表了一篇题为“R-trees: a dynamic index structure for spatial searching”的论文,向世人介绍了R树这种处理高维空间存储问题的数据结构。

简单的说,就是将空间对象按某种空间关系进行划分,以后对空间对象的存取都基于划分块进行。

2.R-Tree原理

R树是一种多级平衡树,它是B树在多维空间上的扩展。在R树中存放的数据并不是原始数据,而是这些数据的最小边界矩形(Minimal-Bounding-Box, MBR),空间对象的MBR被包含于R树的叶结点中。从叶子结点开始用矩形(rectangle)将空间框起来,父节点的矩形需要包含字节点的矩形,结点越往上框住的空间就越大,以此对空间进行分割。

下图展示了一些列对象的边界矩形:

一个矩形框住一个或者多个节点,比如矩形R8框的是一个不规则对象,而矩形R6包含了矩形R15和R16;

R-Tee满足以下属性:

  1. 每个叶子节点包含m到M个索引记录,除非它是根节点;
  2. 每个叶子节点的索引记录(I, tuple-identifier),I是最小的可以在空间中完全覆盖这些记录所代表的点的矩形(高维空间矩形).
  3. 每个非叶子节点包含m到M个子节点,除非它是根节点;
  4. 对于在非叶子结点上的每一个条目(Entry),I是最小的可以在空间中完全包含所有子节点矩形的最小矩形;
  5. 根节点至少有两个孩子节点,除非它是叶子节点;
  6. 所有的叶子节点在同一层;

需要保证,m <= M/2;

叶子节点:

叶子结点所保存的数据形式为:(I, tuple-identifier),其中,tuple-identifier表示的是一个存放于数据库中的tuple,也就是一条记录,它是n维的。I是一个n维空间的矩形,它可以恰好框住这个叶子结点中所有记录代表的n维空间中的点。

非叶子节点

叶子结点所保存的数据形式为:(I, child-pointer),其中child-pointer是子节点指针,I是可以覆盖所有子节点的最小n维空间的矩形;

3.R-Tree应用

典型的PostGis,这使得Postgresql支持空间索引.

4.R-Tree实现

一个网络上的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
public class RTree<T> {

    public enum SeedPicker {
        LINEAR, QUADRATIC
    }

    /**
     * 每个节点包含的最大entry数目
     */
    private final int M;

    /**
     * 每个节点包含的最小entry数目
     */
    private final int m;

    /**
     * 空间的维度,如平面则为2,空间点则为3
     */
    private final int D;

    private final float[] pointDims;

    private final SeedPicker seedPicker;

    /**
     * 根节点
     */
    private Node root;

    private volatile int size;

    /**
     * 构造默认R-Tree的参数
     * DEFAULT_M : M
     * DEFAULT_m : m
     * DEFAULT_D : D
     */
    private static final int DEFAULT_M = 50;
    private static final int DEFAULT_m = 2;
    private static final int DEFAULT_D = 2;

    /**
     * 创建一颗新的R-Tree
     * 
     * @param M maximum number of entries per node
     * @param m minimum number of entries per node (except for the root node)
     * @param D the number of dimensions of the RTree.
     */
    public RTree(int M, int m, int D, SeedPicker seedPicker) {
        assert (m <= (M / 2));
        this.D = D;
        this.M = M;
        this.m = m;
        this.seedPicker = seedPicker;
        pointDims = new float[D];
        root = buildRoot(true);
    }

    public RTree(int M, int m, int D) {
        this(M, m, D, SeedPicker.LINEAR);
    }

    /**
     * 使用默认参数构造一个R-Tree
     */
    public RTree() {
        this(DEFAULT_M, DEFAULT_m, DEFAULT_D, SeedPicker.LINEAR);
    }

    /**
     * 构造根节点
     *
     * @param asLeaf : 根节点是否为叶子节点
     * @return
     */
    private Node buildRoot(boolean asLeaf) {
        float[] initCoords = new float[D];
        float[] initDimensions = new float[D];
        for (int i = 0; i < this.D; i++) {
            initCoords[i] = (float) Math.sqrt(Float.MAX_VALUE);
            initDimensions[i] = -2.0f * (float) Math.sqrt(Float.MAX_VALUE);
        }
        return new Node(initCoords, initDimensions, asLeaf);
    }

    /**
     * getter
     */
    public int getMaxEntries() {
        return M;
    }

    public int getMinEntries() {
        return m;
    }

    public int getNumDims() {
        return D;
    }

    public int size() {
        return size;
    }

    /**
     * 在R-Tee中搜索和给定矩形有重叠(overlapping)的对象
     * 
     * @param coords 矩形的一个顶点(比如左上角)
     * @param dimensions 矩形长度.
     * 返回对象List,这些对象的最小边界矩形(MBR)和给定矩形重叠
     */
    public List<T> search(float[] coords, float[] dimensions) {
        assert (coords.length == D);
        assert (dimensions.length == D);

        LinkedList<T> results = new LinkedList<T>();
        search(coords, dimensions, root, results);

        return results;
    }

    private void search(float[] coords, float[] dimensions, Node n, LinkedList<T> results) {
        if (n.leaf) {
            for (Node e : n.children) {
                if (isOverlap(coords, dimensions, e.coords, e.dimensions)) {
                    results.add(((Entry) e).entry);
                }
            }
        } else {
            for (Node c : n.children) {
                if (isOverlap(coords, dimensions, c.coords, c.dimensions)) {
                    search(coords, dimensions, c, results); //继续在孩子节点中搜索
                }
            }
        }
    }

    /**
     * 删除R-Tree中在矩形rect内的数据entry
     * 
     * @param coords : 矩形的一个顶点(比如左上角)
     * @param dimensions : 矩形长度
     * @param entry : 待删除数据
     * 删除成功返回true
     */
    public boolean delete(float[] coords, float[] dimensions, T entry) {
        assert (coords.length == D);
        assert (dimensions.length == D);
        Node l = findLeaf(root, coords, dimensions, entry);
        if (l == null) {
            System.out.println("WTF?");
            findLeaf(root, coords, dimensions, entry);
        }
        assert (l != null) : "Could not find leaf for entry to delete";
        assert (l.leaf) : "Entry is not found at leaf?!?";
        ListIterator<Node> li = l.children.listIterator();
        T removed = null;
        while (li.hasNext()) {
            @SuppressWarnings("unchecked")
            Entry e = (Entry) li.next();
            if (e.entry.equals(entry)) {
                removed = e.entry;
                li.remove();
                break;
            }
        }
        if (removed != null) {
            condenseTree(l);
            size--;
        }
        if (size == 0) {
            root = buildRoot(true);
        }
        return (removed != null);
    }

    public boolean delete(float[] coords, T entry) {
        return delete(coords, pointDims, entry);
    }

    /**
     * 查找数据 entry 对应的叶子节点
     * @param n
     * @param coords
     * @param dimensions
     * @param entry
     * @return
     */
    private Node findLeaf(Node n, float[] coords, float[] dimensions, T entry) {
        if (n.leaf) {
            for (Node c : n.children) {
                if (((Entry) c).entry.equals(entry)) {
                    return n;
                }
            }
            return null;
        } else {
            for (Node c : n.children) {
                if (isOverlap(c.coords, c.dimensions, coords, dimensions)) {
                    Node result = findLeaf(c, coords, dimensions, entry);
                    if (result != null) {
                        return result;
                    }
                }
            }
            return null;
        }
    }

    /**
     * 压缩树
     * @param n
     */
    private void condenseTree(Node n) {
        Set<Node> q = new HashSet<Node>();
        while (n != root) {
            if (n.leaf && (n.children.size() < m)) {
                q.addAll(n.children);
                n.parent.children.remove(n);
            } else if (!n.leaf && (n.children.size() < m)) {
                // probably a more efficient way to do this...
                LinkedList<Node> toVisit = new LinkedList<Node>(n.children);
                while (!toVisit.isEmpty()) {
                    Node c = toVisit.pop();
                    if (c.leaf) {
                        q.addAll(c.children);
                    } else {
                        toVisit.addAll(c.children);
                    }
                }
                n.parent.children.remove(n);
            } else {
                tighten(n);
            }
            n = n.parent;
        }
        if (root.children.size() == 0) {
            root = buildRoot(true);
        } else if ((root.children.size() == 1) && (!root.leaf)) {
            root = root.children.get(0);
            root.parent = null;
        } else {
            tighten(root);
        }
        for (Node ne : q) {
            @SuppressWarnings("unchecked")
            Entry e = (Entry) ne;
            insert(e.coords, e.dimensions, e.entry);
        }
        size -= q.size();
    }

    /**
     * 清空整个R-Tree
     */
    public void clear() {
        root = buildRoot(true);
        //让GC做剩余的事情...
    }

    /**
     * 在RTree中插入数据entry, 和矩形匹配.
     * 
     * @param coords : 矩形的一个顶点(比如左上角)
     * @param dimensions : 矩形长度
     * @param entry : 待出入数据
     */
    public void insert(float[] coords, float[] dimensions, T entry) {
        assert (coords.length == D);
        assert (dimensions.length == D);
        Entry e = new Entry(coords, dimensions, entry);
        Node l = chooseLeaf(root, e);
        l.children.add(e);
        size++;
        e.parent = l;
        if (l.children.size() > M) {
            Node[] splits = splitNode(l);
            adjustTree(splits[0], splits[1]);
        } else {
            adjustTree(l, null);
        }
    }

    /**
     * Convenience method for inserting a point
     * 
     * @param coords
     * @param entry
     */
    public void insert(float[] coords, T entry) {
        insert(coords, pointDims, entry);
    }

    private void adjustTree(Node n, Node nn) {
        if (n == root) {
            if (nn != null) {
                // build new root and add children.
                root = buildRoot(false);
                root.children.add(n);
                n.parent = root;
                root.children.add(nn);
                nn.parent = root;
            }
            tighten(root);
            return;
        }
        tighten(n);
        if (nn != null) {
            tighten(nn);
            if (n.parent.children.size() > M) {
                Node[] splits = splitNode(n.parent);
                adjustTree(splits[0], splits[1]);
            }
        }
        if (n.parent != null) {
            adjustTree(n.parent, null);
        }
    }

    private Node[] splitNode(Node n) {
        // TODO: this class probably calls "tighten" a little too often.
        // For instance the call at the end of the "while (!cc.isEmpty())" loop
        // could be modified and inlined because it's only adjusting for the addition
        // of a single node. Left as-is for now for readability.
        @SuppressWarnings("unchecked")
        Node[] nn = new RTree.Node[] { n, new Node(n.coords, n.dimensions, n.leaf) };
        nn[1].parent = n.parent;
        if (nn[1].parent != null) {
            nn[1].parent.children.add(nn[1]);
        }
        LinkedList<Node> cc = new LinkedList<Node>(n.children);
        n.children.clear();
        Node[] ss = seedPicker == SeedPicker.LINEAR ? lPickSeeds(cc) : qPickSeeds(cc);
        nn[0].children.add(ss[0]);
        nn[1].children.add(ss[1]);
        tighten(nn);
        while (!cc.isEmpty()) {
            if ((nn[0].children.size() >= m) && (nn[1].children.size() + cc.size() == m)) {
                nn[1].children.addAll(cc);
                cc.clear();
                tighten(nn); // Not sure this is required.
                return nn;
            } else if ((nn[1].children.size() >= m) && (nn[0].children.size() + cc.size() == m)) {
                nn[0].children.addAll(cc);
                cc.clear();
                tighten(nn); // Not sure this is required.
                return nn;
            }
            Node c = seedPicker == SeedPicker.LINEAR ? lPickNext(cc) : qPickNext(cc, nn);
            Node preferred;
            float e0 = getRequiredExpansion(nn[0].coords, nn[0].dimensions, c);
            float e1 = getRequiredExpansion(nn[1].coords, nn[1].dimensions, c);
            if (e0 < e1) {
                preferred = nn[0];
            } else if (e0 > e1) {
                preferred = nn[1];
            } else {
                float a0 = getArea(nn[0].dimensions);
                float a1 = getArea(nn[1].dimensions);
                if (a0 < a1) {
                    preferred = nn[0];
                } else if (e0 > a1) {
                    preferred = nn[1];
                } else {
                    if (nn[0].children.size() < nn[1].children.size()) {
                        preferred = nn[0];
                    } else if (nn[0].children.size() > nn[1].children.size()) {
                        preferred = nn[1];
                    } else {
                        preferred = nn[(int) Math.round(Math.random())];
                    }
                }
            }
            preferred.children.add(c);
            tighten(preferred);
        }
        return nn;
    }

    // Implementation of Quadratic PickSeeds
    private RTree<T>.Node[] qPickSeeds(LinkedList<Node> nn) {
        @SuppressWarnings("unchecked")
        RTree<T>.Node[] bestPair = new RTree.Node[2];
        float maxWaste = -1.0f * Float.MAX_VALUE;
        for (Node n1 : nn) {
            for (Node n2 : nn) {
                if (n1 == n2)
                    continue;
                float n1a = getArea(n1.dimensions);
                float n2a = getArea(n2.dimensions);
                float ja = 1.0f;
                for (int i = 0; i < D; i++) {
                    float jc0 = Math.min(n1.coords[i], n2.coords[i]);
                    float jc1 = Math.max(n1.coords[i] + n1.dimensions[i], n2.coords[i] + n2.dimensions[i]);
                    ja *= (jc1 - jc0);
                }
                float waste = ja - n1a - n2a;
                if (waste > maxWaste) {
                    maxWaste = waste;
                    bestPair[0] = n1;
                    bestPair[1] = n2;
                }
            }
        }
        nn.remove(bestPair[0]);
        nn.remove(bestPair[1]);
        return bestPair;
    }

    /**
     * Implementation of QuadraticPickNext
     * 
     * @param cc the children to be divided between the new nodes, one item will be removed from this list.
     * @param nn the candidate nodes for the children to be added to.
     */
    private Node qPickNext(LinkedList<Node> cc, Node[] nn) {
        float maxDiff = -1.0f * Float.MAX_VALUE;
        Node nextC = null;
        for (Node c : cc) {
            float n0Exp = getRequiredExpansion(nn[0].coords, nn[0].dimensions, c);
            float n1Exp = getRequiredExpansion(nn[1].coords, nn[1].dimensions, c);
            float diff = Math.abs(n1Exp - n0Exp);
            if (diff > maxDiff) {
                maxDiff = diff;
                nextC = c;
            }
        }
        assert (nextC != null) : "No node selected from qPickNext";
        cc.remove(nextC);
        return nextC;
    }

    // Implementation of LinearPickSeeds
    private RTree<T>.Node[] lPickSeeds(LinkedList<Node> nn) {
        @SuppressWarnings("unchecked")
        RTree<T>.Node[] bestPair = new RTree.Node[2];
        boolean foundBestPair = false;
        float bestSep = 0.0f;
        for (int i = 0; i < D; i++) {
            float dimLb = Float.MAX_VALUE, dimMinUb = Float.MAX_VALUE;
            float dimUb = -1.0f * Float.MAX_VALUE, dimMaxLb = -1.0f * Float.MAX_VALUE;
            Node nMaxLb = null, nMinUb = null;
            for (Node n : nn) {
                if (n.coords[i] < dimLb) {
                    dimLb = n.coords[i];
                }
                if (n.dimensions[i] + n.coords[i] > dimUb) {
                    dimUb = n.dimensions[i] + n.coords[i];
                }
                if (n.coords[i] > dimMaxLb) {
                    dimMaxLb = n.coords[i];
                    nMaxLb = n;
                }
                if (n.dimensions[i] + n.coords[i] < dimMinUb) {
                    dimMinUb = n.dimensions[i] + n.coords[i];
                    nMinUb = n;
                }
            }
            float sep = (nMaxLb == nMinUb) ? -1.0f : Math.abs((dimMinUb - dimMaxLb) / (dimUb - dimLb));
            if (sep >= bestSep) {
                bestPair[0] = nMaxLb;
                bestPair[1] = nMinUb;
                bestSep = sep;
                foundBestPair = true;
            }
        }
        // In the degenerate case where all points are the same, the above
        // algorithm does not find a best pair. Just pick the first 2
        // children.
        if (!foundBestPair) {
            bestPair = new RTree.Node[] { nn.get(0), nn.get(1) };
        }
        nn.remove(bestPair[0]);
        nn.remove(bestPair[1]);
        return bestPair;
    }

    /**
     * Implementation of LinearPickNext
     * 
     * @param cc the children to be divided between the new nodes, one item will be removed from this list.
     */
    private Node lPickNext(LinkedList<Node> cc) {
        return cc.pop();
    }

    private void tighten(Node... nodes) {
        assert (nodes.length >= 1) : "Pass some nodes to tighten!";
        for (Node n : nodes) {
            assert (n.children.size() > 0) : "tighten() called on empty node!";
            float[] minCoords = new float[D];
            float[] maxCoords = new float[D];
            for (int i = 0; i < D; i++) {
                minCoords[i] = Float.MAX_VALUE;
                maxCoords[i] = Float.MIN_VALUE;

                for (Node c : n.children) {
                    // we may have bulk-added a bunch of children to a node (eg. in
                    // splitNode)
                    // so here we just enforce the child->parent relationship.
                    c.parent = n;
                    if (c.coords[i] < minCoords[i]) {
                        minCoords[i] = c.coords[i];
                    }
                    if ((c.coords[i] + c.dimensions[i]) > maxCoords[i]) {
                        maxCoords[i] = (c.coords[i] + c.dimensions[i]);
                    }
                }
            }
            for (int i = 0; i < D; i++) {
                // Convert max coords to dimensions
                maxCoords[i] -= minCoords[i];
            }
            System.arraycopy(minCoords, 0, n.coords, 0, D);
            System.arraycopy(maxCoords, 0, n.dimensions, 0, D);
        }
    }

    private RTree<T>.Node chooseLeaf(RTree<T>.Node n, RTree<T>.Entry e) {
        if (n.leaf) {
            return n;
        }
        float minInc = Float.MAX_VALUE;
        Node next = null;
        for (RTree<T>.Node c : n.children) {
            float inc = getRequiredExpansion(c.coords, c.dimensions, e);
            if (inc < minInc) {
                minInc = inc;
                next = c;
            } else if (inc == minInc) {
                float curArea = 1.0f;
                float thisArea = 1.0f;
                for (int i = 0; i < c.dimensions.length; i++) {
                    curArea *= next.dimensions[i];
                    thisArea *= c.dimensions[i];
                }
                if (thisArea < curArea) {
                    next = c;
                }
            }
        }
        return chooseLeaf(next, e);
    }

    /**
     * Returns the increase in area necessary for the given rectangle to cover the given entry.
     */
    private float getRequiredExpansion(float[] coords, float[] dimensions, Node e) {
        float area = getArea(dimensions);
        float[] deltas = new float[dimensions.length];
        for (int i = 0; i < deltas.length; i++) {
            if (coords[i] + dimensions[i] < e.coords[i] + e.dimensions[i]) {
                deltas[i] = e.coords[i] + e.dimensions[i] - coords[i] - dimensions[i];
            } else if (coords[i] + dimensions[i] > e.coords[i] + e.dimensions[i]) {
                deltas[i] = coords[i] - e.coords[i];
            }
        }
        float expanded = 1.0f;
        for (int i = 0; i < dimensions.length; i++) {
            expanded *= dimensions[i] + deltas[i];
        }
        return (expanded - area);
    }

    private float getArea(float[] dimensions) {
        float area = 1.0f;
        for (int i = 0; i < dimensions.length; i++) {
            area *= dimensions[i];
        }
        return area;
    }

    private boolean isOverlap(float[] scoords, float[] sdimensions, float[] coords, float[] dimensions) {
        final float FUDGE_FACTOR = 1.001f;
        for (int i = 0; i < scoords.length; i++) {
            boolean overlapInThisDimension = false;
            if (scoords[i] == coords[i]) {
                overlapInThisDimension = true;
            } else if (scoords[i] < coords[i]) {
                if (scoords[i] + FUDGE_FACTOR * sdimensions[i] >= coords[i]) {
                    overlapInThisDimension = true;
                }
            } else if (scoords[i] > coords[i]) {
                if (coords[i] + FUDGE_FACTOR * dimensions[i] >= scoords[i]) {
                    overlapInThisDimension = true;
                }
            }
            if (!overlapInThisDimension) {
                return false;
            }
        }
        return true;
    }

    /**
     * 节点
     */
    private class Node {
        /**
         * 高维矩形的一个定点,如左下角
         */
        final float[] coords;

        /**
         * 矩形的长度
         */
        final float[] dimensions;

        /**
         * 孩子节点
         */
        final LinkedList<Node> children;

        /**
         * 是否为叶子节点
         */
        final boolean leaf;

        /**
         * 父亲节点
         */
        Node parent;

        private Node(float[] coords, float[] dimensions, boolean leaf) {
            this.coords = new float[coords.length];
            this.dimensions = new float[dimensions.length];
            System.arraycopy(coords, 0, this.coords, 0, coords.length);
            System.arraycopy(dimensions, 0, this.dimensions, 0, dimensions.length);
            this.leaf = leaf;
            children = new LinkedList<Node>();
        }
    }

    /**
     * 实体,代表一个数据项
     * 注意:不是叶子节点,其父亲才是叶子节点
     */
    private class Entry extends Node {
        /**
         * 数据
         */
        final T entry;

        public Entry(float[] coords, float[] dimensions, T entry) {
            super(coords, dimensions, true);
            this.entry = entry;
        }

        public String toString() {
            return "Entry: " + entry;
        }
    }
}

参考

http://publib.boulder.ibm.com/infocenter/idshelp/v10/index.jsp?topic=/com.ibm.rtree.doc/rtree29.htm

Hosts管理

google被墙,这个工具可以帮我们搞定.

1
http://sourceforge.net/projects/huhamhirehosts/files/

截图:

注意点:

  1. 服务器google code不稳定,使用sourceforge;
  2. Linux下加上sudo权限 sudo python hoststool.py 可以替换本机的hosts,需要自己先备份自己的hosts.

Java Concurrency in Practice 5 : Building Blocks

第五章:构建块(Building Blocks) java类库中并发构建块的集合.

1.同步容器

同步容器包含两部分:

(1).Vector和HashTable;

(2).Collections.synchronizedXxx工厂方法创建的容器;

在迭代器件,对容器加锁会导致其他线程需要访问容器时候,必须等待知道迭代结束,另外doSomething中可能还持有另外一个锁,这就可能造成死锁.

1
2
3
4
5
synchronized (vector) {
        for (Integer item : vector){
            doSomething(item);
        }
    }

在迭代器期间,对容器加锁的一个替代办法是复制容器,但需要考虑器带来的性能开销.

2.并发容器

同步容器通过对容器的所有状态进行串行访问,从而实现了他们的线程安全.

这样的代价就是削弱了并发性,当多个线程公摊竞争容器级别的锁时,吞吐量就会降低.

而并发容器正式为多线程并发访问而设计的,比如ConcurrentHashMap替代HashMap;当多数操作为读取的时候用CopyOnWriteArrayList代替List;ConcurrentLinkedQueue是一个具有优先级顺序的队列(非并发),PriorityQueue的操作不会阻塞,如果队列时空,则从队列中获取元素的操作返回null;BlockingQueue扩展了Queue,增加了可阻塞的插入和获取操作,如果队列为空,一个获取操作会一致阻塞知道队列中存在可用元素,如果队列是满的,插入操作会一致阻塞知道队列中存在可用空间;ConcurrentSkipListMap和ConcurrentSkipListSet用来作为同步的Sorted Map和SortedSet的并发替代并.

ConcurrentHashMap

在HashTable和SynchronizedMap中,获取Map的锁就可以防止任何其他线程访问该Map(独占),ConcurrentHashMap使用了一个更细化的锁机制(桶级别的锁),叫分离锁,它允许更深层次的共享访问,即任意数量的独显乘可以并发访问Map,读者和写着也可以并发访问Map,并写有限数量的写线程可以并发的修改Map,因而为并发刚问带来更高的吞吐量. ConcurrentHashMap增加的一些原子操作: 缺少即插入(key不存在的时候才插入):

1
2
3
4
5
6
public V putIfAbsent(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).put(key, hash, value, true);
    }

相等便删除(只有当key和value匹配的时候才删除):

1
2
3
4
5
6
public boolean remove(Object key, Object value) {
        int hash = hash(key.hashCode());
        if (value == null)
            return false;
        return segmentFor(hash).remove(key, hash, value) != null;
}

相等便替换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//只有当key匹配某一个值才取代
public V replace(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).replace(key, hash, value);
}

//只有当key和oldValue匹配的时候才替换
public boolean replace(K key, V oldValue, V newValue) {
        if (oldValue == null || newValue == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).replace(key, hash, oldValue, newValue);
}

CopyOnWriteArrayList

CopyOnWriteArrayList避免了在迭代器件对容器加锁.显然,每次容器改变是复制需要开销,特别是容器比较大的时候,当对容器迭代操作频率远远高于对容器的修改的频率 时候,使用写时复制容器是一个合理的选择.

BlockingQueue

阻塞队列(BlockingQueue)提供了可阻塞的put和take方法,同时也提供了可定时的offer和poll方法:

1
2
3
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

E poll(long timeout, TimeUnit unit) throws InterruptedException;

Queue的长度可以有限,也可以无限.BlockingQueue的实现包括:LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列;PriorityBlockingQueue是一个按优先级顺序排序的队列,支持自定义Comparator;SynchronousQueue根本上不是一个整整意义的队列,因为它不会为队列元素维护任何存储空间.其中每个 put 必须等待一个 take,反之亦然.

生产者-消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class FileCrawler implements Runnable {
    private final BlockingQueue<File> fileQueue;
    private final File root;

    public FileCrawler(BlockingQueue<File> fileQueue, File root) {
        this.fileQueue = fileQueue;
        this.root = root;
    }

    @Override
    public void run() {
        try {
            crawl(root);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void crawl(File root) throws InterruptedException {
        File[] files = root.listFiles();
        if (files != null && files.length > 0) {
            for (File file : files) {
                if (file.isDirectory())
                    crawl(file);
                else if (alreadyIndexed(file) == false)
                    fileQueue.put(file);
            }
        }
    }

    private boolean alreadyIndexed(File file) {
        //TODO
        return false;
    }
}

public class FileIndexer implements Runnable {

    private final BlockingQueue<File> fileQueue;

    public FileIndexer(BlockingQueue<File> fileQueue) {
        this.fileQueue = fileQueue;
    }

    @Override
    public void run() {
        try {
            while (true)
                index(fileQueue.take());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void index(File file) {
        //TODO : do index
    }
}

public class CrawlAndIndex {
    public static void main(String[] args) {
        final int MAX_QUEUE_SIZE = 1024;
        final int INDEXER_COUNT = 100;

        final File[] roots = getFiles();

        BlockingQueue<File> fileQueue = new LinkedBlockingDeque<File>(MAX_QUEUE_SIZE);

        for (File file : roots)
            new Thread(new FileCrawler(fileQueue, file)).start();

        for(int i=0; i<INDEXER_COUNT; i++)
            new Thread(new FileIndexer(fileQueue)).start();

    }

    private static File[] getFiles() {
        //TODO
        return null;
    }
}

阻塞和可中断的方法

线程可能会因为几种原因被阻塞或者暂停:等待IO操作结束,等待获的一个锁,等待从Thread.sleep()中唤醒,或者等待另外一个线程的计算结果.

当一个线程被阻塞时候,它通常被挂起,并设置乘线程阻塞的某一个状态(BLOCKED,WAITING或者TIMED_WAITING).

对于InterruptedException,通常最明智的策略是把这个InterruptedException传递给你的调用者.

恢复中断:有时还我们不能抛出InterruptedException,比如代码是Runnable的一部分,这是很,我们需要捕获InterruptedException并在当前线程中通过调用interrupt()从中断中恢复.如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TaskRunnable implements Runnable {

    private final BlockingQueue<Task> taskQueue;

    public TaskRunnable(BlockingQueue<Task> taskQueue) {
        this.taskQueue = taskQueue;
    }

    @Override
    public void run() {
        try {
            while (true)
                process(taskQueue.take());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt(); //恢复中断状态
        }
    }
}

3.Synchronizer

Synchronizer是一个对象,它根据自身状态调节线程的控制流.

阻塞队列可以扮演一个Synchronizer角色,其他类型的Synchronizer包括信号量(semaphore),关卡(barrier)以及闭锁(latch).

所有的Synchronizer都享有类似的结构特征:他们封装状态,这些庄套决定线程执行到某一点是通过还是被迫等待;他们还提供了操控状态的方法,以及高校地等到Synchronizer进入到期望状态的方法.

闭锁

闭锁可以延迟线程的进度直到进程到达终止(termnal)状态.

一个闭锁就像一道大门:在闭锁达到终点状态之前,门一致是关闭的,没有线程能够通过,终点状态到来的时候,门卡了,允许所有都通过.

一旦闭锁到达终点状态,它就不能在改变状态了,所有它会永远保持趟开状态.

闭锁可以用来确保特定活动在其他活动完成之后才发生,如: 1. 确保一个计算不会执行,直到它需要的资源都被初始化完成; 2. 确保一个服务不会开始,直到它依赖的其他服务都已经开始; 3. 瞪大,直到活动的所有部分窦唯继续处理做好准备(如CS对战中所有玩家都准备就绪);

CountDownLatch是一个闭锁实现,允许一个或多个线程等待事件集的发生.包含一个计数器,表示需要等待的事件数,countDown方法对计算器做减操作,表示一个事件已经发生,await方法等待计算机达到0,此时表示所有事件都已经发生.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
                @Override
                public void run() {
                    try {
                        task.run();
                    } finally {
                        endGate.countDown();
                    }
                }
            };
            t.start();
        }

        long start = System.nanoTime();
        endGate.await();
        long end = System.nanoTime();
        System.out.println("All Job Finish, use time(nano) = " + (end - start));

        return end - start;
    }

    public void doTask() throws InterruptedException{
        timeTasks(10, new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep((long) (1000 * Math.random()));
                    System.out.println("Finished.");
                } catch (InterruptedException e) {
                }
            }
        });
    }

FutureTask

FutureTask描述库一个抽象的可携带结果的计算.FutureTask通过Callable实现的,它等价与一个可携带结果的Runnable,并且有上那个状态:等待,运行和完成.一旦FutureTask进入完成状态,则会永远停止在这个状态上.

Future.get行为依赖与任务的状态,如果它已经完成,get可以立刻得到返回结果,否在会阻塞知道任务转入完成状态,然后返回结构或者抛出异常.

Executor框架利用FuntureTask来完成异步任务,并可以用来计算任何潜在的耗时计算,而且可以在真正要计算结构之前就启动他们开始计算.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Preload {
    private final FutureTask<BigDecimal> futureTask = new FutureTask<BigDecimal>(new Callable<BigDecimal>() {
        @Override
        public BigDecimal call() throws Exception {
            return timeConsumingWork();
        }
    });

    private final Thread thread = new Thread(futureTask);

    public void start() {
        thread.start();
    }

    private BigDecimal get(){
        try{
            return futureTask.get();    //获取数据
        }catch(ExecutionException e){
            Throwable cause = e.getCause();
            //处理各种原因的异常
            return null;
        }catch (InterruptedException e){
            return null;
        }
    }

    /**
     * 耗时的计算
     * 
     * @return
     */
    private BigDecimal timeConsumingWork() {
        // TODO
        return null;
    }
}

Callable记录的这些任务,可以抛出受检查或者未受检查的异常,无论执行任务的代码抛出什么,它都被封装为一个ExecutionException,并被Future.get重新抛出.

信号量

计算信号量(Counting semaphore)用来控制能够同时访问某特定资源的活动的数量,或者同时执行某一给定操作的数量.可以用来实现资源池或者给一个容器限定边界.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore sem;

    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<T>());
        sem = new Semaphore(bound);
    }

    public boolean add(T o) throws InterruptedException {
        sem.acquire();  //获得许可
        boolean added = false;

        try {
            added = set.add(o);
            return added;
        }finally {
            if(!added)
                sem.release();
        }
    }

    public boolean remove(Object o){
        boolean removed = set.remove(o);
        if(removed)
            sem.release();  //将许可放回资源池
        return removed;
    }
}

关卡

关卡(Barrier)类似于闭锁,不同之处在于,所有线程必须同时到达关卡点,才能继续处理.

闭锁等待是事件,关卡等待的是其他线程.

CyclicBarrier更像一个水闸, 线程执行就想水流, 在水闸处都会堵住, 等到水满(线程到齐)了, 才开始泄流.其实现协议类似于一些朋友指定的集合地点:“我们每个人6:00在麦当劳见,到了以后不见不散,之后我们再决定接下来做什么”.

CyclicBarrier允许一个给定数量的成员多次集中在一个关卡点.如果所有线程都到达了关卡点,关卡就被成功的突破,这样所有的线程都被释放,关卡会重置以备下一次使用.

Exchanger是关卡的另外一种形式.

高速缓存

为一个昂贵的函数缓存计算结果.Map中存Future而不是具体的数值,这样避免重复计算.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public interface Computable<A, V> {
    V compute(A arg) throws InterruptedException;
}

public class Memoizer<A, V> implements Computable<A, V> {

    private final ConcurrentHashMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();

    private final Computable<A, V> c;

    public Memoizer(Computable<A, V> c) {
        this.c = c;
    }

    @Override
    public V compute(final A arg) throws InterruptedException {
        while (true) {
            Future<V> f = cache.get(arg);
            if(f == null){
                Callable<V> eval = new Callable<V>() {
                    @Override
                    public V call() throws Exception {
                        return c.compute(arg);
                    }
                };

                FutureTask<V> ft = new FutureTask<V>(eval);
                f = cache.putIfAbsent(arg, ft);

                if(f == null){
                    f = ft;
                    ft.run();
                }
            }
            try{
                return f.get();
            }catch (CancellationException e){
                cache.remove(arg, f);
            }catch (ExecutionException e){
                //TODO:根据 e.getCause() 处理
            }
        }
    }
}

Java Concurrency in Practice 4 : 组合对象

第四章:组合对象 我们希望通过将线程安全的组件以完全的方式组合成更大的组件或者程序.

1. 设计线程安全的类

设计线程安全类的过程应该包含下面3个基本要素: 1. 确定对象状态是由哪些变量构成的; 2. 确定限制状态变量的不变约束; 3. 制定一个管理并发访问对象状态的策略;

2. 实例限制

即使一个对象不是线程安全的,仍然有许多技术可以让他安全的用于多线程程序. 通过使用实例限制,封装简化了类的线程安全化工作.

将数据封装在对象内部,把对数据的访问限制在对象的方法上,更容易确保线程在访问数据时候总能获得正确的锁.

如下,HashSet是非线程安全的,但是由于mySet是private,不会逸出,因此HashSet被限制在PersonSet中,唯一可以访问mySet的代码路径是addPerson与containPerson,执行他们时都要获得PersonSet的锁.因而确保了PersonSet是线程安全的.

1
2
3
4
5
6
7
8
9
10
11
public class PersonSet {
    private final Set<Person> mySet = new HashSet<Person>();

    public synchronized void addPerson(Person p){
        mySet.add(p);
    }

    public synchronized boolean containPerson(Person p){
        return mySet.contains(p);
    }
}

java类库中有很多这样的例子.ArrayList和HashMap这样的基本容器类是非线程安全的,但是类库提供了包装器工厂方法,如:

1
2
3
Collections.synchronizedList(arrayList);
Collections.synchronizedMap(hashMap);
Collections.synchronizedSet(hashSet);

使得这些非线程安全的类可以安全的用于多线程环境.这些工厂方法的原理是使用Decorator模式,使用一个同步的包装器对象保证这些非线程安全的容器,将相关接口实现为同步方法,并将请求转发打下层容器对象上:

1
2
3
4
5
6
7
8
class SynchronizedCollection<E> implements Collection<E>, Serializable {
    final Collection<E> c;  // Backing Collection
    final Object mutex;     // Object on which to synchronize
    ...
    public boolean add(E e) {
        synchronized(mutex) {return c.add(e);}
    }
    ...

3. 委托线程安全

java中CopyOnWrite系列(CopyOnWriteArrayList,CopyOnWriteArraySet),CopyOnWriteArrayList是ArrayList的线程安全版本,即实现了写时复制的ArrayList版本,写时复制即每次写操作都会触发一个复制(深拷贝)的过程,如CopyOnWriteArrayList的add操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    ...

    public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
    }
}

Collections中的Unmodifiable系列,包含UnmodifiableList,UnmodifiableMap,UnmodifiableSet,UnmodifiableSortedMap,UnmodifiableSortedSet,器特性是不支持对容器内数据的修改,比如包含UnmodifiableList:

1
2
3
4
5
6
7
8
9
static class UnmodifiableList<E> extends UnmodifiableCollection<E>
                      implements List<E> {
        static final long serialVersionUID = -283967356065247728L;
    final List<? extends E> list;
    ...
    public E set(int index, E element) {
        throw new UnsupportedOperationException();
    }
}

4. 向已有的线程安全类添加功能

  1. 可以直接扩展原始类的代码
  2. 通过继承来增加功能
  3. 扩展功能,而不是扩展类本身,将扩展代码置于一个助手类;
  4. 组合

Collections的synchronized系列容器,包括synchronizedList,synchronizedMap,synchronizedSet,synchronizedSortedMap,synchronizedSortedSet,是普通容器的线程安全版本版本,实现原理即每个操作上面都加锁:

1
2
3
4
5
static class SynchronizedCollection<E> implements Collection<E>, Serializable {
    final Collection<E> c;  // Backing Collection
    final Object mutex;     // Object on which to synchronize
    ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
static class SynchronizedList<E>
    extends SynchronizedCollection<E>
    implements List<E> {
        static final long serialVersionUID = -7754090372962971524L;

    final List<E> list;

    public E get(int index) {
        synchronized(mutex) {return list.get(index);}
        }
    public E set(int index, E element) {
        synchronized(mutex) {return list.set(index, element);}
    }
}

和Vector类似,但是Vector通过给所有方法级别加上synchronized关键字修饰来实现所以线程安全.

助手类达到扩展功能的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ListHelper<E> {
    public List<E> list = Collections.synchronizedList(new ArrayList<E>());

    public boolean putIfAbsent(E x){
        synchronized (list) {
            boolean absent = !list.contains(x);
            if(absent)
                list.add(x);

            return absent;
        }
    }
}

这种方式的问题是,将加锁的代码分布到对象继承体系中的多个类中,即类的行为和基类实现之间存在耦合.

向已有的线程安全类添加原子操作,更健壮的选择是组合.示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ImprovedList<T> implements List<T> {
    private final List<T> list;

    public ImprovedList(List<T> list){
        this.list = list;
    }

    public synchronized boolean putIfAbsent(T x){
        boolean absent = !list.contains(x);
        if(absent)
            list.add(x);
        return absent;
    }

    @Override
    public synchronized void clear() {
        list.clear();
    }
    //其他list方法的代理
}

Closest Pair

Problem:

Given a set of points in a two dimensional space, you will have to find the distance between the closest two points.

示例:

input

1
2
3
4
5
0 2
6 67
43 71
39 107
189 140

output

1
36.2215

Solutions:

1. Brute force

计算全部n(n-1)/2对点对,找到最近的点对.

时间复杂度:O(n2)

空间复杂度:O(1)

2. Divide and conquer

步骤如下: 1. sort 按照x坐标(y坐标也可以)从小达到排序,时间复杂度O(nlogn); 2. divide 找到中间点q,用q将所有点P所有的点划分为左右两个部分PL和PR; 3. conquer 递归计算PL和PR这两个点集的最近距离,二者中的较小值记为d; 4. combine 取PL和PR中x坐标与点q的x坐标距离小于d的所有点(记为PM,即为一个宽度为2d的竖条带),将PM按照y坐标排序;遍历PM,在y坐标差值小于d的情况下计算距离;

其中1 2 3步骤很明显,步骤4技巧比较多:

  1. 最多6个点满足y坐标差值小于d,就是说我们的遍历PM在O(k)时间内搞定(k为PM点的数目),证明见 参考:http://www.cs.mcgill.ca/~cs251/ClosestPair/proofbox.html
  2. 计算距离,不用计算sqrt((x1-x2)(x1-x2) + (y1-y2)(y1-y2)),取sqrt内的值比较就可以;
  3. 按照y值排序PM,这是combine的性能瓶颈,我们可以在递归时候做merge排序,避免每次combine时候都重新排序的代价,因此combine的代价为O(n);

总的时间代价:

1
T(n) = 2T(n/2) + O(n),即T(n)=O(nlogn)

参考代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import com.google.common.primitives.Doubles;
import java.awt.*;
import java.awt.geom.Point2D;
import java.util.Arrays;
import java.util.Comparator;

public class ClosestPair {

    // 最近的点对及其距离
    private Point2D best1, best2;
    private double bestDistance = Double.POSITIVE_INFINITY;

    //x排序器
    private Comparator<Point2D> X_ORDER = new Comparator<Point2D>() {
        @Override
        public int compare(Point2D o1, Point2D o2) {
            return Doubles.compare(o1.getX(), o2.getX());
        }
    };

    public ClosestPair(Point2D[] points) {
        int N = points.length;
        if (N <= 1) return;

        //按照x排序
        Point2D[] pointsByX = Arrays.copyOf(points, N);
        Arrays.sort(pointsByX, X_ORDER);

        // 检查重合的点
        for (int i = 0; i < N-1; i++) {
            if (pointsByX[i].equals(pointsByX[i+1])) {
                bestDistance = 0.0;
                best1 = pointsByX[i];
                best2 = pointsByX[i+1];
                return ;
            }
        }

        // 用于按照y排序(这里还没排序)
        Point2D[] pointsByY = Arrays.copyOf(pointsByX, N);

        // 辅助数组
        Point2D[] aux = new Point2D[N];

        closest(pointsByX, pointsByY, aux, 0, N-1);
    }

    /**
     * 找pointsByX[lo..hi]中的最近点对
     * @param pointsByX : 按照x坐标排序好的点
     * @param pointsByY
     * @param aux : 辅助数组
     * @param lo : 最小下标
     * @param hi : 最大下标
     * @return
     */
    private double closest(Point2D[] pointsByX, Point2D[] pointsByY, Point2D[] aux, int lo, int hi) {
        if (hi <= lo) return Double.POSITIVE_INFINITY;

        // 中间点
        int mid = (lo + hi) >> 1;
        Point2D median = pointsByX[mid];

        // 递归求解左右子数组的最近点对
        double d = Math.min(closest(pointsByX, pointsByY, aux, lo, mid),
                closest(pointsByX, pointsByY, aux, mid+1, hi));

        // merge pointsByY[lo,mid]和pointsByY[mid+1, hi], 实现按照y坐标排序
        merge(pointsByY, aux, lo, mid, hi);

        // 将按照y排序好的点, 和中间点距离小于d的存在辅助数组中,即为宽度2d的中间条带
        int M = 0;
        for (int i = lo; i <= hi; i++) {
            if (Math.abs(pointsByY[i].getX() - median.getX()) < d)
                aux[M++] = pointsByY[i];
        }

        // 比较中间条带内的点
        for (int i = 0; i < M; i++) {
            // a geometric packing argument shows that this loop iterates at most 7 times
            for (int j = i+1; (j < M) && (aux[j].getY() - aux[i].getY() < d); j++) {
                double distance = aux[i].distance(aux[j]);
                if (distance < d) {
                    d = distance;
                    if (distance < bestDistance) {
                        bestDistance = d;
                        best1 = aux[i];
                        best2 = aux[j];
                    }
                }
            }
        }
        return d;
    }

    /**
     * 利用辅助数组aux[lo .. hi]将a[lo .. mid] 和 a[mid+1 ..hi]合并,
     * 保证字数组a[lo .. mid]和a[mid+1 ..hi]都是有序
     * 排序准则为y坐标,稳定
     * @param a : 待合并数组
     * @param aux : 辅助数组
     * @param lo
     * @param mid
     * @param hi
     */
    private static void merge(Point2D[] a, Point2D[] aux, int lo, int mid, int hi) {
        // 复制到辅助数组
        for (int k = lo; k <= hi; k++) {
            aux[k] = a[k];
        }

        // merge 回 a[]
        int i = lo, j = mid+1;
        for (int k = lo; k <= hi; k++) {
            if      (i > mid)              a[k] = aux[j++];
            else if (j > hi)               a[k] = aux[i++];
            else if (aux[j].getY() < aux[i].getY()) a[k] = aux[j++];
            else                           a[k] = aux[i++];
        }
    }

    public Point2D either() { return best1; }
    public Point2D other()  { return best2; }
    public double distance() { return bestDistance; }

    public static void main(String[] args) {
        int N = 5;
        Point2D[] points = new Point2D[]{
                new Point(0, 2),
                new Point(6, 67),
                new Point(43, 71),
                new Point(39, 107),
                new Point(189, 140)
        };
        ClosestPair closest = new ClosestPair(points);
        System.out.println(closest.distance() + " from " + closest.either() + " to " + closest.other());
    }
}