Skip to content

Commit 4c8c37f

Browse files
committed
address review comments
1 parent 537bdad commit 4c8c37f

File tree

6 files changed

+45
-40
lines changed

6 files changed

+45
-40
lines changed

apps/chat2mix/chat2mix.nim

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -590,10 +590,8 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
590590
error "Couldn't find any service peer"
591591
quit(QuitFailure)
592592

593-
#await mountLegacyLightPush(node)
594593
node.peerManager.addServicePeer(servicePeerInfo, WakuLightpushCodec)
595594
node.peerManager.addServicePeer(servicePeerInfo, WakuPeerExchangeCodec)
596-
#node.peerManager.addServicePeer(servicePeerInfo, WakuRendezVousCodec)
597595

598596
# Start maintaining subscription
599597
asyncSpawn maintainSubscription(

examples/lightpush_mix/lightpush_publisher_mix.nim

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -163,27 +163,27 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LightPushMixConf) {.async.}
163163
ephemeral: true, # tell store nodes to not store it
164164
timestamp: getNowInNanosecondTime(),
165165
) # current timestamp
166-
try:
167-
let res = await node.wakuLightpushClient.publishWithConn(
166+
167+
(
168+
await node.wakuLightpushClient.publishWithConn(
168169
LightpushPubsubTopic, message, conn, dPeerId
169170
)
171+
).isOkOr:
172+
error "failed to publish message via mix", error = error.desc
173+
lp_mix_failed.inc(labelValues = ["publish_error"])
174+
return
175+
176+
lp_mix_success.inc()
177+
notice "published message",
178+
text = text,
179+
timestamp = message.timestamp,
180+
psTopic = LightpushPubsubTopic,
181+
contentTopic = LightpushContentTopic
170182

171-
if res.isOk():
172-
lp_mix_success.inc()
173-
notice "published message",
174-
text = text,
175-
timestamp = message.timestamp,
176-
psTopic = LightpushPubsubTopic,
177-
contentTopic = LightpushContentTopic
178-
else:
179-
error "failed to publish message", error = $res.error
180-
lp_mix_failed.inc(labelValues = ["publish_error"])
181-
except CatchableError as e:
182-
error "exception while publishing message", error = getCurrentExceptionMsg()
183183
if conf.mixDisabled:
184184
await conn.close()
185185
await sleepAsync(conf.msgIntervalMilliseconds)
186-
info "###########Sent all messages via mix"
186+
info "Sent all messages via mix"
187187
quit(0)
188188

189189
when isMainModule:

waku/common/callbacks.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import ../waku_enr/capabilities, ../waku_rendezvous/waku_peer_record
1+
import waku/waku_enr/capabilities, waku/waku_rendezvous/waku_peer_record
22

33
type GetShards* = proc(): seq[uint16] {.closure, gcsafe, raises: [].}
44

waku/waku_mix/protocol.nim

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ type
3636
multiAddr*: string
3737
pubKey*: Curve25519Key
3838

39-
proc mixPoolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
39+
proc filterMixNodes*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
4040
# Note that origin based(discv5) filtering is not done intentionally
4141
# so that more mix nodes can be discovered.
4242
if peer.mixPubKey.isNone():
43-
trace "peer has no mix Pub Key", peer = $peer
43+
trace "remote peer has no mix Pub Key", peer = $peer
4444
return false
4545

4646
if cluster.isSome() and peer.enr.isSome() and
@@ -75,7 +75,7 @@ func getIPv4Multiaddr*(maddrs: seq[MultiAddress]): Option[MultiAddress] =
7575
proc populateMixNodePool*(mix: WakuMix) =
7676
# populate only peers that i) are reachable ii) share cluster iii) support mix
7777
let remotePeers = mix.peerManager.switch.peerStore.peers().filterIt(
78-
mixPoolFilter(some(mix.clusterId), it)
78+
filterMixNodes(some(mix.clusterId), it)
7979
)
8080
var mixNodes = initTable[PeerId, MixPubInfo]()
8181

@@ -87,19 +87,19 @@ proc populateMixNodePool*(mix: WakuMix) =
8787
trace "remote peer info", info = remotePeers[i]
8888

8989
if remotePeers[i].mixPubKey.isNone():
90-
trace "peer has no mix Pub Key", peer = $remotePeers[i]
90+
trace "peer has no mix Pub Key", remotePeerId = $remotePeers[i]
9191
continue
9292

9393
let peerMixPubKey = remotePeers[i].mixPubKey.get()
9494
var peerPubKey: crypto.PublicKey
9595
if not remotePeers[i].peerId.extractPublicKey(peerPubKey):
9696
warn "Failed to extract public key from peerId, skipping node",
97-
peerId = remotePeers[i].peerId
97+
remotePeerId = remotePeers[i].peerId
9898
continue
9999

100100
if peerPubKey.scheme != PKScheme.Secp256k1:
101101
warn "Peer public key is not Secp256k1, skipping node",
102-
peerId = remotePeers[i].peerId, scheme = peerPubKey.scheme
102+
remotePeerId = remotePeers[i].peerId, scheme = peerPubKey.scheme
103103
continue
104104

105105
let mixNodePubInfo = MixPubInfo.init(
@@ -109,7 +109,7 @@ proc populateMixNodePool*(mix: WakuMix) =
109109
peerPubKey.skkey,
110110
)
111111
trace "adding mix node to pool",
112-
peerId = remotePeers[i].peerId, multiAddr = $ipv4addr
112+
remotePeerId = remotePeers[i].peerId, multiAddr = $ipv4addr
113113
mixNodes[remotePeers[i].peerId] = mixNodePubInfo
114114

115115
# set the mix node pool

waku/waku_rendezvous/client.nim

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ import
1313
import metrics except collect
1414

1515
import
16-
../node/peer_manager,
17-
../waku_core/peers,
18-
../waku_core/codecs,
16+
waku/node/peer_manager,
17+
waku/waku_core/peers,
18+
waku/waku_core/codecs,
1919
./common,
2020
./waku_peer_record
2121

@@ -34,6 +34,9 @@ type WakuRendezVousClient* = ref object
3434
# Internal rendezvous instance for making requests
3535
rdv: GenericRendezVous[WakuPeerRecord]
3636

37+
const MaxSimultanesousAdvertisements = 5
38+
const RendezVousLookupInterval = 10.seconds
39+
3740
proc requestAll*(
3841
self: WakuRendezVousClient
3942
): Future[Result[void, string]] {.async: (raises: []).} =
@@ -56,7 +59,8 @@ proc requestAll*(
5659

5760
trace "waku rendezvous client request got peers", count = records.len
5861
for record in records:
59-
rendezvousPeerFoundTotal.inc()
62+
if not self.switch.peerStore.peerExists(record.peerId):
63+
rendezvousPeerFoundTotal.inc()
6064
if record.mixKey.len == 0 or record.peerId == self.switch.peerInfo.peerId:
6165
continue
6266
trace "adding peer from rendezvous",
@@ -77,11 +81,11 @@ proc periodicRequests(self: WakuRendezVousClient) {.async.} =
7781

7882
# infinite loop
7983
while true:
84+
await sleepAsync(self.requestInterval)
85+
8086
(await self.requestAll()).isOkOr:
8187
error "waku rendezvous requests failed", error = error
8288

83-
await sleepAsync(self.requestInterval)
84-
8589
# Exponential backoff
8690

8791
#[ TODO: Reevaluate for mix, maybe be aggresive in the start until a sizeable pool is built and then backoff
@@ -102,7 +106,7 @@ proc new*(
102106
let rdv = GenericRendezVous[WakuPeerRecord](
103107
switch: switch,
104108
rng: rng,
105-
sema: newAsyncSemaphore(5),
109+
sema: newAsyncSemaphore(MaxSimultanesousAdvertisements),
106110
minDuration: rendezvous.MinimumAcceptedDuration,
107111
maxDuration: rendezvous.MaximumDuration,
108112
minTTL: rendezvous.MinimumAcceptedDuration.seconds.uint64,
@@ -119,7 +123,7 @@ proc new*(
119123
switch: switch,
120124
peerManager: peerManager,
121125
clusterId: clusterId,
122-
requestInterval: 10.seconds,
126+
requestInterval: RendezVousLookupInterval,
123127
rdv: rdv,
124128
)
125129

waku/waku_rendezvous/protocol.nim

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,15 @@ type WakuRendezVous* = ref object of GenericRendezVous[WakuPeerRecord]
4545
registrationInterval: timer.Duration
4646
periodicRegistrationFut: Future[void]
4747

48-
# Override discover method to avoid collect macro generic instantiation issues
49-
# TODO figure out if we can use parent generic discover
48+
const MaximumNamespaceLen = 255
49+
5050
method discover*(
5151
self: WakuRendezVous, conn: Connection, d: Discover
5252
) {.async: (raises: [CancelledError, LPStreamError]).} =
53+
# Override discover method to avoid collect macro generic instantiation issues
54+
# TODO figure out if we can use parent generic discover
5355
trace "Received Discover", peerId = conn.peerId, ns = d.ns
54-
if d.ns.isSome() and d.ns.get().len > 255: #MaximumNamespaceLen
56+
if d.ns.isSome() and d.ns.get().len > MaximumNamespaceLen:
5557
await conn.sendDiscoverResponseError(InvalidNamespace)
5658
return
5759

@@ -115,9 +117,10 @@ proc advertise*(
115117
let se = SignedPayload[WakuPeerRecord].init(
116118
self.switch.peerInfo.privateKey, self.getPeerRecord()
117119
).valueOr:
118-
return err("rendezvous advertisement failed: Failed to sign Waku Peer Record")
120+
return
121+
err("rendezvous advertisement failed: Failed to sign Waku Peer Record: " & $error)
119122
let sprBuff = se.encode().valueOr:
120-
return err("rendezvous advertisement failed: Wrong Signed Peer Record")
123+
return err("rendezvous advertisement failed: Wrong Signed Peer Record: " & $error)
121124

122125
# rendezvous.advertise expects already opened connections
123126
# must dial first
@@ -205,7 +208,7 @@ proc new*(
205208
clusterId: uint16,
206209
getShards: GetShards,
207210
getCapabilities: GetCapabilities,
208-
getPeerRecord: GetWakuPeerRecord = nil,
211+
getPeerRecord: GetWakuPeerRecord,
209212
): Result[T, string] {.raises: [].} =
210213
let rng = newRng()
211214
let wrv = T(
@@ -276,7 +279,7 @@ proc start*(self: WakuRendezVous) {.async: (raises: []).} =
276279
await procCall GenericRendezVous[WakuPeerRecord](self).start()
277280
except CancelledError as exc:
278281
error "failed to start GenericRendezVous", cause = exc.msg
279-
282+
return
280283
# start registering forever
281284
self.periodicRegistrationFut = self.periodicRegistration()
282285

0 commit comments

Comments
 (0)