HBase put过程客户端+服务端初步解析

本文将对HBase采用客户端put的方式,结合源码对整个过程进行解析。

对于服务端的解析,本文并没有说的很详细,只是阐述了整个流程,在后面的一片文章将会详细说明服务端的具体过程

PUT 客户端

  1. HBase的client写入过程都是先创建put对象,然后将调用HTable的put方法,如下:
Put put = new Put("rowkey".getBytes();
put.addColumn("family".getBytes(),"qualifier".getBytes(),"value".getBytes());
htable.put(put);

调用HTable的put方法后,数据看起来就会被写入到HBase中。当然HTable的put还可以放一个put的list。

  1. HTable.put

HTable会维护一个buffer,put都会往这个buffer里放,一旦这个buffer中的数量达到一定的值,就会把这个buffer发给服务器,当然,客户端也可以显示调用,强制每一次的刷新都发送出去。HBase的表操作,默认情况下客户端写缓冲区是关闭的,即table.isAutoFlush() = true,
这种情况下,对表的单行操作会实时发送到服务端完成。因此,对于海量数据插入,修改,RPC通信频繁,效率比较低。这种场景下,可以通过激活客户端缓冲区,批量提交操作请求,提高操作效率。

其次客户端在提交请求的时候,会将所有的请求进行一个分组,按照regionServer进行分组。至于如何得到每一个请求所在的regionserver就应该是借助了Zookeeper了。

下面看源码:

// HBable.put
public void put(final Put put) throws IOException {
   getBufferedMutator().mutate(put);  //调用这个buffer的mutate方法
   if (autoFlush) {
     flushCommits();
   }
 }

// BufferedMutatorImpl.mutate方法
// APPEND,DELETE,PUT,INCREMENT都是Mutation的子类
public synchronized void mutate(Mutation m) throws InterruptedIOException,
     RetriesExhaustedWithDetailsException {
   doMutate(m);  //调用doMutate
 }

// BufferedMutatorImpl.doMutate  省略部分代码
private void doMutate(Mutation m) {
    // BufferedMutatorImpl里面维护了一个List<Row> writeAsyncBuffer = new LinkedList<>()去存放
   currentWriteBufferSize += m.heapSize();
   writeAsyncBuffer.add(m);
   while (currentWriteBufferSize > writeBufferSize) {
     backgroundFlushCommits(false); //这false表示异步提交 不会等待  
   }
 }

//backgroundFlushCommits方法  省略部分代码
 private void backgroundFlushCommits(boolean synchronous) {
   try {
     if (!synchronous) {
       ap.submit(tableName, writeAsyncBuffer, true, null, false);
     }
 }

 //submit
 //从那个buffer中进行取数,对于不能提交的数,会保留在这个buffer中
 public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
     List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
     boolean needResults) {
   do {
     //进行遍历
     Iterator<? extends Row> it = rows.iterator();
     while (it.hasNext()) {
       Row r = it.next();
       HRegionLocation loc;
       try {
         // Make sure we get 0-s replica.
         RegionLocations locs = connection.locateRegion(
             tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);

         loc = locs.getDefaultRegionLocation();
       }

       if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
         byte[] regionName = loc.getRegionInfo().getRegionName();

         // 这个addAction方法
         // 传入的 actionsByServer是一个Map<ServerName, MultiAction<Row>> ,一个MultiAction里面包含多个操作
         // 这个方法会把ServerName相同的action放到一个MultiAction里面
         // 而MultiAction里面又维护了一个map Map<byte[], List<Action<R>>> action
         // 这个map是按照每一个region再次进行划分。
         // 这个addAction方法就是把每一个操作放到对应的组里
         addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);

         it.remove();
       }
     }
   } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));

   return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
     locationErrors, locationErrorRows, actionsByServer, pool);
 }
 // 以上方法 将每一个操作进行了分组,得到一个map,之后这个map交给AsyncRequestFutureImpl.sendMultiAction进行处理

     //sendMultiAction
  private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
       int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean reuseThread) {
         // 遍历传入的map
     for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
       ServerName server = e.getKey();
       MultiAction<Row> multiAction = e.getValue();

       // 这个getNewMultiActionRunnable方法返回的是一堆的Runnable,
       // 传入的是一个regionServer 和一个MultiAction
       // 在这个方法里面 
       // 返回的Runnable是SingleServerRequestRunnable,一个runable里面包含了多个action
       Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,numAttempt);
       // run all the runnables 运行这些runable
       for (Runnable runnable : runnables) {
         if ((--actionsRemaining == 0) && reuseThread) {
           runnable.run();
         } else {
           try {
             pool.submit(runnable);
   }

   //上面这个方法返回的Runable里面创建了MultiServerCallable,线程调用的应该是这个方法
   // 在它的call方法里面,最后应该是调用了
    responseProto = getStub().multi(controller, requestProto);
   // 这就应该是利用ClientProtos进行远程调用了

   //到此 再回到最开始HTable的put方法。 
   public void put(final Put put) throws IOException {
   getBufferedMutator().mutate(put);
   if (autoFlush) {
     flushCommits();
   }
  }

  //以上所有的分析都是针对getBufferedMutator().mutate(put)而言的,
  // mutate方法后有一个if判断是否自动刷新,如果这个参数为true,则每put一条数据,
  // 都会调用 flushCommits方法,该方法也调用了backgroundFlushCommits方法,回到上面的代码中。   

Server端

当请求发送到server端之后,涉及到的操作就比较复杂了,会涉及到写入WAL,MemStore,以及MemStore刷新等过程。我们就源码对这些过程进行描述。

  1. RSRpcServices.java multi方法:
    客户端的调用应该都会到这个方法进行处理,这个方法的参数是一个request,在客户端也提到了,一个请求应该是包含多个action的。

    遍历这个request的action,按region进行遍历,每一个region的action得到一个RegionAction。 我理解的这里 一个request里面只会有一个regionserver里面的数据。

    根据RegionAction得到HRegion,并且遍历这个region的所有action。这里他把一个RegionAction的所有action都放到了一个RowMutations。按道理来说这个RowMutations就应该指的是一行的数据,如果是这样的话 这个Regionaction就不应该是一个region的数据了,而是一行的数据,只有这种逻辑,才符合后面的操作???

    以下是源码部分:

//multi方法
public MultiResponse multi(final RpcController rpcc, final MultiRequest request){
    for (RegionAction regionAction : request.getRegionActionList()) {
      HRegion region;

    region = getRegion(regionAction.getRegion());
    if (regionAction.hasAtomic() && regionAction.getAtomic()) {// 是否进行原子操作
    ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
                cellScanner)}
}

// mutateRows方法
private ClientProtos.RegionLoadStats mutateRows(final HRegion region,
      final List<ClientProtos.Action> actions,
      final CellScanner cellScanner) {
   // RowMutations这个类的解释是 Performs multiple mutations atomically on a single row.
   // 这个类里面也是只有一个rowkey
   // 确实他表示一行的操作的集合
   // 下面的操作 使用第一个action的来构建了这个RowMutations
   // 并且所有的action都放到了这个RowMutations里面
    RowMutations rm = null;
    for (ClientProtos.Action action: actions) {// 遍历所有的action
      MutationType type = action.getMutation().getMutateType();
      if (rm == null) {
        rm = new RowMutations(action.getMutation().getRow().toByteArray());
      }
      switch (type) {
      case PUT:
        rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
        break;
      case DELETE:
        rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
        break;
      default:
          throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
      }
    }
    region.mutateRow(rm);
    return region.getRegionStats();
  }
  1. HRegion.mutateRow(RowMutations rm)方法:
  • 上诉得到的一个RowMutations将交到HReigion的这个方法来进行处理。会构建一个MultiRowMutationProcessor,这里面将包含所有的action。然后这个MultiRowMutationProcessor将交给processRowsWithLocks处理。在构建这个processor的时候同时还需要传入一个rowkey的集合,这个集合代码所有要锁的行。

  • 但是我们这里虽然说是一个rowkey的集合,但是按照前面的说法,一个RowMutations只包含一行的数据,那么这个集合里面也就只有一个元素。

  • 会遍历所有要上锁的行,获取全部的锁。(感觉这里也可以理解哈 集合里面只有一行)

  • 这里还会获取这个region的读锁,是updateLoag的readloag。 意味着这个时候不可读?

  • 获取一个mvccNum。这个mvccNum是根据这个时候,这个region的sequenceID获取的。这个值的获取感觉很奇特,是在sequenceid的基础上加了10亿。按照这个方法的说法是10亿是一个足够大的数,可以保证在这个mvcc写完成之前没有scanner可以达到这个数。

  • 现在,我们前面提到过,这个时候所有的action都是在MultiRowMutationProcessor里面的,把actiob拿出来放到两个地方。一个是List mutations ,一个是WALEdit walEdit = new WALEdit()。前者代表了要写入到Memstore中的,后者代表要写入到WAL中的。在放入到WALEdit中的时候,需要进行判断,并不是所有的action都要写入到WAL中。

  • 写入到memstore

  • 写入到wal

    这里插入一个问题,关于是先写入mem还是先写入wal。按照wal的定义,write ahead logging,预写日志,是应该先写入wal的。
    在0.94版本之前,Region中的写入顺序是先写WAL再写MemStore,这与WAL的定义也相符。
    但在0.94版本中,将这两者的顺序颠倒了,当时颠倒的初衷,是为了使得行锁能够在WAL sync之前先释放,从而可以提升针对单行数据的更新性能。详细问题单,请参考HBASE-4528。
    在2.0版本中,这一行为又被改回去了,原因在于修改了行锁机制以后(下面章节将讲到),发现了一些性能下降,而HBASE-4528中的优化却无法再发挥作用,详情请参考HBASE-15158。改动之后的逻辑也更简洁了。
    选自https://mp.weixin.qq.com/s?__biz=MzI4Njk3NjU1OQ==&mid=2247483748&idx=1&sn=37b1ce75ef45f7fbb76a7092ad22ce5f&chksm=ebd5fe24dca27732e7deae4abf1409599d6d01d76df4b479ae0b1a05259461d30d04c11402db&scene=21#wechat_redirect

下面我们看源码

// HRegion.mutateRow(RowMutations rm)方法,
public void mutateRow(RowMutations rm) throws IOException {
mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
//rm.getRow()返回的是这个RowMutations的rowkey,也就是虚需要上锁的行
}
public void mutateRowsWithLocks(Collection<Mutation> mutations,
    Collection<byte[]> rowsToLock, long nonceGroup, long nonce){
    MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);//构建一个MultiRowMutationProcessor,这里面包含了所有的action和这些action对应的需要上锁的行
    processRowsWithLocks(proc, -1, nonceGroup, nonce);//把这个processer交给后续继续处理
   }
// 进入关键的方法,这里面几乎包含了所有的步骤
public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
  long nonceGroup, long nonce){

WALEdit walEdit = new WALEdit();
MultiVersionConsistencyControl.WriteEntry writeEntry = null;
boolean locked;
boolean walSyncSuccessful = false;
List<RowLock> acquiredRowLocks;
List<Mutation> mutations = new ArrayList<Mutation>();
List<Cell> memstoreCells = new ArrayList<Cell>();
Collection<byte[]> rowsToLock = processor.getRowsToLock();//从processer中取出需要上锁的行
long mvccNum = 0;
WALKey walKey = null;
try {
  // 2. Acquire the row lock(s)
  acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
  for (byte[] row : rowsToLock) {
    // Attempt to lock all involved rows, throw if any lock times out
    acquiredRowLocks.add(getRowLock(row));//获取行锁
  }
  // 3. Region lock
  lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
  // Get a mvcc write number
  mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);

  try {
    // doProcessRowWithTimeout方法这里面会把processor里面的action都取出来,分别放到mutations和walEdit中,代表了要写到memstore和wal中的数据
    //取得时候,如果action是put或事delete的话,就放到mutations,但是对于要写到wal的action
    // 则要多一步and
    // 判断如下:
    // for (Mutation m : mutations) {
   //    for (List<Cell> cells : m.getFamilyCellMap().values()) {
   //    boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL;
   //    for (Cell cell : cells) {
   //      if (writeToWAL) walEdit.add(cell);
   //    }
  //   }
  // }
  // 可以看到是判断每一个列族Durability,如果列族是SKIP_WAL,
  // 那么该列族下的所有cell都不会放放入
    doProcessRowWithTimeout(
        processor, now, this, mutations, walEdit, timeout);
    if (!mutations.isEmpty()) {
      // 5. Start mvcc transaction
      // 根据mvccnum创建writeEntry
      // 传进去的mvccNum应该是一个事物id
      writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
      // 6. Call the preBatchMutate hook
      processor.preBatchMutate(this, walEdit);
      // 7. Apply to memstore
      //之前已经把需要写入mem的放入了mutations

      for (Mutation m : mutations) {
        // Handle any tag based cell features
        rewriteCellTags(m.getFamilyCellMap(), m);

        for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
          Cell cell = cellScanner.current();
          // 这里为每一个cell设置的seqid是mvccnum
          CellUtil.setSequenceId(cell, mvccNum);
          // 得到对应HStore
          // 每一个Hregion维护一个 protected final Map<byte[], Store> stores
          // HStore是store的一个实现,他是一个memstore和一系列的storefile
          Store store = getStore(cell);

          if (store == null) {
            checkFamily(CellUtil.cloneFamily(cell));
            // unreachable
          }
          // 调用store的add方法,这个add方法调用了this.memstore.add(cell)
          Pair<Long, Cell> ret = store.add(cell);
          addedSize += ret.getFirst();
          memstoreCells.add(ret.getSecond());
        }
      }

      long txid = 0;
      // 8. Append no sync
      // 如果wal不为空,则写入wal
      if (!walEdit.isEmpty()) {
        // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
        // 构建walkey,这个时候walkey里面传入的logSeqNum是WALKey.NO_SEQUENCE_ID,值为-1
        // WALKey.NO_SEQUENCE_ID表示这个时候我们不关心这个号
        // 在真正写入wal的时候,这个值会被重新赋值
        walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
          this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
          processor.getClusterIds(), nonceGroup, nonce);
          // 调用wal的append
          // 这个append方法会去构建FSWALEntry
          // wal的append方法并不是直接就放到我们的log文件里了
          // 他有一个缓冲区,append的应该是先放到这个缓冲区里面
          // 在写到文件里面
          // 返回事物id
        txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
          walKey, walEdit, getSequenceId(), true, memstoreCells);
      }
      // 9. Release region lock
      if (locked) {
        this.updatesLock.readLock().unlock();
        locked = false;
      }

      // 10. Release row lock(s)
      // 释放行锁
      releaseRowLocks(acquiredRowLocks);

      // 11. Sync edit log
      // 这里就是同步我们的log文件
      // 前面说了 wal会先写到一个缓冲区里面
      // 采用的应该是批量刷新
      // 因为这个过程还是应该挺费时间的
      // 这一步具体的逻辑将在wal预写进行说明
      if (txid != 0) {
        syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
      }
      walSyncSuccessful = true;
      // 12. call postBatchMutate hook
      processor.postBatchMutate(this);
    }
  } finally {
    // ......

} finally {
  closeRegionOperation();
  // 这里需要对memstore进行刷新进行判断,具体的刷新逻辑,在memstore flush说明
  if (!mutations.isEmpty() &&
      isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
    requestFlush();
  }
}
10-05 22:02