66 "fmt"
77 "net"
88 "strconv"
9+ "sync"
910 "time"
1011
1112 decoder "github.com/cloudflare/goflow/v3/decoders"
@@ -146,7 +147,7 @@ func FlowMessageToJSON(fmsg *flowmessage.FlowMessage) string {
146147 return s
147148}
148149
149- func UDPRoutine (name string , decodeFunc decoder.DecoderFunc , workers int , addr string , port int , sockReuse bool , logger Logger ) error {
150+ func UDPRoutine (name string , decodeFunc decoder.DecoderFunc , workers int , addr string , port int , sockReuse int , logger Logger ) error {
150151 ecb := DefaultErrorCallback {
151152 Logger : logger ,
152153 }
@@ -165,73 +166,92 @@ func UDPRoutine(name string, decodeFunc decoder.DecoderFunc, workers int, addr s
165166 Port : port ,
166167 }
167168
168- var udpconn * net.UDPConn
169- var err error
170-
171- if sockReuse {
172- pconn , err := reuseport . ListenPacket ( "udp" , addrUDP . String () )
173- defer pconn . Close ()
174- if err != nil {
175- return err
176- }
177- var ok bool
178- udpconn , ok = pconn .( * net. UDPConn )
179- if ! ok {
180- return err
169+ udpconnL := make ([] * net.UDPConn , 0 )
170+ if sockReuse > 0 {
171+ for i := 0 ; i < sockReuse ; i ++ {
172+ pconn , err := reuseport . ListenPacket ( "udp" , addrUDP . String ())
173+ defer pconn . Close ( )
174+ if err != nil {
175+ return err
176+ }
177+ udpconn , ok := pconn .( * net. UDPConn )
178+ if ! ok {
179+ return err
180+ }
181+ udpconnL = append ( udpconnL , udpconn )
181182 }
182183 } else {
183- udpconn , err = net .ListenUDP ("udp" , & addrUDP )
184- defer udpconn .Close ()
184+ udpconn , err := net .ListenUDP ("udp" , & addrUDP )
185185 if err != nil {
186186 return err
187187 }
188+ udpconnL = append (udpconnL , udpconn )
188189 }
189190
190- payload := make ([]byte , 9000 )
191+ routine := func (lane int , udpconn * net.UDPConn ) {
192+ payload := make ([]byte , 9000 )
193+ localIP := addrUDP .IP .String ()
194+ if addrUDP .IP == nil {
195+ localIP = ""
196+ }
191197
192- localIP := addrUDP . IP . String ()
193- if addrUDP . IP == nil {
194- localIP = ""
195- }
198+ for {
199+ size , pktAddr , _ := udpconn . ReadFromUDP ( payload )
200+ payloadCut := make ([] byte , size )
201+ copy ( payloadCut , payload [ 0 : size ])
196202
197- for {
198- size , pktAddr , _ := udpconn .ReadFromUDP (payload )
199- payloadCut := make ([]byte , size )
200- copy (payloadCut , payload [0 :size ])
203+ baseMessage := BaseMessage {
204+ Src : pktAddr .IP ,
205+ Port : pktAddr .Port ,
206+ Payload : payloadCut ,
207+ }
208+ processor .ProcessMessage (baseMessage )
201209
202- baseMessage := BaseMessage {
203- Src : pktAddr .IP ,
204- Port : pktAddr .Port ,
205- Payload : payloadCut ,
210+ MetricTrafficBytes .With (
211+ prometheus.Labels {
212+ "remote_ip" : pktAddr .IP .String (),
213+ "remote_port" : strconv .Itoa (pktAddr .Port ),
214+ "local_ip" : localIP ,
215+ "local_port" : strconv .Itoa (addrUDP .Port ),
216+ "type" : name ,
217+ "lane" : strconv .Itoa (lane ),
218+ }).
219+ Add (float64 (size ))
220+ MetricTrafficPackets .With (
221+ prometheus.Labels {
222+ "remote_ip" : pktAddr .IP .String (),
223+ "remote_port" : strconv .Itoa (pktAddr .Port ),
224+ "local_ip" : localIP ,
225+ "local_port" : strconv .Itoa (addrUDP .Port ),
226+ "type" : name ,
227+ "lane" : strconv .Itoa (lane ),
228+ }).
229+ Inc ()
230+ MetricPacketSizeSum .With (
231+ prometheus.Labels {
232+ "remote_ip" : pktAddr .IP .String (),
233+ "remote_port" : strconv .Itoa (pktAddr .Port ),
234+ "local_ip" : localIP ,
235+ "local_port" : strconv .Itoa (addrUDP .Port ),
236+ "type" : name ,
237+ "lane" : strconv .Itoa (lane ),
238+ }).
239+ Observe (float64 (size ))
206240 }
207- processor .ProcessMessage (baseMessage )
208-
209- MetricTrafficBytes .With (
210- prometheus.Labels {
211- "remote_ip" : pktAddr .IP .String (),
212- "remote_port" : strconv .Itoa (pktAddr .Port ),
213- "local_ip" : localIP ,
214- "local_port" : strconv .Itoa (addrUDP .Port ),
215- "type" : name ,
216- }).
217- Add (float64 (size ))
218- MetricTrafficPackets .With (
219- prometheus.Labels {
220- "remote_ip" : pktAddr .IP .String (),
221- "remote_port" : strconv .Itoa (pktAddr .Port ),
222- "local_ip" : localIP ,
223- "local_port" : strconv .Itoa (addrUDP .Port ),
224- "type" : name ,
225- }).
226- Inc ()
227- MetricPacketSizeSum .With (
228- prometheus.Labels {
229- "remote_ip" : pktAddr .IP .String (),
230- "remote_port" : strconv .Itoa (pktAddr .Port ),
231- "local_ip" : localIP ,
232- "local_port" : strconv .Itoa (addrUDP .Port ),
233- "type" : name ,
234- }).
235- Observe (float64 (size ))
236241 }
242+
243+ wg := & sync.WaitGroup {}
244+ for i := range udpconnL {
245+ wg .Add (1 )
246+ lane := i
247+ udpconn := udpconnL [i ]
248+ go func () {
249+ defer wg .Done ()
250+ routine (lane , udpconn )
251+ udpconn .Close ()
252+ }()
253+ }
254+
255+ wg .Wait ()
256+ return nil
237257}
0 commit comments