博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
solr源码分析--addDocument
阅读量:4298 次
发布时间:2019-05-27

本文共 29444 字,大约阅读时间需要 98 分钟。

马上2017就要结束了,今年研究了不少solr相关的问题,年末的时候还是做一个总结吧。明年说不定就开始搞ES了。

一、数据流传递过程


  • 数据添加的API是以post的形式发送,此处以solrj数据导入为例,接收数据就用到了JavabinLoader。前面的SolrDispatcher流程暂时跳过,直接看handleRequestBody的处理过程。
    ContentStreamHandlerBase.java
@Override  public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {    SolrParams params = req.getParams();    UpdateRequestProcessorChain processorChain =        req.getCore().getUpdateProcessorChain(params);    UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);    try {      ContentStreamLoader documentLoader = newLoader(req, processor); // 此处的loader就是JavaBinLoader      Iterable
streams = req.getContentStreams(); if (streams == null) { if (!RequestHandlerUtils.handleCommit(req, processor, params, false) && !RequestHandlerUtils.handleRollback(req, processor, params, false)) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "missing content stream"); } } else { for (ContentStream stream : streams) { documentLoader.load(req, rsp, stream, processor); // 具体的数据流处理流程 } // Perhaps commit from the parameters RequestHandlerUtils.handleCommit(req, processor, params, false); RequestHandlerUtils.handleRollback(req, processor, params, false); } } finally { // finish the request processor.finish(); } }

这些操作只算是数据的预处理。

public class JavabinLoader extends ContentStreamLoader {
@Override public void load(SolrQueryRequest req, SolrQueryResponse rsp, ContentStream stream, UpdateRequestProcessor processor) throws Exception { InputStream is = null; try { is = stream.getStream(); parseAndLoadDocs(req, rsp, is, processor); } finally { if(is != null) { is.close(); } } } private void parseAndLoadDocs(final SolrQueryRequest req, SolrQueryResponse rsp, InputStream stream, final UpdateRequestProcessor processor) throws IOException { UpdateRequest update = null; JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = new JavaBinUpdateRequestCodec.StreamingUpdateHandler() { // 匿名类,定义doc的处理逻辑,关注update方法 private AddUpdateCommand addCmd = null; @Override public void update(SolrInputDocument document, UpdateRequest updateRequest, Integer commitWithin, Boolean overwrite) { // 需要注意的是,solrJ可以提交多个doc,但在这里进行处理的时候,只会一各一个的串行处理。 if (document == null) { // Perhaps commit from the parameters try { RequestHandlerUtils.handleCommit(req, processor, updateRequest.getParams(), false); RequestHandlerUtils.handleRollback(req, processor, updateRequest.getParams(), false); } catch (IOException e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ERROR handling commit/rollback"); } return; } if (addCmd == null) { addCmd = getAddCommand(req, updateRequest.getParams()); } addCmd.solrDoc = document; if (commitWithin != null) { addCmd.commitWithin = commitWithin; } if (overwrite != null) { addCmd.overwrite = overwrite; } if (updateRequest.isLastDocInBatch()) { // this is a hint to downstream code that indicates we've sent the last doc in a batch addCmd.isLastDocInBatch = true; } try { processor.processAdd(addCmd); // 使用构造的addCmd传入processor进行添加请求的处理。 addCmd.clear(); } catch (IOException e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ERROR adding document " + document, e); } } }; FastInputStream in = FastInputStream.wrap(stream); for (; ; ) { try { update = new JavaBinUpdateRequestCodec().unmarshal(in, handler); // 最终会调用上面匿名类的update方法,进行doc添加操作 } catch (EOFException e) { break; // this is expected } if (update.getDeleteByIdMap() != null || update.getDeleteQuery() != null) { delete(req, update, processor); } } }............}

  具体看一下JavaBinUpdateRequestCodec的处理逻辑,主要就是定义数据流的处理方式,然后调用上面定义的handler。

public UpdateRequest unmarshal(InputStream is, final StreamingUpdateHandler handler) throws IOException {    final UpdateRequest updateRequest = new UpdateRequest();    List
> doclist; List
>> docMap; List
delById; Map
> delByIdMap; List
delByQ; final NamedList[] namedList = new NamedList[1]; JavaBinCodec codec = new JavaBinCodec() { // 定义url请求传入的stream的读取方式,并遍历stream中的每一个doc. // NOTE: this only works because this is an anonymous inner class // which will only ever be used on a single stream -- if this class // is ever refactored, this will not work. private boolean seenOuterMostDocIterator = false; @Override public NamedList readNamedList(DataInputInputStream dis) throws IOException { int sz = readSize(dis); NamedList nl = new NamedList(); if (namedList[0] == null) { namedList[0] = nl; } for (int i = 0; i < sz; i++) { String name = (String) readVal(dis); Object val = readVal(dis); nl.add(name, val); } return nl; } @Override public List readIterator(DataInputInputStream fis) throws IOException { // default behavior for reading any regular Iterator in the stream if (seenOuterMostDocIterator) return super.readIterator(fis); // special treatment for first outermost Iterator // (the list of documents) seenOuterMostDocIterator = true; return readOuterMostDocIterator(fis); } private List readOuterMostDocIterator(DataInputInputStream fis) throws IOException { NamedList params = (NamedList) namedList[0].get("params"); updateRequest.setParams(new ModifiableSolrParams(SolrParams.toSolrParams(params))); if (handler == null) return super.readIterator(fis); Integer commitWithin = null; Boolean overwrite = null; Object o = null; while (true) { if (o == null) { o = readVal(fis); } if (o == END_OBJ) { break; } SolrInputDocument sdoc = null; if (o instanceof List) { sdoc = listToSolrInputDocument((List
) o); } else if (o instanceof NamedList) { UpdateRequest req = new UpdateRequest(); req.setParams(new ModifiableSolrParams(SolrParams.toSolrParams((NamedList) o))); handler.update(null, req, null, null); } else if (o instanceof Map.Entry){ sdoc = (SolrInputDocument) ((Map.Entry) o).getKey(); Map p = (Map) ((Map.Entry) o).getValue(); if (p != null) { commitWithin = (Integer) p.get(UpdateRequest.COMMIT_WITHIN); overwrite = (Boolean) p.get(UpdateRequest.OVERWRITE); } } else { sdoc = (SolrInputDocument) o; } // peek at the next object to see if we're at the end o = readVal(fis); if (o == END_OBJ) { // indicate that we've hit the last doc in the batch, used to enable optimizations when doing replication updateRequest.lastDocInBatch(); } handler.update(sdoc, updateRequest, commitWithin, overwrite); // 前面所说的又调用了匿名类的update方法,handler是一种代码风格吧,源码中能见到很多地方都是这么定义的名字。 } return Collections.EMPTY_LIST; } };......非关键代码省略...... }

二、ProcessorChain


  • 下面就是比较关键的ProcessorChain的概念了,processorChain处理每一个doc。如果有需要我们可以自己定义一个processor,类似一种插件机制,需要在solrconfig.xml也做出相应的改动。processorChain每一次只处理一个document。
    这里写图片描述
      请求过来之后,new一个processorChain进行数据的处理。默认的情况有三个processor,LogUpdateProcessor,DistributeUpdateProcessor以及DirectUpdateHandler2。简单介绍一下。
      LogUpdateProcessor:用来日志处理,就是纯粹的日志跟事务日志无关。
      DistributeUpdateProcessor:处理分发逻辑,根据FROMLEADER,TOLEADER这样的标志位确定数据的转发逻辑。
      DirectUpdateHandler2:本地数据的存储,也就是调用lucene的API。并且在这里会有tlog事务日志的存储,保证数据的安全性,恢复机制都与tlog和ulog有关。

LogUpdateProcessor

  现在我们只看添加的流程。LogUpdateProcessor的流程比较简单。

@Override    public void processAdd(AddUpdateCommand cmd) throws IOException {      if (logDebug) { log.debug("PRE_UPDATE " + cmd.toString() + " " + req); }      // call delegate first so we can log things like the version that get set later      if (next != null) next.processAdd(cmd); // 调用下一个processor,也就是DistributeUpdateProcessor      // Add a list of added id's to the response      if (adds == null) {        adds = new ArrayList<>();        toLog.add("add",adds);      }      if (adds.size() < maxNumToLog) {        long version = cmd.getVersion();        String msg = cmd.getPrintableId();        if (version != 0) msg = msg + " (" + version + ')';        adds.add(msg);      }      numAdds++;    }

DistributeUpdateProcessor

  这一个就比较重要,牵扯到solr的分发逻辑。首先再次提一下solr的replica逻辑。

  这里写图片描述
  数据是先到leader节点,然后再进行转发到replica,leader要等待replica处理添加请求,这个逻辑在finish()方法中实现。
  

processorAdd

  其中两个重要的方法,setupRequest计算需要转发的节点,会根据uniqueKey生成一个hash值,根据hash值计算此doc转发到哪一个shard(collection在创建的时候会为每一个shard分配一个hash区间),内部自带的hash算法会保证doc会均分在不同的shard中。;versionAdd如果本节点是leader,就会计算出相应的_version_信息,doc添加的一个时间戳,只有leader节点会生成,其他replica使用的也是leader节点生成的值。

@Override  public void processAdd(AddUpdateCommand cmd) throws IOException {    assert TestInjection.injectFailUpdateRequests();    updateCommand = cmd;    if (zkEnabled) {      zkCheck();      nodes = setupRequest(cmd.getHashableId(), cmd.getSolrInputDocument());  // 处理转发逻辑    } else {      isLeader = getNonZkLeaderAssumption(req);    }    // check if client has requested minimum replication factor information    int minRf = -1; // disabled by default    if (replicationTracker != null) {      minRf = replicationTracker.minRf; // for subsequent requests in the same batch    } else {      SolrParams rp = cmd.getReq().getParams();            String distribUpdate = rp.get(DISTRIB_UPDATE_PARAM);      // somewhat tricky logic here: we only activate the replication tracker if we're on       // a leader or this is the top-level request processor      if (distribUpdate == null || distribUpdate.equals(DistribPhase.TOLEADER.toString())) {        String minRepFact = rp.get(UpdateRequest.MIN_REPFACT);        if (minRepFact != null) {          try {            minRf = Integer.parseInt(minRepFact);          } catch (NumberFormatException nfe) {            minRf = -1;          }          if (minRf <= 0)            throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid value "+minRepFact+" for "+UpdateRequest.MIN_REPFACT+                "; must be >0 and less than or equal to the collection replication factor.");        }        if (minRf > 1) {          String myShardId = forwardToLeader ? null : cloudDesc.getShardId();          replicationTracker = new RequestReplicationTracker(myShardId, minRf);        }                      }    }    // TODO: if minRf > 1 and we know the leader is the only active replica, we could fail    // the request right here but for now I think it is better to just return the status    // to the client that the minRf wasn't reached and let them handle it        boolean dropCmd = false;    if (!forwardToLeader) {            dropCmd = versionAdd(cmd); // 生成_version_字段,如果是主节点会调用super.processorAdd()也就是调用DirectUpdateHandler将doc进行本地处理,调用lucene接口。    }    if (dropCmd) {      // TODO: do we need to add anything to the response?      return;    }    if (zkEnabled && isLeader && !isSubShardLeader)  {      DocCollection coll = zkController.getClusterState().getCollection(collection);      List
subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getHashableId(), cmd.getSolrInputDocument()); // the list
will actually have only one element for an add request if (subShardLeaders != null && !subShardLeaders.isEmpty()) { ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); params.set(DISTRIB_FROM_PARENT, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); for (Node subShardLeader : subShardLeaders) { cmdDistrib.distribAdd(cmd, Collections.singletonList(subShardLeader), params, true); // 调用http client将请求分发到其他replica节点,在调用finish的时候,线程会等待所有replica的response } } final List
nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getHashableId(), cmd.getSolrInputDocument()); if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); params.set(DISTRIB_FROM_COLLECTION, req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName()); params.set(DISTRIB_FROM_SHARD, req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId()); for (Node nodesByRoutingRule : nodesByRoutingRules) { cmdDistrib.distribAdd(cmd, Collections.singletonList(nodesByRoutingRule), params, true); } } } ModifiableSolrParams params = null; if (nodes != null) { params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(DISTRIB_UPDATE_PARAM, (isLeader || isSubShardLeader ? DistribPhase.FROMLEADER.toString() : DistribPhase.TOLEADER.toString())); // 设置转发请求参数,根据hash值可能请求需要转发到另一个leader处理(TOLEADER),或者从主节点进行转发(FROMLEADER)。 params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); if (replicationTracker != null && minRf > 1) params.set(UpdateRequest.MIN_REPFACT, String.valueOf(minRf)); cmdDistrib.distribAdd(cmd, nodes, params, false, replicationTracker); }............ }

setupRequest() 返回转发节点

  具体的转发逻辑可以参考博客

private List
setupRequest(String id, SolrInputDocument doc, String route) { List
nodes = null; // if we are in zk mode... if (zkEnabled) { assert TestInjection.injectUpdateRandomPause(); if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) { isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway. forwardToLeader = false; return nodes; } ClusterState cstate = zkController.getClusterState(); DocCollection coll = cstate.getCollection(collection); Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll); // doc分发的路由逻辑,默认使用的是compositeId,还有implicit可以选择。 if (slice == null) { // No slice found. Most strict routers will have already thrown an exception, so a null return is // a signal to use the slice of this core. // TODO: what if this core is not in the targeted collection? String shardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId(); slice = coll.getSlice(shardId); if (slice == null) { throw new SolrException(ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll); } } DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); if (DistribPhase.FROMLEADER == phase && !couldIbeSubShardLeader(coll)) { if (req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader()) { // locally we think we are leader but the request says it came FROMLEADER // that could indicate a problem, let the full logic below figure it out } else { assert TestInjection.injectFailReplicaRequests(); isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway. forwardToLeader = false; return nodes; } } String shardId = slice.getName(); try { // Not equivalent to getLeaderProps, which does retries to find a leader. // Replica leader = slice.getLeader(); Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry( collection, shardId); isLeader = leaderReplica.getName().equals( req.getCore().getCoreDescriptor().getCloudDescriptor() .getCoreNodeName()); if (!isLeader) { isSubShardLeader = amISubShardLeader(coll, slice, id, doc); if (isSubShardLeader) { String myShardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId(); slice = coll.getSlice(myShardId); shardId = myShardId; leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, myShardId); List
myReplicas = zkController.getZkStateReader() .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN); } } doDefensiveChecks(phase); // if request is coming from another collection then we want it to be sent to all replicas // even if its phase is FROMLEADER String fromCollection = updateCommand.getReq().getParams().get(DISTRIB_FROM_COLLECTION); if (DistribPhase.FROMLEADER == phase && !isSubShardLeader && fromCollection == null) { // we are coming from the leader, just go local - add no urls forwardToLeader = false; } else if (isLeader || isSubShardLeader) { // that means I want to forward onto my replicas... // so get the replicas... forwardToLeader = false; List
replicaProps = zkController.getZkStateReader() .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN); if (replicaProps != null) { if (nodes == null) { nodes = new ArrayList<>(replicaProps.size()); } // check for test param that lets us miss replicas String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS); Set
skipListSet = null; if (skipList != null) { skipListSet = new HashSet<>(skipList.length); skipListSet.addAll(Arrays.asList(skipList)); log.info("test.distrib.skip.servers was found and contains:" + skipListSet); } for (ZkCoreNodeProps props : replicaProps) { if (skipList != null) { boolean skip = skipListSet.contains(props.getCoreUrl()); log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:" + skip); if (!skip) { nodes.add(new StdNode(props, collection, shardId)); } } else { nodes.add(new StdNode(props, collection, shardId)); } } } } else { // I need to forward onto the leader... nodes = new ArrayList<>(1); nodes.add(new RetryNode(new ZkCoreNodeProps(leaderReplica), zkController.getZkStateReader(), collection, shardId)); forwardToLeader = true; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); } } return nodes; }

versionAdd

protected boolean versionAdd(AddUpdateCommand cmd) throws IOException {    BytesRef idBytes = cmd.getIndexedId();    if (idBytes == null) {      super.processAdd(cmd);      return false;    }    if (vinfo == null) {      if (AtomicUpdateDocumentMerger.isAtomicUpdate(cmd)) {        throw new SolrException          (SolrException.ErrorCode.BAD_REQUEST,           "Atomic document updates are not supported unless 
is configured"); } else { super.processAdd(cmd); return false; } } // This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash here) int bucketHash = Hash.murmurhash3_x86_32(idBytes.bytes, idBytes.offset, idBytes.length, 0); long versionOnUpdate = cmd.getVersion(); if (versionOnUpdate == 0) { SolrInputField versionField = cmd.getSolrInputDocument().getField(VersionInfo.VERSION_FIELD); if (versionField != null) { Object o = versionField.getValue(); versionOnUpdate = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString()); } else { // Find the version String versionOnUpdateS = req.getParams().get(VERSION_FIELD); versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS); } } boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0; // peerSync是数据恢复流程吗,后续详细写。 boolean leaderLogic = isLeader && !isReplayOrPeersync; boolean forwardedFromCollection = cmd.getReq().getParams().get(DISTRIB_FROM_COLLECTION) != null; VersionBucket bucket = vinfo.bucket(bucketHash); vinfo.lockForUpdate(); try { synchronized (bucket) { boolean checkDeleteByQueries = false; if (versionsStored) { long bucketVersion = bucket.highest; if (leaderLogic) { if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) { // forwarded from a collection but we are not buffering so strip original version and apply our own // see SOLR-5308 log.info("Removing version field from doc: " + cmd.getPrintableId()); cmd.solrDoc.remove(VERSION_FIELD); versionOnUpdate = 0; } boolean updated = getUpdatedDocument(cmd, versionOnUpdate); // leaders can also be in buffering state during "migrate" API call, see SOLR-5308 if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { // we're not in an active state, and this update isn't from a replay, so buffer it. log.info("Leader logic applied but update log is buffering: " + cmd.getPrintableId()); cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); ulog.add(cmd); return true; } if (versionOnUpdate != 0) { Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); long foundVersion = lastVersion == null ? -1 : lastVersion; if ( versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0) || (versionOnUpdate==1 && foundVersion > 0) ) { // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd // specified it must exist (versionOnUpdate==1) and it does. } else { throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId() + " expected=" + versionOnUpdate + " actual=" + foundVersion); } } long version = vinfo.getNewClock(); // 主节点会生成时间戳,将时间戳放进cmd中,传给replica cmd.setVersion(version); cmd.getSolrInputDocument().setField(VersionInfo.VERSION_FIELD, version); bucket.updateHighest(version); } else { // The leader forwarded us this update. cmd.setVersion(versionOnUpdate); if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { // we're not in an active state, and this update isn't from a replay, so buffer it. cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); ulog.add(cmd); return true; } // if we aren't the leader, then we need to check that updates were not re-ordered if (bucketVersion != 0 && bucketVersion < versionOnUpdate) { // we're OK... this update has a version higher than anything we've seen // in this bucket so far, so we know that no reordering has yet occurred. bucket.updateHighest(versionOnUpdate); } else { // there have been updates higher than the current update. we need to check // the specific version for this id. Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId()); if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) { // This update is a repeat, or was reordered. We need to drop this update. log.debug("Dropping add update due to version {}", idBytes.utf8ToString()); return true; } // also need to re-apply newer deleteByQuery commands checkDeleteByQueries = true; } } } boolean willDistrib = isLeader && nodes != null && nodes.size() > 0; SolrInputDocument clonedDoc = null; if (willDistrib) { clonedDoc = cmd.solrDoc.deepCopy(); } // 如果是主节点将会调用本地添加,这个时候分发给其它replica的请求还没有发送,也就是说主节点先进行数据的持久化,doLocalAdd就是调用super.processAdd()。 doLocalAdd(cmd); if (willDistrib) { cmd.solrDoc = clonedDoc; } } // end synchronized (bucket) } finally { vinfo.unlockForUpdate(); } return false; }

doFinish()

  主要的逻辑就是等待上面分发的请求执行结束,判断是否需要重新发送请求,保证备节点能的数据与主节点的同步。

cmdDistrib.finish();   // 重点也就是这个地方了。    List
errors = cmdDistrib.getErrors(); // TODO - we may need to tell about more than one error... List
errorsForClient = new ArrayList<>(errors.size()); for (final SolrCmdDistributor.Error error : errors) { if (error.req.node instanceof RetryNode) { // if it's a forward, any fail is a problem - // otherwise we assume things are fine if we got it locally // until we start allowing min replication param errorsForClient.add(error); continue; } ......

  SolrCmdDistributor.finish() 看一下代码就明白这一步做了什么。

public void finish() {        try {      assert ! finished : "lifecycle sanity check";      finished = true;      blockAndDoRetries(); // 阻塞线程,判断是否需要重试。    } finally {      clients.shutdown();    }  }

DirectUpdateHandler2

  数据的本地持久化,这一层就是要将数据写入到lucene。这个processor的入口是RunUpdateProcessor,但是主要的处理逻辑在DirectUpdateHandler2,所以直接看一下这里面的实现方式。

addDoc0

private int addDoc0(AddUpdateCommand cmd) throws IOException {    int rc = -1;    addCommands.increment();    addCommandsCumulative.increment();    // if there is no ID field, don't overwrite    if (idField == null) {      cmd.overwrite = false;    }    try {      if (cmd.overwrite) {        // Check for delete by query commands newer (i.e. reordered). This        // should always be null on a leader        List
deletesAfter = null; if (ulog != null && cmd.version > 0) { deletesAfter = ulog.getDBQNewer(cmd.version); } if (deletesAfter != null) { addAndDelete(cmd, deletesAfter); } else { doNormalUpdate(cmd); // 主要关注 } } else { allowDuplicateUpdate(cmd); } if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) { // 判断是否需要自动提交 if (commitWithinSoftCommit) { // lucene只有一个commit概念,对应到solr就是硬提交,让softcommit是solr为了实现近实时搜索,实现的一个概念。 commitTracker.addedDocument(-1); softCommitTracker.addedDocument(cmd.commitWithin); } else { softCommitTracker.addedDocument(-1); commitTracker.addedDocument(cmd.commitWithin); } } rc = 1; } finally { if (rc != 1) { numErrors.increment(); numErrorsCumulative.increment(); } else { numDocsPending.increment(); } } return rc; }

doNormalUpdate 包含了lucene数据写入以及ulog的记录

private void doNormalUpdate(AddUpdateCommand cmd) throws IOException {    Term updateTerm;    Term idTerm = new Term(cmd.isBlock() ? "_root_" : idField.getName(), cmd.getIndexedId());    boolean del = false;    if (cmd.updateTerm == null) {      updateTerm = idTerm;    } else {      // this is only used by the dedup update processor      del = true;      updateTerm = cmd.updateTerm;    }    RefCounted
iw = solrCoreState.getIndexWriter(core); try { IndexWriter writer = iw.get(); // indexWrter已经属于lucene的概念 if (cmd.isBlock()) { writer.updateDocuments(updateTerm, cmd); } else { Document luceneDocument = cmd.getLuceneDocument(); // SolrCore.verbose("updateDocument",updateTerm,luceneDocument,writer); writer.updateDocument(updateTerm, luceneDocument); } // SolrCore.verbose("updateDocument",updateTerm,"DONE"); if (del) { // ensure id remains unique BooleanQuery.Builder bq = new BooleanQuery.Builder(); bq.add(new BooleanClause(new TermQuery(updateTerm), Occur.MUST_NOT)); bq.add(new BooleanClause(new TermQuery(idTerm), Occur.MUST)); writer.deleteDocuments(new DeleteByQueryWrapper(bq.build(), core.getLatestSchema())); } if (ulog != null) ulog.add(cmd); // 重点!!!将cmd记录在ulog中,类似事务日志。在index存储的目录中可以找到一个tlog.00000000000000000类似名字的文件,里面记录了明文的cmd请求。hardCommit之后tlog的id会递增,softCommit不会新创建文件。 } finally { iw.decref(); } }

tlog: transaction log

  事务日志主要就是保证数据的完整性。当节点发生异常的时候可以依靠日志恢复还没有持久化的数据,以及做到leader和replica的数据同步。主要涉及的有以下几个场景:

  replay场景:一个场景,节点add完数据之后没有做提交,这时数据没有持久化,在这个时候节点挂了。重新启动之后会进行replay操作,因为tlog文件中没有commit记录,在文件结尾没有标记为,这时候节点就会本地重放tlog,调用processorChain进行一个本地的数据添加。另外一个场景,数据在进行恢复的时候,replica是recoverying的状态,如果现在还有数据添加的请求,数据只会记录在tlog中,在恢复结束之后会调用replay。
  peerSync:主要用于少量数据的恢复,100条以内。节点想另外的节点发送请求,获取最近得100条数据的version,进行比较之后,如果有缺失的,就会向对应节点发送请求,获取数据,进行本地更新。
  replicate:直接进行文件拷贝。如果待回复的数据量太大,直接进行index文件的拷贝,也会涉及到tlog的拷贝。
   后续详细介绍这一部分的内容。

你可能感兴趣的文章
乙未年年终总结
查看>>
子网掩码
查看>>
第一天上班没精神
查看>>
启动eclipse报错:Failed to load the JNI shared library
查看>>
eclipse安装插件的两种方式在线和离线
查看>>
linux下源的相关笔记(suse)
查看>>
linux系统分区文件系统划分札记
查看>>
Linux(SUSE 12)安装Tomcat
查看>>
Linux(SUSE 12)安装jboss4并实现远程访问
查看>>
Neutron在给虚拟机分配网络时,底层是如何实现的?
查看>>
netfilter/iptables全攻略
查看>>
Overlay之VXLAN架构
查看>>
Eclipse : An error occurred while filtering resources(Maven错误提示)
查看>>
在eclipse上用tomcat部署项目404解决方案
查看>>
web.xml 配置中classpath: 与classpath*:的区别
查看>>
suse如何修改ssh端口为2222?
查看>>
详细理解“>/dev/null 2>&1”
查看>>
suse如何创建定时任务?
查看>>
suse搭建ftp服务器方法
查看>>
centos虚拟机设置共享文件夹并通过我的电脑访问[增加smbd端口修改]
查看>>