2424 * as sending a message, opening a socket connection, or gathering statistics.
2525 */
2626
27+ #include <stdio.h>
28+ #include <string.h>
2729#include "coef.h"
2830#include "common.h"
2931#include "countdown_cond.h"
3032#include "flow.h"
3133#include "histo.h"
34+ #include "logging.h"
3235#include "percentiles.h"
3336#include "print.h"
3437#include "rr.h"
@@ -202,6 +205,15 @@ static void rr_state_init(struct thread *t, int fd,
202205 };
203206
204207 flow_create (& args );
208+
209+ if (t -> opts -> client && t -> opts -> log_rtt && t -> rtt_logs == NULL ) {
210+ t -> rtt_log_capacity = (long )t -> flow_limit *
211+ t -> opts -> logrtt_entries_per_flow ;
212+ if (t -> rtt_log_capacity > 0 ) {
213+ t -> rtt_logs = calloc_or_die (
214+ t -> rtt_log_capacity , sizeof (struct rtt_log ), t -> cb );
215+ }
216+ }
205217}
206218
207219/*
@@ -327,6 +339,19 @@ static bool rr_do_compl(struct flow *f,
327339 struct neper_histo * histo = stat -> histo (stat );
328340 neper_histo_event (histo , elapsed );
329341
342+ if (t -> opts -> client && t -> rtt_logs ) {
343+ long count = flow_rtt_log_count (f );
344+ if (count < t -> opts -> logrtt_entries_per_flow ) {
345+ long offset = flow_id (f ) * t -> opts -> logrtt_entries_per_flow ;
346+ struct rtt_log * log = & t -> rtt_logs [offset + count ];
347+ log -> rtt = elapsed ;
348+ log -> thread_id = t -> index ;
349+ log -> flow_id = flow_id (f );
350+ log -> timestamp = * now ;
351+ flow_increment_rtt_log_count (f );
352+ }
353+ }
354+
330355 if (t -> data_pending ) {
331356 /* data vs time mode, last rr? */
332357 if (!countdown_cond_commit (t -> data_pending )) {
@@ -497,6 +522,54 @@ fn_add(struct neper_stat *stat, void *ptr)
497522 return 0 ;
498523}
499524
525+ static void rr_log_rtt (struct thread * tinfo , struct callbacks * cb ,
526+ int num_transactions )
527+ {
528+ const char * sep = " " ;
529+ const char * ext = strrchr (tinfo [0 ].opts -> log_rtt , '.' );
530+ if (ext && !strcmp (ext , ".csv" ))
531+ sep = "," ;
532+
533+ FILE * rtt_log_file = fopen (tinfo [0 ].opts -> log_rtt , "w" );
534+ if (!rtt_log_file )
535+ PLOG_FATAL (cb , "fopen %s" , tinfo [0 ].opts -> log_rtt );
536+
537+ fprintf (rtt_log_file , "%15s%s%10s%s%10s%s%12s\n" , "timestamp" , sep , "thread_id" , sep , "flow_id" , sep , "rtt" );
538+
539+ long transactions_logged = 0 ;
540+ const struct timespec * start_time = tinfo [0 ].time_start ;
541+
542+ for (int i = 0 ; i < tinfo [0 ].opts -> num_threads ; i ++ ) {
543+ struct thread * t = & tinfo [i ];
544+ if (!t -> rtt_logs )
545+ continue ;
546+ for (int j = 0 ; j < t -> flow_count ; j ++ ) {
547+ struct flow * f = t -> flows [j ];
548+ if (!f )
549+ continue ;
550+ transactions_logged += flow_rtt_log_count (f );
551+ long offset = flow_id (f ) * t -> opts -> logrtt_entries_per_flow ;
552+ for (long k = 0 ; k < flow_rtt_log_count (f ); k ++ ) {
553+ struct rtt_log * log = & t -> rtt_logs [offset + k ];
554+ double elapsed = seconds_between (start_time , & log -> timestamp );
555+ fprintf (rtt_log_file , "%15.9f%s%10d%s%10d%s%12.9f\n" ,
556+ elapsed , sep , log -> thread_id , sep ,
557+ log -> flow_id , sep , log -> rtt );
558+ }
559+ }
560+ free (t -> rtt_logs );
561+ t -> rtt_logs = NULL ;
562+ }
563+ fclose (rtt_log_file );
564+ PRINT (cb , "rtt_transactions_logged" , "%ld" , transactions_logged );
565+
566+ if (transactions_logged < num_transactions ) {
567+ LOG_INFO (cb ,
568+ "rtt_transactions_logged (%ld) < num_transactions (%d)" ,
569+ transactions_logged , num_transactions );
570+ }
571+ }
572+
500573int rr_report_stats (struct thread * tinfo )
501574{
502575 const struct options * opts = tinfo [0 ].opts ;
@@ -511,6 +584,9 @@ int rr_report_stats(struct thread *tinfo)
511584 int num_events = thread_stats_events (tinfo );
512585 PRINT (cb , "num_transactions" , "%d" , num_events );
513586
587+ if (opts -> client && tinfo [0 ].opts -> log_rtt )
588+ rr_log_rtt (tinfo , cb , num_events );
589+
514590 struct neper_histo * sum = neper_histo_new (tinfo , DEFAULT_K_BITS );
515591 for (i = 0 ; i < opts -> num_threads ; i ++ )
516592 tinfo [i ].stats -> sumforeach (tinfo [i ].stats , fn_add , sum );
0 commit comments