@@ -442,6 +442,7 @@ pub fn poll(
442
442
.overlapped = [1 ]windows.OVERLAPPED {
443
443
mem .zeroes (windows .OVERLAPPED ),
444
444
} ** enum_fields .len ,
445
+ .small_bufs = undefined ,
445
446
.active = .{
446
447
.count = 0 ,
447
448
.handles_buf = undefined ,
@@ -481,6 +482,7 @@ pub fn Poller(comptime StreamEnum: type) type {
481
482
windows : if (is_windows ) struct {
482
483
first_read_done : bool ,
483
484
overlapped : [enum_fields .len ]windows.OVERLAPPED ,
485
+ small_bufs : [enum_fields .len ][128 ]u8 ,
484
486
active : struct {
485
487
count : math .IntFittingRange (0 , enum_fields .len ),
486
488
handles_buf : [enum_fields .len ]windows.HANDLE ,
@@ -534,24 +536,31 @@ pub fn Poller(comptime StreamEnum: type) type {
534
536
const bump_amt = 512 ;
535
537
536
538
if (! self .windows .first_read_done ) {
537
- // Windows Async IO requires an initial call to ReadFile before waiting on the handle
539
+ var already_read_data = false ;
538
540
for (0.. enum_fields .len ) | i | {
539
541
const handle = self .windows .active .handles_buf [i ];
540
- switch (try windowsAsyncRead (
542
+ switch (try windowsAsyncReadToFifoAndQueueSmallRead (
541
543
handle ,
542
544
& self .windows .overlapped [i ],
543
545
& self .fifos [i ],
546
+ & self .windows .small_bufs [i ],
544
547
bump_amt ,
545
548
)) {
546
- .pending = > {
549
+ .populated , .empty = > | state | {
550
+ if (state == .populated ) already_read_data = true ;
547
551
self .windows .active .handles_buf [self .windows .active .count ] = handle ;
548
552
self .windows .active .stream_map [self .windows .active .count ] = @as (StreamEnum , @enumFromInt (i ));
549
553
self .windows .active .count += 1 ;
550
554
},
551
555
.closed = > {}, // don't add to the wait_objects list
556
+ .closed_populated = > {
557
+ // don't add to the wait_objects list, but we did already get data
558
+ already_read_data = true ;
559
+ },
552
560
}
553
561
}
554
562
self .windows .first_read_done = true ;
563
+ if (already_read_data ) return true ;
555
564
}
556
565
557
566
while (true ) {
@@ -576,32 +585,35 @@ pub fn Poller(comptime StreamEnum: type) type {
576
585
577
586
const active_idx = status - windows .WAIT_OBJECT_0 ;
578
587
579
- const handle = self .windows .active .handles_buf [active_idx ];
580
588
const stream_idx = @intFromEnum (self .windows .active .stream_map [active_idx ]);
581
- var read_bytes : u32 = undefined ;
582
- if (0 == windows .kernel32 .GetOverlappedResult (
583
- handle ,
584
- & self .windows .overlapped [stream_idx ],
585
- & read_bytes ,
586
- 0 ,
587
- )) switch (windows .GetLastError ()) {
588
- .BROKEN_PIPE = > {
589
+ const handle = self .windows .active .handles_buf [active_idx ];
590
+
591
+ const overlapped = & self .windows .overlapped [stream_idx ];
592
+ const stream_fifo = & self .fifos [stream_idx ];
593
+ const small_buf = & self .windows .small_bufs [stream_idx ];
594
+
595
+ const num_bytes_read = switch (try windowsGetReadResult (handle , overlapped , false )) {
596
+ .success = > | n | n ,
597
+ .closed = > {
589
598
self .windows .active .removeAt (active_idx );
590
599
continue ;
591
600
},
592
- else = > | err | return windows . unexpectedError ( err ) ,
601
+ .aborted = > unreachable ,
593
602
};
603
+ try stream_fifo .write (small_buf [0.. num_bytes_read ]);
594
604
595
- self .fifos [stream_idx ].update (read_bytes );
596
-
597
- switch (try windowsAsyncRead (
605
+ switch (try windowsAsyncReadToFifoAndQueueSmallRead (
598
606
handle ,
599
- & self .windows .overlapped [stream_idx ],
600
- & self .fifos [stream_idx ],
607
+ overlapped ,
608
+ stream_fifo ,
609
+ small_buf ,
601
610
bump_amt ,
602
611
)) {
603
- .pending = > {},
604
- .closed = > self .windows .active .removeAt (active_idx ),
612
+ .empty = > {}, // irrelevant, we already got data from the small buffer
613
+ .populated = > {},
614
+ .closed ,
615
+ .closed_populated , // identical, since we already got data from the small buffer
616
+ = > self .windows .active .removeAt (active_idx ),
605
617
}
606
618
return true ;
607
619
}
@@ -654,25 +666,145 @@ pub fn Poller(comptime StreamEnum: type) type {
654
666
};
655
667
}
656
668
657
- fn windowsAsyncRead (
669
+ /// The `ReadFile` docuementation states that `lpNumberOfBytesRead` does not have a meaningful
670
+ /// result when using overlapped I/O, but also that it cannot be `null` on Windows 7. For
671
+ /// compatibility, we point it to this dummy variables, which we never otherwise access.
672
+ /// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile
673
+ var win_dummy_bytes_read : u32 = undefined ;
674
+
675
+ /// Read as much data as possible from `handle` with `overlapped`, and write it to the FIFO. Before
676
+ /// returning, queue a read into `small_buf` so that `WaitForMultipleObjects` returns when more data
677
+ /// is available. `handle` must have no pending asynchronous operation.
678
+ fn windowsAsyncReadToFifoAndQueueSmallRead (
658
679
handle : windows.HANDLE ,
659
680
overlapped : * windows.OVERLAPPED ,
660
681
fifo : * PollFifo ,
682
+ small_buf : * [128 ]u8 ,
661
683
bump_amt : usize ,
662
- ) ! enum { pending , closed } {
684
+ ) ! enum { empty , populated , closed_populated , closed } {
685
+ var read_any_data = false ;
663
686
while (true ) {
664
- const buf = try fifo .writableWithSize (bump_amt );
665
- var read_bytes : u32 = undefined ;
666
- const read_result = windows .kernel32 .ReadFile (handle , buf .ptr , math .cast (u32 , buf .len ) orelse math .maxInt (u32 ), & read_bytes , overlapped );
667
- if (read_result == 0 ) return switch (windows .GetLastError ()) {
668
- .IO_PENDING = > .pending ,
669
- .BROKEN_PIPE = > .closed ,
670
- else = > | err | windows .unexpectedError (err ),
687
+ const fifo_read_pending = while (true ) {
688
+ const buf = try fifo .writableWithSize (bump_amt );
689
+ const buf_len = math .cast (u32 , buf .len ) orelse math .maxInt (u32 );
690
+
691
+ if (0 == windows .kernel32 .ReadFile (
692
+ handle ,
693
+ buf .ptr ,
694
+ buf_len ,
695
+ & win_dummy_bytes_read ,
696
+ overlapped ,
697
+ )) switch (windows .GetLastError ()) {
698
+ .IO_PENDING = > break true ,
699
+ .BROKEN_PIPE = > return if (read_any_data ) .closed_populated else .closed ,
700
+ else = > | err | return windows .unexpectedError (err ),
701
+ };
702
+
703
+ const num_bytes_read = switch (try windowsGetReadResult (handle , overlapped , false )) {
704
+ .success = > | n | n ,
705
+ .closed = > return if (read_any_data ) .closed_populated else .closed ,
706
+ .aborted = > unreachable ,
707
+ };
708
+
709
+ read_any_data = true ;
710
+ fifo .update (num_bytes_read );
711
+
712
+ if (num_bytes_read == buf_len ) {
713
+ // We filled the buffer, so there's probably more data available.
714
+ continue ;
715
+ } else {
716
+ // We didn't fill the buffer, so assume we're out of data.
717
+ // There is no pending read.
718
+ break false ;
719
+ }
671
720
};
672
- fifo .update (read_bytes );
721
+
722
+ if (fifo_read_pending ) cancel_read : {
723
+ // Cancel the pending read into the FIFO.
724
+ _ = windows .kernel32 .CancelIo (handle );
725
+
726
+ // We have to wait for the handle to be signalled, i.e. for the cancellation to complete.
727
+ switch (windows .kernel32 .WaitForSingleObject (handle , windows .INFINITE )) {
728
+ windows .WAIT_OBJECT_0 = > {},
729
+ windows .WAIT_FAILED = > return windows .unexpectedError (windows .GetLastError ()),
730
+ else = > unreachable ,
731
+ }
732
+
733
+ // If it completed before we canceled, make sure to tell the FIFO!
734
+ const num_bytes_read = switch (try windowsGetReadResult (handle , overlapped , true )) {
735
+ .success = > | n | n ,
736
+ .closed = > return if (read_any_data ) .closed_populated else .closed ,
737
+ .aborted = > break :cancel_read ,
738
+ };
739
+ read_any_data = true ;
740
+ fifo .update (num_bytes_read );
741
+ }
742
+
743
+ // Try to queue the 1-byte read.
744
+ if (0 == windows .kernel32 .ReadFile (
745
+ handle ,
746
+ small_buf ,
747
+ small_buf .len ,
748
+ & win_dummy_bytes_read ,
749
+ overlapped ,
750
+ )) switch (windows .GetLastError ()) {
751
+ .IO_PENDING = > {
752
+ // 1-byte read pending as intended
753
+ return if (read_any_data ) .populated else .empty ;
754
+ },
755
+ .BROKEN_PIPE = > return if (read_any_data ) .closed_populated else .closed ,
756
+ else = > | err | return windows .unexpectedError (err ),
757
+ };
758
+
759
+ // We got data back this time. Write it to the FIFO and run the main loop again.
760
+ const num_bytes_read = switch (try windowsGetReadResult (handle , overlapped , false )) {
761
+ .success = > | n | n ,
762
+ .closed = > return if (read_any_data ) .closed_populated else .closed ,
763
+ .aborted = > unreachable ,
764
+ };
765
+ try fifo .write (small_buf [0.. num_bytes_read ]);
766
+ read_any_data = true ;
673
767
}
674
768
}
675
769
770
+ /// Simple wrapper around `GetOverlappedResult` to determine the result of a `ReadFile` operation.
771
+ /// If `!allow_aborted`, then `aborted` is never returned (`OPERATION_ABORTED` is considered unexpected).
772
+ ///
773
+ /// The `ReadFile` documentation states that the number of bytes read by an overlapped `ReadFile` must be determined using `GetOverlappedResult`, even if the
774
+ /// operation immediately returns data:
775
+ /// "Use NULL for [lpNumberOfBytesRead] if this is an asynchronous operation to avoid potentially
776
+ /// erroneous results."
777
+ /// "If `hFile` was opened with `FILE_FLAG_OVERLAPPED`, the following conditions are in effect: [...]
778
+ /// The lpNumberOfBytesRead parameter should be set to NULL. Use the GetOverlappedResult function to
779
+ /// get the actual number of bytes read."
780
+ /// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile
781
+ fn windowsGetReadResult (
782
+ handle : windows.HANDLE ,
783
+ overlapped : * windows.OVERLAPPED ,
784
+ allow_aborted : bool ,
785
+ ) ! union (enum ) {
786
+ success : u32 ,
787
+ closed ,
788
+ aborted ,
789
+ } {
790
+ var num_bytes_read : u32 = undefined ;
791
+ if (0 == windows .kernel32 .GetOverlappedResult (
792
+ handle ,
793
+ overlapped ,
794
+ & num_bytes_read ,
795
+ 0 ,
796
+ )) switch (windows .GetLastError ()) {
797
+ .BROKEN_PIPE = > return .closed ,
798
+ .OPERATION_ABORTED = > | err | if (allow_aborted ) {
799
+ return .aborted ;
800
+ } else {
801
+ return windows .unexpectedError (err );
802
+ },
803
+ else = > | err | return windows .unexpectedError (err ),
804
+ };
805
+ return .{ .success = num_bytes_read };
806
+ }
807
+
676
808
/// Given an enum, returns a struct with fields of that enum, each field
677
809
/// representing an I/O stream for polling.
678
810
pub fn PollFiles (comptime StreamEnum : type ) type {
0 commit comments