learning, progress, future.

skydh


  • 首页

  • 归档

redis分布式锁

发表于 2018-09-03

什么是分布式锁

  一个重要资源被多个jvm进程竞争,会发生数据安全问题,对于分布式系统来说,多个微服务同时竞争一个资源时,就会产生如上问题。对于该问题,我们的方案是用分布式锁来锁住该数据。

使用redis来分布式锁。

  分布式锁的本质是在redis里面占一个坑,当别的进程也要占时,却占不了了,只能等待,或者放弃。
  命令如下setnx(set if not exists)只允许一个客户端占坑,然后del 则是删除这个锁。

问题

  1.由于种种原因,比如程序执行到中间出了bug,导致这个del指令没有被调用,这样就会陷入死锁。

  2.于是我们队这个setnx加了过期时间限制,比如setnx lock true; expire lock 5,使得过5秒自动过期。然后再删除。但是存在问题,当setnx和expire之间服务器突然挂掉了,会导致expire得不到执行,从而继续死锁。如果用redis事务来处理也不行,因为当setnx没有抢到锁时,expire是不该被执行的。而这个redis事务里面没有ifelse判断语句。后续redis2.8版本该作者加入了set指令的扩展参数,是的setnx和expire可以一起执行形成一个原子操作。彻底解决了这个问题。

  3.存在超时问题:当一个进程获取到锁后,由于逻辑执行部分太长,以至于超出了锁的超时限制。
  
  那就出现了问题,因为第二个进程获取到了这个锁,接着第一个线程执行了业务逻辑,于是就释放了这个锁,那么第三个进程就会获取到这个锁。
   
  为了避免这个问题,redis分布式锁,一般不用于较长时间的任务,如果真的出现了,那会很麻烦。为了避免第一个进程删除第二个进程锁的问题。我们可以在加锁前,设置一个随机数,释放锁的时候就进行判断是否需要删除这个锁。这样就保证了自己删自己的锁。但是匹配value和删除可以不是一个原子操作,这就需要lua脚本处理了,因为lua脚本可以保证多个命令是原子操作的。可以将匹配和删除放在一起。

  4.主从集群问题,当主节点挂掉了,从节点变成了主节点,但是从节点没有锁,其他进程就会请求加锁成功。为了解决这个问题,有些开源的library对其做了良好的封装。用户可以拿来就用。比如redlock-py,加锁时,它会向过半节点发送加锁命令,释放锁的时候,则是删除所有节点信息

代码如下

  获取锁。要让其加锁和释放锁的是同一个线程,因此给这个key加了valu.第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作.

  /**
 * 尝试获取分布式锁
 * @param jedis Redis客户端
 * @param lockKey 锁
 * @param requestId 请求标识
 * @param expireTime 超期时间
 * @return 是否获取成功
 */
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

    String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

    if (LOCK_SUCCESS.equals(result)) {
        return true;
    }
    return false;

}

  释放锁。先判断value是否一致,看看自己是不是自己加的锁,让自己释放自己的锁。判断和删除要保证原子性,因此要使用lua脚本保证其原子性。

 /**
 * 释放分布式锁
 * @param jedis Redis客户端
 * @param lockKey 锁
 * @param requestId 请求标识
 * @return 是否释放成功
 */
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

    if (RELEASE_SUCCESS.equals(result)) {
        return true;
    }
    return false;

}

栈和队列

发表于 2018-07-11

  手动实现基于java栈和队列。
  栈:

package base_struct.hashmap;
public class DhStack <K>{
private class Data<K>{
    public K k;
    public Data(K k){
        this.k=k;
    }
}
private int size;
private int offet;
private Data<K>[] data;

public DhStack(int a)
{
    data=new Data[a];
    size=a;
    offet=0;    
}
public synchronized void put(K k)
{
    Data<K> temp=new Data<K>(k);
    data[offet]=temp;
    offet++;    
}
public synchronized K get()
{
    offet=offet-1;
    if(offet<0)
    {
        return null;
    }
    K a=data[offet].k;

    return  data[offet].k;
}
}

  对列:

public class DhQueue<K> {
private class Data<K>{
    public K k;
    public Data<K> next;
    public Data(K k){
        this.k=k;
    }
}    
/*
 * 限制队列大小的参数
 */
private int size;
private int offet;
private Data<K> dataLinkedList;
public DhQueue(int a)
{

    size=a;


}
public synchronized void put(K k)
{
    if(offet>=size-1)
    {
        throw new IllegalArgumentException();
    }
    Data<K> data=new Data<K>(k);


    Data<K> temp=dataLinkedList;
    if(dataLinkedList==null)
    {
        dataLinkedList=data;
    }else{
        while (temp.next != null) {

            temp = temp.next;
        }
        temp.next=data;
    }
    offet++;


}
public synchronized K get()
{
    if(offet<=0)
    {
        return null;
    }
    K a=dataLinkedList.k;
    dataLinkedList=dataLinkedList.next;
    offet=offet-1;
    return a;    
}
}

