Skip to content

Commit 79f1f08

Browse files
committed
chore: refactor Lightpush to avoid code duplication
1 parent 70609c7 commit 79f1f08

File tree

3 files changed

+28
-81
lines changed

3 files changed

+28
-81
lines changed

tests/waku_lightpush/test_ratelimit.nim

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ suite "Rate limited push service":
3737

3838
handlerFuture = newFuture[(string, WakuMessage)]()
3939
let requestRes =
40-
await client.publish(some(DefaultPubsubTopic), message, peer = serverPeerId)
40+
await client.publish(some(DefaultPubsubTopic), message, serverPeerId)
4141

4242
check await handlerFuture.withTimeout(50.millis)
4343

@@ -99,7 +99,7 @@ suite "Rate limited push service":
9999
let message = fakeWakuMessage()
100100
handlerFuture = newFuture[(string, WakuMessage)]()
101101
let requestRes =
102-
await client.publish(some(DefaultPubsubTopic), message, peer = serverPeerId)
102+
await client.publish(some(DefaultPubsubTopic), message, serverPeerId)
103103
discard await handlerFuture.withTimeout(10.millis)
104104

105105
check:
@@ -114,7 +114,7 @@ suite "Rate limited push service":
114114
let message = fakeWakuMessage()
115115
handlerFuture = newFuture[(string, WakuMessage)]()
116116
let requestRes =
117-
await client.publish(some(DefaultPubsubTopic), message, peer = serverPeerId)
117+
await client.publish(some(DefaultPubsubTopic), message, serverPeerId)
118118
discard await handlerFuture.withTimeout(10.millis)
119119

120120
check:

waku/node/waku_node.nim

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1247,9 +1247,7 @@ proc lightpushPublishHandler(
12471247
"Waku lightpush with mix not available",
12481248
)
12491249

1250-
return await node.wakuLightpushClient.publishWithConn(
1251-
pubsubTopic, message, conn, peer.peerId
1252-
)
1250+
return await node.wakuLightpushClient.publish(pubsubTopic, message, conn)
12531251
else:
12541252
return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer)
12551253

waku/waku_lightpush/client.nim

Lines changed: 24 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,7 @@ func shortPeerId(peer: PeerId): string =
4040
func shortPeerId(peer: RemotePeerInfo): string =
4141
shortLog(peer.peerId)
4242

