HBase put过程客户端+服务端初步解析
本文将对HBase采用客户端put的方式,结合源码对整个过程进行解析。
对于服务端的解析,本文并没有说的很详细,只是阐述了整个流程,在后面的一片文章将会详细说明服务端的具体过程
PUT 客户端
- HBase的client写入过程都是先创建put对象,然后将调用HTable的put方法,如下:
Put put = new Put("rowkey".getBytes();
put.addColumn("family".getBytes(),"qualifier".getBytes(),"value".getBytes());
htable.put(put);
调用HTable的put方法后,数据看起来就会被写入到HBase中。当然HTable的put还可以放一个put的list。
- HTable.put
HTable会维护一个buffer,put都会往这个buffer里放,一旦这个buffer中的数量达到一定的值,就会把这个buffer发给服务器,当然,客户端也可以显示调用,强制每一次的刷新都发送出去。HBase的表操作,默认情况下客户端写缓冲区是关闭的,即table.isAutoFlush() = true,
这种情况下,对表的单行操作会实时发送到服务端完成。因此,对于海量数据插入,修改,RPC通信频繁,效率比较低。这种场景下,可以通过激活客户端缓冲区,批量提交操作请求,提高操作效率。
其次客户端在提交请求的时候,会将所有的请求进行一个分组,按照regionServer进行分组。至于如何得到每一个请求所在的regionserver就应该是借助了Zookeeper了。
下面看源码:
// HBable.put
public void put(final Put put) throws IOException {
getBufferedMutator().mutate(put); //调用这个buffer的mutate方法
if (autoFlush) {
flushCommits();
}
}
// BufferedMutatorImpl.mutate方法
// APPEND,DELETE,PUT,INCREMENT都是Mutation的子类
public synchronized void mutate(Mutation m) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
doMutate(m); //调用doMutate
}
// BufferedMutatorImpl.doMutate 省略部分代码
private void doMutate(Mutation m) {
// BufferedMutatorImpl里面维护了一个List<Row> writeAsyncBuffer = new LinkedList<>()去存放
currentWriteBufferSize += m.heapSize();
writeAsyncBuffer.add(m);
while (currentWriteBufferSize > writeBufferSize) {
backgroundFlushCommits(false); //这false表示异步提交 不会等待
}
}
//backgroundFlushCommits方法 省略部分代码
private void backgroundFlushCommits(boolean synchronous) {
try {
if (!synchronous) {
ap.submit(tableName, writeAsyncBuffer, true, null, false);
}
}
//submit
//从那个buffer中进行取数,对于不能提交的数,会保留在这个buffer中
public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
boolean needResults) {
do {
//进行遍历
Iterator<? extends Row> it = rows.iterator();
while (it.hasNext()) {
Row r = it.next();
HRegionLocation loc;
try {
// Make sure we get 0-s replica.
RegionLocations locs = connection.locateRegion(
tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
loc = locs.getDefaultRegionLocation();
}
if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
byte[] regionName = loc.getRegionInfo().getRegionName();
// 这个addAction方法
// 传入的 actionsByServer是一个Map<ServerName, MultiAction<Row>> ,一个MultiAction里面包含多个操作
// 这个方法会把ServerName相同的action放到一个MultiAction里面
// 而MultiAction里面又维护了一个map Map<byte[], List<Action<R>>> action
// 这个map是按照每一个region再次进行划分。
// 这个addAction方法就是把每一个操作放到对应的组里
addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
it.remove();
}
}
} while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
locationErrors, locationErrorRows, actionsByServer, pool);
}
// 以上方法 将每一个操作进行了分组,得到一个map,之后这个map交给AsyncRequestFutureImpl.sendMultiAction进行处理
//sendMultiAction
private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean reuseThread) {
// 遍历传入的map
for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
ServerName server = e.getKey();
MultiAction<Row> multiAction = e.getValue();
// 这个getNewMultiActionRunnable方法返回的是一堆的Runnable,
// 传入的是一个regionServer 和一个MultiAction
// 在这个方法里面
// 返回的Runnable是SingleServerRequestRunnable,一个runable里面包含了多个action
Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,numAttempt);
// run all the runnables 运行这些runable
for (Runnable runnable : runnables) {
if ((--actionsRemaining == 0) && reuseThread) {
runnable.run();
} else {
try {
pool.submit(runnable);
}
//上面这个方法返回的Runable里面创建了MultiServerCallable,线程调用的应该是这个方法
// 在它的call方法里面,最后应该是调用了
responseProto = getStub().multi(controller, requestProto);
// 这就应该是利用ClientProtos进行远程调用了
//到此 再回到最开始HTable的put方法。
public void put(final Put put) throws IOException {
getBufferedMutator().mutate(put);
if (autoFlush) {
flushCommits();
}
}
//以上所有的分析都是针对getBufferedMutator().mutate(put)而言的,
// mutate方法后有一个if判断是否自动刷新,如果这个参数为true,则每put一条数据,
// 都会调用 flushCommits方法,该方法也调用了backgroundFlushCommits方法,回到上面的代码中。
Server端
当请求发送到server端之后,涉及到的操作就比较复杂了,会涉及到写入WAL,MemStore,以及MemStore刷新等过程。我们就源码对这些过程进行描述。
RSRpcServices.java multi方法:
客户端的调用应该都会到这个方法进行处理,这个方法的参数是一个request,在客户端也提到了,一个请求应该是包含多个action的。遍历这个request的action,按region进行遍历,每一个region的action得到一个RegionAction。 我理解的这里 一个request里面只会有一个regionserver里面的数据。
根据RegionAction得到HRegion,并且遍历这个region的所有action。这里他把一个RegionAction的所有action都放到了一个RowMutations。按道理来说这个RowMutations就应该指的是一行的数据,如果是这样的话 这个Regionaction就不应该是一个region的数据了,而是一行的数据,只有这种逻辑,才符合后面的操作???
以下是源码部分:
//multi方法
public MultiResponse multi(final RpcController rpcc, final MultiRequest request){
for (RegionAction regionAction : request.getRegionActionList()) {
HRegion region;
region = getRegion(regionAction.getRegion());
if (regionAction.hasAtomic() && regionAction.getAtomic()) {// 是否进行原子操作
ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
cellScanner);
}
}
// mutateRows方法
private ClientProtos.RegionLoadStats mutateRows(final HRegion region,
final List<ClientProtos.Action> actions,
final CellScanner cellScanner) {
// RowMutations这个类的解释是 Performs multiple mutations atomically on a single row.
// 这个类里面也是只有一个rowkey
// 确实他表示一行的操作的集合
// 下面的操作 使用第一个action的来构建了这个RowMutations
// 并且所有的action都放到了这个RowMutations里面
RowMutations rm = null;
for (ClientProtos.Action action: actions) {// 遍历所有的action
MutationType type = action.getMutation().getMutateType();
if (rm == null) {
rm = new RowMutations(action.getMutation().getRow().toByteArray());
}
switch (type) {
case PUT:
rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
break;
case DELETE:
rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
break;
default:
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
}
}
region.mutateRow(rm);
return region.getRegionStats();
}
- HRegion.mutateRow(RowMutations rm)方法:
上诉得到的一个RowMutations将交到HReigion的这个方法来进行处理。会构建一个MultiRowMutationProcessor,这里面将包含所有的action。然后这个MultiRowMutationProcessor将交给processRowsWithLocks处理。在构建这个processor的时候同时还需要传入一个rowkey的集合,这个集合代码所有要锁的行。
但是我们这里虽然说是一个rowkey的集合,但是按照前面的说法,一个RowMutations只包含一行的数据,那么这个集合里面也就只有一个元素。
会遍历所有要上锁的行,获取全部的锁。(感觉这里也可以理解哈 集合里面只有一行)
这里还会获取这个region的读锁,是updateLoag的readloag。 意味着这个时候不可读?
获取一个mvccNum。这个mvccNum是根据这个时候,这个region的sequenceID获取的。这个值的获取感觉很奇特,是在sequenceid的基础上加了10亿。按照这个方法的说法是10亿是一个足够大的数,可以保证在这个mvcc写完成之前没有scanner可以达到这个数。
现在,我们前面提到过,这个时候所有的action都是在MultiRowMutationProcessor里面的,把actiob拿出来放到两个地方。一个是List mutations ,一个是WALEdit walEdit = new WALEdit()。前者代表了要写入到Memstore中的,后者代表要写入到WAL中的。在放入到WALEdit中的时候,需要进行判断,并不是所有的action都要写入到WAL中。
写入到memstore
写入到wal
这里插入一个问题,关于是先写入mem还是先写入wal。按照wal的定义,write ahead logging,预写日志,是应该先写入wal的。
在0.94版本之前,Region中的写入顺序是先写WAL再写MemStore,这与WAL的定义也相符。
但在0.94版本中,将这两者的顺序颠倒了,当时颠倒的初衷,是为了使得行锁能够在WAL sync之前先释放,从而可以提升针对单行数据的更新性能。详细问题单,请参考HBASE-4528。
在2.0版本中,这一行为又被改回去了,原因在于修改了行锁机制以后(下面章节将讲到),发现了一些性能下降,而HBASE-4528中的优化却无法再发挥作用,详情请参考HBASE-15158。改动之后的逻辑也更简洁了。
选自https://mp.weixin.qq.com/s?__biz=MzI4Njk3NjU1OQ==&mid=2247483748&idx=1&sn=37b1ce75ef45f7fbb76a7092ad22ce5f&chksm=ebd5fe24dca27732e7deae4abf1409599d6d01d76df4b479ae0b1a05259461d30d04c11402db&scene=21#wechat_redirect
下面我们看源码
// HRegion.mutateRow(RowMutations rm)方法,
public void mutateRow(RowMutations rm) throws IOException {
mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
//rm.getRow()返回的是这个RowMutations的rowkey,也就是虚需要上锁的行
}
public void mutateRowsWithLocks(Collection<Mutation> mutations,
Collection<byte[]> rowsToLock, long nonceGroup, long nonce){
MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);//构建一个MultiRowMutationProcessor,这里面包含了所有的action和这些action对应的需要上锁的行
processRowsWithLocks(proc, -1, nonceGroup, nonce);//把这个processer交给后续继续处理
}
// 进入关键的方法,这里面几乎包含了所有的步骤
public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
long nonceGroup, long nonce){
WALEdit walEdit = new WALEdit();
MultiVersionConsistencyControl.WriteEntry writeEntry = null;
boolean locked;
boolean walSyncSuccessful = false;
List<RowLock> acquiredRowLocks;
List<Mutation> mutations = new ArrayList<Mutation>();
List<Cell> memstoreCells = new ArrayList<Cell>();
Collection<byte[]> rowsToLock = processor.getRowsToLock();//从processer中取出需要上锁的行
long mvccNum = 0;
WALKey walKey = null;
try {
// 2. Acquire the row lock(s)
acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
for (byte[] row : rowsToLock) {
// Attempt to lock all involved rows, throw if any lock times out
acquiredRowLocks.add(getRowLock(row));//获取行锁
}
// 3. Region lock
lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
// Get a mvcc write number
mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
try {
// doProcessRowWithTimeout方法这里面会把processor里面的action都取出来,分别放到mutations和walEdit中,代表了要写到memstore和wal中的数据
//取得时候,如果action是put或事delete的话,就放到mutations,但是对于要写到wal的action
// 则要多一步and
// 判断如下:
// for (Mutation m : mutations) {
// for (List<Cell> cells : m.getFamilyCellMap().values()) {
// boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL;
// for (Cell cell : cells) {
// if (writeToWAL) walEdit.add(cell);
// }
// }
// }
// 可以看到是判断每一个列族Durability,如果列族是SKIP_WAL,
// 那么该列族下的所有cell都不会放放入
doProcessRowWithTimeout(
processor, now, this, mutations, walEdit, timeout);
if (!mutations.isEmpty()) {
// 5. Start mvcc transaction
// 根据mvccnum创建writeEntry
// 传进去的mvccNum应该是一个事物id
writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
// 6. Call the preBatchMutate hook
processor.preBatchMutate(this, walEdit);
// 7. Apply to memstore
//之前已经把需要写入mem的放入了mutations
for (Mutation m : mutations) {
// Handle any tag based cell features
rewriteCellTags(m.getFamilyCellMap(), m);
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
Cell cell = cellScanner.current();
// 这里为每一个cell设置的seqid是mvccnum
CellUtil.setSequenceId(cell, mvccNum);
// 得到对应HStore
// 每一个Hregion维护一个 protected final Map<byte[], Store> stores
// HStore是store的一个实现,他是一个memstore和一系列的storefile
Store store = getStore(cell);
if (store == null) {
checkFamily(CellUtil.cloneFamily(cell));
// unreachable
}
// 调用store的add方法,这个add方法调用了this.memstore.add(cell)
Pair<Long, Cell> ret = store.add(cell);
addedSize += ret.getFirst();
memstoreCells.add(ret.getSecond());
}
}
long txid = 0;
// 8. Append no sync
// 如果wal不为空,则写入wal
if (!walEdit.isEmpty()) {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
// 构建walkey,这个时候walkey里面传入的logSeqNum是WALKey.NO_SEQUENCE_ID,值为-1
// WALKey.NO_SEQUENCE_ID表示这个时候我们不关心这个号
// 在真正写入wal的时候,这个值会被重新赋值
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
processor.getClusterIds(), nonceGroup, nonce);
// 调用wal的append
// 这个append方法会去构建FSWALEntry
// wal的append方法并不是直接就放到我们的log文件里了
// 他有一个缓冲区,append的应该是先放到这个缓冲区里面
// 在写到文件里面
// 返回事物id
txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
walKey, walEdit, getSequenceId(), true, memstoreCells);
}
// 9. Release region lock
if (locked) {
this.updatesLock.readLock().unlock();
locked = false;
}
// 10. Release row lock(s)
// 释放行锁
releaseRowLocks(acquiredRowLocks);
// 11. Sync edit log
// 这里就是同步我们的log文件
// 前面说了 wal会先写到一个缓冲区里面
// 采用的应该是批量刷新
// 因为这个过程还是应该挺费时间的
// 这一步具体的逻辑将在wal预写进行说明
if (txid != 0) {
syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
}
walSyncSuccessful = true;
// 12. call postBatchMutate hook
processor.postBatchMutate(this);
}
} finally {
// ......
} finally {
closeRegionOperation();
// 这里需要对memstore进行刷新进行判断,具体的刷新逻辑,在memstore flush说明
if (!mutations.isEmpty() &&
isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
requestFlush();
}
}