concurrenthashmap

发表于 2018-07-10

简介

  1.7的ConcurrentHashMap最重要的一点就是Segment这个概念,每一个Segment就是一个小的HashMap,可以将这个理解为2级哈希表,一个总的Segment数组,每个数组里面存放一个类似Hashmap的结构,而加锁则是对Segment进行加锁。
Segment继承自ReentrantLock,所以我们可以很方便的对每一个Segment上锁
  1.8的则抛弃其冗余的设计,采用Node + CAS + Synchronized来保证并发安全进行实现,分析下其如何实现的吧。
  点到ConcurrentHashMap的put方法里面,分析下源码:

if (tab == null || (n = tab.length) == 0)
            tab = initTable();

  当数组为空时,初始化数组。

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

  初始化部分全部使用了CAS来完成线程安全。

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
           if (casTabAt(tab, i, null,
                        new Node<K,V>(hash, key, value, null)))
               break;                   // no lock when adding to empty bin
       }

  如果相应位置的Node还未初始化,则通过CAS插入相应的数据
  然后判断其数组不为空,那么就对这个节点加锁,然后插入数据

synchronized (f) {
               if (tabAt(tab, i) == f) {
                   if (fh >= 0) {
                       binCount = 1;
                       for (Node<K,V> e = f;; ++binCount) {
                           K ek;
                           if (e.hash == hash &&
                               ((ek = e.key) == key ||
                                (ek != null && key.equals(ek)))) {
                               oldVal = e.val;
                               if (!onlyIfAbsent)
                                   e.val = value;
                               break;
                           }
                           Node<K,V> pred = e;
                           if ((e = e.next) == null) {
                               pred.next = new Node<K,V>(hash, key,
                                                         value, null);
                               break;
                           }
                       }
                   }
                   else if (f instanceof TreeBin) {
                       Node<K,V> p;
                       binCount = 2;
                       if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                      value)) != null) {
                           oldVal = p.val;
                           if (!onlyIfAbsent)
                               p.val = value;
                       }
                   }
               }
           }

  判断,链表就用链表的方式添加,红黑树就用红黑树的方式添加。

if (binCount != 0) {
               if (binCount >= TREEIFY_THRESHOLD)
                   treeifyBin(tab, i);
               if (oldVal != null)
                   return oldVal;
               break;
           }

如果大于了一个阈值,就将链表转换为数组。
嗯就这样,也就是说,1.8的HashMap和ConcurrentHashMap都是大致一样的数据结构,只是其中是否加锁。

hashmap

发表于 2018-07-10

基本信息

  HashMap可以接受null键值和值,而Hashtable则不能;HashMap是非synchronized;HashMap很快;以及HashMap储存的是键值对。

注意点

  1.当两个对象的hashcode相同会发生什么?
  因为hashcode相同,所以它们的bucket位置相同,‘碰撞’会发生。因为HashMap使用链表存储对象,这个Entry(包含有键值对的Map.Entry对象)会存储在链表中。
  2.如果两个键的hashcode相同,你如何获取值对象?
  找到bucket位置之后,会调用keys.equals()方法去找到链表中正确的节点,最终找到要找的值对象。
  3.如果HashMap的大小超过了负载因子(load factor)定义的容量,怎么办?
  默认的负载因子大小为0.75,也就是说,当一个map填满了75%的bucket时候,和其它集合类(如ArrayList等)一样,将会创建原来HashMap大小的两倍的bucket数组,来重新调整map的大小,并将原来的对象放入新的bucket数组中。这个过程叫作rehashing,因为它调用hash方法找到新的bucket位置。
  4.为什么String, Interger这样的wrapper类适合作为键?
  String, Interger这样的wrapper类作为HashMap的键是再适合不过了,而且String最为常用。因为String是不可变的,也是final的,而且已经重写了equals()和hashCode()方法了。其他的wrapper类也有这个特点。不可变性是必要的,因为为了要计算hashCode(),就要防止键值改变,如果键值在放入时和获取时返回不同的hashcode的话,那么就不能从HashMap中找到你想要的对象。不可变性还有其他的优点如线程安全。如果你可以仅仅通过将某个field声明成final就能保证hashCode是不变的,那么请这么做吧。因为获取对象的时候要用到equals()和hashCode()方法,那么键对象正确的重写这两个方法是非常重要的。如果两个不相等的对象返回不同的hashcode的话,那么碰撞的几率就会小些,这样就能提高HashMap的性能。

简单实现一个hashmap

package base_struct.hashmap;

