11package bitflow
22
33import (
4+ "net"
5+ "strconv"
46 "sync"
57 "testing"
68 "time"
@@ -10,6 +12,23 @@ import (
1012 "github.com/stretchr/testify/suite"
1113)
1214
15+ func GetLocalPort () string {
16+ addr , err := net .ResolveTCPAddr ("tcp" , "localhost:0" )
17+ if err != nil {
18+ panic (err )
19+ }
20+ l , err := net .ListenTCP ("tcp" , addr )
21+ if err != nil {
22+ panic (err )
23+ }
24+ defer func () {
25+ if closeErr := l .Close (); closeErr != nil {
26+ panic (closeErr )
27+ }
28+ }()
29+ return strconv .Itoa (l .Addr ().(* net.TCPAddr ).Port )
30+ }
31+
1332type TcpListenerTestSuite struct {
1433 testSuiteWithSamples
1534}
@@ -35,17 +54,18 @@ func (suite *TcpListenerTestSuite) runGroup(sender golib.Task, generator SampleP
3554
3655func (suite * TcpListenerTestSuite ) testListenerSinkAll (m BidiMarshaller ) {
3756 testSink := suite .newFilledTestSink ()
57+ port := GetLocalPort ()
3858
3959 l := & TCPListenerSink {
40- Endpoint : ":7878" ,
60+ Endpoint : ":" + port ,
4161 BufferedSamples : 100 ,
4262 }
4363 l .Writer .ParallelSampleHandler = parallel_handler
4464 l .SetMarshaller (m )
4565
4666 s := & TCPSource {
4767 PrintErrors : true ,
48- RemoteAddrs : []string {"localhost:7878" },
68+ RemoteAddrs : []string {"localhost:" + port },
4969 RetryInterval : time .Second ,
5070 DialTimeout : tcp_dial_timeout ,
5171 }
@@ -69,19 +89,20 @@ func (suite *TcpListenerTestSuite) testListenerSinkAll(m BidiMarshaller) {
6989func (suite * TcpListenerTestSuite ) testListenerSinkIndividual (m Marshaller ) {
7090 for i := range suite .headers {
7191 testSink := suite .newTestSinkFor (i )
92+ port := GetLocalPort ()
7293
7394 // TODO test that a smaller buffer leads to dropped samples
7495
7596 l := & TCPListenerSink {
76- Endpoint : ":7878" ,
97+ Endpoint : ":" + port ,
7798 BufferedSamples : 100 ,
7899 }
79100 l .Writer .ParallelSampleHandler = parallel_handler
80101 l .SetMarshaller (m )
81102
82103 s := & TCPSource {
83104 PrintErrors : true ,
84- RemoteAddrs : []string {"localhost:7878" },
105+ RemoteAddrs : []string {"localhost:" + port },
85106 RetryInterval : tcp_download_retry_interval ,
86107 DialTimeout : tcp_dial_timeout ,
87108 }
@@ -121,14 +142,15 @@ func (suite *TcpListenerTestSuite) TestListenerSinkAllBinary() {
121142
122143func (suite * TcpListenerTestSuite ) testListenerSourceAll (m Marshaller ) {
123144 testSink := suite .newFilledTestSink ()
145+ port := GetLocalPort ()
124146
125- l := NewTcpListenerSource (":7878" )
147+ l := NewTcpListenerSource (":" + port )
126148 l .Reader = SampleReader {
127149 ParallelSampleHandler : parallel_handler ,
128150 }
129151
130152 s := & TCPSink {
131- Endpoint : "localhost:7878" ,
153+ Endpoint : "localhost:" + port ,
132154 DialTimeout : tcp_dial_timeout ,
133155 }
134156 s .Writer .ParallelSampleHandler = parallel_handler
@@ -152,14 +174,15 @@ func (suite *TcpListenerTestSuite) testListenerSourceAll(m Marshaller) {
152174func (suite * TcpListenerTestSuite ) testListenerSourceIndividual (m BidiMarshaller ) {
153175 for i := range suite .headers {
154176 testSink := suite .newTestSinkFor (i )
177+ port := GetLocalPort ()
155178
156- l := NewTcpListenerSource (":7878" )
179+ l := NewTcpListenerSource (":" + port )
157180 l .Reader = SampleReader {
158181 ParallelSampleHandler : parallel_handler ,
159182 }
160183
161184 s := & TCPSink {
162- Endpoint : "localhost:7878" ,
185+ Endpoint : "localhost:" + port ,
163186 DialTimeout : tcp_dial_timeout ,
164187 }
165188 s .Writer .ParallelSampleHandler = parallel_handler
0 commit comments