Skip to content

Commit c620a7d

Browse files
committed
Merge remote-tracking branch 'origin/feature/periodic_ipset_publish'
2 parents c11e7b0 + d6bd7d0 commit c620a7d

14 files changed

+311
-194
lines changed

src/dog_common.erl

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
-include("dog_trainer.hrl").
44

55
-export([
6-
inverse_map_of_lists/1,
6+
concat/2,
77
create_hash/1,
88
eel_test/0,
9+
eq_join/4,
910
format_value/1,
1011
format_var/1,
1112
format_vars/1,
13+
inverse_map_of_lists/1,
1214
list_of_maps_to_map/2,
1315
lmm/2,
1416
merge_lists_in_tuples/1,
@@ -17,10 +19,10 @@
1719
re_filter/2,
1820
rekey_map_of_maps/3,
1921
rkmm/3,
22+
to_binary/1,
2023
to_list/1,
21-
tuple_pairs_to_map_of_lists/1,
2224
to_terraform_name/1,
23-
eq_join/4
25+
tuple_pairs_to_map_of_lists/1
2426
]).
2527

2628
-spec to_list(Item :: iolist() | atom() | tuple() | map() | binary() | integer() | float()) ->
@@ -40,6 +42,23 @@ to_list(Item) when is_integer(Item) ->
4042
to_list(Item) when is_float(Item) ->
4143
float_to_list(Item).
4244

45+
-spec to_binary(Item :: iolist() | atom() | tuple() | map() | binary() | integer() | float()) ->
46+
binary().
47+
to_binary(Item) when is_atom(Item) ->
48+
erlang:atom_to_binary(Item, utf8);
49+
to_binary(Item) when is_list(Item) -> % Assumes Item is a valid iolist or string
50+
erlang:iolist_to_binary(Item);
51+
to_binary(Item) when is_tuple(Item) -> % String representation like "{a,b}"
52+
erlang:iolist_to_binary(io_lib:format("~p", [Item]));
53+
to_binary(Item) when is_map(Item) -> % String representation like "#{a=>b}"
54+
erlang:iolist_to_binary(io_lib:format("~p", [Item]));
55+
to_binary(Item) when is_binary(Item) ->
56+
Item;
57+
to_binary(Item) when is_integer(Item) ->
58+
erlang:integer_to_binary(Item);
59+
to_binary(Item) when is_float(Item) ->
60+
erlang:float_to_binary(Item).
61+
4362
-spec re_filter(List :: [iolist()], Re :: string()) -> [iolist()].
4463
re_filter(List, Re) ->
4564
lists:filter(
@@ -269,3 +288,16 @@ eq_join(Table1Name, Table2Name, Key1, Key2) ->
269288
OneResult
270289
),
271290
JoinedResult.
291+
292+
%% EXTERNAL
293+
294+
concat(Words, string) ->
295+
internal_concat(Words);
296+
concat(Words, binary) ->
297+
list_to_binary(internal_concat(Words)).
298+
299+
%% INTERNAL
300+
301+
internal_concat(Elements) ->
302+
NonBinaryElements = [case Element of _ when is_binary(Element) -> binary_to_list(Element); _ -> Element end || Element <- Elements],
303+
lists:concat(NonBinaryElements).

src/dog_external_agent.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ loop(_RoutingKey, _CType, Payload, State) ->
6464
ExistingExternalId = maps:get(<<"id">>, ExistingExternal),
6565
%TODO: create on link creation, set empty, inactive
6666
dog_external:replace(ExistingExternalId, ExternalEnv),
67-
dog_ipset:update_ipsets(local_env),
67+
?LOG_INFO("dog_ipset_update_agent:queue_update()"),
68+
dog_ipset_update_agent:queue_update(dog_common:concat([<<"external->">>,ExternalEnvName],binary)),
6869
{ack, State}.
6970

7071
-spec set_link_state(NewState :: map()) -> ok | error.

