Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,6 @@ storage_spec =
"fast_file" ->
{Electric.ShapeCache.PureFileStorage, storage_dir: shape_path}

"crashing_file" ->
num_calls_until_crash =
env!("CRASHING_FILE_ELECTRIC_STORAGE__NUM_CALLS_UNTIL_CRASH", :integer)

{Electric.ShapeCache.CrashingFileStorage,
storage_dir: shape_path, num_calls_until_crash: num_calls_until_crash}

_ ->
raise Dotenvy.Error, message: "storage must be one of: MEMORY, FAST_FILE, LEGACY_FILE"
end
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,6 @@ defmodule Electric.ShapeCache.InMemoryStorage do
@impl Electric.ShapeCache.Storage
def get_all_stored_shape_handles(_opts), do: {:ok, MapSet.new()}

@impl Electric.ShapeCache.Storage
def get_stored_shapes(_opts, _shape_handles), do: %{}

@impl Electric.ShapeCache.Storage
def metadata_backup_dir(_opts), do: nil

@impl Electric.ShapeCache.Storage
def get_total_disk_usage(_opts), do: 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ defmodule Electric.ShapeCache.PureFileStorage do
alias Electric.ShapeCache.PureFileStorage.Snapshot
alias Electric.ShapeCache.PureFileStorage.WriteLoop
alias Electric.ShapeCache.Storage
alias Electric.Shapes.Shape

import LogOffset
import Electric.ShapeCache.PureFileStorage.SharedRecords
Expand Down Expand Up @@ -94,9 +93,6 @@ defmodule Electric.ShapeCache.PureFileStorage do

@clean_cache_keys storage_meta_keys() -- @read_path_keys

# Directory for storing metadata
@metadata_storage_dir ".meta"

def shared_opts(opts) do
stack_id = Keyword.fetch!(opts, :stack_id)
storage_dir = Keyword.get(opts, :storage_dir, "./shapes")
Expand Down Expand Up @@ -205,37 +201,6 @@ defmodule Electric.ShapeCache.PureFileStorage do
|> Enum.into(MapSet.new())}
end

def get_stored_shapes(stack_opts, shape_handles) do
Task.Supervisor.async_stream(
stack_task_supervisor(stack_opts.stack_id),
shape_handles,
fn handle ->
shape_opts = for_shape(handle, stack_opts)

case read_shape_definition(shape_opts) do
{:ok, shape} ->
{handle, {:ok, shape}}

_ ->
Logger.warning(
"Failed to read shape definition for shape #{handle}, removing it from disk"
)

cleanup!(shape_opts)
{handle, {:error, :failed_to_recover_shape}}
end
end,
timeout: :infinity,
ordered: false
)
|> Enum.map(fn {:ok, res} -> res end)
|> Map.new()
end

def metadata_backup_dir(%{base_path: base_path}) do
Path.join([base_path, @metadata_storage_dir, "backups"])
end

def drop_all_ets_entries(stack_id) do
try do
:ets.delete_all_objects(stack_ets(stack_id))
Expand Down Expand Up @@ -523,7 +488,7 @@ defmodule Electric.ShapeCache.PureFileStorage do
def init_writer!(shape_opts, shape_definition) do
table = :ets.new(:in_memory_storage, [:ordered_set, :protected])

{initial_acc, suffix} = initialise_filesystem!(shape_opts, shape_definition)
{initial_acc, suffix} = initialise_filesystem!(shape_opts)

