Skip to content

Commit fa23db7

Browse files
authored
Fix wait_until_synced if you are waiting for your own publish (#28)
* Fix wait_until_synced if you are waiting for your own publish * fix docstring * skip stdlibs in downgrade to fix error
1 parent a3f33e8 commit fa23db7

File tree

4 files changed

+32
-10
lines changed

4 files changed

+32
-10
lines changed

.github/workflows/CI.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ jobs:
103103
- uses: julia-actions/cache@v1
104104

105105
- uses: julia-actions/julia-downgrade-compat@v1
106+
with:
107+
skip: Dates, Random, Test
106108

107109
- uses: julia-actions/julia-buildpkg@v1
108110

Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name = "AWSCRT"
22
uuid = "df31ea59-17a4-4ebd-9d69-4f45266dc2c7"
3-
version = "0.4.1"
3+
version = "0.4.2"
44

55
[deps]
66
CountDownLatches = "621fb831-fdad-4fff-93ac-1af7b7ed19e3"

src/ShadowFramework.jl

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,9 +232,25 @@ end
232232
wait_until_synced(sf::ShadowFramework)
233233
234234
Blocks until the next time the shadow document is synchronized with the broker.
235+
236+
!!! warning "Warning: Race Condition"
237+
If you are using this function to publish a message and then wait for the following synchronization, you
238+
must use the `do` form of [`wait_until_synced`](@ref) instead which accepts a lambda as the first argument.
239+
Otherwise, your publish will race the synchronization and there is a chance the synchronization will finish before
240+
you begin waiting for it (so you miss the edge and your program hangs).
241+
"""
242+
wait_until_synced(sf::ShadowFramework) = wait_until_synced(() -> nothing, sf)
243+
244+
"""
245+
wait_until_synced(f::Function, sf::ShadowFramework)
246+
247+
Blocks until the next time the shadow document is synchronized with the broker.
248+
If you want to wait for a synchronization after a publication that you make, then you must make that publication inside
249+
the lambda `f`.
235250
"""
236-
function wait_until_synced(sf::ShadowFramework)
251+
function wait_until_synced(f::Function, sf::ShadowFramework)
237252
sf._sync_latch = CountDownLatch(1)
253+
f()
238254
await(sf._sync_latch)
239255
return nothing
240256
end

test/shadow_framework_integ_test.jl

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,9 @@ shadow_types = parallel ? [:named] : [:named, :unnamed]
193193
try
194194
fetch(publish(sc, "/delete", "", AWS_MQTT_QOS_AT_LEAST_ONCE)[1]) # ensure the shadow is deleted just in case of a prior broken test
195195

196-
fetch(subscribe(sf)[1]) # subscribe and trigger the initial update, which will fail because there is no shadow
197-
wait_until_synced(sf)
196+
wait_until_synced(sf) do
197+
fetch(subscribe(sf)[1]) # subscribe and trigger the initial update, which will fail because there is no shadow
198+
end
198199
sleep(3) # we need to make sure the local shadow won't get modified. no better way than to just wait a bit in case something modifies it.
199200
@test collect(keys(doc)) == ["version", "foo"] # we should have the version and the initial foo key we set
200201
@test doc["foo"] == 1 # should be unchanged from our initial state
@@ -241,8 +242,9 @@ shadow_types = parallel ? [:named] : [:named, :unnamed]
241242

242243
@info "subscribing in band shadow"
243244
values_post_update = []
244-
fetch(subscribe(sf)[1]) # subscribe and trigger the initial update
245-
wait_until_synced(sf)
245+
wait_until_synced(sf) do
246+
fetch(subscribe(sf)[1]) # subscribe and trigger the initial update
247+
end
246248
wait_for(() -> length(values_post_update) >= 1) # wait for the update to finish since it requires multiple messages
247249
# The initial update should have pulled in that desired state
248250
@test doc["foo"] == 2
@@ -425,10 +427,11 @@ end
425427

426428
try
427429
@info "subscribing"
428-
fetch(subscribe(sf)[1])
429430
# wait for the first publish to finish, otherwise we will race it with our next update, which could arrive
430431
# first and break this test
431-
wait_until_synced(sf)
432+
wait_until_synced(sf) do
433+
fetch(subscribe(sf)[1])
434+
end
432435

433436
# publish a /update. this should be accepted. the local shadow should be updated.
434437
# an /update should be published with the new reported state.
@@ -516,10 +519,11 @@ end
516519

517520
try
518521
@info "subscribing"
519-
fetch(subscribe(sf)[1])
520522
# wait for the first publish to finish, otherwise we will race it with our next update, which could arrive
521523
# first and break this test
522-
wait_until_synced(sf)
524+
wait_until_synced(sf) do
525+
fetch(subscribe(sf)[1])
526+
end
523527

524528
# publish an /update which adds bar=1. this should be rejected because bar is not present in the struct.
525529
# the local shadow should not be updated. an /update should not be published.

0 commit comments

Comments
 (0)