43-
proc getConnection(
44-
wl: WakuLightPushClient, peer: PeerId | RemotePeerInfo
45-
): Future[Result[Connection, string]] {.async.} =
46-
let dialResult = await wl.peerManager.dialPeer(peer, WakuLightPushCodec)
47-
if dialResult.isNone():
48-
waku_lightpush_v3_errors.inc(labelValues = [dialFailure])
49-
return err(dialFailure & ": " & $peer & " is not accessible")
50-
51-
return ok(dialResult.get())
52-
53-
proc sendPushRequestToConn(
43+
proc sendPushRequest(
5444
wl: WakuLightPushClient, request: LightPushRequest, conn: Connection
5545
): Future[WakuLightPushResult] {.async.} =
5646
try:
@@ -85,30 +75,41 @@ proc sendPushRequestToConn(
8575

8676
return toPushResult(response)
8777

88-
proc sendPushRequest(
89-
wl: WakuLightPushClient, request: LightPushRequest, peer: PeerId | RemotePeerInfo
90-
): Future[WakuLightPushResult] {.async.} =
91-
let conn = (await wl.getConnection(peer)).valueOr:
92-
return lighpushErrorResult(LightPushErrorCode.NO_PEERS_TO_RELAY, error)
93-
94-
await wl.sendPushRequestToConn(request, conn)
95-
9678
proc publish*(
9779
wl: WakuLightPushClient,
9880
pubsubTopic: Option[PubsubTopic] = none(PubsubTopic),
9981
wakuMessage: WakuMessage,
100-
peer: PeerId | RemotePeerInfo,
82+
dest: Connection | PeerId | RemotePeerInfo,
10183
): Future[WakuLightPushResult] {.async, gcsafe.} =
84+
let conn =
85+
when dest is Connection:
86+
dest
87+
else:
88+
(await wl.peerManager.dialPeer(dest, WakuLightPushCodec)).valueOr:
89+
waku_lightpush_v3_errors.inc(labelValues = [dialFailure])
90+
return lighpushErrorResult(
91+
LightPushErrorCode.NO_PEERS_TO_RELAY,
92+
dialFailure & ": " & $dest & " is not accessible",
93+
)
94+
10295
var message = wakuMessage
10396
ensureTimestampSet(message)
10497

10598
let msgHash = computeMessageHash(pubsubTopic.get(""), message).to0xHex
106-
info "publish", peerId = shortPeerId(peer), msg_hash = msgHash
99+
info "publish",
100+
myPeerId = wl.peerManager.switch.peerInfo.peerId,
101+
peerId =
102+
when dest is Connection:
103+
"unknown (using connection directly)"
104+
else:
105+
shortPeerId(dest),
106+
msgHash = msgHash,
107+
sentTime = getNowInNanosecondTime()
107108

108109
let request = LightpushRequest(
109110
requestId: generateRequestId(wl.rng), pubsubTopic: pubsubTopic, message: message
110111
)
111-
let publishedPeerCount = ?await wl.sendPushRequest(request, peer)
112+
let publishedPeerCount = ?await wl.sendPushRequest(request, conn)
112113

113114
for obs in wl.publishObservers:
114115
obs.onMessagePublished(pubsubTopic.get(""), message)
@@ -119,61 +120,9 @@ proc publishToAny*(
119120
wl: WakuLightPushClient, pubsubTopic: PubsubTopic, wakuMessage: WakuMessage
120121
): Future[WakuLightPushResult] {.async, gcsafe.} =
121122
# Like publish, but selects a peer automatically from the peer manager
122-
123-
var message = wakuMessage
124-
ensureTimestampSet(message)
125-
126123
let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr:
127124
# TODO: check if it is matches the situation - shall we distinguish client side missing peers from server side?
128125
return lighpushErrorResult(
129126
LightPushErrorCode.NO_PEERS_TO_RELAY, "no suitable remote peers"
130127
)
131-
132-
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex
133-
info "publishToAny",
134-
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
135-
peer_id = peer.peerId,
136-
msg_hash = msgHash,
137-
sentTime = getNowInNanosecondTime()
138-
139-
let request = LightpushRequest(
140-
requestId: generateRequestId(wl.rng),
141-
pubsubTopic: some(pubsubTopic),
142-
message: message,
143-
)
144-
let publishedPeerCount = ?await wl.sendPushRequest(request, peer)
145-
146-
for obs in wl.publishObservers:
147-
obs.onMessagePublished(pubsubTopic, message)
148-
149-
return lightpushSuccessResult(publishedPeerCount)
150-
151-
proc publishWithConn*(
152-
wl: WakuLightPushClient,
153-
pubsubTopic: PubsubTopic,
154-
wakuMessage: WakuMessage,
155-
conn: Connection,
156-
destPeer: PeerId,
157-
): Future[WakuLightPushResult] {.async, gcsafe.} =
158-
var message = wakuMessage
159-
ensureTimestampSet(message)
160-
161-
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex
162-
info "publishWithConn",
163-
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
164-
peer_id = destPeer,
165-
msg_hash = msgHash,
166-
sentTime = getNowInNanosecondTime()
167-
168-
let request = LightpushRequest(
169-
requestId: generateRequestId(wl.rng),
170-
pubsubTopic: some(pubsubTopic),
171-
message: message,
172-
)
173-
174-
let publishedPeerCount = ?await wl.sendPushRequestToConn(request, conn)
175-
176-
for obs in wl.publishObservers:
177-
obs.onMessagePublished(pubsubTopic, message)
178-
179-
return lightpushSuccessResult(publishedPeerCount)
128+
return await wl.publish(some(pubsubTopic), wakuMessage, peer)

0 commit comments

Comments
 (0)