@@ -47,6 +47,7 @@ import {
4747 assertIsError ,
4848 assertRejects ,
4949 assertThrows ,
50+ fail ,
5051} from "jsr:@std/assert" ;
5152
5253import { JetStreamClientImpl , JetStreamManagerImpl } from "../src/jsclient.ts" ;
@@ -309,6 +310,127 @@ Deno.test("jetstream - publish require last sequence by subject", async () => {
309310 await cleanup ( ns , nc ) ;
310311} ) ;
311312
313+ Deno . test ( "jetstream - last subject sequence subject" , async ( ) => {
314+ // https://github.com/nats-io/nats-server/blob/47382b1ee49a0dec1c1d8785d54790b39b7a3289/server/jetstream_test.go#L9095
315+ const { ns, nc } = await setup ( jetstreamServerConf ( { } ) ) ;
316+ const jsm = await jetstreamManager ( nc ) ;
317+ await jsm . streams . add ( {
318+ name : "A" ,
319+ subjects : [ `a.>` ] ,
320+ max_msgs_per_subject : 1 ,
321+ } ) ;
322+
323+ const js = jsm . jetstream ( ) ;
324+
325+ const r : Record < string , number > = { } ;
326+
327+ async function pub ( subj : string , data : string ) {
328+ return await js . publish ( subj , data )
329+ . then ( ( pa ) => {
330+ r [ subj ] = pa . seq ;
331+ const chunks = subj . split ( "." ) ;
332+ chunks [ 2 ] = "*" ;
333+ r [ chunks . join ( "." ) ] = pa . seq ;
334+ } ) ;
335+ }
336+
337+ await Promise . all ( [
338+ pub ( "a.1.foo" , "1:1" ) ,
339+ pub ( "a.1.bar" , "1:2" ) ,
340+ pub ( "a.2.foo" , "2:1" ) ,
341+ pub ( "a.3.bar" , "3:1" ) ,
342+ pub ( "a.1.baz" , "1:3" ) ,
343+ pub ( "a.1.bar" , "1:4" ) ,
344+ pub ( "a.2.baz" , "2:2" ) ,
345+ ] ) . then ( ( ) => {
346+ console . table ( r ) ;
347+ } ) ;
348+
349+ async function pc ( subj : string , filter : string , seq : number , ok : boolean ) {
350+ await js . publish ( subj , "data" , {
351+ expect : { lastSubjectSequence : seq , lastSubjectSequenceSubject : filter } ,
352+ } ) . then ( ( _ ) => {
353+ if ( ! ok ) {
354+ fail ( "should have not succeeded" ) ;
355+ }
356+ } )
357+ . catch ( ( err ) => {
358+ if ( ok ) {
359+ fail ( err ) ;
360+ }
361+ } ) ;
362+ }
363+
364+ // ┌─────────┬────────┐
365+ // │ (idx) │ Values │
366+ // ├─────────┼────────┤
367+ // │ a.1.foo │ 1 │
368+ // │ a.1.* │ 6 │
369+ // │ a.1.bar │ 6 │
370+ // │ a.2.foo │ 3 │
371+ // │ a.2.* │ 7 │
372+ // │ a.3.bar │ 4 │
373+ // │ a.3.* │ 4 │
374+ // │ a.1.baz │ 5 │
375+ // │ a.2.baz │ 7 │
376+ // └─────────┴────────┘
377+ await pc ( "a.1.foo" , "a.1.*" , 0 , false ) ;
378+ await pc ( "a.1.bar" , "a.1.*" , 0 , false ) ;
379+ await pc ( "a.1.xxx" , "a.1.*" , 0 , false ) ;
380+ await pc ( "a.1.foo" , "a.1.*" , 1 , false ) ;
381+ await pc ( "a.1.bar" , "a.1.*" , 1 , false ) ;
382+ await pc ( "a.1.xxx" , "a.1.*" , 1 , false ) ;
383+ await pc ( "a.2.foo" , "a.2.*" , 1 , false ) ;
384+ await pc ( "a.2.bar" , "a.2.*" , 1 , false ) ;
385+ await pc ( "a.2.xxx" , "a.2.*" , 1 , false ) ;
386+ await pc ( "a.1.bar" , "a.1.*" , 3 , false ) ;
387+ await pc ( "a.1.bar" , "a.1.*" , 4 , false ) ;
388+ await pc ( "a.1.bar" , "a.1.*" , 5 , false ) ;
389+ // this inserts seq 8 because a.1.* is at seq 6 (a.2.baz is at 7)
390+ await pc ( "a.1.bar" , "a.1.*" , 6 , true ) ;
391+ // ┌─────────┬────────┐
392+ // │ (idx) │ Values │
393+ // ├─────────┼────────┤
394+ // │ a.1.foo │ 1 │
395+ // │ a.1.* │ 8 │
396+ // │ a.1.bar │ 8 │
397+ // │ a.2.foo │ 3 │
398+ // │ a.2.* │ 7 │
399+ // │ a.3.bar │ 4 │
400+ // │ a.3.* │ 4 │
401+ // │ a.1.baz │ 5 │
402+ // │ a.2.baz │ 7 │
403+ // └─────────┴────────┘
404+ await pc ( "a.1.baz" , "a.1.*" , 2 , false ) ;
405+ await pc ( "a.1.bar" , "a.1.*" , 7 , false ) ;
406+
407+ // this inserts seq 9, because a.1.* is at 8
408+ await pc ( "a.1.xxx" , "a.1.*" , 8 , true ) ;
409+ // ┌─────────┬────────┐
410+ // │ (idx) │ Values │
411+ // ├─────────┼────────┤
412+ // │ a.1.foo │ 1 │
413+ // │ a.1.* │ 9 │
414+ // │ a.1.bar │ 8 │
415+ // │ a.2.foo │ 3 │
416+ // │ a.2.* │ 7 │
417+ // │ a.3.bar │ 4 │
418+ // │ a.3.* │ 4 │
419+ // │ a.1.baz │ 5 │
420+ // │ a.2.baz │ 7 │
421+ // │ a.1.xxx │ 9 │
422+ // └─────────┴────────┘
423+ // and so forth...
424+ await pc ( "a.2.foo" , "a.2.*" , 2 , false ) ;
425+ await pc ( "a.2.foo" , "a.2.*" , 7 , true ) ;
426+ await pc ( "a.xxx" , "a.*" , 0 , true ) ;
427+ await pc ( "a.xxx" , "a.*.*" , 0 , false ) ;
428+ await pc ( "a.3.xxx" , "a.3.*" , 4 , true ) ;
429+ await pc ( "a.3.xyz" , "a.3.*" , 12 , true ) ;
430+
431+ await cleanup ( ns , nc ) ;
432+ } ) ;
433+
312434Deno . test ( "jetstream - ephemeral options" , async ( ) => {
313435 const { ns, nc } = await setup ( jetstreamServerConf ( { } ) ) ;
314436 const { stream } = await initStream ( nc ) ;
0 commit comments