Streaming Mental Blocks, and some Haskell Streaming Research

Beyond just network reads and writes, there's a lot of internals with unexposed streaming-potential:

  • zlib can do streaming compression and decompression (it has been able to all along, its just never been wired to anything)

  • now that all the encryption is coming from mbedTLS, every hash and cipher is streaming (this would permit computing the SHA256 of a multi-terabyte file without loading it all into memory at once... IF we knew what the interface for that was supposed to be)

  • CALL gets its stdout and stderr as it comes along...that could be passed to some kind of streaming handler (interactive PARSE of shell commands to react with input?)

One of the reasons none of this cool stuff is used is it has been waiting on designs for PORT! and/or "CODEC". Of course those designs won't invent themselves.

Let's just take the concept of calculating a sha256 hash in streaming fashion. You could "stream" via a plain old object interface:

 hasher: make-hasher 'sha256
 hasher/add-data "some data"
 hasher/add-data #{00DECAFBAD00}
 sha256: hasher/finish

(Note: the AES streaming cipher from Saphirion had even less-frills than this. It didn't take care of building up a buffer for you, so each call assumed you wanted one block of output. Adding one byte at a time gave you different results from sending them cumulatively.)

Defining some one-off objects for these functions doesn't seem appealing. To use it, that puts you in the position of looping through a file, and breaking it into chunks yourself. Having a protocol for the hasher would hopefully let it plug in to something that would drive the process for you once you wired it up.

Are PORT!s the answer?

 hashport: open sha256://
 write hashport "some data"  ; treated as data
 write hashport #{00DECAFBAD00}  ; also treated as data
 write hashport <done>  ; acts as an instruction?
 sha256: read hashport  ; only legal if finalizing instruction received?

Here I've done something whimsical by using a TAG! to indicate when input is done. Files don't have a <done> on them, so you'd have to stick that on somewhere.

Being further whimsical, let's imagine using some kind of universal CONNECT to put things together, and show two hashes being done by the same pipeline:

 hashport: open sha256://
 readhashport: connect [
     open %/some/fileA.txt +> [<done>] +> open %/some/fileB.txt +> [<done>]
     >> hashport
 ] 
 sha256-fileA: read/part readhashport 1
 sha256-fileA: read/part readhashport 1

The weird suggestion here is that +> might join together input sources, and >> to pipe the output of a source (in this case a composite source) to a sink. If you don't like that, maybe it's blocks:

connect [
     [open %/some/fileA.txt | [<done>] | open %/some/fileB.txt | [<done>]]
     [hashport]
]

Anyway I don't know what I'm talking about in particular, but I do know these types of things DO exist.

One of the more documented streaming answers in the Haskell world is something called "Conduit", so I've been looked around at that. (Note: this is a good article for anyone wanting details of alternatives)

Conduit

A couple slides from a presentation

What is streaming data?

  • Process a sequence of values of the same type
  • Produce a sequence of values of the same type
  • Don't keep all the data in memory at once
  • Perform some actions in-between
  • Probably more common than you'd think

Notice the wording there of values of the same type. Basically every language's streaming or piping mechanisms seem to ask you to put a type on things (e.g. Go's channel which I have hacked up a simulation of in stackless.)

Common Examples

  • Read data from/write data to a file
  • Communicate over a socket
  • Read data from a database
  • Traverse a deep directory structure
  • Implement a job queue
  • Generate large HTTP response bodies
  • Parsing

Conduit sets up the terminology that there is a superclass ("Pipe"). When you establish a Pipe type you say what the input and output datatypes are.

  • If you only specify an output type, that Pipe is considered a "Source"
  • If you only specify an input type, that Pipe is considered a "Sink"
  • If you specify both, the Pipe is considered a "Conduit"

The Pipe definition in Data.Internal.Conduit.Pipe is complicated to read, but the author tends to be descriptive in comments so that helps.

Something to notice is that Pipe has a concept of a "result type" distinct from its "output type".... and an "upstream result type". It also mentions the concept that pipes can be able to be defined as having "leftovers", which if it has them will be the same as the input type and are automatically provided to the "next Pipe in the monadic chain".