register_with_stack(
shape_opts,
Expand Down Expand Up @@ -569,7 +534,7 @@ defmodule Electric.ShapeCache.PureFileStorage do
writer_state(state, writer_acc: WriteLoop.flush_and_close_all(acc, state))
end

defp initialise_filesystem!(%__MODULE__{} = opts, shape_definition) do
def initialise_filesystem!(%__MODULE__{} = opts) do
on_disk_version = read_metadata!(opts, :version)
new? = is_nil(on_disk_version)

Expand All @@ -585,7 +550,6 @@ defmodule Electric.ShapeCache.PureFileStorage do

if initialize? do
create_directories!(opts)
write_shape_definition!(opts, shape_definition)
end

suffix =
Expand Down Expand Up @@ -747,24 +711,6 @@ defmodule Electric.ShapeCache.PureFileStorage do
:ets.update_element(stack_ets, handle, updates)
end

defp write_shape_definition!(%__MODULE__{} = opts, shape_definition) do
write!(
shape_metadata_path(opts, "shape_definition.json"),
Jason.encode!(shape_definition),
[:raw]
)
end

defp read_shape_definition(%__MODULE__{} = opts) do
path = shape_metadata_path(opts, "shape_definition.json")

with {:ok, contents} <- File.open(path, [:read, :raw, :read_ahead], &IO.binread(&1, :eof)),
{:ok, decoded} <- Jason.decode(if(is_binary(contents), do: contents, else: "")),
{:ok, rebuilt} <- Shape.from_json_safe(decoded) do
{:ok, rebuilt}
end
end

defp last_snapshot_chunk(%__MODULE__{} = opts),
do: read_cached_metadata(opts, :last_snapshot_chunk)

Expand Down
17 changes: 0 additions & 17 deletions packages/sync-service/lib/electric/shape_cache/storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,6 @@ defmodule Electric.ShapeCache.Storage do
@callback get_all_stored_shape_handles(compiled_opts()) ::
{:ok, MapSet.t(shape_handle())} | {:error, term()}

@doc "Retrieve stored shapes for given shape handles"
@callback get_stored_shapes(compiled_opts(), Enumerable.t(shape_handle())) ::
%{shape_handle() => {:ok, shape_def :: Shape.t()} | {:error, term()}}

@doc "Get the directory where metadata backups are stored."
@callback metadata_backup_dir(compiled_opts()) :: String.t() | nil

@doc "Get the total disk usage for all shapes"
@callback get_total_disk_usage(compiled_opts()) :: non_neg_integer()

Expand Down Expand Up @@ -260,16 +253,6 @@ defmodule Electric.ShapeCache.Storage do
mod.get_all_stored_shape_handles(opts)
end

@impl __MODULE__
def get_stored_shapes({mod, opts}, shape_handles) do
mod.get_stored_shapes(opts, shape_handles)
end

@impl __MODULE__
def metadata_backup_dir({mod, opts}) do
mod.metadata_backup_dir(opts)
end

@impl __MODULE__
def get_total_disk_usage({mod, opts}) do
mod.get_total_disk_usage(opts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1187,46 +1187,6 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do
end
end

describe "#{module_name}.get_stored_shapes/1" do
@describetag skip_initialise: true
setup :start_storage

test "retrieves no shapes if no shapes persisted", %{storage_base: storage_base} do
assert %{} = Storage.get_stored_shapes(storage_base, [])
end

test "retrieves stored shapes", %{storage: opts, storage_base: storage_base} do
_writer = Storage.init_writer!(opts, @shape)
invalid_handle = "invalid-handle"

assert %{@shape_handle => {:ok, parsed}, ^invalid_handle => {:error, _}} =
Storage.get_stored_shapes(storage_base, [@shape_handle, invalid_handle])

assert @shape == parsed
end

test "restores shape snapshot started flag", %{storage: opts, storage_base: storage_base} do
_writer = Storage.init_writer!(opts, @shape)
:ok = Storage.mark_snapshot_as_started(opts)

assert %{@shape_handle => {:ok, parsed}} =
Storage.get_stored_shapes(storage_base, [@shape_handle])

assert @shape == parsed
end
end

describe "#{module_name}.metadata_backup_dir/1" do
@describetag skip_initialise: true
setup :start_storage

test "returns metadata backup directory", %{storage_base: storage_base} do
assert dir = Storage.metadata_backup_dir(storage_base)
assert is_binary(dir)
assert Path.type(dir) == :absolute
end
end

describe "#{module_name}.cleanup!/1" do
setup :start_storage

Expand Down Expand Up @@ -1299,7 +1259,6 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do

Storage.cleanup_all!(storage_base)
assert Storage.get_total_disk_usage(storage_base) == 0
refute File.dir?(Storage.metadata_backup_dir(storage_base))
end

test "should handle entire base directory already missing", %{storage_base: storage_base} do
Expand Down
12 changes: 0 additions & 12 deletions packages/sync-service/test/support/test_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,6 @@ defmodule Support.TestStorage do
Storage.get_all_stored_shape_handles(storage)
end

@impl Electric.ShapeCache.Storage
def get_stored_shapes({parent, _init, storage}, shape_handles) do
send(parent, {__MODULE__, :get_stored_shapes, shape_handles})
Storage.get_stored_shapes(storage, shape_handles)
end

@impl Electric.ShapeCache.Storage
def metadata_backup_dir({parent, _init, storage}) do
send(parent, {__MODULE__, :metadata_backup_dir})
Storage.metadata_backup_dir(storage)
end

@impl Electric.ShapeCache.Storage
def get_total_disk_usage({parent, _init, storage}) do
send(parent, {__MODULE__, :get_total_disk_usage})
Expand Down