@@ -204,6 +204,8 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
204204 "Default size for DirectAgentPool" , false );
205205 protected final ConfigKey <Float > DirectAgentThreadCap = new ConfigKey <Float >("Advanced" , Float .class , "direct.agent.thread.cap" , "1" ,
206206 "Percentage (as a value between 0 and 1) of direct.agent.pool.size to be used as upper thread cap for a single direct agent to process requests" , false );
207+ protected final ConfigKey <Integer > PeerNotFoundRetryAttempts = new ConfigKey <Integer >("Advanced" , Integer .class , "agent.peer.not.found.retry.attempts" , "1" ,
208+ "Number of retry attempts for agent sends when 'Unable to find peer' is encountered" , true );
207209 protected final ConfigKey <Boolean > CheckTxnBeforeSending = new ConfigKey <Boolean >("Developer" , Boolean .class , "check.txn.before.sending.agent.commands" , "false" ,
208210 "This parameter allows developers to enable a check to see if a transaction wraps commands that are sent to the resource. This is not to be enabled on production systems." , true );
209211
@@ -421,6 +423,14 @@ private Command[] checkForCommandsAndTag(final Commands commands) {
421423 return cmds ;
422424 }
423425
426+ private boolean isPeerNotFoundException (final AgentUnavailableException exception ) {
427+ return exception != null && "Unable to find peer" .equals (exception .getMessage ());
428+ }
429+
430+ private int getPeerNotFoundRetryAttempts () {
431+ return PeerNotFoundRetryAttempts .value ();
432+ }
433+
424434 /**
425435 * @param commands
426436 * @param cmds
@@ -452,19 +462,34 @@ public Answer[] send(final Long hostId, final Commands commands, int timeout) th
452462 }
453463
454464 final Command [] cmds = checkForCommandsAndTag (commands );
465+ int retry = 0 ;
466+ final int maxRetries = getPeerNotFoundRetryAttempts ();
467+ while (true ) {
468+ //check what agent is returned.
469+ final AgentAttache agent = getAttache (hostId );
470+ if (agent == null || agent .isClosed ()) {
471+ throw new AgentUnavailableException ("agent not logged into this management server" , hostId );
472+ }
455473
456- //check what agent is returned.
457- final AgentAttache agent = getAttache (hostId );
458- if (agent == null || agent .isClosed ()) {
459- throw new AgentUnavailableException ("agent not logged into this management server" , hostId );
474+ final Request req = new Request (hostId , agent .getName (), _nodeId , cmds , commands .stopOnError (), true );
475+ req .setSequence (agent .getNextSequence ());
476+ try {
477+ final Answer [] answers = agent .send (req , timeout );
478+ notifyAnswersToMonitors (hostId , req .getSequence (), answers );
479+ commands .setAnswers (answers );
480+ return answers ;
481+ } catch (final AgentUnavailableException e ) {
482+ if (isPeerNotFoundException (e ) && retry < maxRetries ) {
483+ retry ++;
484+ if (s_logger .isDebugEnabled ()) {
485+ s_logger .debug (String .format ("Retrying send to host %s due to transient peer mismatch (%d/%d)" , hostId , retry ,
486+ maxRetries ));
487+ }
488+ continue ;
489+ }
490+ throw e ;
491+ }
460492 }
461-
462- final Request req = new Request (hostId , agent .getName (), _nodeId , cmds , commands .stopOnError (), true );
463- req .setSequence (agent .getNextSequence ());
464- final Answer [] answers = agent .send (req , timeout );
465- notifyAnswersToMonitors (hostId , req .getSequence (), answers );
466- commands .setAnswers (answers );
467- return answers ;
468493 }
469494
470495 protected Status investigate (final AgentAttache agent ) {
@@ -502,18 +527,32 @@ protected AgentAttache getAttache(final Long hostId) throws AgentUnavailableExce
502527
503528 @ Override
504529 public long send (final Long hostId , final Commands commands , final Listener listener ) throws AgentUnavailableException {
505- final AgentAttache agent = getAttache (hostId );
506- if (agent .isClosed ()) {
507- throw new AgentUnavailableException ("Agent " + agent .getId () + " is closed" , agent .getId ());
508- }
509-
510530 final Command [] cmds = checkForCommandsAndTag (commands );
531+ int retry = 0 ;
532+ final int maxRetries = getPeerNotFoundRetryAttempts ();
533+ while (true ) {
534+ final AgentAttache agent = getAttache (hostId );
535+ if (agent .isClosed ()) {
536+ throw new AgentUnavailableException ("Agent " + agent .getId () + " is closed" , agent .getId ());
537+ }
511538
512- final Request req = new Request (hostId , agent .getName (), _nodeId , cmds , commands .stopOnError (), true );
513- req .setSequence (agent .getNextSequence ());
514-
515- agent .send (req , listener );
516- return req .getSequence ();
539+ final Request req = new Request (hostId , agent .getName (), _nodeId , cmds , commands .stopOnError (), true );
540+ req .setSequence (agent .getNextSequence ());
541+ try {
542+ agent .send (req , listener );
543+ return req .getSequence ();
544+ } catch (final AgentUnavailableException e ) {
545+ if (isPeerNotFoundException (e ) && retry < maxRetries ) {
546+ retry ++;
547+ if (s_logger .isDebugEnabled ()) {
548+ s_logger .debug (String .format ("Retrying async send to host %s due to transient peer mismatch (%d/%d)" , hostId , retry ,
549+ maxRetries ));
550+ }
551+ continue ;
552+ }
553+ throw e ;
554+ }
555+ }
517556 }
518557
519558 public void removeAgent (final AgentAttache attache , final Status nextState ) {
0 commit comments