# # Copyright (c) 2014-2015 Christian Jaeger, copying@christianjaeger.ch # # This is free software, offered under either the same terms as perl 5 # or the terms of the Artistic License version 2 or the terms of the # MIT License (Expat version). See the file COPYING.md that came # bundled with this file. # =head1 NAME FP::IOStream =head1 SYNOPSIS use FP::IOStream ':all'; # xdirectory_items, xdirectory_paths use FP::Stream; # stream_map use FP::List ':all'; # first my $paths= stream_map sub { my ($item)= @_; "$base/$item" }, xdirectory_items $base; # which is the same as: my $paths= xdirectory_paths $base; my $firstpath= first $paths; # ... =head1 DESCRIPTION Lazy IO (well, input), by reading items lazily as stream items. (It's arguable whether that is a good idea; Haskell uses different approaches nowadays. But it's still a nice way to do things if you're careful.) =head1 NOTE This is alpha software! Read the package README. =cut package FP::IOStream; @ISA="Exporter"; require Exporter; @EXPORT=qw(); @EXPORT_OK=qw(maybeIO_to_stream fh_to_stream perhaps_directory_items perhaps_directory_paths xdirectory_items xdirectory_paths xfile_lines xfile_lines0 xfile_lines0chop xfile_lines_chomp fh_to_lines fh_to_chunks timestream xstream_print xstream_to_file xfile_replace_lines ); %EXPORT_TAGS=(all=>[@EXPORT,@EXPORT_OK]); use strict; use warnings; use warnings FATAL => 'uninitialized'; use FP::Lazy; use Chj::xopendir qw(perhaps_opendir); use FP::List ':all'; use FP::Stream qw(stream_map weaken Weakened); use FP::PureArray qw(array_to_purearray); use FP::Array_sort; use FP::Ops 'the_method'; use Carp; use Chj::singlequote ":all"; use Chj::xopen qw( xopen_read xopen_write xopen_append xopen_update possibly_fh_to_fh glob_to_fh ); use Chj::xtmpfile qw(xtmpfile); # XX use this for the definitions further below instead of re-coding # it each time? sub maybeIO_to_stream { my ($maybeIO, $maybe_close)=@_; my $next; $next= sub { my $next=$next; lazy { if (defined (my $v= &$maybeIO())) { cons ($v, &$next) } else { if (defined $maybe_close) { &$maybe_close() } null } } }; &{Weakened $next} } sub _perhaps_opendir_stream ($) { my ($path)=@_; if (my ($d)= perhaps_opendir $path) { my $next; $next= sub { my $next=$next; lazy { if (defined (my $item= $d->xnread)) { cons $item, &$next } else { $d->xclose; null } } }; &{Weakened $next} } else { () } } sub _perhaps_opendir_stream_sorted ($$) { my ($path,$cmp)=@_; if (my ($d)= perhaps_opendir $path) { my $items= array_sort [$d->xnread], $cmp; $d->xclose; array_to_purearray $items } else { () } } sub perhaps_directory_items ($;$) { my ($path,$maybe_cmp)=@_; if ($maybe_cmp) { _perhaps_opendir_stream_sorted $path,$maybe_cmp; } else { _perhaps_opendir_stream $path; } } sub perhaps_directory_paths ($;$) { my ($base,$maybe_cmp)=@_; $base.= "/" unless $base=~ /\/\z/; if (my ($s)= perhaps_directory_items $base,$maybe_cmp) { $s->map(sub { my ($item)= @_; "$base$item" }) } else { () } } sub xdirectory_items ($;$) { my ($path,$maybe_cmp)=@_; if (my ($s)= perhaps_directory_items ($path, $maybe_cmp)) { $s } else { croak "xdirectory_items(".singlequote_many(@_)."): $!"; } } sub xdirectory_paths ($;$) { my ($path,$maybe_cmp)=@_; if (my ($s)= perhaps_directory_paths ($path, $maybe_cmp)) { $s } else { croak "xdirectory_paths(".singlequote_many(@_)."): $!"; } } sub fh_to_stream ($$$) { my ($fh, $read, $close)=@_; my $next; $next= sub { my $next=$next; lazy { if (defined (my $item= &$read($fh))) { cons $item, &$next } else { &$close ($fh); null } } }; &{Weakened $next} } # And (all?, no, can't proxy 'xopen' for both in and out) some of the # Chj::xopen functions: sub make_open_stream { my ($open,$read,$maybe_close)=@_; my $close= $maybe_close // the_method ("xclose"); sub ($) { fh_to_stream(scalar &$open(@_), $read, $close) } } sub xfile_lines ($); *xfile_lines= make_open_stream(\&xopen_read, the_method ("xreadline")); sub xfile_lines0 ($); *xfile_lines0= make_open_stream(\&xopen_read, the_method ("xreadline0")); sub xfile_lines0chop ($); *xfile_lines0chop= make_open_stream(\&xopen_read, the_method ("xreadline0chop")); sub xfile_lines_chomp ($); *xfile_lines_chomp= make_open_stream(\&xopen_read, the_method ("xreadline_chomp")); # Clojure calls this line-seq # (http://clojure.github.io/clojure/clojure.core-api.html#clojure.core/line-seq) sub fh_to_lines ($) { my ($fh)=@_; fh_to_stream (possibly_fh_to_fh($fh), the_method ("xreadline"), the_method ("xclose")) } # read filehandle in chunks, although the chunk size, even of the # chunks before the last one, is only guaranteed to be non-zero, not # bufiz (since only xsysreadcompletely would guarantee to fill size, # but would die on mid-chunk EOF) sub fh_to_chunks ($$) { my ($fh,$bufsiz)= @_; fh_to_stream (possibly_fh_to_fh($fh), sub { my $buf; my $n= $fh->xsysread($buf, $bufsiz); $n == 0 ? undef : $buf }, the_method("xclose")); } # A stream of floating-point unix timestamps representing the time # when each cell is being forced. Optional argument in seconds # (floating point) to sleep before returning the next element. sub timestream (;$) { my ($maybe_sleep)=@_; require Time::HiRes; my $lp; $lp= sub { lazy { Time::HiRes::sleep ($maybe_sleep) if $maybe_sleep; cons (Time::HiRes::time (), &$lp()) } }; Weakened ($lp)->(); } sub xstream_print ($;$) { @_==2 or @_==1 or die "wrong number of arguments"; my ($s,$maybe_fh)=@_; my $fh= $maybe_fh // glob_to_fh *STDOUT; weaken $_[0]; $s->for_each (sub { print $fh $_[0] or die "xstream_print: writing to $fh: $!"; }); } sub xstream_to_file ($$;$) { @_==2 or @_==3 or die "wrong number of arguments"; my ($s,$path,$maybe_mode)=@_; my $out= xtmpfile $path; weaken $_[0]; xstream_print ($s,$out); $out->xclose; $out->xputback ($maybe_mode); } # read and write back a file, passing its lines as a stream to the # given function; written to temp file that's renamed into place upon # successful completion. sub xfile_replace_lines ($$) { my ($path,$fn)=@_; xstream_to_file &$fn(xfile_lines $path), $path; } 1