Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/.env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ SECURE_COOKIE=false
DATABASE_URL=postgres://postgres:[email protected]:5432/plausible_dev
CLICKHOUSE_DATABASE_URL=http://127.0.0.1:8123/plausible_events_db
CLICKHOUSE_MAX_BUFFER_SIZE_BYTES=1000000
CLICKHOUSE_SESSION_LOCK_ENABLED=true
SECRET_KEY_BASE=/njrhntbycvastyvtk1zycwfm981vpo/0xrvwjjvemdakc/vsvbrevlwsc6u8rcg
TOTP_VAULT_KEY=Q3BD4nddbkVJIPXgHuo5NthGKSIH0yesRfG05J88HIo=
ENVIRONMENT=dev
Expand Down
1 change: 1 addition & 0 deletions config/.env.load
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ SECURE_COOKIE=false
DATABASE_URL=postgres://postgres:[email protected]:5432/plausible_dev
CLICKHOUSE_DATABASE_URL=http://127.0.0.1:8123/plausible_events_db
CLICKHOUSE_MAX_BUFFER_SIZE_BYTES=1000000
CLICKHOUSE_SESSION_LOCK_ENABLED=true
SECRET_KEY_BASE=/njrhntbycvastyvtk1zycwfm981vpo/0xrvwjjvemdakc/vsvbrevlwsc6u8rcg
TOTP_VAULT_KEY=Q3BD4nddbkVJIPXgHuo5NthGKSIH0yesRfG05J88HIo=
ENVIRONMENT=dev
Expand Down
1 change: 1 addition & 0 deletions config/.env.test
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
DATABASE_URL=postgres://postgres:[email protected]:5432/plausible_test
CLICKHOUSE_DATABASE_URL=http://127.0.0.1:8123/plausible_test
CLICKHOUSE_SESSION_LOCK_ENABLED=true
SECRET_KEY_BASE=/njrhntbycvastyvtk1zycwfm981vpo/0xrvwjjvemdakc/vsvbrevlwsc6u8rcg
TOTP_VAULT_KEY=1Jah1HEOnCEnmBE+4/OgbJRraJIppPmYCNbZoFJboZs=
BASE_URL=http://localhost:8000
Expand Down
26 changes: 26 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,26 @@ end
|> get_var_from_path_or_env("CLICKHOUSE_MAX_BUFFER_SIZE_BYTES", "100000")
|> Integer.parse()

ch_session_lock_enabled =
get_bool_from_path_or_env(config_dir, "CLICKHOUSE_SESSION_LOCK_ENABLED", false)

# Timeout must at least account waiting for writebuffer flush to get
# picked up from the queue and processed.
{ch_session_lock_acquire_timeout_ms, ""} =
config_dir
|> get_var_from_path_or_env("CLICKHOUSE_SESSION_LOCK_ACQUIRE_TIMEOUT_MS", "5000")
|> Integer.parse()

{ch_session_lock_timeout_ms, ""} =
config_dir
|> get_var_from_path_or_env("CLICKHOUSE_SESSION_LOCK_TIMEOUT_MS", "3000")
|> Integer.parse()

{ch_session_lock_interval_ms, ""} =
config_dir
|> get_var_from_path_or_env("CLICKHOUSE_SESSION_LOCK_INTERVAL_MS", "100")
|> Integer.parse()

# Can be generated with `Base.encode64(:crypto.strong_rand_bytes(32))` from
# iex shell or `openssl rand -base64 32` from command line.
totp_vault_key =
Expand Down Expand Up @@ -649,6 +669,12 @@ config :plausible, Plausible.ImportDeletionRepo,
transport_opts: ch_transport_opts,
pool_size: 1

config :plausible, Plausible.Session.WriteBuffer,
lock_enabled: ch_session_lock_enabled,
lock_acquire_timeout_ms: ch_session_lock_acquire_timeout_ms,
lock_timeout_ms: ch_session_lock_timeout_ms,
lock_interval_ms: ch_session_lock_interval_ms

config :ex_money,
open_exchange_rates_app_id: get_var_from_path_or_env(config_dir, "OPEN_EXCHANGE_RATES_APP_ID"),
retrieve_every: :timer.hours(24)
Expand Down
10 changes: 10 additions & 0 deletions lib/plausible/cache/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ defmodule Plausible.Cache.Adapter do
:exit, _ -> nil
end

@spec wipe(atom()) :: :ok
def wipe(cache_name) do
cache_name
|> get_names()
|> Enum.map(&ConCache.ets/1)
|> Enum.each(fn table ->
true = :ets.delete_all_objects(table)
end)
end

