Skip to content

Commit 3943c6b

Browse files
[ci] make pekko stream to bucket work with gcp service account
sdf Signed-off-by: Julien Tinguely <[email protected]>
1 parent e01064a commit 3943c6b

File tree

1 file changed

+61
-7
lines changed

1 file changed

+61
-7
lines changed

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/SequencerAdminConnection.scala

Lines changed: 61 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,22 @@ import org.apache.pekko.http.scaladsl.model.ContentTypes
4242
import org.apache.pekko.stream.connectors.googlecloud.storage.StorageObject
4343
import org.apache.pekko.stream.connectors.googlecloud.storage.scaladsl.GCStorage
4444
import com.google.protobuf.ByteString
45+
import com.typesafe.config.ConfigFactory
4546
import io.grpc.stub.StreamObserver
47+
import org.apache.pekko.actor.ActorSystem
4648
import org.apache.pekko.stream.Materializer
49+
import org.apache.pekko.stream.connectors.google.auth.Credentials
50+
import org.apache.pekko.stream.connectors.google.{GoogleAttributes, GoogleSettings}
4751
import org.apache.pekko.util.ByteString as PekkoByteString
4852
import org.lfdecentralizedtrust.splice.admin.api.client.GrpcClientMetrics
49-
import org.lfdecentralizedtrust.splice.config.BackupDumpConfig
53+
import org.lfdecentralizedtrust.splice.config.{BackupDumpConfig, GcpCredentialsConfig}
5054
import org.lfdecentralizedtrust.splice.environment.SequencerAdminConnection.TrafficState
5155
import org.lfdecentralizedtrust.splice.environment.TopologyAdminConnection.TopologyResult
5256
import org.lfdecentralizedtrust.splice.environment.TopologyAdminConnection.TopologyTransactionType.AuthorizedState
5357

58+
import java.util.{Base64, Collections}
5459
import scala.concurrent.{ExecutionContextExecutor, Future}
60+
import scala.jdk.CollectionConverters.*
5561

5662
/** Connection to the subset of the Canton sequencer admin API that we rely
5763
* on in our own applications.
@@ -136,6 +142,7 @@ class SequencerAdminConnection(
136142
executionSequencerFactory: ExecutionSequencerFactory,
137143
materializer: Materializer,
138144
): Future[StorageObject] = {
145+
implicit val system: ActorSystem = materializer.system
139146

140147
val bucketConfig = backupDumConfig match {
141148
case BackupDumpConfig.Gcp(bucketConfig, _) =>
@@ -145,12 +152,59 @@ class SequencerAdminConnection(
145152
.withDescription("Stream genesis state works only with GCP buckets.")
146153
.asRuntimeException()
147154
}
148-
val sink = GCStorage.resumableUpload(
149-
bucketConfig.bucketName,
150-
fileName,
151-
contentType = ContentTypes.`application/octet-stream`,
152-
chunkSize = 256 * 1024, // Upload it in 256KB chunks
153-
)
155+
156+
val sink = GCStorage
157+
.resumableUpload(
158+
bucketConfig.bucketName,
159+
fileName,
160+
contentType = ContentTypes.`application/octet-stream`,
161+
chunkSize = 256 * 1024, // Upload it in 256KB chunks
162+
)
163+
.withAttributes(
164+
GoogleAttributes.settings(
165+
GoogleSettings().withCredentials(bucketConfig.credentials match {
166+
case cred @ GcpCredentialsConfig.User(_) =>
167+
val x = cred.credentials
168+
Credentials.apply(
169+
ConfigFactory.parseMap(
170+
Map[String, AnyRef](
171+
"provider" -> "user-access",
172+
"user-access" -> Map[String, AnyRef](
173+
"client-id" -> x.getClientId,
174+
"client-secret" -> x.getClientSecret,
175+
"refresh-token" -> x.getRefreshToken,
176+
"project-id" -> bucketConfig.projectId,
177+
).asJava,
178+
).asJava
179+
)
180+
)
181+
case cred @ GcpCredentialsConfig.ServiceAccount(_) =>
182+
val x = cred.credentials
183+
val scopes = if (x.getScopes.isEmpty) {
184+
Collections.singletonList("https://www.googleapis.com/auth/cloud-platform")
185+
} else {
186+
new java.util.ArrayList(x.getScopes)
187+
}
188+
Credentials.apply(
189+
ConfigFactory.parseMap(
190+
Map[String, AnyRef](
191+
"provider" -> "service-account",
192+
"service-account" ->
193+
Map[String, AnyRef](
194+
"private-key" -> Base64.getEncoder.encodeToString(
195+
x.getPrivateKey.getEncoded
196+
),
197+
"client-email" -> x.getClientEmail,
198+
"project-id" -> bucketConfig.projectId,
199+
"scopes" -> scopes,
200+
).asJava,
201+
).asJava
202+
)
203+
)
204+
})
205+
)
206+
)
207+
154208
// the stream observer acts as intermediate receiver
155209
val responseObserver =
156210
new ByteStringStreamObserver[OnboardingStateV2Response](_.onboardingStateForSequencer)

0 commit comments

Comments
 (0)