src/dog_external_update_agent.erl

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
-module(dog_external_update_agent).
2+
-behaviour(gen_server).
3+
4+
%% ------------------------------------------------------------------
5+
%% Record and Type Definitions
6+
%% ------------------------------------------------------------------
7+
8+
-include("dog_trainer.hrl").
9+
-include_lib("kernel/include/logger.hrl").
10+
%% ------------------------------------------------------------------
11+
%% API Function Exports
12+
%% ------------------------------------------------------------------
13+
14+
-export([
15+
periodic_publish/0,
16+
publish_to_external/1,
17+
publish_to_outbound_exchange/2,
18+
publish_to_outbound_exchanges/1,
19+
queue_length/0,
20+
queue_update/1,
21+
start_link/0
22+
]).
23+
24+
%% ------------------------------------------------------------------
25+
%% gen_server Function Exports
26+
%% ------------------------------------------------------------------
27+
28+
-export([
29+
init/1,
30+
handle_call/3,
31+
handle_cast/2,
32+
handle_info/2,
33+
terminate/2,
34+
code_change/3
35+
]).
36+
37+
%% ------------------------------------------------------------------
38+
%% test Function Exports
39+
%% ------------------------------------------------------------------
40+
%-export([do_periodic_publish/1]).
41+
42+
%% ------------------------------------------------------------------
43+
%% API Function Definitions
44+
%% ------------------------------------------------------------------
45+
46+
-spec start_link() ->
47+
{ok, Pid :: pid()} | ignore | {error, {already_started, Pid :: pid()} | term()}.
48+
start_link() ->
49+
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
50+
51+
-spec periodic_publish() -> OldServer :: ok.
52+
periodic_publish() ->
53+
?LOG_INFO("function"),
54+
gen_server:call(?MODULE, periodic_publish).
55+
56+
-spec queue_update(Ipsets :: list) -> ok.
57+
queue_update(Ipsets) ->
58+
imetrics:add(external_queue_add),
59+
gen_server:cast(?MODULE, {add_to_queue, [Ipsets]}).
60+
61+
queue_length() ->
62+
gen_server:call(?MODULE, queue_length).
63+
%% ------------------------------------------------------------------
64+
%% gen_server Function Definitions
65+
%% ------------------------------------------------------------------
66+
67+
%%----------------------------------------------------------------------
68+
%% Func: init/1
69+
%% Returns: {ok, State} |
70+
%% {ok, State, Timeout} |
71+
%% ignore |
72+
%% {stop, Reason}
73+
%%----------------------------------------------------------------------
74+
75+
-spec init(_) -> {'ok', []}.
76+
init(_Args) ->
77+
%CurrentIpset = dog_ipset:read_current_ipset(),
78+
%{ok, CurrentIpsetHashes} = dog_ipset:get_hashes(),
79+
%dog_ipset:create(CurrentIpsetHashes),
80+
PeriodicPublishInterval = application:get_env(
81+
dog_trainer, external_ipset_periodic_publish_interval_seconds, 5
82+
),
83+
_PublishTimer = erlang:send_after(PeriodicPublishInterval * 1000, self(), periodic_publish),
84+
State = ordsets:new(),
85+
{ok, State}.
86+
87+
%%----------------------------------------------------------------------
88+
%% Func: handle_call/3
89+
%% Returns: {reply, Reply, State} |
90+
%% {reply, Reply, State, Timeout} |
91+
%% {noreply, State} |
92+
%% {noreply, State, Timeout} |
93+
%% {stop, Reason, Reply, State} | (terminate/2 is called)
94+
%% {stop, Reason, State} (terminate/2 is called)
95+
%%----------------------------------------------------------------------
96+
-spec handle_call(term(), {pid(), term()}, State :: ips_state()) -> {reply, ok, any()}.
97+
handle_call(queue_length, _from, State) ->
98+
QueueLength = length(State),
99+
{reply, QueueLength, State};
100+
handle_call(_Request, _From, State) ->
101+
{reply, ok, State}.
102+
103+
%%----------------------------------------------------------------------
104+
%% Func: handle_cast/2
105+
%% Returns: {noreply, State} |
106+
%% {noreply, State, Timeout} |
107+
%% {stop, Reason, State} (terminate/2 is called)
108+
%%----------------------------------------------------------------------
109+
-spec handle_cast(_, _) -> {'noreply', _}.
110+
handle_cast(stop, State) ->
111+
{stop, normal, State};
112+
handle_cast({add_to_queue, Ipsets}, State) ->
113+
NewState = Ipsets,
114+
{noreply, NewState};
115+
handle_cast(Msg, State) ->
116+
?LOG_ERROR("unknown_message: Msg: ~p, State: ~p", [Msg, State]),
117+
{noreply, State}.
118+
119+
%%----------------------------------------------------------------------
120+
%% Func: handle_info/2
121+
%% Returns: {noreply, State} |
122+
%% {noreply, State, Timeout} |
123+
%% {stop, Reason, State} (terminate/2 is called)
124+
%%----------------------------------------------------------------------
125+
% TODO: be more specific about Info in spec
126+
-spec handle_info(_, _) -> {'noreply', _}.
127+
handle_info(periodic_publish, State) ->
128+
{ok, NewState} = do_periodic_publish(State),
129+
PeriodicPublishInterval = application:get_env(
130+
dog_trainer, external_ipset_periodic_publish_interval_seconds, 5
131+
),
132+
erlang:send_after(PeriodicPublishInterval * 1000, self(), periodic_publish),
133+
{noreply, NewState};
134+
handle_info(Info, State) ->
135+
?LOG_ERROR("unknown_message: Info: ~p, State: ~p", [Info, State]),
136+
{noreply, State}.
137+
138+
%%----------------------------------------------------------------------
139+
%% Func: terminate/2
140+
%% Purpose: Shutdown the server
141+
%% Returns: any (ignored by gen_server)
142+
%%----------------------------------------------------------------------
143+
-spec terminate(_, ips_state()) -> {close}.
144+
terminate(Reason, State) ->
145+
?LOG_INFO("terminate: Reason: ~p, State: ~p", [Reason, State]),
146+
{close}.
147+
148+
-spec code_change(_, State :: ips_state(), _) -> {ok, State :: ips_state()}.
149+
code_change(_OldVsn, State, _Extra) ->
150+
{ok, State}.
151+
152+
%% ------------------------------------------------------------------
153+
%% Internal Function Definitions
154+
%% ------------------------------------------------------------------
155+
-spec do_periodic_publish(_) -> OldServers :: {ok, list()}.
156+
do_periodic_publish(State) ->
157+
?LOG_INFO("do_periodic_publish"),
158+
case State of
159+
[] ->
160+
{ok, []};
161+
_ ->
162+
?LOG_INFO("ipset queue: ~p", [State]),
163+
?LOG_INFO("length of ipset queue: ~p", [length(State)]),
164+
LatestInternalIpsets = lists:last(State),
165+
publish_to_external(LatestInternalIpsets),
166+
dog_ipset:update_ipsets(),
167+
{ok, []}
168+
end.
169+
170+
-spec publish_to_outbound_exchanges(IpsetExternalMap :: map()) -> any().
171+
publish_to_outbound_exchanges(IpsetExternalMap) ->
172+
{ok, ExternalEnvs} = dog_link:get_all_active_outbound(),
173+
IdsByGroup = dog_group:get_all_internal_ec2_security_group_ids(),
174+
%dog_common:merge_maps_of_lists([IdsByGroupMap,AllActiveUnionEc2Sgs]).
175+
lists:foreach(
176+
fun(Env) ->
177+
EnvName = maps:get(<<"name">>, Env),
178+
ExternalMap = maps:put(<<"ec2">>, IdsByGroup, IpsetExternalMap),
179+
%ExternalMap = maps:put(<<"ec2">>,jsx:encode(#{}),IpsetExternalMap),
180+
?LOG_DEBUG("ExternalMap: ~p~n", [ExternalMap]),
181+
publish_to_outbound_exchange(EnvName, ExternalMap)
182+
end,
183+
ExternalEnvs
184+
).
185+
186+
-spec publish_to_outbound_exchange(TargetEnvName :: binary(), IpsetExternalMap :: map()) -> any().
187+
publish_to_outbound_exchange(TargetEnvName, IpsetExternalMap) ->
188+
?LOG_INFO("IpsetExternalMap: ~p", [IpsetExternalMap]),
189+
{ok, LocalEnvName} = application:get_env(dog_trainer, env),
190+
UserData = #{
191+
ipsets => jsx:encode(IpsetExternalMap),
192+
name => LocalEnvName
193+
},
194+
Count = 1,
195+
Pid = erlang:self(),
196+
Message = term_to_binary([
197+
{count, Count},
198+
{local_time, calendar:local_time()},
199+
{pid, Pid},
200+
{user_data, UserData}
201+
]),
202+
RoutingKey = binary:list_to_bin(LocalEnvName),
203+
BrokerConfigName = list_to_atom(binary:bin_to_list(TargetEnvName)),
204+
%thumper:start_link(BrokerConfigName),
205+
?LOG_INFO("~p, ~p, ~p, ~p", [BrokerConfigName, Message, <<"inbound">>, RoutingKey]),
206+
%Response = thumper:publish_to(BrokerConfigName, Message, <<"inbound">>, RoutingKey),
207+
PublisherName = erlang:binary_to_atom(<<TargetEnvName/binary, <<"_publisher">>/binary>>),
208+
Response = turtle:publish(
209+
PublisherName,
210+
<<"inbound">>,
211+
RoutingKey,
212+
<<"text/json">>,
213+
Message,
214+
#{delivery_mode => persistent}
215+
),
216+
imetrics:add(ipset_outbound_publish),
217+
Response.
218+
219+
-spec publish_to_external(InternalIpsetsMap :: map()) -> any().
220+
publish_to_external(InternalIpsetsMap) ->
221+
?LOG_INFO("publishing to external"),
222+
publish_to_outbound_exchanges(InternalIpsetsMap).

src/dog_group_watcher.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,13 @@ handle_query_result(Result, State) ->
135135
{ok, _} = dog_group:set_hash4_ipsets(GroupName, Hash4Ipsets),
136136
{ok, _} = dog_group:set_hash6_ipsets(GroupName, Hash6Ipsets),
137137
{ok, _} = dog_group:set_hash4_iptables(GroupName, Hash4Iptables),
138-
{ok, _} = dog_group:set_hash6_iptables(GroupName, Hash6Iptables)
138+
{ok, _} = dog_group:set_hash6_iptables(GroupName, Hash6Iptables),
139+
?LOG_INFO("dog_ipset_update_agent:queue_update()"),
140+
dog_ipset_update_agent:queue_update(dog_common:concat([<<"group-">>,GroupName],binary))
139141
end,
140142
Result
141143
)
142144
end,
143-
dog_ipset:force_update_ipsets(),
144145
{noreply, [Result | State]}.
145146