(To see why leftovers might be useful, imagine you are parsing data from a network and you get a HTTP stream with a header and a body, which you then are digesting into some output. Your "Source" is a network, and let's say your "Sink" is a file. But in between them you have a "Conduit" that is sometimes processing header and then sometimes processing body...which may be implemented in two different pieces of code that are chained together. If the source is producing BINARY! blobs, it might straddle the line between header and body...so your header parser would want to take just the part it wanted, and then pass on the remainder to the body in the same conceptual conduit layer. More generally you can think of this as being like having a composed parser that is passing on remainders as it goes...)

The comments say:

A basic intuition is that every Pipe produces a stream of output values
(of type o) and eventually indicates that this stream is terminated by sending a
(of type r) . On the receiving end of a Pipe, these become the i and u
parameters.

It defines some type constructors which look like function calls or signals the implementation of a pipe would raise:

  • HaveOutput (Pipe l i o u m r) o - Provide new output to be sent downstream. This constructor has two fields: the next Pipe to be used and the output value.

  • NeedInput (i -> Pipe l i o u m r) (u -> Pipe l i o u m r) - Request more input from upstream. The first field takes a new input value and provides a new Pipe. The second takes an upstream result value, which indicates that upstream is producing no more results.

(Translation: this passes two callbacks. If there's more data, the first callback will be invoked with the new single item's worth of input. If there's an error or the upstream is finished, the second callback gets called and is passed that result.)

  • Done r - Processing with this Pipe is complete, providing the final result.

  • PipeM (m (Pipe l i o u m r)) - Require running of a monadic action to get the next Pipe.

(Translation: :man_shrugging: )

  • Leftover (Pipe l i o u m r) l - Return leftover input, which should be provided to future operations.

Modulo the "require running of a monadic action", it seems fairly clear.

The Setup-And-Go Model

Something about Conduit and systems like it (boost::asio) is that they generally look to be about setting up connections and then saying "okay, run it". This is a bit akin to setting up a server and then saying wait for the entire program.

You can see some nice examples of these "flow" chains like in this example from another library called "Streaming":

import qualified Data.ByteString.Streaming.Char8 as Q

-- Send the first n lines to stdout. 
-- Note that this streams even if it hits a 10 terabyte "line"
head n file = withFile file ReadMode $ \h -> 
  Q.stdout
  $ Q.unlines
  $ takes n
  $ Q.lines
  $ Q.fromHandle h

Which I could translate into wishful-thinking pseudocode like:

 h: open/read file
 run-stream [
     (make-stream-fromHandle h)
         |
     :deline/lines  ; e.g. deline/lines "a^/b^/c" => [a b c]
         |
     :(specialize 'take [part: n])  ; take N lines
         |
     :(specialize 'delimit [delimiter: newline])  ; add newlines back in
         |
     :write-stdout
 ]

The guts of those natives have no streaming ability, of course. But let me point something out about the streaming example: Haskell has a built in UNLINES which acts like DELIMIT with newline, it's a very simple recursive formulation. Empty input is empty output, otherwise it takes the first element and sticks a '\n' onto it and appends the result of UNLINES on the rest:

unlines [] = []
unlines (l:ls) = l ++ '\n' : unlines ls 

But that is not what this example is using. The Q.unlines is the Streaming library's own implementation. It only works with this particular Streaming library (e.g. not with Conduit or other varied implementations). The source is clearly much more complex. Same applies for Q.lines.

What about random access IO?

C++ is an example where the definition of the std::istream and std::ostream types account for being able to seek to arbitrary positions. It's up to the implementer of the stream who writes these interfaces to say whether that's legal or not.

It seems that at least in these examples I've looked at, the streaming abstractions are independent universes. You import file handles or other things into the streams to get them to conform to the interface, which is limited to just the operations given.

How do Streams map to PORT! and/or CODEC?

Codecs are simply objects with three entry points:

  • Ask if BINARY data contains magic numbers to be a recognized type
  • Translate a BINARY! into a Rebol value
  • Convert a Rebol value into a binary.

It seems any such thing would benefit from being able to stream--you may not need to load the entirety of an input into memory at once just to get the item loaded. These would fit into the "conduit" category of pipes.

PORT!s have tried to apply in many contexts of "object you talk to that takes data in and gives data out". It's framed as the way that you would talk to a SQL database, as well as a random-access file, or internet connection. These things share so little in common that the generality of "involves I/O" has been of little assistance and mostly muddied the waters.

PORT!s have acted as a source, a sink, or both. There are cases of PORT!s incorporating each other for their implementation...e.g. the TLS port instantiates a TCP port and controls it. And the HTTP port instantiates either a TLS or TCP port depending on whether it's http or https. But these embeddings are very manual (and not well understood, at least not by me). They use each other in their implementations but there is no general architecture for such reuse. So nothing is being "wired" together.

When we look at questions like wanting to expose CALL's functionality, then CALL wants us to hook it up to a source and two sinks. One PORT! doesn't give us that. Should it be three?

One piece of good news in all the questions is that stackless is offering hope for more experimentation with chains of processing, that can retain their memory/state and yield information across the pipe. I've shown one primordial example with channels, and if things can solidify a bit more then it may be a good way to research seeing if some of these streaming features can be exposed.

1 Like

More of a stream of consciousness than considered response:

Ports

It's really a way of abstracting outside sources in a way that they can be manipulated by the standard suite of Rebol actors. That you could define how the actors behaved on a per-scheme basis gave you 'intuitive' polymorphic functionality if not consistent.

I'm still grappling with how this works with regard to the MySQL scheme where you'd have two ports: one based on the MySQL scheme with child based on the TCP scheme: the TCP port is given an awake function that directs traffic and the MySQL scheme has one that interprets and dispatches the resultant data and ultimately controls the TCP port.

You're spot on that no standard exists for how this should work, so each and every implementation is somewhat reinventing the wheel.

I'll ruminate on the following: I'd expect ports based on HTTP and MySQL schemes to behave differently. However, both are candidates for streaming blobs of data—HTTP might give you blobs of text or binary based on the MIME type after having negotiated the header; MySQL might give you column names, and row/column values as they arrived. The nature of the transport is slightly different—certainly the specifics of each protocol, but the concept is more or less the same: header, content, header, content, rinse, repeat. How do you address codifying the commonalities in TCP handling while managing the differences—where would that happen? And then could that be applied to other protocols/schemes: IMAP, SMTP, etc...

Then there's the high-level interface—what do you expect when you READ an HTTP port? I have a FSM-based HTML that behaves per the HTML5 standard and is a candidate for how to hand HTML as a streamed media, but there's so much that can come down that pipe and what to do with the header(s)? How do you represent a query to MySQL e.g. select mydb [dialected query] (questionably maps well to the spirit of the SELECT function, however demands a complete response and not a streamed one, thus the problem with ports may not be an implementation one, rather a language one).

It seems to me that a lot happens with the AWAKE function—particularly with ports/sub-ports based on TCP—and its relationship to the WAIT function that needs to be better documented and understood

I don't know how you could 'wire' it together better, though—a higher-level scheme is always going to be reliant on events fired by it's lower-level counterpart, right? There's a fair bit of discussion on how Ports are supposed to work on the Rebol 3 Wiki, however I think a lot of the design work was being figured out as some of the major schemes were being developed and thus even if considered somewhat complete is largely untested.

It seems to me that channels, pipes, etc. all amount to what ports wanted to be and thus are worth evaluating whether they can serve that purpose or if it is just trying to cram too much into one basket.

I would imagine that it'd be three—two low-level to handle incoming content from OUTPUT/ERROR and a higher level one that provides an interface to those channels. Might even be a fourth if, for example you want to use a command as the core for a higher-level port (like HTTP based on Curl, or some image formatting based on ImageMagick, etc.).

On Parse

I'm not really sure how Parse should work with ports/streams other than completely rewiring the keywords for that purpose and would depend on the port's underlying scheme having the capability to handle them. That would go for low-level (string) parsing on the likes of TCP/File/IO or higher-level (block) parsing of values thrown back by the likes of HTTP or MySQL or some combination of the high-level ports masquerading as low-level (like HTTP, or FTP, or TLS, or S3, or whatever else).

Sidenotes

Couple of sidenotes: sha256:// ⇒ hash:type=sha256 — don't see why it'd have to mimic a resolving URL...

...
update hashport  ; write hashport <done>
sha256: read hashport  ; only legal if finalizing instruction received?

This would seem to be another of the uses of UPDATE as was.

A further thought on HTTP streaming—there's streaming the result of a single request, and there's also using RANGE to process a resource in parts.

Could a streaming PARSE use a RANGE-aware HTTP scheme to dissect a PDF where you first get the index from the tail of the document then work back through individual chunks?