@spec get(atom(), any()) :: any()
def get(cache_name, key) do
full_cache_name = get_name(cache_name, key)
Expand Down
48 changes: 35 additions & 13 deletions lib/plausible/ingestion/write_buffer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,39 @@ defmodule Plausible.Ingestion.WriteBuffer do
GenServer.call(server, :flush, :infinity)
end

def flush_async(server) do
GenServer.cast(server, :flush)
end

@impl true
def init(opts) do
name = Keyword.fetch!(opts, :name)
buffer = opts[:buffer] || []
max_buffer_size = opts[:max_buffer_size] || default_max_buffer_size()
flush_interval_ms = opts[:flush_interval_ms] || default_flush_interval_ms()
on_init = Keyword.get(opts, :on_init, fn _opts -> %{} end)

Process.flag(:trap_exit, true)
timer = Process.send_after(self(), :tick, flush_interval_ms)

extra_state = on_init.(opts)

{:ok,
%{
buffer: buffer,
timer: timer,
name: Keyword.fetch!(opts, :name),
insert_sql: Keyword.fetch!(opts, :insert_sql),
insert_opts: Keyword.fetch!(opts, :insert_opts),
header: Keyword.fetch!(opts, :header),
buffer_size: IO.iodata_length(buffer),
max_buffer_size: max_buffer_size,
flush_interval_ms: flush_interval_ms
}}
Map.merge(
%{
buffer: buffer,
timer: timer,
name: name,
insert_sql: Keyword.fetch!(opts, :insert_sql),
insert_opts: Keyword.fetch!(opts, :insert_opts),
on_flush: Keyword.get(opts, :on_flush, fn _result, state -> state end),
header: Keyword.fetch!(opts, :header),
buffer_size: IO.iodata_length(buffer),
max_buffer_size: max_buffer_size,
flush_interval_ms: flush_interval_ms
},
extra_state
)}
end

@impl true
Expand All @@ -59,6 +71,14 @@ defmodule Plausible.Ingestion.WriteBuffer do
end
end

def handle_cast(:flush, state) do
%{timer: timer, flush_interval_ms: flush_interval_ms} = state
Process.cancel_timer(timer)
do_flush(state)
new_timer = Process.send_after(self(), :tick, flush_interval_ms)
{:noreply, %{state | buffer: [], buffer_size: 0, timer: new_timer}}
end

@impl true
def handle_info(:tick, state) do
do_flush(state)
Expand Down Expand Up @@ -88,16 +108,18 @@ defmodule Plausible.Ingestion.WriteBuffer do
insert_opts: insert_opts,
insert_sql: insert_sql,
header: header,
name: name
name: name,
on_flush: on_flush
} = state

case buffer do
[] ->
nil
on_flush.(:empty, state)

_not_empty ->
Logger.notice("Flushing #{buffer_size} byte(s) RowBinary from #{name}")
IngestRepo.query!(insert_sql, [header | buffer], insert_opts)
on_flush.(:success, state)
end
end

Expand Down
46 changes: 39 additions & 7 deletions lib/plausible/session/transfer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ defmodule Plausible.Session.Transfer do
end

def init(base_path) do
session_lock_enabled? =
Application.fetch_env!(:plausible, Plausible.Session.WriteBuffer)
|> Keyword.fetch!(:lock_enabled)

File.mkdir_p!(base_path)

replica =
Expand All @@ -57,7 +61,9 @@ defmodule Plausible.Session.Transfer do
primary =
{TinySock,
base_path: base_path,
handler: fn message -> handle_replica(message, parent, given_counter) end}
handler: fn message ->
handle_replica(message, parent, given_counter, session_lock_enabled?)
end}

alive =
Supervisor.child_spec(
Expand Down Expand Up @@ -101,27 +107,53 @@ defmodule Plausible.Session.Transfer do
}
end

defp handle_replica(request, parent, given_counter) do
defp handle_replica(request, parent, given_counter, session_lock_enabled?) do
Logger.notice(
"Session transfer message received at #{node()}: #{inspect(request, limit: 10)}"
)

case request do
{@cmd_list_cache_names, session_version} ->
if session_version == session_version() and attempted?(parent) do
Cache.Adapter.get_names(:sessions)
else
[]
end
list_cache_names(session_version, parent, session_lock_enabled?)

{@cmd_dump_cache, cache} ->
Cache.Adapter.cache2list(cache)