146147
handle_query_done(State) ->

src/dog_host_interfaces_watcher.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ handle_query_result(Result, State) ->
9999
pass;
100100
_ ->
101101
imetrics:add_m(watcher, host_config_update),
102-
dog_ipset_update_agent:queue_update()
102+
?LOG_INFO("dog_ipset_update_agent:queue_update()"),
103+
dog_ipset_update_agent:queue_update(<<"dog_host_interface_watcher">>)
103104
end,
104105
{noreply, [Result | State]}.
105106

src/dog_ips.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ loop(_RoutingKey, _CType, Payload, State) ->
7272
Hostkey = maps:get(<<"hostkey">>, Config),
7373
?LOG_DEBUG(#{hostname => Hostname, hostkey => Hostkey}),
7474
dog_config:update_host_keepalive(Hostkey),
75+
UpdateSource = dog_common:concat([<<"host_group->">>,GroupName],binary),
7576
case dog_host:get_by_hostkey(Hostkey) of
7677
{ok, HostExists} ->
7778
%HostId = maps:get(<<"id">>, HostExists),
@@ -89,12 +90,12 @@ loop(_RoutingKey, _CType, Payload, State) ->
8990
force ->
9091
?LOG_INFO("got force: ~p", [Hostkey]),
9192
dog_host:update_by_hostkey(Hostkey, Config),
92-
dog_ipset_update_agent:queue_force(),
93+
dog_ipset_update_agent:queue_update(UpdateSource), %ignoring force
9394
dog_iptables:update_group_iptables(GroupName, <<"group">>);
9495
update ->
9596
?LOG_INFO("got update: ~p", [Hostkey]),
9697
dog_host:update_by_hostkey(Hostkey, Config),
97-
dog_ipset_update_agent:queue_update(),
98+
dog_ipset_update_agent:queue_update(UpdateSource),
9899
dog_iptables:update_group_iptables(GroupName, <<"group">>);
99100
keepalive ->
100101
?LOG_INFO("got keepalive: ~p", [Hostkey]),
@@ -160,12 +161,12 @@ subscriber_callback(_DeliveryTag, _RoutingKey, Payload) ->
160161
force ->
161162
?LOG_INFO("got force: ~p", [Hostkey]),
162163
dog_host:update_by_hostkey(Hostkey, Config),
163-
dog_ipset_update_agent:queue_force(),
164+
dog_ipset_update_agent:queue_update(Hostkey), %ignoring force
164165
dog_iptables:update_group_iptables(GroupName, <<"group">>);
165166
update ->
166167
?LOG_INFO("got update: ~p", [Hostkey]),
167168
dog_host:update_by_hostkey(Hostkey, Config),
168-
dog_ipset_update_agent:queue_update(),
169+
dog_ipset_update_agent:queue_update(Hostkey),
169170
dog_iptables:update_group_iptables(GroupName, <<"group">>);
170171
keepalive ->
171172
?LOG_INFO("got keepalive: ~p", [Hostkey]),

0 commit comments

Comments
 (0)