Skip to content

Commit 33ad0f6

Browse files
[static] init
Signed-off-by: Julien Tinguely <[email protected]>
1 parent 61d841c commit 33ad0f6

File tree

10 files changed

+292
-37
lines changed

10 files changed

+292
-37
lines changed

apps/app/src/main/scala/org/lfdecentralizedtrust/splice/config/SpliceConfig.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,17 @@ object SpliceConfig {
503503
deriveReader[InitialAnsConfig]
504504
implicit val domainFeesConfigReader: ConfigReader[SynchronizerFeesConfig] =
505505
deriveReader[SynchronizerFeesConfig]
506+
implicit val topologySnapshotConfigHint: FieldCoproductHint[TopologySnapshotConfig] =
507+
new FieldCoproductHint[TopologySnapshotConfig]("type")
508+
implicit val topologySnapshotConfigDirectoryReader
509+
: ConfigReader[TopologySnapshotConfig.Directory] =
510+
deriveReader[TopologySnapshotConfig.Directory]
511+
implicit val topologySnapshotConfigGcpReader: ConfigReader[TopologySnapshotConfig.Gcp] =
512+
deriveReader[TopologySnapshotConfig.Gcp]
513+
implicit val topologySnapshotConfigReader: ConfigReader[TopologySnapshotConfig] =
514+
deriveReader[TopologySnapshotConfig]
515+
implicit val periodicTopologySnapshotConfig: ConfigReader[PeriodicTopologySnapshotConfig] =
516+
deriveReader[PeriodicTopologySnapshotConfig]
506517
implicit val svOnboardingFoundDsoReader: ConfigReader[SvOnboardingConfig.FoundDso] =
507518
deriveReader[SvOnboardingConfig.FoundDso]
508519
implicit val svOnboardingJoinWithKeyReader: ConfigReader[SvOnboardingConfig.JoinWithKey] =
@@ -935,6 +946,15 @@ object SpliceConfig {
935946
deriveWriter[InitialAnsConfig]
936947
implicit val domainFeesConfigWriter: ConfigWriter[SynchronizerFeesConfig] =
937948
deriveWriter[SynchronizerFeesConfig]
949+
implicit val topologySnapshotConfigDirectoryWriter
950+
: ConfigWriter[TopologySnapshotConfig.Directory] =
951+
deriveWriter[TopologySnapshotConfig.Directory]
952+
implicit val topologySnapshotConfigGcpWriter: ConfigWriter[TopologySnapshotConfig.Gcp] =
953+
deriveWriter[TopologySnapshotConfig.Gcp]
954+
implicit val topologySnapshotConfigWriter: ConfigWriter[TopologySnapshotConfig] =
955+
deriveWriter[TopologySnapshotConfig]
956+
implicit val periodicTopologySnapshotConfig: ConfigWriter[PeriodicTopologySnapshotConfig] =
957+
deriveWriter[PeriodicTopologySnapshotConfig]
938958
implicit val svOnboardingFoundDsoWriter: ConfigWriter[SvOnboardingConfig.FoundDso] =
939959
deriveWriter[SvOnboardingConfig.FoundDso]
940960
implicit val svOnboardingJoinWithKeyWriter: ConfigWriter[SvOnboardingConfig.JoinWithKey] =
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package org.lfdecentralizedtrust.splice.config
5+
6+
import com.digitalasset.canton.config.NonNegativeFiniteDuration
7+
8+
final case class PeriodicTopologySnapshotConfig(
9+
location: TopologySnapshotConfig,
10+
backupInterval: NonNegativeFiniteDuration,
11+
)
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package org.lfdecentralizedtrust.splice.config
5+
6+
import java.nio.file.Path
7+
8+
sealed abstract class TopologySnapshotConfig {
9+
def locationDescription: String
10+
}
11+
12+
object TopologySnapshotConfig {
13+
final case class Directory(
14+
directory: Path
15+
) extends TopologySnapshotConfig {
16+
override val locationDescription = s"directory $directory"
17+
}
18+
19+
final case class Gcp(
20+
bucket: GcpBucketConfig,
21+
prefix: Option[String],
22+
) extends TopologySnapshotConfig {
23+
override val locationDescription = s"GCP bucket ${bucket.bucketName}"
24+
}
25+
}

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/util/GcpBucket.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,10 @@ class GcpBucket(config: GcpBucketConfig, override val loggerFactory: NamedLogger
6161
val blob = storage.get(blobId)
6262
blob.getContent()
6363
}
64+
65+
def fileExists(fileName: String): Boolean = {
66+
val blobId = BlobId.of(config.bucketName, fileName)
67+
val blob = storage.get(blobId)
68+
blob.exists()
69+
}
6470
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package org.lfdecentralizedtrust.splice.util
5+
6+
import better.files.File
7+
import com.digitalasset.canton.logging.NamedLoggerFactory
8+
import com.digitalasset.canton.tracing.TraceContext
9+
import io.circe.Decoder
10+
import io.grpc.Status
11+
import org.lfdecentralizedtrust.splice.config.{GcpBucketConfig, TopologySnapshotConfig}
12+
13+
import java.io.FileNotFoundException
14+
import java.nio.charset.StandardCharsets
15+
import java.nio.file.{Path, Paths}
16+
import scala.util.Try
17+
18+
object TopologySnapshot {
19+
20+
/** Blocking function to write a string to a file within the given backup dump location.
21+
*
22+
* Returns the path to the written file as a string.
23+
*/
24+
def write(
25+
config: TopologySnapshotConfig,
26+
filename: Path,
27+
content: String,
28+
loggerFactory: NamedLoggerFactory,
29+
)(implicit tc: TraceContext): Path =
30+
config match {
31+
case TopologySnapshotConfig.Directory(directory) =>
32+
val file = directory.resolve(filename)
33+
writeToPath(file, content).path
34+
case TopologySnapshotConfig.Gcp(bucketConfig, prefix) =>
35+
val gcpBucket = new GcpBucket(bucketConfig, loggerFactory)
36+
val path = prefix.fold(filename)(prefix => Paths.get(prefix).resolve(filename))
37+
gcpBucket.dumpStringToBucket(content, path)
38+
path
39+
}
40+
41+
def writeToPath(path: Path, content: String): File = {
42+
import better.files.File
43+
val file = File(path)
44+
withParentDirectoryFor(path) {
45+
// even though the default is UTF-8 the String implementation of encoding is broken so we need to explicitly set
46+
// StandardCharsets.UTF_8 and not Charset.defaultCharset()
47+
// for more details check #2864
48+
file.write(content)(File.OpenOptions.default, StandardCharsets.UTF_8)
49+
}
50+
file
51+
}
52+
53+
def withParentDirectoryFor(path: Path)(f: => Unit): Unit = {
54+
import better.files.File
55+
val file = File(path)
56+
file.parent.createDirectories()
57+
f
58+
}
59+
60+
def fileExists(
61+
config: TopologySnapshotConfig,
62+
fileName: String,
63+
loggerFactory: NamedLoggerFactory,
64+
): Boolean = {
65+
config match {
66+
case conf: TopologySnapshotConfig.Gcp =>
67+
val gcpBucket = new GcpBucket(conf.bucket, loggerFactory)
68+
gcpBucket.fileExists(fileName)
69+
case _ =>
70+
throw Status.UNIMPLEMENTED
71+
.withDescription("Topology snapshot works only with GCP buckets")
72+
.asRuntimeException()
73+
}
74+
}
75+
76+
def readFromPath[T: Decoder](path: Path): Try[T] = Try {
77+
val dumpFile = better.files.File(path)
78+
if (!dumpFile.exists) {
79+
throw new FileNotFoundException(s"Failed to find dump file at $path")
80+
} else {
81+
val jsonString: String = dumpFile.contentAsString(StandardCharsets.UTF_8)
82+
io.circe.parser.decode[T](jsonString) match {
83+
case Left(error) =>
84+
throw new IllegalArgumentException(
85+
s"Failed to parse dump file at $path: $error"
86+
)
87+
case Right(dump) => dump
88+
}
89+
}
90+
}
91+
92+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package org.lfdecentralizedtrust.splice.sv.automation
5+
6+
import com.digitalasset.canton.time.Clock
7+
import com.digitalasset.canton.tracing.TraceContext
8+
import io.opentelemetry.api.trace.Tracer
9+
import org.lfdecentralizedtrust.splice.automation.{
10+
PeriodicTaskTrigger,
11+
TaskOutcome,
12+
TaskSuccess,
13+
TriggerContext,
14+
}
15+
import org.lfdecentralizedtrust.splice.config.PeriodicTopologySnapshotConfig
16+
import org.lfdecentralizedtrust.splice.environment.SequencerAdminConnection
17+
import org.lfdecentralizedtrust.splice.util.TopologySnapshot
18+
19+
import java.nio.file.{Path, Paths}
20+
import java.time.{ZoneOffset, ZonedDateTime}
21+
import scala.concurrent.{ExecutionContext, Future, blocking}
22+
23+
class PeriodicTopologySnapshotTrigger(
24+
config: PeriodicTopologySnapshotConfig,
25+
triggerContext: TriggerContext,
26+
sequencerAdminConnection: SequencerAdminConnection,
27+
)(implicit
28+
override val ec: ExecutionContext,
29+
override val tracer: Tracer,
30+
) extends PeriodicTaskTrigger(config.backupInterval, triggerContext) {
31+
32+
override def completeTask(
33+
task: PeriodicTaskTrigger.PeriodicTask
34+
)(implicit traceContext: TraceContext): Future[TaskOutcome] = {
35+
val utcDate = ZonedDateTime.now(ZoneOffset.UTC).toLocalDate.toString
36+
val folderName = s"topology_snapshot_$utcDate"
37+
for {
38+
snapshotMissing <- checkTopologySnapshot(s"$folderName/genesis-state")
39+
res <-
40+
if (snapshotMissing) takeTopologySnapshot(sequencerAdminConnection, folderName, utcDate)
41+
else Future.successful(TaskSuccess("Today's topology snapshot already exists."))
42+
} yield res
43+
}
44+
45+
private def checkTopologySnapshot(fileName: String): Future[Boolean] =
46+
for {
47+
res <- Future {
48+
blocking {
49+
TopologySnapshot.fileExists(config.location, fileName, loggerFactory)
50+
}
51+
}
52+
} yield res
53+
54+
private def takeTopologySnapshot(
55+
sequencerAdminConnection: SequencerAdminConnection,
56+
folderName: String,
57+
utcDate: String,
58+
)(implicit traceContext: TraceContext): Future[TaskSuccess] =
59+
for {
60+
sequencerId <- sequencerAdminConnection.getSequencerId
61+
onboardingState <- sequencerAdminConnection.getOnboardingState(sequencerId)
62+
authorizedStore <- sequencerAdminConnection.exportAuthorizedStoreSnapshot(sequencerId.uid)
63+
_ <- Future {
64+
blocking {
65+
val genesisStateFilename = Paths.get(s"$folderName/genesis-state")
66+
val authorizedFilename = Paths.get(s"$folderName/authorized")
67+
val fileDesc =
68+
s"dumping current topology state into gcp bucket"
69+
logger.debug(s"Attempting to write $fileDesc")
70+
val paths = Seq(
71+
TopologySnapshot.write(
72+
config.location,
73+
genesisStateFilename,
74+
onboardingState.toStringUtf8,
75+
loggerFactory,
76+
),
77+
TopologySnapshot.write(
78+
config.location,
79+
authorizedFilename,
80+
authorizedStore.toStringUtf8,
81+
loggerFactory,
82+
),
83+
)
84+
logger.info(s"Wrote $fileDesc")
85+
paths
86+
}
87+
}
88+
} yield TaskSuccess(s"Took a new topology snapshot for $utcDate")
89+
}

apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/SvSvAutomationService.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import com.digitalasset.canton.logging.NamedLoggerFactory
2626
import com.digitalasset.canton.resource.Storage
2727
import com.digitalasset.canton.time.Clock
2828
import io.opentelemetry.api.trace.Tracer
29+
import org.lfdecentralizedtrust.splice.config.PeriodicTopologySnapshotConfig
2930
import org.lfdecentralizedtrust.splice.store.AppStoreWithIngestion.SpliceLedgerConnectionPriority
3031

3132
import scala.concurrent.ExecutionContextExecutor
@@ -42,6 +43,7 @@ class SvSvAutomationService(
4243
participantAdminConnection: ParticipantAdminConnection,
4344
localSynchronizerNode: Option[LocalSynchronizerNode],
4445
retryProvider: RetryProvider,
46+
topologySnapshotConfig: Option[PeriodicTopologySnapshotConfig],
4547
override protected val loggerFactory: NamedLoggerFactory,
4648
)(implicit
4749
ec: ExecutionContextExecutor,
@@ -76,6 +78,20 @@ class SvSvAutomationService(
7678
)
7779
)
7880

81+
topologySnapshotConfig.foreach(config =>
82+
registerTrigger(
83+
new PeriodicTopologySnapshotTrigger(
84+
config,
85+
triggerContext,
86+
localSynchronizerNode
87+
.getOrElse(
88+
sys.error("Cannot take topology snapshot with no localSynchronizerNode")
89+
)
90+
.sequencerAdminConnection,
91+
)
92+
)
93+
)
94+
7995
config.identitiesDump.foreach { backupConfig =>
8096
registerTrigger(
8197
new BackupNodeIdentitiesTrigger(

apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/config/SvAppConfig.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.lfdecentralizedtrust.splice.config.{
2727
GcpBucketConfig,
2828
LedgerApiClientConfig,
2929
ParticipantBootstrapDumpConfig,
30+
PeriodicTopologySnapshotConfig,
3031
PruningConfig,
3132
SpliceBackendConfig,
3233
SpliceInstanceNamesConfig,
@@ -336,6 +337,8 @@ case class SvAppBackendConfig(
336337
// every SV tries to convert markers from any other SV's book of work (in a contention avoiding fashion)
337338
delegatelessAutomationFeaturedAppActivityMarkerCatchupThreshold: Int = 10_000,
338339
delegatelessAutomationExpiredAmuletBatchSize: Int = 100,
340+
// configuration to periodically take topology snapshots
341+
topologySnapshotConfig: Option[PeriodicTopologySnapshotConfig] = None,
339342
bftSequencerConnection: Boolean = true,
340343
// Skip synchronizer initialization and synchronizer config reconciliation.
341344
// Can be safely set to true for an SV that has completed onboarding unless you

0 commit comments

Comments
 (0)