Skip to the content.

Grouping in pipes

While playing with pipes, I was wondering how I could group values in a pipe. As an exercise, I tried to create a producer that yields lists of values from a producer that yields values. This may not make sense because streaming libraries like pipes are used not to load all values in a list, but it’s just an exercise.

The first attempt was creating a pipe that calls Pipes.await multiple times and yields values as a list.

import Control.Monad (forever)
import Pipes
import qualified Pipes.Prelude as P

source :: Monad m => Producer Int m ()
source = each [1..9]

gather :: Monad m => Int -> Pipe a [a] m r
gather n = forever $ sequence (replicate n await) >>= yield

test :: Int -> IO ()
test n = runEffect $ source >-> gather n >-> P.print

The problem of this attempt was that it didn’t yield last values if there weren’t enough numbers of values. For example, test 2 doesn’t yield 9 at the end of the stream.

>>> test 2
[1,2]
[3,4]
[5,6]
[7,8]

I’d rather like it to yield [9] at the end. One solution that I came up with was putting all values into Just and appending infinite Nothing at the end of the stream, then taking only Justs from the stream.

import Control.Monad (forever)
import Data.Maybe (catMaybes)
import Pipes
import qualified Pipes.Prelude as P

source :: Monad m => Producer Int m ()
source = each [1..9]

test :: Int -> IO ()
test n = runEffect $ (source >-> P.map Just >> forever (yield Nothing)) >->
                     (forever $ sequence (replicate n await) >>= yield) >->
                     P.map catMaybes >->
                     P.takeWhile (not . null) >->
                     P.print

This worked pretty well.

Next, since it seemed impossible to implement this as a pipe, I tried to create a function from Producer a m r to Producer [a] m r.

import Control.Monad (unless)
import Pipes
import qualified Pipes.Prelude as P

source :: Monad m => Producer Int m ()
source = each [1..9]

gather :: Monad m => Int -> Producer a m r -> Producer [a] m r
gather n producer = go n producer []
  where
    go 0 producer l = do
        yield $ reverse l
        go n producer []
    go m producer l = do
        x <- lift $ next producer
        case x of
            Left r -> do
                unless (null l) $
                    yield $ reverse l
                return r
            Right (v, producer') -> go (m - 1) producer' (v:l)

test :: Int -> IO ()
test n = runEffect $ gather n source >-> P.print

gather retrieves each value from the producer and yields a list of values when it retrieves a specified number of values, or it reaches at the end of the producer.

Then I thought it was easier if I used pipes-parse.

import Control.Monad (unless)
import Data.Functor ((<$>))
import Data.Maybe (catMaybes)
import Pipes
import Pipes.Parse
import qualified Pipes.Prelude as P

source :: Monad m => Producer Int m ()
source = each [1..9]

gather :: (Functor m, Monad m) => Int -> Producer a m r -> Producer [a] m ()
gather n producer = do
    (l, producer') <- lift $ runStateT parser producer
    unless (null l) $ do
        yield l
        gather n producer'
  where
    parser = catMaybes <$> sequence (replicate n draw)

test :: Int -> IO ()
test n = runEffect $ gather n source >-> P.print

This is basically the same way as the previous attempt, but uses pipes-parse to retrieve values from the producer. With pipes-parse, we don’t need to retrieve values one by one, but we no longer be able to return r and always return ().

With pipes-parse, you can also zoom to a producer split by Pipes.Parse.splitAt. If you split the producer, you can get all values using Pipes.Parse.drawAll instead of calling Pipes.Parse.draw specified times, because now that the split producer only yields the specified number of values.

import Control.Monad (unless)
import Lens.Family2.State.Strict (zoom)
import Pipes
import Pipes.Parse
import qualified Pipes.Prelude as P
import Prelude hiding (splitAt)

source :: Monad m => Producer Int m ()
source = each [1..9]

gather :: Monad m => Int -> Producer a m r -> Producer [a] m ()
gather n producer = do
    (l, producer') <- lift $ runStateT parser producer
    unless (null l) $ do
        yield l
        gather n producer'
  where
    parser = zoom (splitAt n) drawAll

test :: Int -> IO ()
test n = runEffect $ gather n source >-> P.print

Another approach was to use pipes-group. Once you write a function that retrieves all values from a producer and yields them as a list, you can apply to a producer split by Pipes.Group.chunksOf using Pipes.Group.maps.

import Lens.Family2 (view)
import Pipes
import Pipes.Group
import qualified Pipes.Prelude as P

source :: Monad m => Producer Int m ()
source = each [1..9]

gather :: Monad m => Int -> Producer a m r -> Producer [a] m r
gather n = concats . maps toListProducer . view (chunksOf n)

toListProducer :: Monad m => Producer a m r -> Producer [a] m r
toListProducer producer = go producer []
  where
    go producer l = do
        x <- lift $ next producer
        case x of
            Left r -> do
                yield $ reverse l
                return r
            Right (v, producer') -> go producer' (v:l)

test :: Int -> IO ()
test n = runEffect $ gather n source >-> P.print

The problem is that there seems no way to write toListProducer in a smart manner. Note that we cannot use the following definition, because it always returns (). maps needs a function from a producer to a producer which can return an arbitrary r.

toListProducer' :: Monad m => Producer a m () -> Producer [a] m ()
toListProducer' p = lift (P.toListM p) >>= yield