/**
     * 基于链表数组的实现
     * @author dh
     * @param <V>
     * @param <K>
     *
*/
public class DhHashMap<K, V> {
private class Entry<K,V>{
    int hash;
    K key;
    V value;
    Entry<K,V> next;
    Entry(int hash, K key, V value, Entry<K, V> next) {
        this.hash = hash;
        this.key = key;
        this.value = value;
        this.next = next;
    }
}
private static final int DEFAULT_CAPACITY = 1 << 4;

private Entry<K, V>[] table;

private int capacity;

private int size;
public DhHashMap(int capacity) {
    if (capacity < 0) {
        throw new IllegalArgumentException();
    } else {
        table = new Entry[capacity];
        size = 0;
        this.capacity = capacity;
    }
}
public int size() {
    return size;
}
public boolean isEmpty() {
    return size == 0 ? true : false;}

/**
 * 这个hash算法采用位运算提高了效率,由于采用位运算,因此默认长度最好是2的幂最好
 * @param key
 * @return
 */
private int hash(Object key) {
    return (key == null) ? 0 : key.hashCode()&(capacity-1);
}

/**
 * 按照以前邏輯,默认key一样则只修改value,
 * @param key
 * @param value
 */
public void put(K key, V value) {
    if (key == null) {
        throw new IllegalArgumentException();
    }
    int hash = hash(key);
    Entry<K, V> nEntry = new Entry<K, V>(hash, key, value, null);
    Entry<K, V> entry = table[hash];
    while (entry != null) {
        if (entry.key.equals(key)) {
            entry.value = value;
            return;
        }
        entry = entry.next;
    }
    nEntry.next = table[hash];
    table[hash] = nEntry;
    size++;
}
/**
 * h很简单的逻辑,找到确认的hash值后,一个个遍历
 * @param key
 * @return
 */
public V get(K key) {
    if (key == null) {
        throw new IllegalArgumentException();
    }
    int hash = hash(key);
    Entry<K, V> entry = table[hash];
    while (entry != null) {
        if (entry.key.equals(key)) {
            return entry.value;
        }
        entry = entry.next;
    }
    return null;
}
}

>>>操作符

 这个操作符的作用是将当前整数,转换为2进制后,右移。比如
2>>>1,就是10右移1位剩下了一个1,就是1。
  案例如下:
public static void main(String [] args)
{
System.out.println(4>>> 1);
}
  结果是2.
public static void main(String [] args)
{
System.out.println(4>>> 2);
}
  结果是1

1.8增加了对其优化。

  其原因是hash的的平均分布可能有问题,有可能导致链表过长,从而使得效率变的很低。因此加了一个新的结构红黑树,如果说1.8之前的hashMap是数组+链表,那么现在版本的就是数组+链表+红黑树。当链表长度<8时,默认以前的方式加元素,也就是数组+链表,当链表>8时则将链表转换为红黑树。(红黑树是一个自平衡的二叉查找树,其插入删除时为了保持红黑树的平衡特征,回自旋来保证,可以将其理解为一个具有自平衡的查找2叉树)

hashmap线程不安全

  hashmap没有做加锁处理,举个简单例子,2个线程插入一个数据,其hashcode都是一样的,在极端情况下,可能导致插入少了一个数据。

