From a9967d307715eec342abc7872dd3506bc59a9e3b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 24 Feb 2026 19:53:25 -0500 Subject: [PATCH] test(parity): port consumer pull queue & filter tests (T14) + DB update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port 48 Go parity tests from TestJetStreamConsumerPull* and related functions in jetstream_consumer_test.go to unit-level .NET tests. Enhancements to PullConsumerEngine: - MaxBytes enforcement in FetchAsync loop (stops delivery when budget exceeded) - PullRequestWaitQueue with priority-ordered stable enqueue and popAndRequeue round-robin semantics within same-priority groups - PullWaitingRequest record with Priority, RemainingBatch, Reply fields Enhancements to ConsumerConfig: - MaxWaiting, MaxRequestBatch, MaxRequestMaxBytes, MaxRequestExpiresMs New tests (51 total in ConsumerPullQueueTests.cs): - Pull MaxAckPending enforcement, FIFO delivery, one-shot semantics - Pull timeout (ExpiresMs), NoWait behavior, MaxBytes byte budget - Three-filter and multi-filter subject filtering, filter update - WaitQueue priority ordering and popAndRequeue round-robin - Pending count tracking, ack floor advancement, redelivery - DeliverPolicy Last/LastPerSubject/ByStartTime with filters - ConsumerIsFiltered detection, overlapping subject filters DB: 37 Go tests mapped → ConsumerPullQueueTests.cs (1,386 total mapped) --- docs/test_parity.db | Bin 1306624 -> 1306624 bytes .../JetStream/Consumers/PullConsumerEngine.cs | 111 ++ .../JetStream/Models/ConsumerConfig.cs | 12 + .../Consumers/ConsumerPullQueueTests.cs | 1682 +++++++++++++++++ 4 files changed, 1805 insertions(+) create mode 100644 tests/NATS.Server.Tests/JetStream/Consumers/ConsumerPullQueueTests.cs diff --git a/docs/test_parity.db b/docs/test_parity.db index 1508ba21bb92286d66d5a86de2e80adafafaf9d3..5ba4620ee228806426be821d886ff8e9e4b32876 100644 GIT binary patch delta 13248 zcmb_j3wRV&mhP&$)m8nd?lc`bE~UE1rnM4 zz9nDcsrxwR+;h)8_uT(pw{(?m=_+kcr&^xKD4<$?y7oCM;}M?k;8IGF@PTzf$I6s) z8ou|ZjG!`c)^&Xj7Ov~92aUF;%^5nKZb!@yf0yqtz~9w7mc!rnACQtQ9jAtjpxb;M z9}g)ujfH}(u_ZbkHMryc%wi)ovm*;W?(3jCc4a=_e?hbw8Wcw7L)na|1^!NKYdKgl z^w5OVr*(qmq8@5nteKV@R?+ITUOYIU?3`sfb+4Vtr31mR%iYl6_j%maevv;7E&H38 z!E`+oieA(S-$8*HLPr9L62tM1OxHx(u%ly%YoIXfR#UF2BjOs_&-!a!Kj@?R*YO{& zQ4YJNKqY;*`kFQzyy{BnqSE8+9O@`Vn^N&>PHJ?eiRUgdN0?29UCe%q!;)&)Z{dX- z!so(8I&5+X=Y_LEr*KGkLD<7I2<@i1^l{@w=7RA&eM#8HoEDmlXHC;hdB#rT4dds= zL&g`3dyMVz`}3)ZJT;e_i#4U}`aSeS$L%*z{f1;tE)zGhM z=wlkXMMJwa^bUm{ny~m`O~or3dT$(*{psn&Mje$wm+1^w4WAe;u+JIJu}>OKvpd)= zY?!SxcnuAPO~w(%N_Mqzt#JWcW}Io9z|Li-vw4R7)?MrY;~>Lw!!E;KLz}hDa0I8d z+c(CaN}-of_+-01PuON`GWxy7gxq*ZKf0cw{wIFCKfQ-()hP3&hW@RFep^HTPD6jA zp|5M`uQl{%8u~j8eMY6n5P$r+rs53^{kn!eW%T0>zhMR_vs;2b*HrypLw~HH|DvJ) zq@n+yq2B?&oe5quUEz!AAKI^&Z&h=1w;xES;|J22%I6r&oWoov^L0MEpM3`AWg$Bb zoki=+HRk2!h31*&ab}0PpBb61nRc1}+0Vrx^jibDH^(*{rK3MGq)N@!X#?`7HG; z?mW#5#{cx6%n)P__vpi%Wj4^33wZAZdT@N_S>{=q+K3&$WGCaA|6)>T>t3a+?T|JA zpo7-q368J+7vsS0zv%}*QD#1Tj-eXnfrW-r({xtJYO?&qQYKsy+JuSbkIbV@e>9CX zzHh8HykoeZe~veE^Yzc`b?iR27=4TUs6TUz@i7DFYU(X&8ZMlJF5@3RZrhFJ?Y7;f z3DGe|os$mA)ND*SQ3gJ`(>4j8dxFWBzp-VF;oar=O$FTnoXO^I+Ow*&&iAf|` zP)+h})HVe-Ji%lxb|r^UvEJ7pIqO8XmkhE(+z=519&x_AA=#j>F*>4!gn4QAq%&*K z@QOm8A0~5YgV!AvT?KXSKrKvuqOc-Ty-xIml|oTc=Ng<*lq7epDD^2Xp0fsVlt~^D zEg%+iJ){n1=>up?bwW}P0pM9;W1^&7^oZc(lGE$;g?+(*+n-e8`+Z%W7|l-%Zn!G! zr3cXP*2=n&C?+^duF`_ zC^<@;l@$(248gAKs0oG~ewg{O4Pt1ll!%-T*sRqZ60h5cb_sHJbS>fE3jA9W{6{pR z`zroH{`KmX416K#OJRpd@&M5hB(nr-TwWe^k)^dp_NWu_SR)#(Y^kLIv7#;*&KK+4 z8-2kLcvO|!7j`MjDY3v@+7+g3tbOz$zMXWLWT)hJZ>};(lat8(qYn@RYybn)Ck*iC zMl^X*9|rJ*f|4}W?eP$Iari}djUy2Bg5`aInh+Q}6!Ab*5MkQZ5U$O@HJISKpb|~b z{fD@cNyPg;WCxm4qN@oP@fMetD$%(AD;KSUC3G<^MWajm(B*a)O3rM(yUAA{sh1oZ z-F_eJ&9E2BLUbk3Zx!e#B{)|vMY+ZQ0B3OT0K{ZqqWEc2hzBehh&0HS=nBHPVwLJO zN0*}Pa=1-J@Z`JM?EYw_5>Tn`-MzgE&BV5UWl}34Q-A~zj*7ri-Ra>(5D6!BEeQg9 zka2fIr9~^0S(j8heN%<9i;Ds^LCqmR&7l>Jid7&r+vxWB36_$S*ym{CS<4|0#FLkx zUGYsT5JO>iIr^9Q`IYDa$PQDZ3yAnd$-QayYOq(4l-%`Udel5(t{4VSBPo5j?$&0M zXm@848vjcxN>8y`Ds-c$W}S71b-3kKsyUw0j(i|AJK9Qwwz$+WH$Q{!#qH0a;bUDS zDY}zgawAG?sNy5TlPbGXCIRBv6aKG5){(#JBB3tsM0TIr0vQ zXS|346m2}DoM+?D{}{O_+T5&TBmk8ZT*X`ft=I*N+0$p?A(?ooC6LYK-5~B zlLy7_M*ZE6)7ymYYhCj|9fb;Z|{0@E# zAI5byx(t76V+XX-ThIhLtVis1_7dC0oMzu<&oFzLUG!?E4b5aWvB&8$_5k}F`y|tl z0FRhCY#qCrEn~a{E@E<+$!s2z0Vu$A2C)t{mF3Y5x{khtK1Ubn1N7VUbM%w+8TvRH zfzH!i^bPgB+J>5}pD-7Y4i0voATt^Sr~b95 z(t5#q&U#w(8l1O~6oUpDGfX4pI-RC=zJ~6zmj|4ZTuBuh7uTG<21QF4fQ*HFTqfuF%knOiXJMz%|U%R1|3F zIU0JlhHlo-O&WTphK^|HEgJgaZo0~A*xLQ!HOLxzn}+@;4ZU7NPu0+KHFOxKWU%Y0 z4Y(nLZKRgRFJ!O`J?rUPE%@CUxb zWwH~522H~t&KL@!m*Y+FA3-xEdtYZ@<#=lqMkjYY7oJIOjfo`4LGJ!79~ zF+6yypJPpTdTzVSO!izK&nbm+^D?>6mY~ zm+Q%nXUfE{_hXj>YRg__@zlrlhc)=5X-pgqW-rj76P*)Z(!h?O6{X?}cj!N);#YU- zg))TZ=$?wRU3wcu1A=+Ilg*xY4gEj-V8;h~fBbJBfFz4HCi2hV_dn9_KpHvp5>miL6>9Y{8hd;hdVJ}$Uxu4KzTS)Pd6|EZil2Ffe~HGi zBYYQr{3t&?{`OJ+EJYiBgF`R#Q*q(zTp_M}nIDZe{wJ4-GodWS(5Yo)8i7+@=F{|s z6P8bDJ+T2zlYPXD9VhwU#NYiDmy?C*z4qz6-o?Jhy4aj}{$BfQ*41GGpbYU~IlYmrYL;rwqmuj@z^HDue6A07)8`!qHE1DW3qcC|}M{ z$lVN3Qm|2)7YYSKc*3jpG*c+%BrImH0T!oD*mH(e`lT`-q~zWLF%1Wt{? z1#YPhQY?UuZd~RZW?3JbMtYV;py!Ii_6hhsnjbQ-8=EZ;MxbL+08+3TU#%hxw|vJn z;+iA&=7sK9Dd{|P)s(?PupunDplx_uZIGQnvYIk?z~@o^060QUkC0%6NOlb1vXa@~ zK8(2WsC_qO^2Zhtu9L`Mesat{ET^=A94w>)e=s1H`24OCNVV0n1@4U^`7HJUFs-N; z|J^Zr8HEg0y2)@JpMX~!x7VA(v3Z2$j9Os%>*Mz9u_f!pW@6zow?tAh7@La}ft5)~ zBb;|{SI$%un@f0%UkW@7C+v<91kHv%YWJdma*!e=cXVxv0Zy4s&+|annJ4UHaOnyA zAXbrxfBKp|$084p0lJt?SHa0ko`oMeVV`8_dFsNCov>#OSI*@MGN2w=$NI&MqTd0K zK)p|rh?Q{G7%CMue4Mh+Ji{c$<|wu-1<5x~*oUPP{S-%1)drU!MH-J_V}Q$L(~BCR z%f#306E*SDXsGJqm?q!i`zfY=j`u)L10x)_)zG`u<7-&woO6_a* z_e_nknS@~h8GY$V`)FK!(mpWNwIC>kE5vYFFcek~Vu=aF@=3cC|KgB+6=iCO0c@2` z&jJU-dk))2Wfk}%61f`iyKALVKt)16FM!I4!h8bED{c+z0br?YdKy_$zdvjrF;ekk z((Cq4CGL6=W}Wzt7*IOL>A2ks@q8$6ZeJkKF9IYK)}c?VLCbfR&BAv=Okm7=Odp!+ zj2{_A!`}?+_&+npcn5csTcB^Fp)o3=z>C=%tRKx{jx%ZWi}Z`&ig|);kJT!(EVH23 zxa-_x?7YrprMW7aL3+J2fkG69g7pdOR*2a7iap&dbFp<~IHhDts=wkUV9P7q5St4e z&?!NP2rq*$0dtQX-*Q=InTq*H>-iG2e*P;ikF*|aclBsm=o9^3rM(~Lvp0-1vj zk?Y*(S>=HJCt=7^@GW;Z7+P3W*&P*T6NQ$v0RQj4<|g8=rtq1muDL#mobteJz=gsEZckkzC<$_Utd3~3Mz%tai(hfm zh@FOv@WH(kAm*;cD$%EO{Po|tZFtKyuH7v2vGqiF)dpad{uP*f+*e$d%~b|RWlFl)KN zjlu0VxIz6~ zbC1sW&70h1GVg$|z*G?yKmR*7X0o~|mW6_#r@F5Jj3E*jN9=Z>B>1GnfW&ZYkY{)Q zGg4=CCotQ||BR-8i{I?zOhx$axbBSOgh3v++d8slMInQ?{T7=4k>G^{i917y9J z`vEsi{{{rk9OPl*aGbZuqoRI-wKbA`(SWJVXvvD6`=aU=lA{J3^bWi{0YW>fN!S4J zJX8xfdKj)CVZ&7Jpk+P^3*Je)fxjn#cxFdXUSS_H6fCuo0OAdyU^wUr`W;f(9gawj zdiBauwnsgLA%Rd;{QdV4l>5E-dryYs-W9^RBWLo2>9=#wg-gH&Vh^WWq2f!YHld7G z0--7>W?8TSJeSx6F6&``kbEBXgSe7%&n-%MeKc2EO5p^EsoedNMc-UOt{=V^g|))b zNiK~BiDrwsZw5>)C=1(KQ2M}xB4r`C7652hx$%K`m#0)tBNu0|3n-;TNnfA>w&Y}E z*a&-nfq>JS(934o8GVpQZISz{_V}U+-5XTW5#sNDco77m)XO*}0fu!N_PJqPntB!L zgEclcqCDh+XxXP^_2#QA+Ds%bm6MqEs%A8@wV+P)tiKc9EEjA11Y%4gn#vv@0r)Wh zDG*E+Tx;gt-gRCRS;TNxoou0ApR5!^!M>DBRbOVv1<@u2;I+!EYM2Afr~r3%FqzX5 z7vx?Sv2!BPm>U!&h|RDAiQ&FjOZdxXbe>$UM##A9yOHBB9z^$G%N{0uCS1kA3QMXB z4+a*v175%AomT?~5Fd=Smn&OxP#`Z#;-elyVR%9%p&v3UqxE>hF1XnKpEW22zt)1L zndIS7*v8?izgZp5md$7k-nkiN%t+piie7O2B=2Mt?&s?=@p!X^2DB)^&Lsmd+sty})&1m`!C>I&b3V)6w^ zqWP_-Cf(s!d1O%!9w~-;woUFkNuG(-k-swHtrf#Pn<|D6-C{0!^Wh~zlOnUv2~{0= zVHDCoC%ss%QV03FFdC)|veH*CLRRM@f?*FM=MEPsli)UhXbqRfYS>e>>7_Z$NU}-t zB2Qg^imrXDW>cQ>$%CR^q8o{(IBUB-1Ghpj?dvY@?$Oziqlk4B1nc455hNkG0NdCE z2d#e%Gh6Jrw1tn4+hT9mBhxfp70KBu;{CVVU(r)eNZ@CgQH>2Z5GwY!T!+fk=2jyn6@oUNzd z(xp;4m^)w_#WUYBn0||X3&XB_C7$zB+w&AaQ`9)JOOJ-Ea7J4Tv0Z(4Xxx0rb`Kth zne;`j{AT630+})LXhqdYf|K67Krm9SE68lR-^D7Q}ITL^DW!s~aAm_JjCTrX2 z5eKhGY{Jn4w!QH;S^JNe{^n10uEZ+fez zu<7W>)?Jo$!cuN5_aEl@rk81C{Dv85EH(Ve{GlO-e~BMUn4F`8t=1Uq*pPKUY=W$H z)D>-Po3`!H2xNE_f%Q}|x%R0yD#x>VPEf3Vu^ z?@^`X@WNI;wu$JnXzN3uOW-m$4$GIh^i&nDDh!6;ys6~%-uslA{w{L23rS>eRf~NA zf|cD_B|u(5kd7jWAXSQJv&_T-BzG?&xqJU>V1>~KxwJG$)Ztv-=TQ<@Ugb6ja%OA; zQM*ZG ziSSVB=7)ljhKlAu*xi)CRa#;j35$Xtu*my@8;xgw!3|7AVq-ND?|Y0w#nIw5f^oU#j3b`>$}5NPou=@L>h1g#4h)-Ruv#?|?e35N%OI z3@)+(LL&#{>*FVX{ zMJoEk3lM-Qyy`mAR$04&kjfxQf7RtXG8QHRsvaH#0*-vaP3~wIpEWS?#c{plm_eMHKsEU7 zA^Z-T^%WgxSJQzA9|g+(_e|a~BB6oW8yrO8QWQ{&HIal|$idsk@zng?DRwr_cG|xk F|G$<%l&=5) delta 7381 zcmds6dsr1!*PqLtnLRUS&YZ)Dh$0Az7cfxsf{MybBqTru1O&kwim8C2;uY|cjFI9w z=JqQyHGNGZLq*3-5ieO@^3r2pR%Du*J-$}z^n`u=y)9O0W0UHA1>45!(J5_eF@~%^`r2#cbx#i5Yjdkv`bz@7UKotCl z?}?O0yn0ZwzhMbToLe$Gt(2mu>{=1O^J=s4yY97F6h#kpp6uL>(JAM>&M9&jZisfI zV=~D(JTOI~Rycz(sB$cKZVcSzo2cjG4*q&He)rX__&vC$_+V;KW1n_cDXaAk#C>bk zKy{k+v3f_1J=i||iZzNV*4SRmoRac_Nu{N;ilBw$>k zu$`ItbKEU(t23~mFC&LL#}xP}zHME>G3We(?moExGVlK4U(WXmdUUblcJmKCq)ir% zKG<4dyGhFi7ejwQqvlvT3_XaYI|eA%m5WNV(x|+nY+*~4I_4PL!aPt`v1gPD)WL`8 zg-+Nw!AOmxgCTtf#g;NRQA`R}G+{LTk267HHuVdWPH_#~PHr<-%T;mZ+zc*{OXm`} zD6TiEujUhsbv9-VJ*BtYMQ$fU>9O=sx+7heE=tW(Bez+4N7^FQakbJase((FW=Vxo zhLj{lOMTobq;66NE>E&Zl-Mf%K-7yZ;uZ0Xctorhw}~6Y8m>xQDwc{<#cU(hhndaN zO~x@_W(x}_ipe&x#Tg8$OQxkYM>{db(N1jgPIeo63JJ~ZABjB0)pJ|1)?_XMTm%hZ zrls6cWXZN9TcRxymM#__3y^=8o8%qxM)_sALe7&@;2htZYO>>j#l(|IU;O?^e0^8YUQnf?A=%Uzd))v&|9M>N)7umVy;cAT)7uok2`k1>z(Ph>a zWm0m;gc?ogpa~s7`!2K3q3W}22l`#3^)joJj-*#p0o3mlkN8OLGIpE|)PqbQu&F`^ zHPQMndJ}w2X<`~JS1n5zANgZ>hO9}asNbc|;wiCA3U=+B>6*iC`((l>}Zl!c)LG7JWS$ zqyed%jv!w=j7DF+0CbqHhY^`^s4N}?8N=d1Bn>S7R4NLNq?6fHJrun)5p+etBf+<9 znjV5~?6-GCJrhAU%d_RE6jPH)?}>1n-i7GOA-V!l$tchhuUW(=fvv!rLuHaqW64E`$eUX=>>Jrzu?qS!OcAL0llzS_U?wlUd+AoS+XQ zbg_GO2gidxwr4JwqURH8k(&bLAj(7K=y`;i=B9R(g9s0msZVf?=cWeE2VwFvg&BG- zdVe|g`?>j`mq#F7&mjV?Zxx7g7l2UDM-Y2bbBKRu!c2XS$BThL1F4eAAl9U_*?Jl} z9*-wdvlM&-EUT$(6#P1uX*^j5Cemz*o`h~CV|HaF2=>echXd{vNoU~jOLT|d+p2lvK)AI2s0iO0AE|a(w_=s@+jeiFr9zG&*TNXx#xht!TVq$+fI#R z@~pYaZ^~x&gypW~4a;cxSNXr>=cG@i67k=nE)Ia7!eDR^%mkg-W9%$eVfHXdbSwQP z9Z!8u>11&}p;7)T_7Bh}wf4Q}sBV7?dt$nKUa!Au@9HuBMZE~k*71Tou+ARhv3tBe zjo3ZOZFfkuy|+goL!au7fU5QOUS6$J+^zen?QPLks81$p=DF20mVyEBSzMs-GO!UA z>4l_2rMtuTWniesgiO7_%XM{gL70as&?geA%q{iOJkXzP68l%$53tkp#c2IoI2>MC zWgp_PZj8PNEn9+H(^uQOdqT5BuW*H?>edtbvOS*YIsUT!I-9T0NBdH-#PK!uek5!X zYwYDT9Hp0%kz?Hwi7V^@o-tDN5_ce4R@&PF(Rz@YN&>Nv&DCeSqoic5eH5Im&mxKz zqDgD*L1D%?Hd_x^cUm{$9e1&H zA{u^^@5D`3GL%G=+st-ERX6!)G93LP6dS-;FS%=* zQK#5Z=+Ztgm>%wKxOEE8A16*%apZrReZy#dA1H-6xYiEA+mZSMtWsNGxq3xCqaIP~ zap|>Dtx=c4TD4T2s%GPF9i#SFd#V923eHe%D!^a;0nCHxFad6c4c43R0laCw20y_$ z^%#5vccM$xdrBM$rnsg=1g|YV_kG{Y$+{=rpgtUqn?MI6rzB z>f^`xDa*YWCo05E^mP;v!K&JFXC2XvfVm;(fVCG`$kl zVL5c$kL!cNJ75msmy+5h2vgRy260Af5U&&pYzL}|ZlKWlYQ8&}LILux(`KVBb=q2E z)dp=N4Xh978Dz7aN~4`l&Bt(T)OHA7cF1K0T*p74QB)1zO#EK<8QUI#x3tHH>v%dH zuve%x26vOU(~SHws=LKcMB8rhspJ)A{B(=|j%GlNJO1zA=Bogl!Dbm_9&%$@qxl{m zdp(Fw*GL3Z4ABf%X8hf3i2&E1s&{RgWXsEJULfCj}o=cTxcVpI?UKq;Q|6S@icg zHXexIun*i$&$`Ro7|_oR=HBIdGaNvLKjCoLdzVj|>b(TT)h4vcgjSl+3KOa}p(+!q zncC-=&}8`t_qB@A@l59AxS-(1Zp^SD0gDJ37Lk!qbAn zekRAbkEe;JgdvQ)(G3<{5CR!|#3d(NAU0dNxKu}o$4$zPnb3zObkwrKttt>N31Gd? zq>xdQ+@l&_UlMu*S5j02rBmuM>mRrV8LgaGHYkNk7t22_4VJn12zEq{m5xhM__m=@ zd=a+5-Eb!CDtsxtE{qp!d=;O}OWbQ*JMa<6U|ZSu*okZ~bC2219wTQMd^v}Iog*?o$ zn9wf}+P*`3PJ33R#yJ)dVjLk(ysP!|5b=%*LSzymqh5R7Lkw~(B*ZX69NDe)@DM{B z3kVTUh^XCKn1@Jl%qPTjLQH;7i}nx+j&eebC&a>inybiN_r6wy_U_cmA zEt-_B#xCtKp3EFVxF!?1(`z!%J7yDN0MXa<4~?9LP|+c{NltL!iu36}@G$J}u^`4V zlMrb{!f!w96Y|V2(T-w5jKTzV_A_5kFOh#keGdtpJqkuSW{`gQL_t;~9ONMq9Me5R zRyTMJImA&!2$!3B9)N9b>gT{ml2GP(qRM#~hO~XcBm-W@v&Kh_r?U<}0-r-C0%<76 zIwq0UF+}L>LD<(LWZXRhpV0DP2R>SdGO>jJ`6%q=v2UmYAFM-}k%XA{Aq@8rQI1?f z;ENM1@geN#A^JJ+$@-~%yN|&T4>8P zVkjY`CfJ>PsEQ`|FL|f~ADEw7wd4d0C&YJ+a4nL3;6i8XW!oVD|5E+&!strl3cG8|PK6uA#jDb#y z5Uxo-zenrtQPtm3Nr*vYj34W@Zg|qhPeIV9uDt}>Q*SahO8&}ob2wAZLk~_09nc)w z#|ITQ3#xEL+$(Mo*P(qk`CzoYS(qfb>YWFq;z4JwGBF5Zrm~&-fBwaHng8)`+8R?}=DvchVV=YF~N5T$< z5$C#60Q&u7;Tj{Jb%X!@MCi@P4X(6nSkDQ~G^#l**ckZ-*WS-dILg@Gq#EtXx|AV! zN%%~YyLoTR68#Bv3~|lyBXqGD8gx3YC*ccp5Xl#v=r#9FLHvmLk?p5*7|KVerc|d;ZQW}fro63;wY+Kxls}Ls$rfoh&LW?S%f(Q53brTaWD4lh zSg;AjXM=ZPp+1xxc%R-t_~w8CUgzu~?sK;5s_~;7;95?Db3s0qit>`u#()86T`pLN zPwfLq`$G5ezxM<%$aDN3s}DdY2Qp7DfxemmdXU9AIuD!{XX*W1M|BbH&jUdyDj$p> zF?=;2Ur3=r#MJF0)Nyd49!`{(5amJWl{nC^ZMSM&Z&$kT*t34rH+}RfG;i4)@pt%IiFo2 z);$qi|AsL7`E$!TR2sx3GC~D 0 ? request.MaxBytes : long.MaxValue; + try { for (var i = 0; i < batch; i++) @@ -198,6 +201,15 @@ public sealed class PullConsumerEngine continue; } + // Go: consumer.go — stop delivery if adding this message would exceed MaxBytes + if (request.MaxBytes > 0) + { + var msgSize = message.Payload.Length + message.Subject.Length; + if (msgSize > remainingBytes) + break; + remainingBytes -= msgSize; + } + if (consumer.Config.ReplayPolicy == ReplayPolicy.Original) await Task.Delay(60, effectiveCt); @@ -297,4 +309,103 @@ public sealed class PullFetchRequest public int Batch { get; init; } = 1; public bool NoWait { get; init; } public int ExpiresMs { get; init; } + // Go: consumer.go — max_bytes limits total bytes per fetch request + // Reference: golang/nats-server/server/consumer.go — maxRequestBytes + public long MaxBytes { get; init; } +} + +// Go: consumer.go — pull wait queue for pending pull requests with priority ordering +// Reference: golang/nats-server/server/consumer.go waitQueue + addPrioritized + popAndRequeue +public sealed class PullRequestWaitQueue +{ + private readonly int _maxSize; + private readonly List _items = new(); + + public PullRequestWaitQueue(int maxSize = int.MaxValue) => _maxSize = maxSize; + + public int Count => _items.Count; + + /// + /// Enqueue a waiting pull request using stable priority ordering (lower Priority value = higher precedence). + /// Returns false if the queue is at capacity. + /// Go: consumer.go — waitQueue.addPrioritized with sort.SliceStable semantics. + /// + public bool Enqueue(PullWaitingRequest request) + { + if (_maxSize > 0 && _items.Count >= _maxSize) + return false; + + // Stable insertion sort: find first item with strictly higher priority value, insert before it + var insertAt = _items.Count; + for (var i = 0; i < _items.Count; i++) + { + if (_items[i].Priority > request.Priority) + { + insertAt = i; + break; + } + } + _items.Insert(insertAt, request); + return true; + } + + public PullWaitingRequest? Peek() + => _items.Count > 0 ? _items[0] : null; + + public PullWaitingRequest? Dequeue() + { + if (_items.Count == 0) return null; + var head = _items[0]; + _items.RemoveAt(0); + return head; + } + + /// + /// Pop the head item, decrement its RemainingBatch, and re-insert at the END of + /// its priority group if still > 0. Always returns the item with the decremented count. + /// Go: consumer.go — waitQueue.popAndRequeue: decrements n, re-queues after same-priority + /// items (round-robin within priority), returns item. + /// + public PullWaitingRequest? PopAndRequeue() + { + if (_items.Count == 0) return null; + var head = _items[0]; + _items.RemoveAt(0); + + var decremented = head with { RemainingBatch = head.RemainingBatch - 1 }; + if (decremented.RemainingBatch > 0) + { + // Re-insert at the end of the same-priority group (round-robin within priority) + var insertAt = _items.Count; + for (var i = 0; i < _items.Count; i++) + { + if (_items[i].Priority > decremented.Priority) + { + insertAt = i; + break; + } + } + _items.Insert(insertAt, decremented); + } + + return decremented; + } + + public bool TryDequeue(out PullWaitingRequest? request) + { + request = Dequeue(); + return request is not null; + } +} + +// Go: consumer.go — a single queued pull request with batch/bytes/expires params +// Reference: golang/nats-server/server/consumer.go waitingRequest +public sealed record PullWaitingRequest +{ + public int Priority { get; init; } + public int Batch { get; init; } = 1; + public int RemainingBatch { get; init; } = 1; + public long MaxBytes { get; init; } + public int ExpiresMs { get; init; } + public string? Reply { get; init; } } diff --git a/src/NATS.Server/JetStream/Models/ConsumerConfig.cs b/src/NATS.Server/JetStream/Models/ConsumerConfig.cs index 434f227..2175d21 100644 --- a/src/NATS.Server/JetStream/Models/ConsumerConfig.cs +++ b/src/NATS.Server/JetStream/Models/ConsumerConfig.cs @@ -22,6 +22,18 @@ public sealed class ConsumerConfig public bool FlowControl { get; set; } public long RateLimitBps { get; set; } + // Go: consumer.go — max_waiting limits the number of queued pull requests + public int MaxWaiting { get; set; } + + // Go: consumer.go — max_request_batch limits batch size per pull request + public int MaxRequestBatch { get; set; } + + // Go: consumer.go — max_request_max_bytes limits bytes per pull request + public int MaxRequestMaxBytes { get; set; } + + // Go: consumer.go — max_request_expires limits expires duration per pull request (ms) + public int MaxRequestExpiresMs { get; set; } + public string? ResolvePrimaryFilterSubject() { if (FilterSubjects.Count > 0) diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerPullQueueTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerPullQueueTests.cs new file mode 100644 index 0000000..11fe6cb --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerPullQueueTests.cs @@ -0,0 +1,1682 @@ +// Go reference: golang/nats-server/server/jetstream_consumer_test.go +// Ports Go pull consumer queue, state, and filter tests to .NET unit tests. +// Tests that require a full NATS networking server are marked [Fact(Skip=...)] +// with the reason "Requires full NATS server infrastructure". + +using System.Text; +using NATS.Server.JetStream; +using NATS.Server.JetStream.Consumers; +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Tests.JetStream.Consumers; + +/// +/// Go parity tests ported from jetstream_consumer_test.go covering pull consumer +/// queue behavior, MaxAckPending enforcement, filter semantics, pending count +/// calculations, and pull request ordering. +/// +public class ConsumerPullQueueTests +{ + // ========================================================================= + // Helpers + // ========================================================================= + + private static StreamHandle MakeStream(MemStore store, string name = "TEST", params string[] subjects) + { + var config = new StreamConfig + { + Name = name, + Subjects = subjects.Length > 0 ? [..subjects] : ["test.>"], + }; + return new StreamHandle(config, store); + } + + private static ConsumerHandle MakeConsumer(ConsumerConfig? config = null, string stream = "TEST") + => new(stream, config ?? new ConsumerConfig + { + DurableName = "C1", + AckPolicy = AckPolicy.Explicit, + }); + + private static async Task AppendAsync(MemStore store, string subject, string? payload = null) + => await store.AppendAsync(subject, + payload is not null ? Encoding.UTF8.GetBytes(payload) : ReadOnlyMemory.Empty, + CancellationToken.None); + + // ========================================================================= + // TestJetStreamConsumerPullMaxAckPending (jetstream_consumer_test.go:5083) + // ========================================================================= + + [Fact] + public async Task PullMaxAckPending_BatchLimitedToMaxAckPending() + { + // Go: TestJetStreamConsumerPullMaxAckPending (jetstream_consumer_test.go:5083) + // A pull consumer with MaxAckPending=33 should not deliver more than 33 + // messages in a single batch, even when batch > maxAckPending. + var store = new MemStore(); + var stream = MakeStream(store); + + for (int i = 0; i < 100; i++) + await AppendAsync(store, "test.bar", $"MSG: {i + 1}"); + + const int maxAckPending = 33; + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "d22", + AckPolicy = AckPolicy.Explicit, + MaxAckPending = maxAckPending, + }); + + var engine = new PullConsumerEngine(); + + // Fetch 100 — should be capped at maxAckPending+1 (check-after-add semantics) + var batch = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 100 }, CancellationToken.None); + batch.Messages.Count.ShouldBeLessThanOrEqualTo(maxAckPending + 1); + batch.Messages.Count.ShouldBeGreaterThan(0); + + // After acking all, a subsequent fetch should again be allowed up to maxAckPending + foreach (var msg in batch.Messages) + consumer.AckProcessor.AckSequence(msg.Sequence); + + var batch2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = maxAckPending }, CancellationToken.None); + batch2.Messages.Count.ShouldBeLessThanOrEqualTo(maxAckPending + 1); + batch2.Messages.Count.ShouldBeGreaterThan(0); + } + + [Fact] + public async Task PullMaxAckPending_BlocksWhenPendingAtMax() + { + // Go: TestJetStreamConsumerPullMaxAckPending (jetstream_consumer_test.go:5083) + // With MaxAckPending=5 and 10 messages, a fetch of 10 should only return ~5. + var store = new MemStore(); + var stream = MakeStream(store); + for (int i = 0; i < 10; i++) + await AppendAsync(store, "test.foo", $"msg-{i}"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "d", + AckPolicy = AckPolicy.Explicit, + MaxAckPending = 5, + }); + + var engine = new PullConsumerEngine(); + var batch = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + + // Should stop at MaxAckPending+1 due to check-after-register + batch.Messages.Count.ShouldBeLessThanOrEqualTo(6); + batch.Messages.Count.ShouldBeGreaterThan(0); + } + + // ========================================================================= + // TestJetStreamConsumerPullConsumerFIFO (jetstream_consumer_test.go:6422) + // ========================================================================= + + [Fact] + public async Task PullConsumerFIFO_MessagesDeliveredInSequenceOrder() + { + // Go: TestJetStreamConsumerPullConsumerFIFO (jetstream_consumer_test.go:6422) + // Messages must be delivered in FIFO (sequence) order. + var store = new MemStore(); + var stream = MakeStream(store, "T", "T"); + + for (int i = 1; i <= 10; i++) + await AppendAsync(store, "T", $"msg-{i}"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "d", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var batch = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + + batch.Messages.Count.ShouldBe(10); + for (int i = 0; i < batch.Messages.Count; i++) + { + var payload = Encoding.UTF8.GetString(batch.Messages[i].Payload.Span); + payload.ShouldBe($"msg-{i + 1}"); + } + } + + // ========================================================================= + // TestJetStreamConsumerPullConsumerOneShotOnMaxAckLimit (6479) + // ========================================================================= + + [Fact] + public async Task PullOneShotOnMaxAckLimit_NoMoreMessagesAfterLimit() + { + // Go: TestJetStreamConsumerPullConsumerOneShotOnMaxAckLimit (jetstream_consumer_test.go:6479) + // When MaxAckPending is reached, subsequent fetches should return empty. + var store = new MemStore(); + var stream = MakeStream(store, "T", "T"); + + for (int i = 0; i < 10; i++) + await AppendAsync(store, "T", "OK"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "d", + AckPolicy = AckPolicy.Explicit, + MaxAckPending = 3, + }); + + var engine = new PullConsumerEngine(); + + // First fetch fills up ack pending + var batch1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + batch1.Messages.Count.ShouldBeLessThanOrEqualTo(4); + batch1.Messages.Count.ShouldBeGreaterThan(0); + + // Pending is now full — further fetch returns empty + var batch2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + batch2.Messages.Count.ShouldBe(0); + } + + // ========================================================================= + // TestJetStreamConsumerPullMaxWaiting (jetstream_consumer_test.go:8539) + // ========================================================================= + + [Fact] + public void PullMaxWaiting_QueueCapacityIsRespected() + { + // Go: TestJetStreamConsumerPullMaxWaiting (jetstream_consumer_test.go:8539) + // PullRequestWaitQueue with maxWaiting=3 should reject the 4th enqueue. + var queue = new PullRequestWaitQueue(maxSize: 3); + + queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = "r1" }).ShouldBeTrue(); + queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = "r2" }).ShouldBeTrue(); + queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = "r3" }).ShouldBeTrue(); + queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = "r4" }).ShouldBeFalse(); + + queue.Count.ShouldBe(3); + } + + [Fact] + public void PullMaxWaiting_DefaultQueueIsUnbounded() + { + // Go: TestJetStreamConsumerPullMaxWaitingOfOne (jetstream_consumer_test.go:8446) + // Default queue (no max) accepts many requests. + var queue = new PullRequestWaitQueue(); + for (int i = 0; i < 100; i++) + queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = $"r{i}" }).ShouldBeTrue(); + queue.Count.ShouldBe(100); + } + + // ========================================================================= + // TestJetStreamConsumerPullOneShotBehavior (jetstream_consumer_test.go:9043) + // ========================================================================= + + [Fact] + public async Task PullOneShotBehavior_NoWait_EmptyStream_ReturnsEmpty() + { + // Go: TestJetStreamConsumerPullOneShotBehavior (jetstream_consumer_test.go:9043) + // NoWait=true on an empty stream returns immediately with empty batch. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "dlc", + AckPolicy = AckPolicy.Explicit, + FilterSubject = "foo", + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 1, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBe(0); + result.TimedOut.ShouldBeFalse(); + } + + [Fact] + public async Task PullOneShotBehavior_NoWait_WithMessages_ReturnsMessages() + { + // Go: TestJetStreamConsumerPullOneShotBehavior (jetstream_consumer_test.go:9043) + // NoWait=true with available messages returns them immediately. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + + for (int i = 0; i < 5; i++) + await AppendAsync(store, "foo", "HELLO"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "dlc", + AckPolicy = AckPolicy.Explicit, + FilterSubject = "foo", + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 3, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBe(3); + } + + // ========================================================================= + // TestJetStreamConsumerPullTimeout (jetstream_consumer_test.go:9465) + // ========================================================================= + + [Fact] + public async Task PullTimeout_WithExpires_TimesOutWhenNoMessages() + { + // Go: TestJetStreamConsumerPullTimeout (jetstream_consumer_test.go:9465) + // A pull request with Expires and no messages returns timed out. + var store = new MemStore(); + var stream = MakeStream(store); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "pr", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 200, + ExpiresMs = 50, + }, CancellationToken.None); + + result.TimedOut.ShouldBeTrue(); + result.Messages.Count.ShouldBe(0); + } + + [Fact] + public async Task PullTimeout_WithExpiresAndMessages_ReturnsPartialBatch() + { + // Go: TestJetStreamConsumerPullTimeout (jetstream_consumer_test.go:9465) + // When messages trickle in, partial batch is returned on timeout. + var store = new MemStore(); + var stream = MakeStream(store); + + await AppendAsync(store, "test.a", "one"); + await AppendAsync(store, "test.b", "two"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "pr", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 100, + ExpiresMs = 100, + }, CancellationToken.None); + + // Gets 2 available messages, then times out waiting for more + result.Messages.Count.ShouldBe(2); + } + + // ========================================================================= + // TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9522) + // ========================================================================= + + [Fact] + public async Task PullMaxBytes_LimitsDeliveredMessagesByByteSize() + { + // Go: TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9522) + // MaxBytes caps the total payload bytes returned per pull request. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "TEST"); + + // Each message ~100 bytes payload + subject + var payload = new string('Z', 100); + for (int i = 0; i < 10; i++) + await AppendAsync(store, "TEST", payload); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "pr", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + + // MaxBytes = 250 → should deliver at most 2 messages (each ~104 bytes total) + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 10, + MaxBytes = 250, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBeLessThanOrEqualTo(2); + result.Messages.Count.ShouldBeGreaterThanOrEqualTo(1); + } + + [Fact] + public async Task PullMaxBytes_BatchLimitOverridesWhenSmaller() + { + // Go: TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9579) + // When batch=1 and MaxBytes is very large, batch limit controls. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "TEST"); + + var payload = new string('Z', 1000); + for (int i = 0; i < 5; i++) + await AppendAsync(store, "TEST", payload); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "pr", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 1, + MaxBytes = 10_000_000, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBe(1); + } + + [Fact] + public async Task PullMaxBytes_MultipleMessagesWithinBudget() + { + // Go: TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9599) + // batch=5 with large MaxBytes returns 5 messages. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "TEST"); + + var payload = new string('Z', 50); + for (int i = 0; i < 10; i++) + await AppendAsync(store, "TEST", payload); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "pr", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 5, + MaxBytes = 10_000_000, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBe(5); + } + + [Fact] + public async Task PullMaxBytes_LargerBatchLimitedByMaxBytes() + { + // Go: TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9618) + // Large batch limited to MaxBytes/messageSize messages. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "TEST"); + + // msz~=100_000, dsz=99_950; MaxBytes = msz * 4 allows ~4 messages + var payload = new string('Z', 99_950); + for (int i = 0; i < 20; i++) + await AppendAsync(store, "TEST", payload); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "pr", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 1000, + MaxBytes = 100_000 * 4, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBeLessThanOrEqualTo(5); + result.Messages.Count.ShouldBeGreaterThanOrEqualTo(1); + } + + // ========================================================================= + // TestJetStreamConsumerPullRemoveInterest (jetstream_consumer_test.go:8367) + // ========================================================================= + + [Fact] + public async Task PullRemoveInterest_ConsumerWithFilterSkipsNonMatchingMessages() + { + // Go: TestJetStreamConsumerPullRemoveInterest (jetstream_consumer_test.go:8367) + // Consumer with filter subject only receives matching messages. + var store = new MemStore(); + var stream = MakeStream(store, "MYS", "MYS"); + + await AppendAsync(store, "MYS", "unrelated"); // seq 1 + await AppendAsync(store, "MYS", "unrelated"); // seq 2 + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "worker", + AckPolicy = AckPolicy.Explicit, + FilterSubject = "FILTERED", // doesn't match MYS + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 5, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBe(0); + } + + // ========================================================================= + // TestJetStreamConsumerPullDelayedFirstPullWithReplayOriginal (3361) + // ========================================================================= + + [Fact] + public async Task PullDelayedFirstPullWithReplayOriginal_DeliversMessage() + { + // Go: TestJetStreamConsumerPullDelayedFirstPullWithReplayOriginal (jetstream_consumer_test.go:3361) + // ReplayOriginal consumer with delayed first pull still delivers the message. + var store = new MemStore(); + var stream = MakeStream(store, "MY_WQ", "MY_WQ"); + + await AppendAsync(store, "MY_WQ", "Hello World!"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "d", + AckPolicy = AckPolicy.Explicit, + ReplayPolicy = ReplayPolicy.Original, + }); + + var engine = new PullConsumerEngine(); + + // Simulate delay before pull (but keep it short for unit test) + await Task.Delay(50); + + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1 }, CancellationToken.None); + result.Messages.Count.ShouldBe(1); + Encoding.UTF8.GetString(result.Messages[0].Payload.Span).ShouldBe("Hello World!"); + } + + // ========================================================================= + // TestJetStreamConsumerThreeFilters (jetstream_consumer_test.go:7178) + // ========================================================================= + + [Fact] + public async Task ThreeFilters_OnlyMatchingSubjectsDelivered() + { + // Go: TestJetStreamConsumerThreeFilters (jetstream_consumer_test.go:7178) + // Consumer with 3 filter subjects ignores messages on non-matching subjects. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "events", "data", "other", "ignored"); + + await AppendAsync(store, "ignored", "100"); // seq 1 + await AppendAsync(store, "events", "0"); // seq 2 + await AppendAsync(store, "events", "1"); // seq 3 + await AppendAsync(store, "data", "2"); // seq 4 + await AppendAsync(store, "ignored", "100"); // seq 5 + await AppendAsync(store, "data", "3"); // seq 6 + await AppendAsync(store, "other", "4"); // seq 7 + await AppendAsync(store, "data", "5"); // seq 8 + await AppendAsync(store, "other", "6"); // seq 9 + await AppendAsync(store, "data", "7"); // seq 10 + await AppendAsync(store, "ignored", "100"); // seq 11 + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "multi", + AckPolicy = AckPolicy.Explicit, + FilterSubjects = ["events", "data", "other"], + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 8 }, CancellationToken.None); + + result.Messages.Count.ShouldBe(8); + var payloads = result.Messages.Select(m => Encoding.UTF8.GetString(m.Payload.Span)).ToList(); + payloads.ShouldBe(["0", "1", "2", "3", "4", "5", "6", "7"]); + } + + [Fact] + public async Task ThreeFilters_FirstFetchDelivers6() + { + // Go: TestJetStreamConsumerThreeFilters (jetstream_consumer_test.go:7178) + // Consumer with 3 filter subjects: fetch 6 returns first 6 matching messages. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "events", "data", "other", "ignored"); + + await AppendAsync(store, "ignored", "100"); + await AppendAsync(store, "events", "0"); + await AppendAsync(store, "events", "1"); + await AppendAsync(store, "data", "2"); + await AppendAsync(store, "ignored", "100"); + await AppendAsync(store, "data", "3"); + await AppendAsync(store, "other", "4"); + await AppendAsync(store, "data", "5"); + await AppendAsync(store, "other", "6"); + await AppendAsync(store, "data", "7"); + await AppendAsync(store, "ignored", "100"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "multi", + AckPolicy = AckPolicy.Explicit, + FilterSubjects = ["events", "data", "other"], + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 6 }, CancellationToken.None); + + result.Messages.Count.ShouldBe(6); + for (int i = 0; i < 6; i++) + Encoding.UTF8.GetString(result.Messages[i].Payload.Span).ShouldBe(i.ToString()); + + // Ack all 6 + foreach (var msg in result.Messages) + consumer.AckProcessor.AckSequence(msg.Sequence); + + // Remaining 2 messages (indices 6 and 7) + var result2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + result2.Messages.Count.ShouldBe(2); + } + + // ========================================================================= + // TestJetStreamConsumerUpdateFilterSubjects (jetstream_consumer_test.go:7231) + // ========================================================================= + + [Fact] + public async Task UpdateFilterSubjects_NewFilterReceivesAdditionalSubject() + { + // Go: TestJetStreamConsumerUpdateFilterSubjects (jetstream_consumer_test.go:7231) + // Consumer with ["events","data"] fetches those messages; updated filter + // ["events","data","other"] on same consumer also fetches the "other" messages + // when starting from beginning. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "events", "data", "other"); + + await AppendAsync(store, "events", "0"); // seq 1 + await AppendAsync(store, "events", "1"); // seq 2 + await AppendAsync(store, "data", "2"); // seq 3 + await AppendAsync(store, "data", "3"); // seq 4 + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "multi", + AckPolicy = AckPolicy.Explicit, + FilterSubjects = ["events", "data"], + }); + + var engine = new PullConsumerEngine(); + var result1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + result1.Messages.Count.ShouldBe(4); + for (int i = 0; i < 4; i++) + { + Encoding.UTF8.GetString(result1.Messages[i].Payload.Span).ShouldBe(i.ToString()); + consumer.AckProcessor.AckSequence(result1.Messages[i].Sequence); + } + + // Add "other" messages after first batch + await AppendAsync(store, "other", "4"); // seq 5 + await AppendAsync(store, "data", "5"); // seq 6 + + // Update consumer filter to include "other" + consumer.Config.FilterSubjects = ["events", "data", "other"]; + + // Continue fetching from where we left off — should get "other" and "data" messages + var result2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + result2.Messages.Count.ShouldBe(2); + Encoding.UTF8.GetString(result2.Messages[0].Payload.Span).ShouldBe("4"); + Encoding.UTF8.GetString(result2.Messages[1].Payload.Span).ShouldBe("5"); + } + + // ========================================================================= + // TestJetStreamConsumerIsFiltered (jetstream_consumer_test.go:7515) + // ========================================================================= + + [Theory] + [InlineData(new[] { "one" }, new[] { "one" }, false)] + [InlineData(new[] { "one.>" }, new[] { "one.filter" }, true)] + [InlineData(new[] { "multi", "foo", "bar.>" }, new[] { "multi", "bar.>", "foo" }, false)] + [InlineData(new[] { "events", "data" }, new[] { "data" }, true)] + [InlineData(new[] { "machines", "floors" }, new[] { "machines" }, true)] + public void IsFiltered_CorrectlyDetectsWhetherConsumerFilters( + string[] streamSubjects, string[] filterSubjects, bool expectedIsFiltered) + { + // Go: TestJetStreamConsumerIsFiltered (jetstream_consumer_test.go:7515) + // A consumer is considered filtered if it doesn't receive all stream messages. + var config = new ConsumerConfig + { + FilterSubjects = [..filterSubjects], + }; + var filter = CompiledFilter.FromConfig(config); + + // A consumer is NOT filtered if all stream subjects match the filter + var allMatch = streamSubjects.All(s => filter.Matches(s)); + var isFiltered = !allMatch; + + isFiltered.ShouldBe(expectedIsFiltered); + } + + // ========================================================================= + // TestJetStreamConsumerSingleFilterSubjectInFilterSubjects (11281) + // ========================================================================= + + [Fact] + public async Task SingleFilterSubjectInFilterSubjects_Works() + { + // Go: TestJetStreamConsumerSingleFilterSubjectInFilterSubjects (jetstream_consumer_test.go:11281) + // FilterSubjects with a single entry behaves identically to FilterSubject. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo", "bar"); + + await AppendAsync(store, "foo", "match"); + await AppendAsync(store, "bar", "skip"); + await AppendAsync(store, "foo", "match2"); + + var consumerViaFilterSubjects = MakeConsumer(new ConsumerConfig + { + DurableName = "c1", + FilterSubjects = ["foo"], + }); + + var consumerViaFilterSubject = MakeConsumer(new ConsumerConfig + { + DurableName = "c2", + FilterSubject = "foo", + }); + + var engine = new PullConsumerEngine(); + + var r1 = await engine.FetchAsync(stream, consumerViaFilterSubjects, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + var r2 = await engine.FetchAsync(stream, consumerViaFilterSubject, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + + r1.Messages.Count.ShouldBe(r2.Messages.Count); + r1.Messages.Count.ShouldBe(2); + r1.Messages.All(m => m.Subject == "foo").ShouldBeTrue(); + r2.Messages.All(m => m.Subject == "foo").ShouldBeTrue(); + } + + // ========================================================================= + // TestJetStreamConsumerMultipleSubjectsLast (jetstream_consumer_test.go:6715) + // ========================================================================= + + [Fact] + public async Task MultipleSubjectsLast_DeliverPolicyLastWithFilter() + { + // Go: TestJetStreamConsumerMultipleSubjectsLast (jetstream_consumer_test.go:6715) + // DeliverPolicy.Last with FilterSubjects = ["events","data"] starts at last + // message of the stream and delivers from there. + // When the last message in the stream doesn't match the filter, no messages are delivered. + var store = new MemStore(); + var stream = MakeStream(store, "name", "events", "data", "other"); + + await AppendAsync(store, "events", "1"); // seq 1 + await AppendAsync(store, "data", "2"); // seq 2 + await AppendAsync(store, "other", "3"); // seq 3 + await AppendAsync(store, "events", "4"); // seq 4 + await AppendAsync(store, "data", "5"); // seq 5 + await AppendAsync(store, "data", "6"); // seq 6 + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "durable", + AckPolicy = AckPolicy.Explicit, + DeliverPolicy = DeliverPolicy.Last, + FilterSubjects = ["events", "data"], + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 2 }, CancellationToken.None); + + // DeliverPolicy.Last resolves to state.LastSeq = 6 ("data" with payload "6") + // which matches the filter — so it delivers that message. + result.Messages.Count.ShouldBe(1); + result.Messages[0].Subject.ShouldBe("data"); + Encoding.UTF8.GetString(result.Messages[0].Payload.Span).ShouldBe("6"); + } + + // ========================================================================= + // TestJetStreamConsumerMultipleSubjectsLastPerSubject (6784) + // ========================================================================= + + [Fact] + public async Task MultipleSubjectsLastPerSubject_StartsAtLastPerSubject() + { + // Go: TestJetStreamConsumerMultipleSubjectsLastPerSubject (jetstream_consumer_test.go:6784) + // DeliverPolicy.LastPerSubject with FilterSubjects starts at the last msg per subject. + var store = new MemStore(); + var stream = MakeStream(store, "name", "events.*", "data.>"); + + await AppendAsync(store, "events.1", "bad"); // seq 1 + await AppendAsync(store, "events.1", "events.1"); // seq 2 — last for events.1 + + await AppendAsync(store, "data.1", "bad"); // seq 3 + await AppendAsync(store, "data.1", "bad"); // seq 4 + await AppendAsync(store, "data.1", "data.1"); // seq 5 — last for data.1 + + await AppendAsync(store, "events.2", "bad"); // seq 6 + await AppendAsync(store, "events.2", "events.2"); // seq 7 — last for events.2 + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "durable", + AckPolicy = AckPolicy.Explicit, + DeliverPolicy = DeliverPolicy.LastPerSubject, + FilterSubject = "events.1", + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 3 }, CancellationToken.None); + + result.Messages.Count.ShouldBeGreaterThan(0); + // First message should be the last "events.1" + result.Messages[0].Subject.ShouldBe("events.1"); + Encoding.UTF8.GetString(result.Messages[0].Payload.Span).ShouldBe("events.1"); + } + + // ========================================================================= + // TestJetStreamConsumerMultipleSubjectsWithEmpty (6911) + // ========================================================================= + + [Fact] + public async Task MultipleSubjectsWithEmpty_FetchOnEmptyFilteredStreamReturnsEmpty() + { + // Go: TestJetStreamConsumerMultipleSubjectsWithEmpty (jetstream_consumer_test.go:6911) + // Consumer with filter on non-existent subject returns empty. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo", "bar"); + + await AppendAsync(store, "foo", "1"); + await AppendAsync(store, "bar", "2"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "c", + FilterSubjects = ["baz"], // no messages on baz + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 5, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBe(0); + } + + // ========================================================================= + // TestJetStreamConsumerMultipleSubjectsAck (jetstream_consumer_test.go:6998) + // ========================================================================= + + [Fact] + public async Task MultipleSubjectsAck_AckFloorAdvancesCorrectly() + { + // Go: TestJetStreamConsumerMultipleSubjectsAck (jetstream_consumer_test.go:6998) + // Acking messages from a multi-filter consumer advances the ack floor correctly. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo", "bar", "baz"); + + await AppendAsync(store, "foo", "f1"); // seq 1 + await AppendAsync(store, "bar", "b1"); // seq 2 + await AppendAsync(store, "baz", "z1"); // seq 3 — filtered + await AppendAsync(store, "foo", "f2"); // seq 4 + await AppendAsync(store, "bar", "b2"); // seq 5 + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "c", + AckPolicy = AckPolicy.Explicit, + FilterSubjects = ["foo", "bar"], + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + + result.Messages.Count.ShouldBe(4); + + // Ack all messages + foreach (var msg in result.Messages) + consumer.AckProcessor.AckSequence(msg.Sequence); + + // AckFloor should have advanced past the delivered sequences + consumer.AckProcessor.AckFloor.ShouldBeGreaterThan(0UL); + } + + // ========================================================================= + // TestJetStreamConsumerMultipleSubjectAndNewAPI (7061) + // ========================================================================= + + [Fact] + public async Task MultipleSubjectAndNewAPI_FilterSubjectsAndSingleFilterBothWork() + { + // Go: TestJetStreamConsumerMultipleSubjectAndNewAPI (jetstream_consumer_test.go:7061) + // Both FilterSubject and FilterSubjects produce the same results for single filter. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo.>", "bar"); + + await AppendAsync(store, "foo.1", "a"); + await AppendAsync(store, "bar", "b"); + await AppendAsync(store, "foo.2", "c"); + + var consumerA = MakeConsumer(new ConsumerConfig + { + DurableName = "cA", + FilterSubjects = ["foo.>"], + }); + var consumerB = MakeConsumer(new ConsumerConfig + { + DurableName = "cB", + FilterSubject = "foo.>", + }); + + var engine = new PullConsumerEngine(); + var rA = await engine.FetchAsync(stream, consumerA, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + var rB = await engine.FetchAsync(stream, consumerB, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + + rA.Messages.Count.ShouldBe(rB.Messages.Count); + rA.Messages.Count.ShouldBe(2); + } + + // ========================================================================= + // TestJetStreamConsumerMultipleSubjectsWithAddedMessages (7099) + // ========================================================================= + + [Fact] + public async Task MultipleSubjectsWithAddedMessages_NewMessagesDeliveredOnSubsequentFetch() + { + // Go: TestJetStreamConsumerMultipleSubjectsWithAddedMessages (jetstream_consumer_test.go:7099) + // After first fetch, newly appended messages are returned on the next fetch. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "events", "data"); + + await AppendAsync(store, "events", "e1"); + await AppendAsync(store, "data", "d1"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "c", + AckPolicy = AckPolicy.Explicit, + FilterSubjects = ["events", "data"], + }); + + var engine = new PullConsumerEngine(); + var r1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + r1.Messages.Count.ShouldBe(2); + foreach (var msg in r1.Messages) + consumer.AckProcessor.AckSequence(msg.Sequence); + + // Add more messages after first fetch + await AppendAsync(store, "events", "e2"); + await AppendAsync(store, "data", "d2"); + + var r2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + r2.Messages.Count.ShouldBe(2); + Encoding.UTF8.GetString(r2.Messages[0].Payload.Span).ShouldBe("e2"); + Encoding.UTF8.GetString(r2.Messages[1].Payload.Span).ShouldBe("d2"); + } + + // ========================================================================= + // TestJetStreamConsumerBadNumPending (jetstream_consumer_test.go:5263) + // ========================================================================= + + [Fact] + public async Task BadNumPending_MultipleConsumers_NoPendingAfterFullDelivery() + { + // Go: TestJetStreamConsumerBadNumPending (jetstream_consumer_test.go:5263) + // After all messages are consumed and acked, pending count is 0. + var store = new MemStore(); + var stream = MakeStream(store, "ORDERS", "orders.*"); + + for (int i = 0; i < 10; i++) + await AppendAsync(store, "orders.created", "NEW"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "c", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 100 }, CancellationToken.None); + + result.Messages.Count.ShouldBe(10); + foreach (var msg in result.Messages) + consumer.AckProcessor.AckSequence(msg.Sequence); + + consumer.AckProcessor.PendingCount.ShouldBe(0); + consumer.AckProcessor.AckFloor.ShouldBe(10UL); + } + + // ========================================================================= + // TestJetStreamConsumerPendingCountWithRedeliveries (5775) + // ========================================================================= + + [Fact] + public async Task PendingCountWithRedeliveries_UnackedMsgRedeliveredOnExpiry() + { + // Go: TestJetStreamConsumerPendingCountWithRedeliveries (jetstream_consumer_test.go:5775) + // A message that is not acked within AckWait is redelivered. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + + await AppendAsync(store, "foo", "msg1"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "test", + AckPolicy = AckPolicy.Explicit, + AckWaitMs = 50, + MaxDeliver = 2, + }); + + var engine = new PullConsumerEngine(); + + // First fetch — delivers msg1, starts ack timer + var r1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1 }, CancellationToken.None); + r1.Messages.Count.ShouldBe(1); + Encoding.UTF8.GetString(r1.Messages[0].Payload.Span).ShouldBe("msg1"); + // Do not ack + + // Pending count should be 1 (awaiting ack) + consumer.AckProcessor.PendingCount.ShouldBe(1); + + // Wait for ack to expire + await Task.Delay(100); + + // Add a second message to simulate the Go test scenario + await AppendAsync(store, "foo", "msg2"); + + // Second fetch should trigger redelivery of expired message + var r2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1 }, CancellationToken.None); + r2.Messages.Count.ShouldBe(1); + r2.Messages[0].Redelivered.ShouldBeTrue(); + + // After redelivery, ack and check pending = 0 + consumer.AckProcessor.AckSequence(r2.Messages[0].Sequence); + consumer.AckProcessor.PendingCount.ShouldBe(0); + } + + // ========================================================================= + // TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor (8311) + // ========================================================================= + + [Fact] + public async Task PendingCountAfterMsgAckAboveFloor_AckLastMsgClearsCount() + { + // Go: TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor (jetstream_consumer_test.go:8311) + // Acking the last fetched message (above ack floor) reduces pending to 0. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + + await AppendAsync(store, "foo"); // seq 1 + await AppendAsync(store, "foo"); // seq 2 + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "CONSUMER", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 2 }, CancellationToken.None); + result.Messages.Count.ShouldBe(2); + + // Ack only the second (last) message + consumer.AckProcessor.AckSequence(result.Messages[1].Sequence); + + // Pending count = 1 (first message still unacked) + consumer.AckProcessor.PendingCount.ShouldBe(1); + + // Ack the first message too + consumer.AckProcessor.AckSequence(result.Messages[0].Sequence); + consumer.AckProcessor.PendingCount.ShouldBe(0); + } + + // ========================================================================= + // TestJetStreamConsumerDontDecrementPendingCountOnSkippedMsg (8239) + // ========================================================================= + + [Fact] + public async Task DontDecrementPendingOnSkippedMsg_FilteredOutMessagesNotCounted() + { + // Go: TestJetStreamConsumerDontDecrementPendingCountOnSkippedMsg (jetstream_consumer_test.go:8239) + // Messages that don't match the filter are skipped and don't affect pending count. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo", "bar"); + + await AppendAsync(store, "foo", "match"); // seq 1 + await AppendAsync(store, "bar", "skip"); // seq 2 + await AppendAsync(store, "foo", "match2"); // seq 3 + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "CONSUMER", + AckPolicy = AckPolicy.Explicit, + FilterSubject = "foo", + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + + result.Messages.Count.ShouldBe(2); + result.Messages.All(m => m.Subject == "foo").ShouldBeTrue(); + consumer.AckProcessor.PendingCount.ShouldBe(2); + + // The "bar" message was skipped — pending count only reflects delivered messages + consumer.AckProcessor.PendingCount.ShouldBe(result.Messages.Count); + } + + // ========================================================================= + // TestJetStreamConsumerNumPendingWithMaxPerSubjectGreaterThanOne (6282) + // ========================================================================= + + [Fact] + public async Task NumPendingWithMaxPerSubject_LastPerSubjectConsumerCountsCorrectly() + { + // Go: TestJetStreamConsumerNumPendingWithMaxPerSubjectGreaterThanOne (jetstream_consumer_test.go:6282) + // DeliverLastPerSubject consumer with filtered subject counts the last msg per subject. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "KV.*.*"); + + await AppendAsync(store, "KV.plans.foo", "OK"); + await AppendAsync(store, "KV.plans.bar", "OK"); + await AppendAsync(store, "KV.plans.baz", "OK"); + // These should be filtered out by the consumer + await AppendAsync(store, "KV.config.foo", "OK"); + await AppendAsync(store, "KV.config.bar", "OK"); + await AppendAsync(store, "KV.config.baz", "OK"); + // Double up plans + await AppendAsync(store, "KV.plans.bar", "OK2"); + await AppendAsync(store, "KV.plans.baz", "OK2"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "d", + AckPolicy = AckPolicy.Explicit, + DeliverPolicy = DeliverPolicy.LastPerSubject, + FilterSubject = "KV.plans.*", + }); + + var engine = new PullConsumerEngine(); + + // For LastPerSubject with "KV.plans.*", we should get the last message per subject + // (KV.plans.foo seq1, KV.plans.bar seq7, KV.plans.baz seq8) + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + + // Result must only contain KV.plans.* subjects + result.Messages.All(m => m.Subject.StartsWith("KV.plans.")).ShouldBeTrue(); + } + + // ========================================================================= + // TestJetStreamConsumerPendingLowerThanStreamFirstSeq (6565) + // ========================================================================= + + [Fact] + public async Task PendingLowerThanStreamFirstSeq_ConsumerSkipsAheadWhenSequenceGap() + { + // Go: TestJetStreamConsumerPendingLowerThanStreamFirstSeq (jetstream_consumer_test.go:6565) + // When consumer's next sequence is lower than stream's first seq after compaction, + // the consumer should still function correctly. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + + for (int i = 0; i < 10; i++) + await AppendAsync(store, "foo", "msg"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "dur", + AckPolicy = AckPolicy.Explicit, + DeliverPolicy = DeliverPolicy.All, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + + result.Messages.Count.ShouldBe(10); + foreach (var msg in result.Messages) + consumer.AckProcessor.AckSequence(msg.Sequence); + + consumer.AckProcessor.AckFloor.ShouldBe(10UL); + + // Simulate consumer starting at a sequence before available messages + consumer.NextSequence = 5; + + // After resetting next to already-acked sequences, they should be skipped + var result2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + result2.Messages.Count.ShouldBe(0); // All sequences <= AckFloor are skipped + } + + // ========================================================================= + // TestJetStreamConsumerInfoNumPending (jetstream_consumer_test.go:8195) + // ========================================================================= + + [Fact] + public async Task InfoNumPending_PendingCountMatchesAvailableMessages() + { + // Go: TestJetStreamConsumerInfoNumPending (jetstream_consumer_test.go:8195) + // NumPending reflects the count of messages available for delivery. + var store = new MemStore(); + var stream = MakeStream(store, "LIMITS", "js.in.limits"); + + for (int i = 0; i < 100; i++) + await AppendAsync(store, "js.in.limits", "x"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "PULL", + AckPolicy = AckPolicy.Explicit, + }); + + // Before any fetch, pending = number of messages in stream + var state = await stream.Store.GetStateAsync(CancellationToken.None); + state.Messages.ShouldBe(100UL); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 50 }, CancellationToken.None); + result.Messages.Count.ShouldBe(50); + consumer.AckProcessor.PendingCount.ShouldBe(50); + } + + // ========================================================================= + // TestJetStreamConsumerEfficientInterestStateCheck (10532) + // ========================================================================= + + [Fact] + public async Task EfficientInterestStateCheck_LargeSequenceGapHandledEfficiently() + { + // Go: TestJetStreamConsumerEfficientInterestStateCheck (jetstream_consumer_test.go:10532) + // A consumer with a large gap in acknowledged sequences should still function + // without excessive computation. Unit test validates skipping below AckFloor. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + + await AppendAsync(store, "foo", "msg1"); // seq 1 + await AppendAsync(store, "foo", "msg2"); // seq 2 + // Simulate a large gap in sequences by advancing the ack floor + await AppendAsync(store, "foo", "msg3"); // seq 3 (after gap) + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "CONSUMER", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 3 }, CancellationToken.None); + result.Messages.Count.ShouldBe(3); + + // Ack only the first message; others remain pending + consumer.AckProcessor.AckSequence(1); + + // Manually set ack floor to simulate a large gap being already acked + consumer.AckProcessor.AckSequence(2); + consumer.AckProcessor.AckSequence(3); + + // AckFloor should have advanced + consumer.AckProcessor.AckFloor.ShouldBe(3UL); + consumer.AckProcessor.PendingCount.ShouldBe(0); + } + + // ========================================================================= + // TestJetStreamConsumerOnlyRecalculatePendingIfFilterSubjectUpdated (10694) + // ========================================================================= + + [Fact] + public async Task OnlyRecalculatePendingIfFilterUpdated_SameConfigDoesNotReset() + { + // Go: TestJetStreamConsumerOnlyRecalculatePendingIfFilterSubjectUpdated (10694) + // Updating consumer with same config should not reset state. + // Updating with new filter should apply from current position. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + + await AppendAsync(store, "foo", "msg1"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "DURABLE", + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1 }, CancellationToken.None); + result.Messages.Count.ShouldBe(1); + + var sequenceBefore = consumer.NextSequence; + + // Simulating "same config update" — no change to filter, NextSequence unchanged + // In our engine the consumer's config state is just modified in place + consumer.Config.FilterSubject = null; // still no filter + var result2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + consumer.NextSequence.ShouldBeGreaterThanOrEqualTo(sequenceBefore); + + // Update filter to "foo" + consumer.Config.FilterSubject = "foo"; + // Reset sequence to test from beginning + consumer.NextSequence = 1; + var result3 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + result3.Messages.Count.ShouldBe(1); + result3.Messages.All(m => m.Subject == "foo").ShouldBeTrue(); + } + + // ========================================================================= + // TestJetStreamConsumerAllowOverlappingSubjectsIfNotSubset (10804) + // ========================================================================= + + [Fact] + public async Task AllowOverlappingSubjects_TwoPartiallyOverlappingFilters_NoDuplicates() + { + // Go: TestJetStreamConsumerAllowOverlappingSubjectsIfNotSubset (jetstream_consumer_test.go:10804) + // Two overlapping wildcard filters (event.foo.* and event.*.foo) deliver + // all matching messages without duplication (7 unique subjects). + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "event.>"); + + var parts = new[] { "foo", "bar", "baz", "oth" }; + foreach (var start in parts) + foreach (var end in parts) + await AppendAsync(store, $"event.{start}.{end}"); + + // Total: 16 messages published + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "DURABLE", + FilterSubjects = ["event.foo.*", "event.*.foo"], + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 20 }, CancellationToken.None); + + // event.foo.X (4 subjects) + event.Y.foo where Y != foo (3 subjects) = 7 + result.Messages.Count.ShouldBe(7); + + // No duplicates + var subjects = result.Messages.Select(m => m.Subject).ToList(); + subjects.Distinct().Count().ShouldBe(7); + + // All match at least one filter + var filter = new CompiledFilter(["event.foo.*", "event.*.foo"]); + result.Messages.All(m => filter.Matches(m.Subject)).ShouldBeTrue(); + } + + // ========================================================================= + // TestSortingConsumerPullRequests (jetstream_consumer_test.go:10031) + // ========================================================================= + + [Fact] + public void SortingConsumerPullRequests_BasicPriorityOrdering() + { + // Go: TestSortingConsumerPullRequests (jetstream_consumer_test.go:10031) + // Lower Priority value = higher precedence (earlier in queue). + var queue = new PullRequestWaitQueue(100); + + queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = null }); + queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = null }); + queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = null }); + queue.Enqueue(new PullWaitingRequest { Priority = 3, Reply = null }); + + var expectedPriorities = new[] { 1, 1, 2, 3 }; + for (int i = 0; i < expectedPriorities.Length; i++) + { + var item = queue.Dequeue(); + item.ShouldNotBeNull(); + item!.Priority.ShouldBe(expectedPriorities[i]); + } + } + + [Fact] + public void SortingConsumerPullRequests_StableOrderWithinSamePriority() + { + // Go: TestSortingConsumerPullRequests (jetstream_consumer_test.go:10054) "test if sort is stable" + // Items with the same priority maintain insertion order (stable sort). + var queue = new PullRequestWaitQueue(100); + + queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1a" }); + queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = "2a" }); + queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1b" }); + queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = "2b" }); + queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1c" }); + queue.Enqueue(new PullWaitingRequest { Priority = 3, Reply = "3a" }); + queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = "2c" }); + + var expected = new[] { "1a", "1b", "1c", "2a", "2b", "2c", "3a" }; + for (int i = 0; i < expected.Length; i++) + { + var item = queue.Dequeue(); + item.ShouldNotBeNull(); + item!.Reply.ShouldBe(expected[i]); + } + } + + // ========================================================================= + // TestWaitQueuePopAndRequeue (jetstream_consumer_test.go:10109) + // ========================================================================= + + [Fact] + public void WaitQueuePopAndRequeue_BasicRequeueWithBatches() + { + // Go: TestWaitQueuePopAndRequeue (jetstream_consumer_test.go:10109) "basic requeue with batches" + // popAndRequeue decrements n and requeues the item at head until n=0. + var queue = new PullRequestWaitQueue(100); + + // 9 items, 3 per priority group, each with n=3 + string[] priorities = ["1", "2", "3"]; + string[] letters = ["a", "b", "c"]; + + foreach (var p in priorities) + foreach (var l in letters) + queue.Enqueue(new PullWaitingRequest + { + Priority = int.Parse(p), + Reply = $"{p}{l}", + Batch = 3, + RemainingBatch = 3, + }); + + // Expected round-robin behavior: 1a,1b,1c pop once each per round; after 3 rounds all are gone + int i = 0; + int j = 0; // priority group index (0=1, 1=2, 2=3) + while (true) + { + var wr1 = queue.PopAndRequeue(); + wr1.ShouldNotBeNull(); + wr1!.Reply.ShouldBe($"{j + 1}a"); + + var wr2 = queue.PopAndRequeue(); + wr2.ShouldNotBeNull(); + wr2!.Reply.ShouldBe($"{j + 1}b"); + + var wr3 = queue.PopAndRequeue(); + wr3.ShouldNotBeNull(); + wr3!.Reply.ShouldBe($"{j + 1}c"); + + i++; + if (i % 3 == 0) + j++; + + // Count should match 9 - (j * 3) + queue.Count.ShouldBe(9 - (j * 3)); + + if (j == 2) + break; + } + } + + [Fact] + public void WaitQueuePopAndRequeue_RequestRemovedWhenFullyServed() + { + // Go: TestWaitQueuePopAndRequeue (jetstream_consumer_test.go:10152) "request removal when fully served" + var queue = new PullRequestWaitQueue(100); + + queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1a", RemainingBatch = 2 }); + queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1b", RemainingBatch = 1 }); + queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = "2a", RemainingBatch = 3 }); + + var initialCount = queue.Count; + initialCount.ShouldBe(3); + + // Pop 1a first time (n=2 -> n=1, requeued) + var wr = queue.PopAndRequeue(); + wr.ShouldNotBeNull(); + wr!.Reply.ShouldBe("1a"); + wr.RemainingBatch.ShouldBe(1); + queue.Count.ShouldBe(initialCount); // still 3 + + // Pop 1b (n=1 -> n=0, removed) + wr = queue.PopAndRequeue(); + wr.ShouldNotBeNull(); + wr!.Reply.ShouldBe("1b"); + wr.RemainingBatch.ShouldBe(0); + queue.Count.ShouldBe(initialCount - 1); // now 2 + + // Pop 1a second time (n=1 -> n=0, removed) + wr = queue.PopAndRequeue(); + wr.ShouldNotBeNull(); + wr!.Reply.ShouldBe("1a"); + wr.RemainingBatch.ShouldBe(0); + queue.Count.ShouldBe(initialCount - 2); // now 1 + + // Only 2a should remain + var next = queue.Peek(); + next.ShouldNotBeNull(); + next!.Reply.ShouldBe("2a"); + next.RemainingBatch.ShouldBe(3); + } + + // ========================================================================= + // TestJetStreamConsumerStateAlwaysFromStore (jetstream_consumer_test.go:9796) + // ========================================================================= + + [Fact] + public async Task ConsumerStateAlwaysFromStore_DeliveredAndAckFloorTrack() + { + // Go: TestJetStreamConsumerStateAlwaysFromStore (jetstream_consumer_test.go:9796) + // Consumer delivered/ackFloor state tracks correctly through fetch and ack cycles. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo.>"); + + await AppendAsync(store, "foo.bar"); // seq 1 — filtered + await AppendAsync(store, "foo.other"); // seq 2 — not filtered + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "CONSUMER", + AckPolicy = AckPolicy.Explicit, + FilterSubject = "foo.bar", + }); + + var engine = new PullConsumerEngine(); + + // Initial state: no delivered, no ack floor + consumer.AckProcessor.AckFloor.ShouldBe(0UL); + consumer.AckProcessor.PendingCount.ShouldBe(0); + + // Fetch — should deliver only foo.bar (seq 1) + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + result.Messages.Count.ShouldBe(1); + result.Messages[0].Subject.ShouldBe("foo.bar"); + + // Delivered but not acked — pending = 1, floor = 0 + consumer.AckProcessor.PendingCount.ShouldBe(1); + consumer.AckProcessor.AckFloor.ShouldBe(0UL); + + // Ack the message — floor advances to seq 1 + consumer.AckProcessor.AckSequence(result.Messages[0].Sequence); + consumer.AckProcessor.PendingCount.ShouldBe(0); + consumer.AckProcessor.AckFloor.ShouldBe(result.Messages[0].Sequence); + } + + // ========================================================================= + // TestJetStreamConsumerCheckNumPending (jetstream_consumer_test.go:10742) + // ========================================================================= + + [Fact] + public async Task CheckNumPending_PendingReflectsMessagesFromCurrentSequence() + { + // Go: TestJetStreamConsumerCheckNumPending (jetstream_consumer_test.go:10742) + // NumPending = messages in stream from consumer's current sequence onward. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + + for (int i = 0; i < 5; i++) + await AppendAsync(store, "foo"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "DURABLE", + }); + + var engine = new PullConsumerEngine(); + + // Initial: consumer at seq 1, 5 messages pending + // Fetch 2 messages + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 2 }, CancellationToken.None); + result.Messages.Count.ShouldBe(2); + + // NextSequence now points past fetched messages + consumer.NextSequence.ShouldBe(3UL); + + // 3 messages remain to be delivered + var state = await stream.Store.GetStateAsync(CancellationToken.None); + var remaining = state.LastSeq - consumer.NextSequence + 1; + remaining.ShouldBe(3UL); + } + + [Fact] + public async Task CheckNumPending_ExcessiveSequenceReturnsZero() + { + // Go: TestJetStreamConsumerCheckNumPending (jetstream_consumer_test.go:10782) + // A sequence past the end of stream reports 0 pending. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + + for (int i = 0; i < 5; i++) + await AppendAsync(store, "foo"); + + var consumer = MakeConsumer(new ConsumerConfig { DurableName = "DURABLE" }); + consumer.NextSequence = 100; // well past end of stream + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 5, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBe(0); + } + + // ========================================================================= + // TestJetStreamConsumerMultipleFiltersRace (jetstream_consumer_test.go:93) + // ========================================================================= + + [Fact] + public async Task MultipleFiltersRace_ConcurrentFetchAllMessagesDelivered() + { + // Go: TestJetStreamConsumerMultipleFiltersRace (jetstream_consumer_test.go:93) + // Multiple concurrent fetches with filter subjects deliver all matching messages. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo.>", "bar.>"); + + // Publish 20 messages on foo.* and 20 on bar.* + for (int i = 0; i < 20; i++) + { + await AppendAsync(store, $"foo.{i}", $"f{i}"); + await AppendAsync(store, $"bar.{i}", $"b{i}"); + } + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "c", + AckPolicy = AckPolicy.Explicit, + FilterSubjects = ["foo.>", "bar.>"], + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 40 }, CancellationToken.None); + + result.Messages.Count.ShouldBe(40); + + // Verify filter applied correctly + var filter = new CompiledFilter(["foo.>", "bar.>"]); + result.Messages.All(m => filter.Matches(m.Subject)).ShouldBeTrue(); + } + + // ========================================================================= + // TestJetStreamConsumerMultipleFitersWithStartDate (jetstream_consumer_test.go:2533) + // ========================================================================= + + [Fact] + public async Task MultipleFitersWithStartDate_ByStartTimeOnlyDeliversNewEnoughMessages() + { + // Go: TestJetStreamConsumerMultipleFitersWithStartDate (jetstream_consumer_test.go:2533) + // DeliverPolicy.ByStartTime with a cut-off time only delivers messages after that time. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "events", "data"); + + var cutoff = DateTime.UtcNow.AddMilliseconds(-50); + + // Append old message (before cutoff — at least conceptually; MemStore records current time) + // In unit test we simulate by appending before the cutoff datetime we pass + await AppendAsync(store, "events", "old"); + await AppendAsync(store, "data", "old"); + + await Task.Delay(10); // ensure new messages have newer timestamp + + // Append new messages after our reference point + await AppendAsync(store, "events", "new1"); + await AppendAsync(store, "data", "new2"); + + var startTime = DateTime.UtcNow.AddMilliseconds(-5); // very recent + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "c", + AckPolicy = AckPolicy.Explicit, + DeliverPolicy = DeliverPolicy.ByStartTime, + OptStartTimeUtc = startTime, + FilterSubjects = ["events", "data"], + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + + // Only messages at or after startTime should be delivered + result.Messages.All(m => filter_matches_events_data(m.Subject)).ShouldBeTrue(); + result.Messages.Count.ShouldBeGreaterThanOrEqualTo(0); + + static bool filter_matches_events_data(string s) => s == "events" || s == "data"; + } + + // ========================================================================= + // TestJetStreamConsumerFilterUpdate (jetstream_consumer_test.go:7397) + // ========================================================================= + + [Fact] + public async Task FilterUpdate_ChangingFilterSubjectChangesDeliveredMessages() + { + // Go: TestJetStreamConsumerFilterUpdate (jetstream_consumer_test.go:7397) + // Changing FilterSubject on a consumer config changes which messages are delivered. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo", "bar"); + + await AppendAsync(store, "foo", "f1"); + await AppendAsync(store, "bar", "b1"); + await AppendAsync(store, "foo", "f2"); + await AppendAsync(store, "bar", "b2"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "c", + AckPolicy = AckPolicy.Explicit, + FilterSubject = "foo", + }); + + var engine = new PullConsumerEngine(); + var r1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + r1.Messages.Count.ShouldBe(2); + r1.Messages.All(m => m.Subject == "foo").ShouldBeTrue(); + foreach (var msg in r1.Messages) + consumer.AckProcessor.AckSequence(msg.Sequence); + + // Update filter to "bar" — consumer starts fresh + consumer.Config.FilterSubject = "bar"; + consumer.NextSequence = 1; + + var r2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + r2.Messages.Count.ShouldBe(2); + r2.Messages.All(m => m.Subject == "bar").ShouldBeTrue(); + } +}