Skip to content

Commit 50eb394

Browse files
committed
feat: batch write points
1 parent d1cfea9 commit 50eb394

File tree

3 files changed

+68
-16
lines changed

3 files changed

+68
-16
lines changed

shard.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ shards:
159159

160160
openssl_ext:
161161
git: https://github.com/spider-gazelle/openssl_ext.git
162-
version: 2.8.3
162+
version: 2.8.4
163163

164164
pars:
165165
git: https://github.com/spider-gazelle/pars.git

src/source/publishing/influx_publisher.cr

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,50 @@ module PlaceOS::Source
4747
def initialize(@client : Flux::Client, @bucket : String)
4848
end
4949

50+
@buffer : Array(Flux::Point) = Array(Flux::Point).new(StatusEvents::BATCH_SIZE)
51+
5052
# Write an MQTT event to InfluxDB
5153
#
5254
def publish(message : Publisher::Message)
5355
points = self.class.transform(message)
54-
points.each do |point|
55-
Log.trace { {
56-
measurement: point.measurement,
57-
timestamp: point.timestamp.to_s,
58-
tags: point.tags.to_json,
59-
fields: point.fields.to_json,
60-
} }
61-
client.write(bucket, point)
56+
@buffer.concat points
57+
commit if @buffer.size >= StatusEvents::BATCH_SIZE
58+
end
59+
60+
def commit : Nil
61+
return if @buffer.empty?
62+
points = @buffer.dup
63+
@buffer.clear
64+
65+
# points.each do |point|
66+
# Log.trace { {
67+
# measurement: point.measurement,
68+
# timestamp: point.timestamp.to_s,
69+
# tags: point.tags.to_json,
70+
# fields: point.fields.to_json,
71+
# } }
72+
# client.write(bucket, point)
73+
# end
74+
75+
Log.debug { "writing #{points.size} points" }
76+
77+
begin
78+
client.write(bucket, points)
79+
rescue error
80+
Log.error(exception: error) { "error batch writing points" }
81+
points.each do |point|
82+
client.write(bucket, point) rescue nil
83+
end
84+
end
85+
end
86+
87+
def self.invalid_string?(str : String) : Bool
88+
return true if str.size > 0xFF
89+
str.each_byte do |byte|
90+
# ASCII control chars + DEL
91+
return true if byte < 0x20 || byte == 0x7F
6292
end
93+
false
6394
end
6495

6596
@@building_timezones = {} of String => Time::Location?
@@ -141,6 +172,8 @@ module PlaceOS::Source
141172
case raw = Value.from_json(payload)
142173
in CustomMetrics then parse_custom(raw, fields, tags, data, timestamp)
143174
in FieldTypes
175+
return [] of Flux::Point if raw.is_a?(String) && invalid_string?(raw)
176+
144177
fields[key] = raw
145178
point = Flux::Point.new!(
146179
measurement: data.module_name,
@@ -183,8 +216,10 @@ module PlaceOS::Source
183216
sub_key = sub_key.gsub(/\W/, '_')
184217

185218
if sub_key == "measurement" && value.is_a?(String)
219+
return nil if invalid_string?(value)
186220
measurement = value
187221
else
222+
next if value.is_a?(String) && invalid_string?(value)
188223
local[sub_key] = value
189224
end
190225
end
@@ -213,6 +248,8 @@ module PlaceOS::Source
213248
points = Array(Flux::Point).new(initial_capacity: raw.value.size)
214249
default_measurement = raw.measurement
215250

251+
return [] of Flux::Point if default_measurement.is_a?(String) && invalid_string?(default_measurement)
252+
216253
raw.value.each_with_index do |val, index|
217254
# Skip if an empty point
218255
compacted = val.compact
@@ -231,7 +268,9 @@ module PlaceOS::Source
231268
local_tags = tags.dup
232269
local_tags["pos_uniq"] = index.to_s
233270

234-
points << build_custom_point(measurement, data, fields, local_tags, compacted, override_timestamp || timestamp, ts_map, raw.ts_tag_keys)
271+
if point = build_custom_point(measurement, data, fields, local_tags, compacted, override_timestamp || timestamp, ts_map, raw.ts_tag_keys)
272+
points << point
273+
end
235274
end
236275

237276
points
@@ -250,6 +289,8 @@ module PlaceOS::Source
250289
end
251290
end
252291

292+
return nil if measurement_value.is_a?(String) && invalid_string?(measurement_value)
293+
253294
# convert fields to tags as required
254295
if ts_tag_keys
255296
ts_tag_keys.each do |field|

src/source/publishing/publisher.cr

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ module PlaceOS::Source
1616

1717
abstract def publish(message : Message)
1818

19+
def commit : Nil
20+
end
21+
1922
def start
2023
spawn { consume_messages }
2124
end
@@ -25,12 +28,20 @@ module PlaceOS::Source
2528
end
2629

2730
private def consume_messages
28-
while message = message_queue.receive?
29-
begin
30-
publish(message)
31-
@processed += 1_u64
32-
rescue error
33-
Log.warn(exception: error) { "publishing message: #{message}" }
31+
while !message_queue.closed?
32+
select
33+
when message = message_queue.receive?
34+
if message
35+
begin
36+
publish(message)
37+
@processed += 1_u64
38+
rescue error
39+
Log.warn(exception: error) { "publishing message: #{message}" }
40+
end
41+
end
42+
when timeout(10.seconds)
43+
# commit any buffered messages that have not been published yet
44+
commit
3445
end
3546
end
3647
end

0 commit comments

Comments
 (0)