java 1.8源码分析

  这里只分析下put方法,get方法很简单,自己去看看。突然觉得put方法也很简单。

  final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
               boolean evict) {
    Node<K,V>[] tab; Node<K,V> p; int n, i;
    if ((tab = table) == null || (n = tab.length) == 0)
        n = (tab = resize()).length;
    if ((p = tab[i = (n - 1) & hash]) == null)
        tab[i] = newNode(hash, key, value, null);
    else {
        Node<K,V> e; K k;
        if (p.hash == hash &&
            ((k = p.key) == key || (key != null && key.equals(k))))
            e = p;
        else if (p instanceof TreeNode)
            e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
        else {
            for (int binCount = 0; ; ++binCount) {
                if ((e = p.next) == null) {
                    p.next = newNode(hash, key, value, null);
                    if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                        treeifyBin(tab, hash);
                    break;
                }
                if (e.hash == hash &&
                    ((k = e.key) == key || (key != null && key.equals(k))))
                    break;
                p = e;
            }
        }
        if (e != null) { // existing mapping for key
            V oldValue = e.value;
            if (!onlyIfAbsent || oldValue == null)
                e.value = value;
            afterNodeAccess(e);
            return oldValue;
        }
    }
    ++modCount;
    if (++size > threshold)
        resize();
    afterNodeInsertion(evict);
    return null;
}

  这里我直接说下流程:

  • 先判断table是否为空,为空,则创建table。
  • 在判断key所在table的位置是否为null,空的话直接创建链表node。否则进入else。

  • else里面先判断这个链表node的key是否和要put的key一样,一样则覆盖

  • 在判断是否是树节点,是的话,插入树节点的方式插入数据。
  • 否则,遍历链表,要么找到一样的key覆盖,要么创建新的节点,如果是创建节点,那么再判断这个链表长度是否>=8,是的话则将链表转换为红黑树。
  • 再然后,覆盖的已经返回了,创建新节点的,走下一步,hashmap的size加一。在判断是否大于限定值,大于的话则是则扩张table。
  • 好了 over,,,
 /** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

  这个是ConcurrentHashMap的put源码。

  • 先根据key计算出位置,然后这个位置为null则cas看看可否插入值成功。
  • 判断是否需要扩容
  • 加锁写数据,和hashmap类似

CountDownLatch 用法

发表于 2018-07-09
package test;
import java.util.concurrent.CountDownLatch;
public class Test {
 public static void main(String[] args) {   
     final CountDownLatch latch = new CountDownLatch(2);
     new Thread(){
         public void run() {
             try {
                 System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
                Thread.sleep(3000);
                System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
         };
     }.start();

     new Thread(){
         public void run() {
             try {
                 System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
                 Thread.sleep(3000);
                 System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
                 latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
         };
     }.start();

     try {
         System.out.println("等待2个子线程执行完毕...");
        latch.await();
        System.out.println("2个子线程已经执行完毕");
        System.out.println("继续执行主线程");
            } catch (InterruptedException e) {
        e.printStackTrace();
        }
     }
}

  对于这个类来说,这就是个计数器,设置了一个值,这个值不为0则调用其await()方法时,其线程将持续等待。这个功能的作用就是在一个方法里面可以异步的启动一个线程让异步从而加快运行。

spring mvc 流程

发表于 2018-07-02

什么是spring mvc

  Spring MVC本质上还是一个Servlet,他封装了一套通用的方案,使得我们开发时只需要写核心的业务逻辑,不需要写Servlet。而Servlet也是对底层Socket的封装,使得我们开发变得简单。

一个简单mvc 流程

  • 首先浏览器向服务器发送一个请求。
  • 服务器接受到连接创建一个socket线程和浏览器建立连接,紧接着创建request,reponse对象,然后交接给Servlet容器。
  • 打断下,预先知识,DispatcherServlet类继承实现 FrameworkServlet抽象类, FrameworkServlet抽象类继承HttpServlet抽象类。HttpServlet抽象类最终实现Servlet接口,因此最顶级的接口是Servlet接口。
  • 下面说的方法本质上都是DispatcherServlet类的,然后进入service方法( FrameworkServlet抽象类实现的),然后调用父类的service方法(HttpServlet),在这个方法里面从HttpServletRequest对象里面获取到http请求方式,比如get,post等,在这些方法里面。统一调用了processRequest方法(FrameworkServlet抽象类实现的)在这个方法里面,LocaleContext,ServletRequestAttributes这2个从HttpServletRequest对象取出来,放到其上下文里面去。
  • 然后进入doService方法,在这个方法里面设置了诸如上下文,等信息到request对象里面,然后请求传递走到doDispatch方法里面了。
  • 先调用checkMultipart方法判断是否上传请求。然后调用getHandler方法来获取handle,进入RequestMappingHandlerMapping.getHandler方法来获取handle.

Elasticsearch入门

发表于 2018-06-25

Elasticsearch 和 solr

  Elasticsearch更加精简,大多高级功能是依靠插件完成的。而solr很全面,包含了大部分的高级功能,可以从下载大小就可以看出来。
  2者都是基于Lucene这个基于java的搜索类库。
  根据文档(未测试)Elasticsearch的实时搜索效率明显高于solr。
  由于小且精简,因此入门简单。

Elasticsearch 安装

  本次安装基于Windows10。
  官网下载:https://www.elastic.co/downloads/elasticsearch。
  下载的zip包,下载解压,然后运行bin\elasticsearch.bat这个Windows批处理文件。
使用下面命令启动es,你可以运行多次这个命令,只要node.name不一样,cluster.name一样,那么就是一个集群。

elasticsearch.bat -E node.name=node2 -E cluster.name=dh -E path.data=node2  -d

  然后打开浏览器输入 http://localhost:9200/。看看是否返回了有效信息。
  安装node.js,grunt(网上搜索)。
  下载head插件。https://github.com/mobz/elasticsearch-head
  修改Elasticsearch的配置文件config/elasticsearch.yml
  修改一下ES的监听地址,这样别的机器也可以访问
  network.host: 0.0.0.0
  增加新的参数,这样head插件可以访问es
  http.cors.enabled: true
  http.cors.allow-origin: “*”
  然后在下载的head插件的解压文件夹的根路径下,运行,npm install 安装其所依赖的各种插件。
  最后启动 grunt server。访问http://localhost:9100/
  安装完毕。

ps:kabana,logstash配合使用时版本必须一致。

健康颜色

绿色就是在这个集群上,可以分配所有的主分片和副本分片
黄色,意味着只能分配主分片
红色,意味着主分片也无法分配

面向文档(Nosql)

  Elasticsearch 是 面向文档 的,意味着它是以文档来进行存储数据的。Elasticsearch 不仅存储文档,而且能索引每个文档的内容使之可以被检索。在 Elasticsearch 中,你是对文档进行索引、检索、排序和过滤–而不是对行列数据(关系数据库)。这是一种完全不同的思考数据的方式,也是 Elasticsearch 能支持复杂全文检索的原因。

索引

  在Elasticsearch里面,索引有很多个意思。
  1.名词:一个索引就是类似于关系数据库的一个数据库,是一个存储关系型文档的地方。
  2.动词:索引一个文档就是存储一个文档到索引(名词)中以便于它可以被检索和查询到,类似于关系数据库的insert关键字。
  3.倒排索引:如果说关系数据库的索引的数据结构是B+树,那么Elasticsearch采用了一个叫倒排索引的结构达到相同的目的。

创建和基本查询

  打开Elasticsearch的head插件,然后连接Elasticsearch。在复合查询中,输入url http://localhost:9200/megacorp/employee/3/,提交方式选择put,内容:{}(json)。即可创建一个索引文档。其中 http://localhost:9200 是Elasticsearch的地址。而megacorp上上面提到的名词索引。employee是类型。3为该数据的特定id。
  插入成功后,可以GET /megacorp/employee/3 查询出你插入的数据。
  往里面多插入几条数据,以便于学习测试。
   /megacorp/employee/_search 这个就是查出这个索引这个类型的所有数据。
  在head这个插件的复合查询里面,有2个行,第一行就是输入上面的url,下面的行,加入查询参数:?q=last_name:Smith
  后面有更加复杂的ELS查询:
  http://localhost:9200/
  megacorp/employee/_search/
  post
  {
“query” : {
“bool”: {
“must”: {
“match” : {
“last_name” : “smith”
}
},
“filter”: {
“range” : {
“age” : { “gt” : 30 }
}
}
}
}
}
  也可以用上面的方式(match)实现全文检索。结果会按照匹配度按顺序排下来。但是想要不全文检索,具体查询,那么就用这个(match_phrase)

文档的更新

在 Elasticsearch 中文档是 不可改变 的,不能修改它们。 相反,如果想要更新现有的文档,需要 重建索引 或者进行替换,在内部,Elasticsearch 已将旧文档标记为已删除,并增加一个全新的文档。 尽管你不能再对旧版本的文档进行访问,但它并不会立即消失。当继续索引更多的数据,Elasticsearch 会在后台清理这些已删除文档。删除逻辑也是。这点和mysql的删除很类似,都是逻辑删除!但是意义不一样~~~

冲突处理

  当多个端同时处理一个文档的值的时候,可能出现值丢失的情况。因此我们需要加锁,就像对mysql的处理一样。有2个方式,第一个就是悲观加锁,要获取这个数据,那么你要先获得锁,但是效率存在问题,第二个就是用乐观锁,类似java原子类的CAS的思想,我假设你没有变化。有变化,我就重新获取值进行处理,而我们的文档存在一个叫version的字段,可以用来判断。我们可以通过api来指定version来确认。

PUT /website/blog/1?version=1 
{
  "title": "My first blog entry",
  "text":  "Starting to get the hang of this..."
}

  当然大部分情况下,我们使用mysql作为主要的数据存储,而Elasticsearch主要作为检索用的,如果多个进程进行数据同步,就会出现那个问题。我们在数据库里面就有了版本号,那么我们也可以用这个版本号作为判断条件,是否更新。

请求合并

  es可以将多个请求合并成一个,避免单独处理每个请求花费的网络延时和开销。 如果你需要从 Elasticsearch 检索很多文档,那么使用 multi-get 或者 mget API 来将这些检索请求放在一个请求中,将比逐个文档请求更快地检索到全部文档。

_mget
{
       "docs" : [
  {
     "_index" : "website",
     "_type" :  "blog",
     "_id" :    2
  },
  {
     "_index" : "website",
     "_type" :  "pageviews",
     "_id" :    1,
     "_source": "views"
  }
       ]
}

分布式存储

  我们知道我们一个索引对应多个分片,每个分片又有多个副本,那么我们索引一个文档时,文档会被存储到主分片里面,但是应该存到具体哪个分片里面呢?是有个hash算法,根据id计算出一个值,然后对分片数取模,即可。

  以下是在主副分片和任何副本分片上面 成功新建,索引和删除文档所需要的步骤顺序:
  1.客户端向 Node 1 发送新建、索引或者删除请求。
  2.节点使用文档的 _id 确定文档属于分片 0 。请求会被转发到 Node 3,因为分片 0 的主分片目前被分配在Node 3 上。
  3.Node 3 在主分片上面执行请求。如果成功了,它将请求并行转发到 Node 1 和 Node 2 的副本分片上。一旦所有的副本分片都报告成功, Node 3 将向协调节点报告成功,协调节点向客户端报告成功。

  以下是从主分片或者副本分片检索文档的步骤顺序:

  1、客户端向 Node 1 发送获取请求。

  2、节点使用文档的 _id 来确定文档属于分片 0 。分片 0 的副本分片存在于所有的三个节点上。 在这种情况下,它将请求转发到 Node2 。

  3、Node 2 将文档返回给 Node 1 ,然后将文档返回给客户端。

  在处理读取请求时,协调结点在每次请求的时候都会通过轮询所有的副本分片来达到负载均衡。

  在文档被检索时,已经被索引的文档可能已经存在于主分片上但是还没有复制到副本分片。 在这种情况下,副本分片可能会报告文档不存在,但是主分片可能成功返回文档。 一旦索引请求成功返回给用户,文档在主分片和副本分片都是可用的。

  以下是部分更新一个文档的步骤:

  1.客户端向 Node 1 发送更新请求。
  2.它将请求转发到主分片所在的 Node 3 。
  3.Node 3 从主分片检索文档,修改 _source 字段中的 JSON ,并且尝试重新索引主分片的文档。 如果文档已经被另一个进程修改,它会重试步骤 3 ,超过 retry_on_conflict 次后放弃。
  4.如果 Node 3 成功地更新文档,它将新版本的文档并行转发到 Node 1 和 Node 2 上的副本分片,重新建立索引。 一旦所有副本分片都返回成功, Node 3 向协调节点也返回成功,协调节点向客户端返回成功。

集群

  Elasticsearch可以垂直扩展(购买性能更好的机器),也可以水平扩展(扩展多个节点),一个运行中的Elasticsearch实例就是一个节点,具有多个相同cluster.name配置的节点组成。其共同承担数据和负载的压力。
  集群有个叫集群健康的值,3个状态。
  green:所有主分片和副分片都正常。
  yellow:所有主分片正常,但是副分片有的不正常。
  red:存在主分片不正常。
  我们上面提到的名词索引实际上指向一个或者多个分片。一个分片是底层的工作单元,保存了全部数据的一部分,Elasticsearch利用分片将数据分发到集群各处的,分片是数据的容器,文档保存在分片中。分片分配到集群的各个节点里面。
  一个分片可以是主分片和副本分片,索引内的任何一个文档都是属于主分片的,所以主分片决定了索引能保存的最大数据量。副本分片是主分片的copy,为了保障数据安全的。
  水平扩容后,分片后自动重新分配的。
  一个Elasticsearch实例就是一个节点。一个索引默认五个主分片,五个副分片。
  当你只有一个节点的时候,创建一个索引的集群健康值一定是黄色。因为副本分片都是 unassigned —— 它们都没有被分配到任何节点。 在同一个节点上既保存原始数据又保存副本是没有意义的,因为一旦失去了那个节点,我们也将丢失该节点上的所有副本数据。当你再启动一个名字一样的节点的时候,集群健康值就是绿色了。
  然后我们在启动第三个节点,发现一个索引的10个分片均匀分配到3个节点里面了。
因此我们发现Elasticsearch的动态扩展能力极强。
  写操作我们只能通过主分片进行,读操作我们可以从主副分片都可以。当我们创建的节点吵过10个后,我们可以将副本重新设置,来获取更大的并发量。

java 操作 Elasticsearch

  代码如下:
  maven依赖:

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>6.3.0</version>
</dependency>

  代码:

import java.net.InetAddress;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
public class TestEsClient {
    public static void main(String[] args) {
        try {
            Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
            // 创建client
            TransportClient client = new PreBuiltTransportClient(settings)
                    .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
            // 搜索数据
            GetResponse response = client.prepareGet("megacorp", "employee", "1").execute().actionGet();
            // 输出结果
            System.out.println(response.getSourceAsString());
            // 关闭client
            client.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  结果输出:

{"first_name":"John","last_name":"Smith","age":25,"about":"I love to go rock climbing","interests":["sports","music"]}

小结

  大致这样,具体要用的时候细看吧。ps:关于java代码操作Elasticsearch,要切记,maven版本和Elasticsearch版本不能差距太大哦,不然是连不上的

BigDecimal精度问题

发表于 2018-06-19

介绍

  1.商业计算使用BigDecimal。
  2.使用参数为String的构造函数。
  3.BigDecimal都是不可变的,每一步的运算时,都会产生一个新的对象。所以在做加减乘除后千万要保存操作后的值。

案例

  代码1:

public class Test001 {
    public static void main(String args[]) {
        BigDecimal a = new BigDecimal(1.5);
        BigDecimal a1 = new BigDecimal(329.530);
        System.out.println(a.multiply(a1).setScale(2, BigDecimal.ROUND_HALF_UP));
    }
}

  输出:

494.29

  代码2:

public class Test001 {
    public static void main(String args[]) {
        BigDecimal a = new BigDecimal("1.5");
      BigDecimal a1 = new BigDecimal("329.530");
        System.out.println(a.multiply(a1).setScale(2, BigDecimal.ROUND_HALF_UP));
    }
}

  输出:

494.30

计算器输出结果:

489.30

  原因解析:
  JDK的描述:参数为dubbo的构造方法的结果具有一定的不可预知性,认为java在写入new BigDecimal(0.1)中这个0.1不是标准的0.1可能是一个无限趋近于0.1的一个小数,虽然表面上等于他。

源码

public BigDecimal multiply(BigDecimal multiplicand) {
    int productScale = checkScale((long) scale + multiplicand.scale);
    if (this.intCompact != INFLATED) {
        if ((multiplicand.intCompact != INFLATED)) {
            return multiply(this.intCompact, multiplicand.intCompact, productScale);
        } else {
            return multiply(this.intCompact, multiplicand.intVal, productScale);
        }
    } else {
        if ((multiplicand.intCompact != INFLATED)) {
            return multiply(multiplicand.intCompact, this.intVal, productScale);
        } else {
            return multiply(this.intVal, multiplicand.intVal, productScale);
        }
    }
}

  在这个地方就是判断是不是字符串的,这个this.intCompact 是获取到参数的整数值,如果获取到时一大串数字,那就是dubbo参数传进来的,这里进行判断,从而获取到不同的值。进入到不同的方法进行运算。其实运算原理大致说下,小数转换为整数计算,最后除以10的n次方即可。

ControllerAdvice用法解析

发表于 2018-06-19

简介

  通过@ControllerAdvice注解可以将对于控制器的全局配置放在同一个位置。
  注解了@Controller的类的方法可以使用@ExceptionHandler、@InitBinder、@ModelAttribute注解到方法上。
  @ControllerAdvice注解将作用在所有注解了@RequestMapping的控制器的方法上
  @ExceptionHandler:用于全局处理控制器里的异常。

用法

@ControllerAdvice
@ResponseBody
public class BusinessExceptionHandler {
    @ExceptionHandler(value = Exception.class)
    public JsonBackData exceptionHandler(HttpServletRequest request, Exception e) {
        e.printStackTrace();
        JsonBackData back = new JsonBackData();
        if (e instanceof BusinessException) {
            BusinessException ex = (BusinessException) e;
            back.setSuccess(false);
            back.setBackMsg(ex.getMessage());
        }
        return back;
    }

  加了这个全局配置的Bean后,以前我的代码是这样的:

@RequestMapping(value = "queryDetail")
@ResponseBody
public JsonBackData queryDetail(@RequestParam String id) {
    JsonBackData back = new JsonBackData();
    try {
        OpenSourceThrottleAdjustVO vo = openSourceThrottlePlanQueryService.findById(id);
        back.setBackData(vo);
        back.setSuccess(true);
        back.setBackMsg("查询详细信息成功");
    } catch (BusinessException e) {
        back.setSuccess(false);
        back.setBackMsg("查询详细信息失败:" + e.getMessage());
    }
    return back;
}

 多了很多无关信息。几乎每个类都要try catch一下,代码极度冗余。
但是加了上面的全局处理控制器的异常处理后。代码就变成了下面的了。

@RequestMapping(value = "queryDetail")
@ResponseBody
public JsonBackData queryDetail(@RequestParam String id) {
    JsonBackData back = new JsonBackData();
    OpenSourceThrottleAdjustVO vo = openSourceThrottlePlanQueryService.findById(id);
    back.setBackData(vo);
    back.setSuccess(true);
    back.setBackMsg("查询详细信息成功");
    return back;
}

dubbo+zookeeper

发表于 2018-05-21

案例

  先搭建一个dubbo+zookeeper的demo,然后根据demo来细说。
  第一步:从dubbo官网下载dubbo-master.zip文件,从zookeeper官网下载zookeeper.tar.gz文件。
  第二步:解压zookeeper.tar.gz文件,在解压\zookeeper-3.4.10\bin路径下,点击zkServer.cmd(本案例是直接在windows下运行),成功启动,发现其绑定了端口号2181。
  第三步:解压dubbo-master.zip文件,然后在用eclipse导入其项目空间。发现dubbo的注入地址默认配置的是zookeeper://127.0.0.1:2181这个地址,不用修改,然后将其放到tomcat下启动。(且记先启动zookeeper在启动dubbo,不然dubbo启动连接不到zookeeper,启动失败的。)
  第四步:打开http://localhost:8088/dubbo-admin-2.5.8/,然后输入账号密码,登录到dubbo管理界面。(确切地址是多少,看你的配置)。这样4步,dubbo管理平台就已经搭建好了。
  第五步:在写一个消费者和生产者来同步dubbo来进行通信。先创建一个生产者。这里不详细说了,大部分都是maven工程的创建修改,直接给上代码,供大家测试调试生产者。
  第六步:写一个消费者来消费生产者的服务。这里也是直接提供代码让大家来进行调试。消费者。
  第七步:输入http://localhost:8089/spring-mvc/hello/dh1,发现调用已经成功,同时发现dubbo管理界面出现了一个生成者,一个消费者。

什么是zookeeper  

  Zookeeper是Hadoop的一个子项目,它是分布式系统中的协调系统,可提供的服务有:配置服务,名字服务,分布式同步,组服务等。

zookeeper配置文件

  我们可以通过修改Zookeeper的配置文件来修改Zookeeper的启动信息。
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper-data/
clientPort=2181
  tickTime:指定了ZooKeeper的基本时间单位(以毫秒为单位);
  initLimit:指定了启动zookeeper时,zookeeper实例中的随从实例同步到领导实例的初始化连接时间限制,超出时间限制则连接失败(以tickTime为时间单位);
  syncLimit:指定了zookeeper正常运行时,主从节点之间同步数据的时间限制,若超过这个时间限制,那么随从实例将会被丢弃;
  dataDir:zookeeper存放数据的目录;
  clientPort:用于连接客户端的端口。

zookeeper存储模型

  Zookeeper的存储结构采用的是结构化存储,很像数据结构当中的树,也很像文件系统的目录。树是由节点所组成,Zookeeper的数据存储也同样是基于节点,这里称之为znode。但是,不同于树的节点,Znode的引用方式是路径引用,类似于文件路径:/a/b。
  Znode包含4个信息。
  data:Znode存储的数据信息。
  ACL:记录Znode的访问权限,哪些ip那些人可以访问本节点。
  stat:包含Znode的各种元数据,比如事务id,大小,时间戳等。
  child:当前节点的子节点引用。

zookeeper的基本操作

  create:创建节点。
  delete:删除节点。
  exists:判断是否存在节点。
  getData:获取一个节点的数据。
  setData:设置一个节点的数据。
  getChildren:获取节点下所有的子节点。
  其中exist,getData.getChildren 都是读操作。客户端在请求读操作时可以设置是否watch(该效果是当Znode发生改变时,如增删改时,请求watch的客户端将会接收到异步通知,也就是说当一个节点被watch后,就会在对应的哈希表里面插入被watch的Znode路径以及watch列表)。

zookeeper的一致性

  Zookeeper身为分布式系统的协调服务,如果自身挂了怎么办呢?为了防止单机挂掉,ZooKeeper维护了一个集群。这个集群是一主多从结构。更新数据时,首先更新到主节点(这里的节点指的是服务器节点,不是Znode),再同步到从节点。为了保证主从节点数据一致性,zookeeper采用了ZAB协议。
  ZAB协议定义了3种节点状态。
  Looking:选举状态。
  Following:从节点所处的状态。
  Leading:Leader节点(主节点)所处的状态。
  最大ZXID的概念:
  最大ZXID也就是节点本地的最新事务编号,包含epoch和计数两部分。epoch是纪元的意思,相当于Raft算法选主时候的term。
  从节点挂了没有任何影响。如果主节点挂了,那么集群就会进行崩溃恢复。分为3个阶段。
  第一阶段,所有节点变成Looking状态。这些节点会各自向其他节点发起投票,投票信息包含自身服务器ID和最新事务ID.(ZXID)。接下来,节点会用自身的ZXID和从其他节点接受到的ZXID比较,如果别的节点比自己大,那就重新投票,投给接受到节点最大的ZXID的节点的票。
  每次投完票后,服务器都会统计投票数量,判断是否存在半数以上的投票,如果存在这样的节点,那么这个节点将会成为准Leader节点,状态也会变成Leading,其他节点就会从Looking转换为Following。
  第二阶段:发现阶段,用于在从节点中发现最新的ZXID和事务日志,因为可能存在Leader的ZIXD不一定是最新的,有可能意外情况,导致最新的ZXID不是最新的。
  所以这个阶段,Leader集思广益,接收所有Follower发过来的epoch值,Leader从中选出最大的值,基于此值加一,生成最新的epoch分给各个Follower.各个Follower收到全新的epoch后,返回ACK给Leader,带上各自最大的ZXID和历史事务日志,Leader选出最大的ZXID,并更新自身历史日志。
  第三阶段:同步阶段,将Leader刚接收到的最新历史事务日志,同步给集群所有的Follower。只有半数Follower同步成功,这个准Leader才能正式成为leader.

zookeeper的数据写入

  
  1.客户端发出写出请求给任意的Follower。
  2.Follower把写入数据的请求转发给Leader。
  3.Leader采用二段提交的方式,先发送Propose广播给Follower。
  4.Follower接受到Propose消息,写入日志成功后,返回ACk消息给Leader。
  5.Leader接到半数以上ACK消息,返回成功给客户端,并且广播Commit请求给Follower。

1…789…13

skydh

skydh

126 日志
© 2020 skydh
本站访客数:
由 Hexo 强力驱动
|
主题 — NexT.Pisces v5.1.3