本文共 29444 字,大约阅读时间需要 98 分钟。
马上2017就要结束了,今年研究了不少solr相关的问题,年末的时候还是做一个总结吧。明年说不定就开始搞ES了。
@Override public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { SolrParams params = req.getParams(); UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessorChain(params); UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp); try { ContentStreamLoader documentLoader = newLoader(req, processor); // 此处的loader就是JavaBinLoader Iterablestreams = req.getContentStreams(); if (streams == null) { if (!RequestHandlerUtils.handleCommit(req, processor, params, false) && !RequestHandlerUtils.handleRollback(req, processor, params, false)) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "missing content stream"); } } else { for (ContentStream stream : streams) { documentLoader.load(req, rsp, stream, processor); // 具体的数据流处理流程 } // Perhaps commit from the parameters RequestHandlerUtils.handleCommit(req, processor, params, false); RequestHandlerUtils.handleRollback(req, processor, params, false); } } finally { // finish the request processor.finish(); } }
这些操作只算是数据的预处理。
public class JavabinLoader extends ContentStreamLoader { @Override public void load(SolrQueryRequest req, SolrQueryResponse rsp, ContentStream stream, UpdateRequestProcessor processor) throws Exception { InputStream is = null; try { is = stream.getStream(); parseAndLoadDocs(req, rsp, is, processor); } finally { if(is != null) { is.close(); } } } private void parseAndLoadDocs(final SolrQueryRequest req, SolrQueryResponse rsp, InputStream stream, final UpdateRequestProcessor processor) throws IOException { UpdateRequest update = null; JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = new JavaBinUpdateRequestCodec.StreamingUpdateHandler() { // 匿名类,定义doc的处理逻辑,关注update方法 private AddUpdateCommand addCmd = null; @Override public void update(SolrInputDocument document, UpdateRequest updateRequest, Integer commitWithin, Boolean overwrite) { // 需要注意的是,solrJ可以提交多个doc,但在这里进行处理的时候,只会一各一个的串行处理。 if (document == null) { // Perhaps commit from the parameters try { RequestHandlerUtils.handleCommit(req, processor, updateRequest.getParams(), false); RequestHandlerUtils.handleRollback(req, processor, updateRequest.getParams(), false); } catch (IOException e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ERROR handling commit/rollback"); } return; } if (addCmd == null) { addCmd = getAddCommand(req, updateRequest.getParams()); } addCmd.solrDoc = document; if (commitWithin != null) { addCmd.commitWithin = commitWithin; } if (overwrite != null) { addCmd.overwrite = overwrite; } if (updateRequest.isLastDocInBatch()) { // this is a hint to downstream code that indicates we've sent the last doc in a batch addCmd.isLastDocInBatch = true; } try { processor.processAdd(addCmd); // 使用构造的addCmd传入processor进行添加请求的处理。 addCmd.clear(); } catch (IOException e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ERROR adding document " + document, e); } } }; FastInputStream in = FastInputStream.wrap(stream); for (; ; ) { try { update = new JavaBinUpdateRequestCodec().unmarshal(in, handler); // 最终会调用上面匿名类的update方法,进行doc添加操作 } catch (EOFException e) { break; // this is expected } if (update.getDeleteByIdMap() != null || update.getDeleteQuery() != null) { delete(req, update, processor); } } }............}
具体看一下JavaBinUpdateRequestCodec的处理逻辑,主要就是定义数据流的处理方式,然后调用上面定义的handler。
public UpdateRequest unmarshal(InputStream is, final StreamingUpdateHandler handler) throws IOException { final UpdateRequest updateRequest = new UpdateRequest(); List
> doclist; List
现在我们只看添加的流程。LogUpdateProcessor的流程比较简单。
@Override public void processAdd(AddUpdateCommand cmd) throws IOException { if (logDebug) { log.debug("PRE_UPDATE " + cmd.toString() + " " + req); } // call delegate first so we can log things like the version that get set later if (next != null) next.processAdd(cmd); // 调用下一个processor,也就是DistributeUpdateProcessor // Add a list of added id's to the response if (adds == null) { adds = new ArrayList<>(); toLog.add("add",adds); } if (adds.size() < maxNumToLog) { long version = cmd.getVersion(); String msg = cmd.getPrintableId(); if (version != 0) msg = msg + " (" + version + ')'; adds.add(msg); } numAdds++; }
这一个就比较重要,牵扯到solr的分发逻辑。首先再次提一下solr的replica逻辑。
数据是先到leader节点,然后再进行转发到replica,leader要等待replica处理添加请求,这个逻辑在finish()方法中实现。其中两个重要的方法,setupRequest计算需要转发的节点,会根据uniqueKey生成一个hash值,根据hash值计算此doc转发到哪一个shard(collection在创建的时候会为每一个shard分配一个hash区间),内部自带的hash算法会保证doc会均分在不同的shard中。;versionAdd如果本节点是leader,就会计算出相应的_version_信息,doc添加的一个时间戳,只有leader节点会生成,其他replica使用的也是leader节点生成的值。
@Override public void processAdd(AddUpdateCommand cmd) throws IOException { assert TestInjection.injectFailUpdateRequests(); updateCommand = cmd; if (zkEnabled) { zkCheck(); nodes = setupRequest(cmd.getHashableId(), cmd.getSolrInputDocument()); // 处理转发逻辑 } else { isLeader = getNonZkLeaderAssumption(req); } // check if client has requested minimum replication factor information int minRf = -1; // disabled by default if (replicationTracker != null) { minRf = replicationTracker.minRf; // for subsequent requests in the same batch } else { SolrParams rp = cmd.getReq().getParams(); String distribUpdate = rp.get(DISTRIB_UPDATE_PARAM); // somewhat tricky logic here: we only activate the replication tracker if we're on // a leader or this is the top-level request processor if (distribUpdate == null || distribUpdate.equals(DistribPhase.TOLEADER.toString())) { String minRepFact = rp.get(UpdateRequest.MIN_REPFACT); if (minRepFact != null) { try { minRf = Integer.parseInt(minRepFact); } catch (NumberFormatException nfe) { minRf = -1; } if (minRf <= 0) throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid value "+minRepFact+" for "+UpdateRequest.MIN_REPFACT+ "; must be >0 and less than or equal to the collection replication factor."); } if (minRf > 1) { String myShardId = forwardToLeader ? null : cloudDesc.getShardId(); replicationTracker = new RequestReplicationTracker(myShardId, minRf); } } } // TODO: if minRf > 1 and we know the leader is the only active replica, we could fail // the request right here but for now I think it is better to just return the status // to the client that the minRf wasn't reached and let them handle it boolean dropCmd = false; if (!forwardToLeader) { dropCmd = versionAdd(cmd); // 生成_version_字段,如果是主节点会调用super.processorAdd()也就是调用DirectUpdateHandler将doc进行本地处理,调用lucene接口。 } if (dropCmd) { // TODO: do we need to add anything to the response? return; } if (zkEnabled && isLeader && !isSubShardLeader) { DocCollection coll = zkController.getClusterState().getCollection(collection); ListsubShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getHashableId(), cmd.getSolrInputDocument()); // the list will actually have only one element for an add request if (subShardLeaders != null && !subShardLeaders.isEmpty()) { ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); params.set(DISTRIB_FROM_PARENT, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); for (Node subShardLeader : subShardLeaders) { cmdDistrib.distribAdd(cmd, Collections.singletonList(subShardLeader), params, true); // 调用http client将请求分发到其他replica节点,在调用finish的时候,线程会等待所有replica的response } } final List nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getHashableId(), cmd.getSolrInputDocument()); if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); params.set(DISTRIB_FROM_COLLECTION, req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName()); params.set(DISTRIB_FROM_SHARD, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); for (Node nodesByRoutingRule : nodesByRoutingRules) { cmdDistrib.distribAdd(cmd, Collections.singletonList(nodesByRoutingRule), params, true); } } } ModifiableSolrParams params = null; if (nodes != null) { params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(DISTRIB_UPDATE_PARAM, (isLeader || isSubShardLeader ? DistribPhase.FROMLEADER.toString() : DistribPhase.TOLEADER.toString())); // 设置转发请求参数,根据hash值可能请求需要转发到另一个leader处理(TOLEADER),或者从主节点进行转发(FROMLEADER)。 params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); if (replicationTracker != null && minRf > 1) params.set(UpdateRequest.MIN_REPFACT, String.valueOf(minRf)); cmdDistrib.distribAdd(cmd, nodes, params, false, replicationTracker); }............ }
具体的转发逻辑可以参考博客
private ListsetupRequest(String id, SolrInputDocument doc, String route) { List nodes = null; // if we are in zk mode... if (zkEnabled) { assert TestInjection.injectUpdateRandomPause(); if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) { isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway. forwardToLeader = false; return nodes; } ClusterState cstate = zkController.getClusterState(); DocCollection coll = cstate.getCollection(collection); Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll); // doc分发的路由逻辑,默认使用的是compositeId,还有implicit可以选择。 if (slice == null) { // No slice found. Most strict routers will have already thrown an exception, so a null return is // a signal to use the slice of this core. // TODO: what if this core is not in the targeted collection? String shardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId(); slice = coll.getSlice(shardId); if (slice == null) { throw new SolrException(ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll); } } DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); if (DistribPhase.FROMLEADER == phase && !couldIbeSubShardLeader(coll)) { if (req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader()) { // locally we think we are leader but the request says it came FROMLEADER // that could indicate a problem, let the full logic below figure it out } else { assert TestInjection.injectFailReplicaRequests(); isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway. forwardToLeader = false; return nodes; } } String shardId = slice.getName(); try { // Not equivalent to getLeaderProps, which does retries to find a leader. // Replica leader = slice.getLeader(); Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry( collection, shardId); isLeader = leaderReplica.getName().equals( req.getCore().getCoreDescriptor().getCloudDescriptor() .getCoreNodeName()); if (!isLeader) { isSubShardLeader = amISubShardLeader(coll, slice, id, doc); if (isSubShardLeader) { String myShardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId(); slice = coll.getSlice(myShardId); shardId = myShardId; leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, myShardId); List myReplicas = zkController.getZkStateReader() .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN); } } doDefensiveChecks(phase); // if request is coming from another collection then we want it to be sent to all replicas // even if its phase is FROMLEADER String fromCollection = updateCommand.getReq().getParams().get(DISTRIB_FROM_COLLECTION); if (DistribPhase.FROMLEADER == phase && !isSubShardLeader && fromCollection == null) { // we are coming from the leader, just go local - add no urls forwardToLeader = false; } else if (isLeader || isSubShardLeader) { // that means I want to forward onto my replicas... // so get the replicas... forwardToLeader = false; List replicaProps = zkController.getZkStateReader() .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN); if (replicaProps != null) { if (nodes == null) { nodes = new ArrayList<>(replicaProps.size()); } // check for test param that lets us miss replicas String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS); Set skipListSet = null; if (skipList != null) { skipListSet = new HashSet<>(skipList.length); skipListSet.addAll(Arrays.asList(skipList)); log.info("test.distrib.skip.servers was found and contains:" + skipListSet); } for (ZkCoreNodeProps props : replicaProps) { if (skipList != null) { boolean skip = skipListSet.contains(props.getCoreUrl()); log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:" + skip); if (!skip) { nodes.add(new StdNode(props, collection, shardId)); } } else { nodes.add(new StdNode(props, collection, shardId)); } } } } else { // I need to forward onto the leader... nodes = new ArrayList<>(1); nodes.add(new RetryNode(new ZkCoreNodeProps(leaderReplica), zkController.getZkStateReader(), collection, shardId)); forwardToLeader = true; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); } } return nodes; }
protected boolean versionAdd(AddUpdateCommand cmd) throws IOException { BytesRef idBytes = cmd.getIndexedId(); if (idBytes == null) { super.processAdd(cmd); return false; } if (vinfo == null) { if (AtomicUpdateDocumentMerger.isAtomicUpdate(cmd)) { throw new SolrException (SolrException.ErrorCode.BAD_REQUEST, "Atomic document updates are not supported unlessis configured"); } else { super.processAdd(cmd); return false; } } // This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash here) int bucketHash = Hash.murmurhash3_x86_32(idBytes.bytes, idBytes.offset, idBytes.length, 0); long versionOnUpdate = cmd.getVersion(); if (versionOnUpdate == 0) { SolrInputField versionField = cmd.getSolrInputDocument().getField(VersionInfo.VERSION_FIELD); if (versionField != null) { Object o = versionField.getValue(); versionOnUpdate = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString()); } else { // Find the version String versionOnUpdateS = req.getParams().get(VERSION_FIELD); versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS); } } boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0; // peerSync是数据恢复流程吗,后续详细写。 boolean leaderLogic = isLeader && !isReplayOrPeersync; boolean forwardedFromCollection = cmd.getReq().getParams().get(DISTRIB_FROM_COLLECTION) != null; VersionBucket bucket = vinfo.bucket(bucketHash); vinfo.lockForUpdate(); try { synchronized (bucket) { boolean checkDeleteByQueries = false; if (versionsStored) { long bucketVersion = bucket.highest; if (leaderLogic) { if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) { // forwarded from a collection but we are not buffering so strip original version and apply our own // see SOLR-5308 log.info("Removing version field from doc: " + cmd.getPrintableId()); cmd.solrDoc.remove(VERSION_FIELD); versionOnUpdate = 0; } boolean updated = getUpdatedDocument(cmd, versionOnUpdate); // leaders can also be in buffering state during "migrate" API call, see SOLR-5308 if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { // we're not in an active state, and this update isn't from a replay, so buffer it. log.info("Leader logic applied but update log is buffering: " + cmd.getPrintableId()); cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); ulog.add(cmd); return true; } if (versionOnUpdate != 0) { Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); long foundVersion = lastVersion == null ? -1 : lastVersion; if ( versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0) || (versionOnUpdate==1 && foundVersion > 0) ) { // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd // specified it must exist (versionOnUpdate==1) and it does. } else { throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId() + " expected=" + versionOnUpdate + " actual=" + foundVersion); } } long version = vinfo.getNewClock(); // 主节点会生成时间戳,将时间戳放进cmd中,传给replica cmd.setVersion(version); cmd.getSolrInputDocument().setField(VersionInfo.VERSION_FIELD, version); bucket.updateHighest(version); } else { // The leader forwarded us this update. cmd.setVersion(versionOnUpdate); if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { // we're not in an active state, and this update isn't from a replay, so buffer it. cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); ulog.add(cmd); return true; } // if we aren't the leader, then we need to check that updates were not re-ordered if (bucketVersion != 0 && bucketVersion < versionOnUpdate) { // we're OK... this update has a version higher than anything we've seen // in this bucket so far, so we know that no reordering has yet occurred. bucket.updateHighest(versionOnUpdate); } else { // there have been updates higher than the current update. we need to check // the specific version for this id. Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) { // This update is a repeat, or was reordered. We need to drop this update. log.debug("Dropping add update due to version {}", idBytes.utf8ToString()); return true; } // also need to re-apply newer deleteByQuery commands checkDeleteByQueries = true; } } } boolean willDistrib = isLeader && nodes != null && nodes.size() > 0; SolrInputDocument clonedDoc = null; if (willDistrib) { clonedDoc = cmd.solrDoc.deepCopy(); } // 如果是主节点将会调用本地添加,这个时候分发给其它replica的请求还没有发送,也就是说主节点先进行数据的持久化,doLocalAdd就是调用super.processAdd()。 doLocalAdd(cmd); if (willDistrib) { cmd.solrDoc = clonedDoc; } } // end synchronized (bucket) } finally { vinfo.unlockForUpdate(); } return false; }
主要的逻辑就是等待上面分发的请求执行结束,判断是否需要重新发送请求,保证备节点能的数据与主节点的同步。
cmdDistrib.finish(); // 重点也就是这个地方了。 Listerrors = cmdDistrib.getErrors(); // TODO - we may need to tell about more than one error... List errorsForClient = new ArrayList<>(errors.size()); for (final SolrCmdDistributor.Error error : errors) { if (error.req.node instanceof RetryNode) { // if it's a forward, any fail is a problem - // otherwise we assume things are fine if we got it locally // until we start allowing min replication param errorsForClient.add(error); continue; } ......
SolrCmdDistributor.finish() 看一下代码就明白这一步做了什么。
public void finish() { try { assert ! finished : "lifecycle sanity check"; finished = true; blockAndDoRetries(); // 阻塞线程,判断是否需要重试。 } finally { clients.shutdown(); } }
数据的本地持久化,这一层就是要将数据写入到lucene。这个processor的入口是RunUpdateProcessor,但是主要的处理逻辑在DirectUpdateHandler2,所以直接看一下这里面的实现方式。
private int addDoc0(AddUpdateCommand cmd) throws IOException { int rc = -1; addCommands.increment(); addCommandsCumulative.increment(); // if there is no ID field, don't overwrite if (idField == null) { cmd.overwrite = false; } try { if (cmd.overwrite) { // Check for delete by query commands newer (i.e. reordered). This // should always be null on a leader ListdeletesAfter = null; if (ulog != null && cmd.version > 0) { deletesAfter = ulog.getDBQNewer(cmd.version); } if (deletesAfter != null) { addAndDelete(cmd, deletesAfter); } else { doNormalUpdate(cmd); // 主要关注 } } else { allowDuplicateUpdate(cmd); } if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) { // 判断是否需要自动提交 if (commitWithinSoftCommit) { // lucene只有一个commit概念,对应到solr就是硬提交,让softcommit是solr为了实现近实时搜索,实现的一个概念。 commitTracker.addedDocument(-1); softCommitTracker.addedDocument(cmd.commitWithin); } else { softCommitTracker.addedDocument(-1); commitTracker.addedDocument(cmd.commitWithin); } } rc = 1; } finally { if (rc != 1) { numErrors.increment(); numErrorsCumulative.increment(); } else { numDocsPending.increment(); } } return rc; }
private void doNormalUpdate(AddUpdateCommand cmd) throws IOException { Term updateTerm; Term idTerm = new Term(cmd.isBlock() ? "_root_" : idField.getName(), cmd.getIndexedId()); boolean del = false; if (cmd.updateTerm == null) { updateTerm = idTerm; } else { // this is only used by the dedup update processor del = true; updateTerm = cmd.updateTerm; } RefCountediw = solrCoreState.getIndexWriter(core); try { IndexWriter writer = iw.get(); // indexWrter已经属于lucene的概念 if (cmd.isBlock()) { writer.updateDocuments(updateTerm, cmd); } else { Document luceneDocument = cmd.getLuceneDocument(); // SolrCore.verbose("updateDocument",updateTerm,luceneDocument,writer); writer.updateDocument(updateTerm, luceneDocument); } // SolrCore.verbose("updateDocument",updateTerm,"DONE"); if (del) { // ensure id remains unique BooleanQuery.Builder bq = new BooleanQuery.Builder(); bq.add(new BooleanClause(new TermQuery(updateTerm), Occur.MUST_NOT)); bq.add(new BooleanClause(new TermQuery(idTerm), Occur.MUST)); writer.deleteDocuments(new DeleteByQueryWrapper(bq.build(), core.getLatestSchema())); } if (ulog != null) ulog.add(cmd); // 重点!!!将cmd记录在ulog中,类似事务日志。在index存储的目录中可以找到一个tlog.00000000000000000类似名字的文件,里面记录了明文的cmd请求。hardCommit之后tlog的id会递增,softCommit不会新创建文件。 } finally { iw.decref(); } }
事务日志主要就是保证数据的完整性。当节点发生异常的时候可以依靠日志恢复还没有持久化的数据,以及做到leader和replica的数据同步。主要涉及的有以下几个场景:
replay场景:一个场景,节点add完数据之后没有做提交,这时数据没有持久化,在这个时候节点挂了。重新启动之后会进行replay操作,因为tlog文件中没有commit记录,在文件结尾没有标记为,这时候节点就会本地重放tlog,调用processorChain进行一个本地的数据添加。另外一个场景,数据在进行恢复的时候,replica是recoverying的状态,如果现在还有数据添加的请求,数据只会记录在tlog中,在恢复结束之后会调用replay。 peerSync:主要用于少量数据的恢复,100条以内。节点想另外的节点发送请求,获取最近得100条数据的version,进行比较之后,如果有缺失的,就会向对应节点发送请求,获取数据,进行本地更新。 replicate:直接进行文件拷贝。如果待回复的数据量太大,直接进行index文件的拷贝,也会涉及到tlog的拷贝。 后续详细介绍这一部分的内容。