Skip to content

Commit 1e73213

Browse files
chore: Lightpush minor refactor (#3538)
* chore: refactor Lightpush (more DRY) * chore: apply review suggestions Co-authored-by: Ivan FB <[email protected]> --------- Co-authored-by: Ivan FB <[email protected]>
1 parent c0a7deb commit 1e73213

File tree

6 files changed

+77
-115
lines changed

6 files changed

+77
-115
lines changed

examples/lightpush_mix/lightpush_publisher_mix.nim

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,9 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LightPushMixConf) {.async.}
163163
ephemeral: true, # tell store nodes to not store it
164164
timestamp: getNowInNanosecondTime(),
165165
) # current timestamp
166-
let res = await node.wakuLightpushClient.publishWithConn(
167-
LightpushPubsubTopic, message, conn, dPeerId
168-
)
166+
167+
let res =
168+
await node.wakuLightpushClient.publish(some(LightpushPubsubTopic), message, conn)
169169

170170
let startTime = getNowInNanosecondTime()
171171

tests/waku_lightpush/test_ratelimit.nim

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ suite "Rate limited push service":
3737

3838
handlerFuture = newFuture[(string, WakuMessage)]()
3939
let requestRes =
40-
await client.publish(some(DefaultPubsubTopic), message, peer = serverPeerId)
40+
await client.publish(some(DefaultPubsubTopic), message, serverPeerId)
4141

4242
check await handlerFuture.withTimeout(50.millis)
4343

@@ -66,7 +66,7 @@ suite "Rate limited push service":
6666
var endTime = Moment.now()
6767
var elapsed: Duration = (endTime - startTime)
6868
await sleepAsync(tokenPeriod - elapsed + firstWaitExtend)
69-
firstWaitEXtend = 100.millis
69+
firstWaitExtend = 100.millis
7070

7171
## Cleanup
7272
await allFutures(clientSwitch.stop(), serverSwitch.stop())
@@ -99,7 +99,7 @@ suite "Rate limited push service":
9999
let message = fakeWakuMessage()
100100
handlerFuture = newFuture[(string, WakuMessage)]()
101101
let requestRes =
102-
await client.publish(some(DefaultPubsubTopic), message, peer = serverPeerId)
102+
await client.publish(some(DefaultPubsubTopic), message, serverPeerId)
103103
discard await handlerFuture.withTimeout(10.millis)
104104

105105
check:
@@ -114,7 +114,7 @@ suite "Rate limited push service":
114114
let message = fakeWakuMessage()
115115
handlerFuture = newFuture[(string, WakuMessage)]()
116116
let requestRes =
117-
await client.publish(some(DefaultPubsubTopic), message, peer = serverPeerId)
117+
await client.publish(some(DefaultPubsubTopic), message, serverPeerId)
118118
discard await handlerFuture.withTimeout(10.millis)
119119

120120
check:

waku/node/kernel_api/lightpush.nim

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,9 +210,7 @@ proc lightpushPublishHandler(
210210
"Waku lightpush with mix not available",
211211
)
212212

213-
return await node.wakuLightpushClient.publishWithConn(
214-
pubsubTopic, message, conn, peer.peerId
215-
)
213+
return await node.wakuLightpushClient.publish(some(pubsubTopic), message, conn)
216214
else:
217215
return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer)
218216

waku/waku_lightpush/client.nim

Lines changed: 54 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ logScope:
1717
topics = "waku lightpush client"
1818

1919
type WakuLightPushClient* = ref object
20-
peerManager*: PeerManager
2120
rng*: ref rand.HmacDrbgContext
21+
peerManager*: PeerManager
2222
publishObservers: seq[PublishObserver]
2323