@cmd_takeover_done ->
if session_lock_enabled? do
# Wipe the cache after transfer is complete to avoid making the transferred cache stale
Cache.Adapter.wipe(:sessions)
# Unblock ingest after transfer is done
Plausible.Session.WriteBuffer.unlock()
end

:counters.add(given_counter, 1, 1)
end
end

defp list_cache_names(session_version, parent, true = _session_cache_enabled?) do
if session_version == session_version() and attempted?(parent) do
# Blocking ingest before transfer
case Plausible.Session.WriteBuffer.lock() do
:ok ->
Cache.Adapter.get_names(:sessions)

_ ->
[]
end
else
[]
end
end

defp list_cache_names(session_version, parent, false = _session_cache_enabled?) do
if session_version == session_version() and attempted?(parent) do
Cache.Adapter.get_names(:sessions)
else
[]
end
end

defp init_takeover(base_path) do
started = System.monotonic_time()

Expand Down
86 changes: 85 additions & 1 deletion lib/plausible/session/write_buffer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ defmodule Plausible.Session.WriteBuffer do
name: __MODULE__,
header: unquote(header),
insert_sql: unquote(insert_sql),
insert_opts: unquote(insert_opts)
insert_opts: unquote(insert_opts),
on_init: &Plausible.Session.WriteBuffer.on_init/1,
on_flush: &Plausible.Session.WriteBuffer.on_flush/2
)

Plausible.Ingestion.WriteBuffer.child_spec(opts)
Expand All @@ -40,4 +42,86 @@ defmodule Plausible.Session.WriteBuffer do
def flush do
Plausible.Ingestion.WriteBuffer.flush(__MODULE__)
end

@doc false
def on_init(opts) do
name = Keyword.fetch!(opts, :name)

^name = :ets.new(name, [:named_table, :set, :public])

%{
lock_timeout_ms: opts[:lock_timeouts_ms] || default_lock_timeout_ms(),
lock_interval_ms: opts[:lock_interval_ms] || default_lock_interval_ms()
}
end

@doc false
def on_flush(_, state) do
case :ets.lookup(state.name, :state) do
[state: %{locker: pid}] when is_pid(pid) ->
send(pid, {:locked, state.name})
now = System.monotonic_time()
lock_loop(state.name, now, state.lock_timeout_ms, state.lock_interval_ms)

_ ->
:ignore
end

state
end

def lock(timeout \\ nil) do
locker = self()
timeout = timeout || default_lock_acquire_timeout_ms()
name = __MODULE__

true = :ets.insert(name, {:state, %{locker: locker}})
Plausible.Ingestion.WriteBuffer.flush_async(name)

receive do
{:locked, ^name} -> :ok
after
timeout -> {:error, :timeout}
end
end

def unlock() do
name = __MODULE__
true = :ets.insert(name, {:state, %{locker: nil}})

:ok
end

defp lock_loop(name, start, lock_timeout, lock_interval) do
now = System.monotonic_time()

if now - start <= lock_timeout do
Process.sleep(lock_interval)

case :ets.lookup(name, :state) do
[state: %{locker: pid}] when is_pid(pid) ->
lock_loop(name, start, lock_timeout, lock_interval)

_ ->
:pass
end
else
# Wipe the cache before unlocking to prevent stale session in case
# transfer actually occurs, either partially or completely
Plausible.Cache.Adapter.wipe(:sessions)
unlock()
end
end

defp default_lock_acquire_timeout_ms do
Keyword.fetch!(Application.get_env(:plausible, __MODULE__), :lock_acquire_timeout_ms)
end

defp default_lock_timeout_ms do
Keyword.fetch!(Application.get_env(:plausible, __MODULE__), :lock_timeout_ms)
end

defp default_lock_interval_ms do
Keyword.fetch!(Application.get_env(:plausible, __MODULE__), :lock_interval_ms)
end
end
9 changes: 8 additions & 1 deletion test/plausible/session/transfer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@ defmodule Plausible.Session.TransferTest do

Enum.each(1..250, fn _ -> process_event(old, build(:event, name: "pageview")) end)

:ok = :peer.call(old, Plausible.Session.WriteBuffer, :flush, [])
:ok = :peer.call(old, Plausible.Event.WriteBuffer, :flush, [])

old_sessions_sorted = all_sessions_sorted(old)

new = start_another_plausible(tmp_dir)

await_transfer(new)

assert all_sessions_sorted(new) == all_sessions_sorted(old)
assert all_sessions_sorted(old) == []
assert all_sessions_sorted(new) == old_sessions_sorted
end

defp start_another_plausible(tmp_dir) do
Expand Down
Loading