Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,8 @@ private constructor(
}

private fun discoverDeployment(client: DeploymentApi, uri: String) {
val request = RegisterDeploymentRequest(RegisterHttpDeploymentRequest().uri(uri).force(false))
val request =
RegisterDeploymentRequest(RegisterHttpDeploymentRequest().uri(URI.create(uri)).force(false))

val response =
Unreliables.retryUntilSuccess(20, TimeUnit.SECONDS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ class BackwardCompatibilityTest {
// For each deployment, update its URI
for (deployment in deployments.deployments) {
val updateRequest =
UpdateDeploymentRequest(UpdateHttpDeploymentRequest().uri(localEndpointURI.toString()))
UpdateDeploymentRequest(
UpdateHttpDeploymentRequest().uri(URI.create(localEndpointURI.toString())))

try {
adminApi.updateDeployment(deployment.httpDeploymentResponse.id, updateRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ class ForwardCompatibilityTest {
// For each deployment, update its URI
for (deployment in deployments.deployments) {
val updateRequest =
UpdateDeploymentRequest(UpdateHttpDeploymentRequest().uri(localEndpointURI.toString()))
UpdateDeploymentRequest(
UpdateHttpDeploymentRequest().uri(URI.create(localEndpointURI.toString())))

try {
adminApi.updateDeployment(deployment.httpDeploymentResponse.id, updateRequest)
Expand Down
22 changes: 19 additions & 3 deletions src/main/kotlin/dev/restate/sdktesting/tests/Kafka.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE
package dev.restate.sdktesting.tests

import dev.restate.admin.api.KafkaClusterApi
import dev.restate.admin.api.SubscriptionApi
import dev.restate.admin.client.ApiClient
import dev.restate.admin.model.CreateKafkaClusterRequest
import dev.restate.admin.model.CreateSubscriptionRequest
import dev.restate.sdktesting.infra.KafkaContainer
import dev.restate.sdktesting.infra.runtimeconfig.IngressOptions
Expand All @@ -22,7 +24,7 @@ import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord

object Kafka {
fun produceMessagesToKafka(port: Int, topic: String, values: List<Pair<String, String>>) {
fun produceMessagesToKafka(port: Int, topic: String, values: List<Pair<String?, String>>) {
val props = Properties()
props["bootstrap.servers"] = "PLAINTEXT://localhost:$port"
props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
Expand All @@ -35,6 +37,20 @@ object Kafka {
producer.close()
}

fun registerKafkaCluster(
adminURI: URI,
) {
val kafkaClustersClient =
KafkaClusterApi(ApiClient().setHost(adminURI.host).setPort(adminURI.port))
kafkaClustersClient.createKafkaCluster(
CreateKafkaClusterRequest()
.name("my-cluster")
.properties(
mapOf(
"bootstrap.servers" to
"PLAINTEXT://kafka:${KafkaContainer.KAFKA_NETWORK_PORT}")))
}

fun createKafkaSubscription(
adminURI: URI,
topic: String,
Expand All @@ -45,8 +61,8 @@ object Kafka {
SubscriptionApi(ApiClient().setHost(adminURI.host).setPort(adminURI.port))
subscriptionsClient.createSubscription(
CreateSubscriptionRequest()
.source("kafka://my-cluster/$topic")
.sink("service://$serviceName/$handlerName")
.source(URI.create("kafka://my-cluster/$topic"))
.sink(URI.create("service://$serviceName/$handlerName"))
.options(mapOf("auto.offset.reset" to "earliest")))
}

Expand Down
149 changes: 149 additions & 0 deletions src/main/kotlin/dev/restate/sdktesting/tests/KafkaDynamicSetupTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate SDK Test suite tool,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-test-suite/blob/main/LICENSE
package dev.restate.sdktesting.tests

import dev.restate.client.Client
import dev.restate.sdk.annotation.Handler
import dev.restate.sdk.annotation.Name
import dev.restate.sdk.annotation.Service
import dev.restate.sdk.annotation.VirtualObject
import dev.restate.sdk.common.StateKey
import dev.restate.sdk.endpoint.Endpoint
import dev.restate.sdk.kotlin.Context
import dev.restate.sdk.kotlin.ObjectContext
import dev.restate.sdk.kotlin.stateKey
import dev.restate.sdktesting.infra.*
import dev.restate.sdktesting.tests.Kafka.createKafkaSubscription
import dev.restate.sdktesting.tests.Kafka.produceMessagesToKafka
import dev.restate.sdktesting.tests.Kafka.registerKafkaCluster
import java.net.URI
import java.util.*
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.kotlin.await
import org.awaitility.kotlin.withAlias
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.RegisterExtension
import org.junit.jupiter.api.parallel.Execution
import org.junit.jupiter.api.parallel.ExecutionMode
import org.junit.jupiter.api.parallel.Isolated

@Tag("only-single-node" /* This test depends on metadata propagation happening immediately */)
@Isolated
class KafkaDynamicSetupTest {

@VirtualObject
@Name("Counter")
class Counter {
companion object {
private val COUNTER_KEY: StateKey<Long> = stateKey("counter")
}

@Handler
suspend fun add(ctx: ObjectContext, value: Long): Long {
val current = ctx.get(COUNTER_KEY) ?: 0L
val newValue = current + value
ctx.set(COUNTER_KEY, newValue)
return newValue
}

@Handler
suspend fun get(ctx: ObjectContext): Long {
return ctx.get(COUNTER_KEY) ?: 0L
}
}

@Service
@Name("EventHandler")
class EventHandler {
@Serializable data class ProxyRequest(val key: String, val value: Long)

@Handler
suspend fun oneWayCall(ctx: Context, request: ProxyRequest) {
KafkaDynamicSetupTestCounterClient.fromContext(ctx, request.key).send().add(request.value)
}
}

companion object {
private const val COUNTER_TOPIC = "counter"
private const val EVENT_HANDLER_TOPIC = "event-handler"

@RegisterExtension
val deployerExt: RestateDeployerExtension = RestateDeployerExtension {
withEndpoint(Endpoint.bind(Counter()).bind(EventHandler()))
withContainer("kafka", KafkaContainer(COUNTER_TOPIC, EVENT_HANDLER_TOPIC))
}

@JvmStatic
@BeforeAll
fun beforeAll(
@InjectAdminURI adminURI: URI,
) {
registerKafkaCluster(adminURI)
}
}

@Test
@Execution(ExecutionMode.CONCURRENT)
fun handleEventInCounterService(
@InjectAdminURI adminURI: URI,
@InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT)
kafkaPort: Int,
@InjectClient ingressClient: Client
) = runTest {
val counter = UUID.randomUUID().toString()

// Register subscription
createKafkaSubscription(adminURI, COUNTER_TOPIC, "Counter", "add")

// Produce message to kafka
produceMessagesToKafka(
kafkaPort, COUNTER_TOPIC, listOf(counter to "1", counter to "2", counter to "3"))

await withAlias
"Updates from Kafka are visible in the counter" untilAsserted
{
assertThat(KafkaDynamicSetupTestCounterClient.fromClient(ingressClient, counter).get())
.isEqualTo(6L)
}
}

@Test
@Execution(ExecutionMode.CONCURRENT)
fun handleEventInEventHandler(
@InjectAdminURI adminURI: URI,
@InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT)
kafkaPort: Int,
@InjectClient ingressClient: Client
) = runTest {
val counter = UUID.randomUUID().toString()

// Register subscription
createKafkaSubscription(adminURI, EVENT_HANDLER_TOPIC, "EventHandler", "oneWayCall")

// Produce message to kafka
produceMessagesToKafka(
kafkaPort,
EVENT_HANDLER_TOPIC,
listOf(
null to Json.encodeToString(EventHandler.ProxyRequest(counter, 1)),
null to Json.encodeToString(EventHandler.ProxyRequest(counter, 2)),
null to Json.encodeToString(EventHandler.ProxyRequest(counter, 3))))

await withAlias
"Updates from Kafka are visible in the counter" untilAsserted
{
assertThat(KafkaDynamicSetupTestCounterClient.fromClient(ingressClient, counter).get())
.isEqualTo(6L)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package dev.restate.sdktesting.tests

import dev.restate.admin.api.InvocationApi
import dev.restate.admin.client.ApiClient
import dev.restate.admin.model.RestartAsNewInvocationDeploymentParameter
import dev.restate.client.Client
import dev.restate.client.kotlin.attachSuspend
import dev.restate.sdk.annotation.Handler
Expand Down Expand Up @@ -92,7 +93,10 @@ class PauseResumeChangingDeploymentTest {
// Resume the paused invocation on the specific endpoint
val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port)
val invocationApi = InvocationApi(adminClient)
retryOnServiceUnavailable { invocationApi.resumeInvocation(invocationId, local.deploymentId) }
retryOnServiceUnavailable {
invocationApi.resumeInvocation(
invocationId, RestartAsNewInvocationDeploymentParameter(local.deploymentId))
}

assertThat(sendResult.attachSuspend().response()).isEqualTo("Success in new version!")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package dev.restate.sdktesting.tests

import dev.restate.admin.api.InvocationApi
import dev.restate.admin.client.ApiClient
import dev.restate.admin.model.RestartAsNewInvocationDeploymentParameter
import dev.restate.client.Client
import dev.restate.client.kotlin.attachSuspend
import dev.restate.sdk.annotation.Handler
Expand Down Expand Up @@ -90,7 +91,10 @@ class PauseResumeTest {
// Resume the paused invocation on the specific endpoint
val adminClient = ApiClient().setHost(adminURI.host).setPort(adminURI.port)
val invocationApi = InvocationApi(adminClient)
retryOnServiceUnavailable { invocationApi.resumeInvocation(invocationId, null) }
retryOnServiceUnavailable {
invocationApi.resumeInvocation(
invocationId, RestartAsNewInvocationDeploymentParameter("keep"))
}

assertThat(sendResult.attachSuspend().response()).isEqualTo("input")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package dev.restate.sdktesting.tests

import dev.restate.admin.api.InvocationApi
import dev.restate.admin.client.ApiClient
import dev.restate.admin.model.RestartAsNewInvocationDeploymentParameter
import dev.restate.client.Client
import dev.restate.client.IngressException
import dev.restate.client.kotlin.*
Expand Down Expand Up @@ -100,7 +101,10 @@ class RestartAsNewInvocationTest {
val invocationApi = InvocationApi(adminClient)
val newInvocationId =
retryOnServiceUnavailable {
invocationApi.restartAsNewInvocation(sendResult.invocationId(), null, null)
invocationApi.restartAsNewInvocation(
sendResult.invocationId(),
null,
RestartAsNewInvocationDeploymentParameter("latest"))
}
.newInvocationId

Expand Down Expand Up @@ -158,7 +162,8 @@ class RestartAsNewInvocationTest {
val invocationApi = InvocationApi(adminClient)
val newInvocationId =
retryOnServiceUnavailable {
invocationApi.restartAsNewInvocation(sendResult.invocationId(), 1, null)
invocationApi.restartAsNewInvocation(
sendResult.invocationId(), 1, RestartAsNewInvocationDeploymentParameter("latest"))
}
.newInvocationId

Expand Down
3 changes: 2 additions & 1 deletion src/main/kotlin/dev/restate/sdktesting/tests/utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ fun startAndRegisterLocalEndpoint(endpoint: Endpoint, adminURI: URI): LocalEndpo
try {
deploymentApi
.createDeployment(
RegisterDeploymentRequest(RegisterHttpDeploymentRequest().uri(uri).force(false)))
RegisterDeploymentRequest(
RegisterHttpDeploymentRequest().uri(URI.create(uri)).force(false)))
.id
} catch (e: Exception) {
LOG.error("Failed to register new deployment {}: {}", uri, e.message)
Expand Down
Loading
Loading