Skip to content

Commit 21ecebc

Browse files
[ci] make pekko stream to bucket work with gcp service account
Signed-off-by: Julien Tinguely <[email protected]>
1 parent e01064a commit 21ecebc

File tree

1 file changed

+62
-7
lines changed

1 file changed

+62
-7
lines changed

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/SequencerAdminConnection.scala

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,22 @@ import org.apache.pekko.http.scaladsl.model.ContentTypes
4242
import org.apache.pekko.stream.connectors.googlecloud.storage.StorageObject
4343
import org.apache.pekko.stream.connectors.googlecloud.storage.scaladsl.GCStorage
4444
import com.google.protobuf.ByteString
45+
import com.typesafe.config.ConfigFactory
4546
import io.grpc.stub.StreamObserver
47+
import org.apache.pekko.actor.ActorSystem
4648
import org.apache.pekko.stream.Materializer
49+
import org.apache.pekko.stream.connectors.google.auth.Credentials
50+
import org.apache.pekko.stream.connectors.google.{GoogleAttributes, GoogleSettings}
4751
import org.apache.pekko.util.ByteString as PekkoByteString
4852
import org.lfdecentralizedtrust.splice.admin.api.client.GrpcClientMetrics
49-
import org.lfdecentralizedtrust.splice.config.BackupDumpConfig
53+
import org.lfdecentralizedtrust.splice.config.{BackupDumpConfig, GcpCredentialsConfig}
5054
import org.lfdecentralizedtrust.splice.environment.SequencerAdminConnection.TrafficState
5155
import org.lfdecentralizedtrust.splice.environment.TopologyAdminConnection.TopologyResult
5256
import org.lfdecentralizedtrust.splice.environment.TopologyAdminConnection.TopologyTransactionType.AuthorizedState
5357

58+
import java.util.{Base64, Collections}
5459
import scala.concurrent.{ExecutionContextExecutor, Future}
60+
import scala.jdk.CollectionConverters.*
5561

5662
/** Connection to the subset of the Canton sequencer admin API that we rely
5763
* on in our own applications.
@@ -133,9 +139,11 @@ class SequencerAdminConnection(
133139
backupDumConfig: BackupDumpConfig,
134140
fileName: String,
135141
)(implicit
142+
tc: TraceContext,
136143
executionSequencerFactory: ExecutionSequencerFactory,
137144
materializer: Materializer,
138145
): Future[StorageObject] = {
146+
implicit val system: ActorSystem = materializer.system
139147

140148
val bucketConfig = backupDumConfig match {
141149
case BackupDumpConfig.Gcp(bucketConfig, _) =>
@@ -145,12 +153,59 @@ class SequencerAdminConnection(
145153
.withDescription("Stream genesis state works only with GCP buckets.")
146154
.asRuntimeException()
147155
}
148-
val sink = GCStorage.resumableUpload(
149-
bucketConfig.bucketName,
150-
fileName,
151-
contentType = ContentTypes.`application/octet-stream`,
152-
chunkSize = 256 * 1024, // Upload it in 256KB chunks
153-
)
156+
157+
val sink = GCStorage
158+
.resumableUpload(
159+
bucketConfig.bucketName,
160+
fileName,
161+
contentType = ContentTypes.`application/octet-stream`,
162+
chunkSize = 256 * 1024, // Upload it in 256KB chunks
163+
)
164+
.withAttributes(
165+
GoogleAttributes.settings(
166+
GoogleSettings().withCredentials(bucketConfig.credentials match {
167+
case cred @ GcpCredentialsConfig.User(_) =>
168+
val x = cred.credentials
169+
Credentials.apply(
170+
ConfigFactory.parseMap(
171+
Map[String, AnyRef](
172+
"provider" -> "user-access",
173+
"user-access" -> Map[String, AnyRef](
174+
"client-id" -> x.getClientId,
175+
"client-secret" -> x.getClientSecret,
176+
"refresh-token" -> x.getRefreshToken,
177+
"project-id" -> bucketConfig.projectId,
178+
).asJava,
179+
).asJava
180+
)
181+
)
182+
case cred @ GcpCredentialsConfig.ServiceAccount(_) =>
183+
val x = cred.credentials
184+
val scopes = if (x.getScopes.isEmpty) {
185+
Collections.singletonList("https://www.googleapis.com/auth/cloud-platform")
186+
} else {
187+
new java.util.ArrayList(x.getScopes)
188+
}
189+
Credentials.apply(
190+
ConfigFactory.parseMap(
191+
Map[String, AnyRef](
192+
"provider" -> "service-account",
193+
"service-account" ->
194+
Map[String, AnyRef](
195+
"private-key" -> Base64.getEncoder.encodeToString(
196+
x.getPrivateKey.getEncoded
197+
),
198+
"client-email" -> x.getClientEmail,
199+
"project-id" -> bucketConfig.projectId,
200+
"scopes" -> scopes,
201+
).asJava,
202+
).asJava
203+
)
204+
)
205+
})
206+
)
207+
)
208+
154209
// the stream observer acts as intermediate receiver
155210
val responseObserver =
156211
new ByteStringStreamObserver[OnboardingStateV2Response](_.onboardingStateForSequencer)

0 commit comments

Comments
 (0)