2424
proc new*(
@@ -29,33 +29,31 @@ proc new*(
2929
proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) =
3030
wl.publishObservers.add(obs)
3131

32-
proc sendPushRequest(
33-
wl: WakuLightPushClient,
34-
req: LightPushRequest,
35-
peer: PeerId | RemotePeerInfo,
36-
conn: Option[Connection] = none(Connection),
37-
): Future[WakuLightPushResult] {.async.} =
38-
let connection = conn.valueOr:
39-
(await wl.peerManager.dialPeer(peer, WakuLightPushCodec)).valueOr:
40-
waku_lightpush_v3_errors.inc(labelValues = [dialFailure])
41-
return lighpushErrorResult(
42-
LightPushErrorCode.NO_PEERS_TO_RELAY,
43-
dialFailure & ": " & $peer & " is not accessible",
44-
)
32+
proc ensureTimestampSet(message: var WakuMessage) =
33+
if message.timestamp == 0:
34+
message.timestamp = getNowInNanosecondTime()
4535

46-
defer:
47-
await connection.closeWithEOF()
36+
## Short log string for peer identifiers (overloads for convenience)
37+
func shortPeerId(peer: PeerId): string =
38+
shortLog(peer)
39+
40+
func shortPeerId(peer: RemotePeerInfo): string =
41+
shortLog(peer.peerId)
42+
43+
proc sendPushRequestToConn(
44+
wl: WakuLightPushClient, request: LightPushRequest, conn: Connection
45+
): Future[WakuLightPushResult] {.async.} =
4846
try:
49-
await connection.writeLP(req.encode().buffer)
50-
except CatchableError:
51-
error "failed to send push request", error = getCurrentExceptionMsg()
47+
await conn.writeLp(request.encode().buffer)
48+
except LPStreamRemoteClosedError:
49+
error "Failed to write request to peer", error = getCurrentExceptionMsg()
5250
return lightpushResultInternalError(
53-
"failed to send push request: " & getCurrentExceptionMsg()
51+
"Failed to write request to peer: " & getCurrentExceptionMsg()
5452
)
5553

5654
var buffer: seq[byte]
5755
try:
58-
buffer = await connection.readLp(DefaultMaxRpcSize.int)
56+
buffer = await conn.readLp(DefaultMaxRpcSize.int)
5957
except LPStreamRemoteClosedError:
6058
error "Failed to read response from peer", error = getCurrentExceptionMsg()
6159
return lightpushResultInternalError(
@@ -66,10 +64,12 @@ proc sendPushRequest(
6664
waku_lightpush_v3_errors.inc(labelValues = [decodeRpcFailure])
6765
return lightpushResultInternalError(decodeRpcFailure)
6866

69-
if response.requestId != req.requestId and
70-
response.statusCode != LightPushErrorCode.TOO_MANY_REQUESTS:
67+
let requestIdMismatch = response.requestId != request.requestId
68+
let tooManyRequests = response.statusCode == LightPushErrorCode.TOO_MANY_REQUESTS
69+
if requestIdMismatch and (not tooManyRequests):
70+
# response with TOO_MANY_REQUESTS error code has no requestId by design
7171
error "response failure, requestId mismatch",
72-
requestId = req.requestId, responseRequestId = response.requestId
72+
requestId = request.requestId, responseRequestId = response.requestId
7373
return lightpushResultInternalError("response failure, requestId mismatch")
7474

7575
return toPushResult(response)
@@ -78,88 +78,49 @@ proc publish*(
7878
wl: WakuLightPushClient,
7979
pubSubTopic: Option[PubsubTopic] = none(PubsubTopic),
8080
wakuMessage: WakuMessage,
81-
peer: PeerId | RemotePeerInfo,
81+
dest: Connection | PeerId | RemotePeerInfo,
8282
): Future[WakuLightPushResult] {.async, gcsafe.} =
83+
let conn =
84+
when dest is Connection:
85+
dest
86+
else:
87+
(await wl.peerManager.dialPeer(dest, WakuLightPushCodec)).valueOr:
88+
waku_lightpush_v3_errors.inc(labelValues = [dialFailure])
89+
return lighpushErrorResult(
90+
LightPushErrorCode.NO_PEERS_TO_RELAY,
91+
"Peer is not accessible: " & dialFailure & " - " & $dest,
92+
)
93+
94+
defer:
95+
await conn.closeWithEOF()
96+
8397
var message = wakuMessage
84-
if message.timestamp == 0:
85-
message.timestamp = getNowInNanosecondTime()
98+
ensureTimestampSet(message)
99+
100+
let msgHash = computeMessageHash(pubSubTopic.get(""), message).to0xHex()
101+
info "publish",
102+
myPeerId = wl.peerManager.switch.peerInfo.peerId,
103+
peerId = shortPeerId(conn.peerId),
104+
msgHash = msgHash,
105+
sentTime = getNowInNanosecondTime()
86106

87-
when peer is PeerId:
88-
info "publish",
89-
peerId = shortLog(peer),
90-
msg_hash = computeMessageHash(pubsubTopic.get(""), message).to0xHex
91-
else:
92-
info "publish",
93-
peerId = shortLog(peer.peerId),
94-
msg_hash = computeMessageHash(pubsubTopic.get(""), message).to0xHex
95-
96-
let pushRequest = LightpushRequest(
97-
requestId: generateRequestId(wl.rng), pubSubTopic: pubSubTopic, message: message
107+
let request = LightpushRequest(
108+
requestId: generateRequestId(wl.rng), pubsubTopic: pubSubTopic, message: message
98109
)
99-
let publishedCount = ?await wl.sendPushRequest(pushRequest, peer)
110+
let relayPeerCount = ?await wl.sendPushRequestToConn(request, conn)
100111

101112
for obs in wl.publishObservers:
102113
obs.onMessagePublished(pubSubTopic.get(""), message)
103114

104-
return lightpushSuccessResult(publishedCount)
115+
return lightpushSuccessResult(relayPeerCount)
105116

106117
proc publishToAny*(
107-
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, wakuMessage: WakuMessage
118+
wl: WakuLightPushClient, pubsubTopic: PubsubTopic, wakuMessage: WakuMessage
108119
): Future[WakuLightPushResult] {.async, gcsafe.} =
109-
## This proc is similar to the publish one but in this case
110-
## we don't specify a particular peer and instead we get it from peer manager
111-
112-
var message = wakuMessage
113-
if message.timestamp == 0:
114-
message.timestamp = getNowInNanosecondTime()
115-
120+
# Like publish, but selects a peer automatically from the peer manager
116121
let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr:
117122
# TODO: check if it is matches the situation - shall we distinguish client side missing peers from server side?
118123
return lighpushErrorResult(
119124
LightPushErrorCode.NO_PEERS_TO_RELAY, "no suitable remote peers"
120125
)
121-
122-
info "publishToAny",
123-
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
124-
peer_id = peer.peerId,
125-
msg_hash = computeMessageHash(pubsubTopic, message).to0xHex,
126-
sentTime = getNowInNanosecondTime()
127-
128-
let pushRequest = LightpushRequest(
129-
requestId: generateRequestId(wl.rng),
130-
pubSubTopic: some(pubSubTopic),
131-
message: message,
132-
)
133-
let publishedCount = ?await wl.sendPushRequest(pushRequest, peer)
134-
135-
for obs in wl.publishObservers:
136-
obs.onMessagePublished(pubSubTopic, message)
137-
138-
return lightpushSuccessResult(publishedCount)
139-
140-
proc publishWithConn*(
141-
wl: WakuLightPushClient,
142-
pubSubTopic: PubsubTopic,
143-
message: WakuMessage,
144-
conn: Connection,
145-
destPeer: PeerId,
146-
): Future[WakuLightPushResult] {.async, gcsafe.} =
147-
info "publishWithConn",
148-
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
149-
peer_id = destPeer,
150-
msg_hash = computeMessageHash(pubsubTopic, message).to0xHex,
151-
sentTime = getNowInNanosecondTime()
152-
153-
let pushRequest = LightpushRequest(
154-
requestId: generateRequestId(wl.rng),
155-
pubSubTopic: some(pubSubTopic),
156-
message: message,
157-
)
158-
#TODO: figure out how to not pass destPeer as this is just a hack
159-
let publishedCount =
160-
?await wl.sendPushRequest(pushRequest, destPeer, conn = some(conn))
161-
162-
for obs in wl.publishObservers:
163-
obs.onMessagePublished(pubSubTopic, message)
164-
165-
return lightpushSuccessResult(publishedCount)
126+
return await wl.publish(some(pubsubTopic), wakuMessage, peer)

waku/waku_lightpush/common.nim

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,15 @@ func isSuccess*(response: LightPushResponse): bool =
3535

3636
func toPushResult*(response: LightPushResponse): WakuLightPushResult =
3737
if isSuccess(response):
38-
return ok(response.relayPeerCount.get(0))
38+
let relayPeerCount = response.relayPeerCount.get(0)
39+
return (
40+
if (relayPeerCount == 0):
41+
# Consider publishing to zero peers an error even if the service node
42+
# sent us a "successful" response with zero peers
43+
err((LightPushErrorCode.NO_PEERS_TO_RELAY, response.statusDesc))
44+
else:
45+
ok(relayPeerCount)
46+
)
3947
else:
4048
return err((response.statusCode, response.statusDesc))
4149

@@ -51,11 +59,6 @@ func lightpushResultBadRequest*(msg: string): WakuLightPushResult =
5159
func lightpushResultServiceUnavailable*(msg: string): WakuLightPushResult =
5260
return err((LightPushErrorCode.SERVICE_NOT_AVAILABLE, some(msg)))
5361

54-
func lighpushErrorResult*(
55-
statusCode: LightpushStatusCode, desc: Option[string]
56-
): WakuLightPushResult =
57-
return err((statusCode, desc))
58-
5962
func lighpushErrorResult*(
6063
statusCode: LightpushStatusCode, desc: string
6164
): WakuLightPushResult =

waku/waku_lightpush/protocol.nim

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@ proc handleRequest(
7878
proc handleRequest*(
7979
wl: WakuLightPush, peerId: PeerId, buffer: seq[byte]
8080
): Future[LightPushResponse] {.async.} =
81-
let pushRequest = LightPushRequest.decode(buffer).valueOr:
81+
let request = LightPushRequest.decode(buffer).valueOr:
8282
let desc = decodeRpcFailure & ": " & $error
83-
error "failed to push message", error = desc
83+
error "failed to decode Lightpush request", error = desc
8484
let errorCode = LightPushErrorCode.BAD_REQUEST
8585
waku_lightpush_v3_errors.inc(labelValues = [$errorCode])
8686
return LightPushResponse(
@@ -89,16 +89,16 @@ proc handleRequest*(
8989
statusDesc: some(desc),
9090
)
9191

92-
let relayPeerCount = (await handleRequest(wl, peerId, pushRequest)).valueOr:
92+
let relayPeerCount = (await wl.handleRequest(peerId, request)).valueOr:
9393
let desc = error.desc
9494
waku_lightpush_v3_errors.inc(labelValues = [$error.code])
9595
error "failed to push message", error = desc
9696
return LightPushResponse(
97-
requestId: pushRequest.requestId, statusCode: error.code, statusDesc: desc
97+
requestId: request.requestId, statusCode: error.code, statusDesc: desc
9898
)
9999

100100
return LightPushResponse(
101-
requestId: pushRequest.requestId,
101+
requestId: request.requestId,
102102
statusCode: LightPushSuccessCode.SUCCESS,
103103
statusDesc: none[string](),
104104
relayPeerCount: some(relayPeerCount),
@@ -123,7 +123,7 @@ proc initProtocolHandler(wl: WakuLightPush) =
123123
)
124124

125125
try:
126-
rpc = await handleRequest(wl, conn.peerId, buffer)
126+
rpc = await wl.handleRequest(conn.peerId, buffer)
127127
except CatchableError:
128128
error "lightpush failed handleRequest", error = getCurrentExceptionMsg()
129129
do:

0 commit comments

Comments
 (0)