Skip to content

Commit af0e52f

Browse files
committed
Implementation replacing function
1 parent e13ba4f commit af0e52f

File tree

1 file changed

+46
-15
lines changed

1 file changed

+46
-15
lines changed

lib/fluent/plugin/filter_record_modifier.rb

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,28 +8,33 @@ class Plugin::RecordModifierFilter < Plugin::Filter
88
desc: <<-DESC
99
Prepare values for filtering in configure phase. Prepared values can be used in <record>.
1010
You can write any ruby code.
11-
DESC
11+
DESC
1212
config_param :char_encoding, :string, default: nil,
1313
desc: <<-DESC
1414
Fluentd including some plugins treats the logs as a BINARY by default to forward.
1515
But an user sometimes processes the logs depends on their requirements,
1616
e.g. handling char encoding correctly.
1717
In more detail, please refer this section:
1818
https://github.com/repeatedly/fluent-plugin-record-modifier#char_encoding.
19-
DESC
19+
DESC
2020
config_param :remove_keys, :string, default: nil,
2121
desc: <<-DESC
2222
The logs include needless record keys in some cases.
2323
You can remove it by using `remove_keys` parameter.
2424
This option is exclusive with `whitelist_keys`.
25-
DESC
25+
DESC
2626

2727
config_param :whitelist_keys, :string, default: nil,
2828
desc: <<-DESC
2929
Specify `whitelist_keys` to remove all unexpected keys and values from events.
3030
Modified events will have only specified keys (if exist in original events).
3131
This option is exclusive with `remove_keys`.
32-
DESC
32+
DESC
33+
34+
config_param :replace, :bool, default: false,
35+
desc: <<-DESC
36+
Boolean flag to enable replace function. Default is false.
37+
DESC
3338

3439
def configure(conf)
3540
super
@@ -51,13 +56,16 @@ def configure(conf)
5156
method(:set_encoding)
5257
end
5358

54-
(class << self; self; end).module_eval do
59+
(
60+
class << self;
61+
self;
62+
end).module_eval do
5563
define_method(:change_encoding, m)
5664
end
5765
end
5866

5967
@has_tag_parts = false
60-
conf.elements.select { |element| element.name == 'record' }.each do |element|
68+
conf.elements.select {|element| element.name == 'record'}.each do |element|
6169
element.each_pair do |k, v|
6270
check_config_placeholders(k, v)
6371
element.has_key?(k) # to suppress unread configuration warning
@@ -66,6 +74,20 @@ def configure(conf)
6674
end
6775
end
6876

77+
@replace_keys = Array.new
78+
if @replace
79+
conf.elements.select {|element| element.name == 'replace'}.each do |element|
80+
expr = if element['expression'][0] == "/" && element['expression'][-1] == "/"
81+
element['expression'][1..-2]
82+
else
83+
element['expression']
84+
end
85+
@replace_keys.push("key" => element['key'],
86+
"expression" => Regexp.new(expr),
87+
"replace" => element['replace'])
88+
end
89+
end
90+
6991
if @remove_keys and @whitelist_keys
7092
raise Fluent::ConfigError, "remove_keys and whitelist_keys are exclusive with each other."
7193
elsif @remove_keys
@@ -81,12 +103,12 @@ def configure(conf)
81103
def filter(tag, time, record)
82104
tag_parts = @has_tag_parts ? tag.split('.') : nil
83105

84-
@map.each_pair { |k, v|
106+
@map.each_pair {|k, v|
85107
record[k] = v.expand(tag, time, record, tag_parts)
86108
}
87109

88110
if @remove_keys
89-
@remove_keys.each { |v|
111+
@remove_keys.each {|v|
90112
record.delete(v)
91113
}
92114
elsif @whitelist_keys
@@ -97,8 +119,14 @@ def filter(tag, time, record)
97119
record = modified
98120
end
99121

122+
if @replace && @replace_keys
123+
@replace_keys.each {|rep|
124+
record[rep['key']] = record[rep['key']].gsub(rep['expression'], rep['replace']) if record.include?(rep['key']) && rep['expression'].match(record[rep['key']])
125+
}
126+
end
127+
100128
record = change_encoding(record) if @char_encoding
101-
record
129+
recordk
102130
end
103131

104132
private
@@ -107,15 +135,15 @@ def set_encoding(value)
107135
if value.is_a?(String)
108136
value.force_encoding(@from_enc)
109137
elsif value.is_a?(Hash)
110-
value.each_pair { |k, v|
138+
value.each_pair {|k, v|
111139
if v.frozen? && v.is_a?(String)
112140
value[k] = set_encoding(v.dup)
113141
else
114142
set_encoding(v)
115143
end
116144
}
117145
elsif value.is_a?(Array)
118-
value.each { |v| set_encoding(v) }
146+
value.each {|v| set_encoding(v)}
119147
else
120148
value
121149
end
@@ -126,15 +154,15 @@ def convert_encoding(value)
126154
value.force_encoding(@from_enc) if value.encoding == Encoding::BINARY
127155
value.encode!(@to_enc, @from_enc, :invalid => :replace, :undef => :replace)
128156
elsif value.is_a?(Hash)
129-
value.each_pair { |k, v|
157+
value.each_pair {|k, v|
130158
if v.frozen? && v.is_a?(String)
131159
value[k] = convert_encoding(v.dup)
132160
else
133161
convert_encoding(v)
134162
end
135163
}
136164
elsif value.is_a?(Array)
137-
value.each { |v| convert_encoding(v) }
165+
value.each {|v| convert_encoding(v)}
138166
else
139167
value
140168
end
@@ -143,7 +171,7 @@ def convert_encoding(value)
143171
HOSTNAME_PLACEHOLDERS = %W(__HOSTNAME__ ${hostname})
144172

145173
def check_config_placeholders(k, v)
146-
HOSTNAME_PLACEHOLDERS.each { |ph|
174+
HOSTNAME_PLACEHOLDERS.each {|ph|
147175
if v.include?(ph)
148176
raise ConfigError, %!#{ph} placeholder in #{k} is removed. Use "\#{Socket.gethostname}" instead.!
149177
end
@@ -158,7 +186,10 @@ def initialize(param_key, param_value, prepare_value)
158186
# Use class_eval with string instead of define_method for performance.
159187
# It can't share instructions but this is 2x+ faster than define_method in filter case.
160188
# Refer: http://tenderlovemaking.com/2013/03/03/dynamic_method_definitions.html
161-
(class << self; self; end).class_eval <<-EORUBY, __FILE__, __LINE__ + 1
189+
(
190+
class << self;
191+
self;
192+
end).class_eval <<-EORUBY, __FILE__, __LINE__ + 1
162193
def expand(tag, time, record, tag_parts)
163194
#{__str_eval_code__}
164195
end

0 commit comments

Comments
 (0)