Skip to content

Commit 0c56e65

Browse files
zhaohehuhupan3793
authored andcommitted
[KYUUBI #7171] Fix empty list engine result when etcd is used as the service registry
### Why are the changes needed? In etcd, keys are stored in a flat namespace where / is just part of the key name. For example, /kyuubi_version_USER_SPARK_SQL/test/default is treated as a complete single key. To retrieve all entries under a similar path, a prefix query (--prefix) is required. In ZooKeeper, data is organized in a hierarchical tree structure where / represents parent-child relationships. ### How was this patch tested? added a UT to verify code. ### Was this patch authored or co-authored using generative AI tooling? No Closes #7171 from zhaohehuhu/dev-0813. Closes #7171 48de708 [zhaohehuhu] fix 8f83a46 [zhaohehuhu] refactor 2d9b322 [zhaohehuhu] refactor 85207c4 [zhaohehuhu] refactor 3961489 [zhaohehuhu] reformat 15cc07b [zhaohehuhu] Fix empty list engine result when etcd is used as the service registry Lead-authored-by: zhaohehuhu <[email protected]> Co-authored-by: zhaohehuhu <[email protected]> Signed-off-by: Cheng Pan <[email protected]>
1 parent 28bba27 commit 0c56e65

File tree

4 files changed

+32
-7
lines changed

4 files changed

+32
-7
lines changed

kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,14 @@ trait DiscoveryClient extends Logging {
6262
def pathExists(path: String): Boolean
6363

6464
/**
65-
* Check if the path non exists.
66-
*/
67-
def pathNonExists(path: String): Boolean
65+
* Checks whether the given path and all its child paths (prefix matches) do not exist.
66+
* The isPrefix is set to true by default for Etcd to retrieve all entries under the given path.
67+
* For other discovery service, it can be set false by default to only check the exact path.
68+
* @param path The path to check
69+
* @param isPrefix whether to check all paths with the given path as prefix
70+
* @return true if the path and all its sub-paths are non-existent; false otherwise
71+
*/
72+
def pathNonExists(path: String, isPrefix: Boolean = false): Boolean
6873

6974
/**
7075
* Delete a path.

kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,10 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
147147
!pathNonExists(path)
148148
}
149149

150-
override def pathNonExists(path: String): Boolean = {
151-
kvClient.get(ByteSequence.from(path.getBytes())).get().getKvs.isEmpty
150+
override def pathNonExists(path: String, isPrefix: Boolean = true): Boolean = {
151+
kvClient.get(
152+
ByteSequence.from(path.getBytes()),
153+
GetOption.newBuilder().isPrefix(isPrefix).build()).get().getKvs.isEmpty
152154
}
153155

154156
override def delete(path: String, deleteChildren: Boolean = false): Unit = {
@@ -311,7 +313,7 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
311313
override def getAndIncrement(path: String, delta: Int = 1): Int = {
312314
val lockPath = s"${path}_tmp_for_lock"
313315
tryWithLock(lockPath, 60 * 1000) {
314-
if (pathNonExists(path)) {
316+
if (pathNonExists(path, isPrefix = false)) {
315317
create(path, "PERSISTENT")
316318
setData(path, String.valueOf(0).getBytes)
317319
}

kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
8686
zkClient.checkExists().forPath(path) != null
8787
}
8888

89-
override def pathNonExists(path: String): Boolean = {
89+
override def pathNonExists(path: String, isPrefix: Boolean): Boolean = {
9090
zkClient.checkExists().forPath(path) == null
9191
}
9292

kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClientSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,22 @@ class EtcdDiscoveryClientSuite extends DiscoveryClientTests {
9292
assert(!discoveryClient.pathExists(path))
9393
}
9494
}
95+
96+
test("etcd test: set, get with path prefix and delete") {
97+
withDiscoveryClient(conf) { discoveryClient =>
98+
val path = "/kyuubi_version_USER_SPARK_SQL/test/default"
99+
val pathPrefix = "/kyuubi_version_USER_SPARK_SQL/test"
100+
// set
101+
discoveryClient.create(path, "PERSISTENT")
102+
assert(discoveryClient.pathExists(path))
103+
assert(!discoveryClient.pathNonExists(pathPrefix))
104+
105+
// get
106+
assert(new String(discoveryClient.getData(path), StandardCharsets.UTF_8) == path)
107+
108+
// delete
109+
discoveryClient.delete(path)
110+
assert(!discoveryClient.pathExists(path))
111+
}
112+
}
95113
}

0 commit comments

Comments
 (0)