Skip to content

Commit eb111e3

Browse files
committed
AQL support for SHORTEST, LONGEST, LIKELY and UNLIKELY for prioritizing results
1 parent b963672 commit eb111e3

File tree

2 files changed

+157
-66
lines changed

2 files changed

+157
-66
lines changed

modules/aql/aqlquery.go

Lines changed: 34 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"errors"
55
"sync"
66

7-
deque "github.com/edwingeng/deque/v2"
87
"github.com/lkarlslund/adalanche/modules/engine"
98
"github.com/lkarlslund/adalanche/modules/graph"
109
"github.com/lkarlslund/adalanche/modules/query"
@@ -17,13 +16,12 @@ type AQLquery struct {
1716
sourceCache []*engine.IndexedGraph
1817
Next []EdgeSearcher // count is n-1
1918
Mode QueryMode
20-
Shortest bool
19+
Traversal Priority
2120
OverAllProbability engine.Probability
2221
}
2322

2423
func (aqlq AQLquery) Resolve(opts ResolverOptions) (*graph.Graph[*engine.Node, engine.EdgeBitmap], error) {
2524
if aqlq.Mode == Walk {
26-
// Check we don't have endless filtering potential
2725
for _, nf := range aqlq.Next {
2826
if nf.MaxIterations == 0 {
2927
return nil, errors.New("can't resolve Walk query without edge iteration limit")
@@ -32,7 +30,6 @@ func (aqlq AQLquery) Resolve(opts ResolverOptions) (*graph.Graph[*engine.Node, e
3230
}
3331
pb := ui.ProgressBar("Preparing AQL query sources", int64(len(aqlq.Sources)*2))
3432

35-
// Prepare all the potentialnodes by filtering them and saving them in potentialnodes[n]
3633
aqlq.sourceCache = make([]*engine.IndexedGraph, len(aqlq.Sources))
3734
for i, q := range aqlq.Sources {
3835
aqlq.sourceCache[i] = q.Populate(aqlq.datasource)
@@ -47,7 +44,6 @@ func (aqlq AQLquery) Resolve(opts ResolverOptions) (*graph.Graph[*engine.Node, e
4744
}
4845
pb.Add(1)
4946
pb.Finish()
50-
// nodes := make([]*engine.Objects, len(aqlq.Sources))
5147
result := graph.NewGraph[*engine.Node, engine.EdgeBitmap]()
5248

5349
if len(aqlq.Sources) == 1 {
@@ -63,7 +59,6 @@ func (aqlq AQLquery) Resolve(opts ResolverOptions) (*graph.Graph[*engine.Node, e
6359

6460
var resultlock sync.Mutex
6561
nodeindex := 0
66-
// Iterate over all starting nodes
6762
pb = ui.ProgressBar("Searching from start nodes", int64(len(aqlq.sourceCache)))
6863
aqlq.sourceCache[nodeindex].IterateParallel(func(o *engine.Node) bool {
6964
pb.Add(1)
@@ -79,18 +74,13 @@ func (aqlq AQLquery) Resolve(opts ResolverOptions) (*graph.Graph[*engine.Node, e
7974
return &result, nil
8075
}
8176

82-
var (
83-
directionsIn = []engine.EdgeDirection{engine.In}
84-
directionsOut = []engine.EdgeDirection{engine.Out}
85-
directionsAny = []engine.EdgeDirection{engine.In, engine.Out}
86-
)
87-
88-
var pWPPool sync.Pool
89-
90-
func init() {
91-
pWPPool.New = func() any {
92-
return probableWorkingPath{}
93-
}
77+
type searchState struct {
78+
currentObject *engine.Node
79+
workingGraph probableWorkingPath
80+
currentOverAllProbability float64
81+
currentSearchIndex byte // index into Next and sourceCache patterns
82+
currentDepth byte // depth in current edge searcher
83+
currentTotalDepth byte // total depth in all edge searchers (for total depth limiting)
9484
}
9585

9686
type pathItem struct {
@@ -159,7 +149,6 @@ func (pWP *probableWorkingPath) CommitToGraph(ao *engine.IndexedGraph, g graph.G
159149
continue
160150
}
161151
if pathItem.direction == engine.Out {
162-
// lookup edge from start object to current object
163152
bitmap, found := ao.GetEdge(lastNode, pathItem.target)
164153
if !found {
165154
ui.Error().Msgf("Graph has no outgoing edge from %v to %v!?", lastNode, pathItem.target)
@@ -180,27 +169,16 @@ func (aqlq AQLquery) resolveEdgesFrom(
180169
opts ResolverOptions,
181170
startObject *engine.Node,
182171
) graph.Graph[*engine.Node, engine.EdgeBitmap] {
183-
184172
committedGraph := graph.NewGraph[*engine.Node, engine.EdgeBitmap]()
173+
maxSearchIndex := byte(len(aqlq.Next) - 1)
185174

186-
type searchState struct {
187-
currentObject *engine.Node
188-
workingGraph probableWorkingPath
189-
currentOverAllProbability float64
190-
currentSearchIndex byte // index into Next and sourceCache patterns
191-
currentDepth byte // depth in current edge searcher
192-
currentTotalDepth byte // total depth in all edge searchers (for total depth limiting)
193-
}
194-
195-
// Initialize the search queue with the starting object and search index
196175
var initialWorkingGraph probableWorkingPath
197176
initialWorkingGraph.Add(startObject, engine.Any, 0)
198-
// FIXME initialWorkingGraph.SetNodeData(startObject, "reference", aqlq.Sources[0].Reference)
199-
200-
maxSearchIndex := byte(len(aqlq.Next) - 1)
201177

202-
queue := deque.NewDeque[searchState]()
203-
queue.PushBack(searchState{
178+
queue := PriorityQueue{
179+
p: aqlq.Traversal,
180+
}
181+
queue.Push(searchState{
204182
currentObject: startObject,
205183
currentSearchIndex: 0,
206184
workingGraph: initialWorkingGraph,
@@ -211,8 +189,7 @@ func (aqlq AQLquery) resolveEdgesFrom(
211189

212190
var processed int
213191
var currentState searchState
214-
for !queue.IsEmpty() {
215-
// Check if we've reached the node limit
192+
for queue.Len() > 0 {
216193
if opts.NodeLimit > 0 && committedGraph.Order() >= opts.NodeLimit {
217194
break
218195
}
@@ -222,20 +199,14 @@ func (aqlq AQLquery) resolveEdgesFrom(
222199
}
223200
processed++
224201

225-
if aqlq.Shortest {
226-
// Pop from front for BFS (standard, shortest results)
227-
currentState = queue.PopFront()
228-
} else {
229-
// Pop from end for DFS
230-
currentState = queue.PopBack()
231-
}
202+
currentState = queue.Pop()
203+
232204
nextDepth := currentState.currentDepth + 1
233205
nextTotalDepth := currentState.currentTotalDepth + 1
234206

235207
thisEdgeSearcher := aqlq.Next[currentState.currentSearchIndex]
236208
nextTargets := aqlq.sourceCache[currentState.currentSearchIndex+1]
237209

238-
// If edge has node requirements, use that, otherwise default to the next final node requirement
239210
nextEdgeTargets := thisEdgeSearcher.pathNodeRequirementCache
240211
if nextEdgeTargets == nil {
241212
nextEdgeTargets = nextTargets
@@ -251,11 +222,9 @@ func (aqlq AQLquery) resolveEdgesFrom(
251222
directions = directionsAny
252223
}
253224

254-
// Optionally skip this edge searcher if MinIterations == 0
255225
if thisEdgeSearcher.MinIterations == 0 && currentState.currentDepth == 0 {
256-
// We can skip this one!
257226
if currentState.currentSearchIndex < byte(maxSearchIndex) {
258-
queue.PushBack(searchState{
227+
queue.Push(searchState{
259228
currentObject: currentState.currentObject,
260229
currentSearchIndex: currentState.currentSearchIndex + 1,
261230
workingGraph: currentState.workingGraph,
@@ -264,7 +233,6 @@ func (aqlq AQLquery) resolveEdgesFrom(
264233
currentOverAllProbability: currentState.currentOverAllProbability,
265234
})
266235
} else {
267-
// The last edge searcher is not required, so add this as a match
268236
currentState.workingGraph.CommitToGraph(aqlq.datasource, committedGraph, aqlq.Sources)
269237
}
270238
}
@@ -275,7 +243,6 @@ func (aqlq AQLquery) resolveEdgesFrom(
275243
return false
276244
}
277245

278-
// Check homomorphism requirements
279246
switch aqlq.Mode {
280247
case Trail:
281248
if direction == engine.Out {
@@ -299,7 +266,6 @@ func (aqlq AQLquery) resolveEdgesFrom(
299266
}
300267
}
301268

302-
// Check if the edge is a match
303269
matchedEdges := thisEdgeSearcher.FilterEdges.Bitmap.Intersect(eb)
304270
if thisEdgeSearcher.FilterEdges.Comparator != query.CompareInvalid {
305271
if !query.Comparator[int64](thisEdgeSearcher.FilterEdges.Comparator).Compare(int64(matchedEdges.Count()), thisEdgeSearcher.FilterEdges.Count) {
@@ -318,12 +284,10 @@ func (aqlq AQLquery) resolveEdgesFrom(
318284
edgeProbability = matchedEdges.MaxProbability(nextObject, currentState.currentObject)
319285
}
320286

321-
// Edge probability filtering
322287
if thisEdgeSearcher.ProbabilityComparator != query.CompareInvalid && !query.Comparator[engine.Probability](thisEdgeSearcher.ProbabilityComparator).Compare(edgeProbability, thisEdgeSearcher.ProbabilityValue) {
323288
return true
324289
}
325290

326-
// Options based edge probability filtering
327291
if opts.MinEdgeProbability > 0 && edgeProbability < opts.MinEdgeProbability {
328292
return true
329293
}
@@ -332,28 +296,19 @@ func (aqlq AQLquery) resolveEdgesFrom(
332296
if nextOverAllProbability < float64(aqlq.OverAllProbability) {
333297
return true
334298
}
335-
nextOverAllProbability = nextOverAllProbability / 100 // make it 0-1 float
299+
nextOverAllProbability = nextOverAllProbability / 100
336300

337-
// addedge := matchedEdges
338-
// if thisEdgeSearcher.FilterEdges.NoTrimEdges {
339-
// addedge = eb
340-
// }
341-
342-
// Next node is a match
343301
if nextDepth >= byte(thisEdgeSearcher.MinIterations) &&
344302
(nextTargets == nil || nextTargets.Contains(nextObject)) {
345303
newWorkingGraph := currentState.workingGraph.Clone()
346304
newWorkingGraph.Add(nextObject, direction, byte(currentState.currentSearchIndex+1))
347305

348306
atLastIndex := currentState.currentSearchIndex == byte(maxSearchIndex)
349307
if atLastIndex {
350-
// We've reached the end of the current search index, so let's merge the working graph into the committed graph - it's a complete match
351308
newWorkingGraph.CommitToGraph(aqlq.datasource, committedGraph, aqlq.Sources)
352309
}
353-
// Reached max depth for this edge searcher, cannot continue
354310
if currentState.currentSearchIndex < maxSearchIndex && nextTotalDepth <= byte(opts.MaxDepth) {
355-
// queue next search index
356-
queue.PushBack(searchState{
311+
queue.Push(searchState{
357312
currentObject: nextObject,
358313
currentSearchIndex: currentState.currentSearchIndex + 1,
359314
workingGraph: newWorkingGraph,
@@ -365,11 +320,10 @@ func (aqlq AQLquery) resolveEdgesFrom(
365320
}
366321
if nextDepth < byte(thisEdgeSearcher.MaxIterations) && nextTotalDepth <= byte(opts.MaxDepth) &&
367322
(nextEdgeTargets == nil || nextEdgeTargets.Contains(nextObject)) {
368-
// More edges and nodes along the same searcher (deeper search)
369323
newWorkingGraph := currentState.workingGraph.Clone()
370324
newWorkingGraph.Add(nextObject, direction, 255)
371325

372-
queue.PushBack(searchState{
326+
queue.Push(searchState{
373327
currentObject: nextObject,
374328
currentSearchIndex: currentState.currentSearchIndex,
375329
workingGraph: newWorkingGraph,
@@ -387,3 +341,17 @@ func (aqlq AQLquery) resolveEdgesFrom(
387341

388342
return committedGraph
389343
}
344+
345+
var (
346+
directionsIn = []engine.EdgeDirection{engine.In}
347+
directionsOut = []engine.EdgeDirection{engine.Out}
348+
directionsAny = []engine.EdgeDirection{engine.In, engine.Out}
349+
)
350+
351+
var pWPPool sync.Pool
352+
353+
func init() {
354+
pWPPool.New = func() any {
355+
return probableWorkingPath{}
356+
}
357+
}

modules/aql/priorityqueue.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package aql
2+
3+
type Priority int
4+
5+
const (
6+
ShortestFirst Priority = iota
7+
ProbableShortest
8+
LongestFirst
9+
UnlikelyLongest
10+
)
11+
12+
type PriorityQueue struct {
13+
p Priority
14+
items []searchState
15+
}
16+
17+
func (pq *PriorityQueue) Len() int { return len(pq.items) }
18+
19+
func (pq *PriorityQueue) Less(i, j int) bool {
20+
switch pq.p {
21+
case ProbableShortest:
22+
if pq.items[i].currentOverAllProbability > pq.items[j].currentOverAllProbability {
23+
return true
24+
}
25+
if pq.items[i].currentOverAllProbability < pq.items[j].currentOverAllProbability {
26+
return false
27+
}
28+
fallthrough
29+
case ShortestFirst:
30+
if pq.items[i].currentTotalDepth < pq.items[j].currentTotalDepth {
31+
return true
32+
}
33+
if pq.items[i].currentTotalDepth > pq.items[j].currentTotalDepth {
34+
return false
35+
}
36+
case UnlikelyLongest:
37+
if pq.items[i].currentOverAllProbability < pq.items[j].currentOverAllProbability {
38+
return true
39+
}
40+
if pq.items[i].currentOverAllProbability > pq.items[j].currentOverAllProbability {
41+
return false
42+
}
43+
fallthrough
44+
case LongestFirst:
45+
if pq.items[i].currentTotalDepth > pq.items[j].currentTotalDepth {
46+
return true
47+
}
48+
if pq.items[i].currentTotalDepth < pq.items[j].currentTotalDepth {
49+
return false
50+
}
51+
}
52+
// ensure stability
53+
return pq.items[i].currentObject.ID() < pq.items[j].currentObject.ID()
54+
}
55+
56+
func (pq *PriorityQueue) Swap(i, j int) {
57+
pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
58+
}
59+
60+
func (pq *PriorityQueue) Push(x searchState) {
61+
if len(pq.items) == cap(pq.items) {
62+
newCap := cap(pq.items) * 2
63+
if newCap == 0 {
64+
newCap = 1
65+
}
66+
newItems := make([]searchState, len(pq.items), newCap)
67+
copy(newItems, pq.items)
68+
pq.items = newItems
69+
}
70+
pq.items = append(pq.items, x)
71+
pq.siftUp(len(pq.items) - 1)
72+
}
73+
74+
func (pq *PriorityQueue) Pop() searchState {
75+
if len(pq.items) == 0 {
76+
panic("pop from empty priority queue")
77+
}
78+
item := pq.items[0]
79+
pq.items[0] = pq.items[len(pq.items)-1]
80+
pq.items = pq.items[:len(pq.items)-1]
81+
pq.siftDown(0)
82+
83+
if len(pq.items) <= cap(pq.items)/4 {
84+
newCap := cap(pq.items) / 2
85+
if newCap < 1 {
86+
newCap = 1
87+
}
88+
newItems := make([]searchState, len(pq.items), newCap)
89+
copy(newItems, pq.items)
90+
pq.items = newItems
91+
}
92+
return item
93+
}
94+
95+
func (pq *PriorityQueue) siftUp(i int) {
96+
for {
97+
parent := (i - 1) / 2
98+
if parent == i || !pq.Less(i, parent) {
99+
break
100+
}
101+
pq.Swap(i, parent)
102+
i = parent
103+
}
104+
}
105+
106+
func (pq *PriorityQueue) siftDown(i int) {
107+
for {
108+
left := 2*i + 1
109+
right := 2*i + 2
110+
largest := i
111+
if left < len(pq.items) && pq.Less(left, largest) {
112+
largest = left
113+
}
114+
if right < len(pq.items) && pq.Less(right, largest) {
115+
largest = right
116+
}
117+
if largest == i {
118+
break
119+
}
120+
pq.Swap(i, largest)
121+
i = largest
122+
}
123+
}

0 commit comments

Comments
 (0)