Produce, Adapt, Consume… Concurrent Data processing – DEVELOPPARADISE
17/07/2018

Produce, Adapt, Consume… Concurrent Data processing

Introduction

We have a long way to go, thus, in order to quickly set the stage, we consider following psuedo code we often encounter:

   List<T> data = CreateData(...) 
   //where T is some known datatype, 
   //     CreateData is some function which returns a collection of instances of T
   
   ProcessList(data)
   //where ProcessList performs required processing on the generated data

As per above example, we declare, “CreateData” as our Data-Producer (or simply Producer) and “ProcessList” as our Data-Consumer (or simply Consumer). Then we consider following description of our goal:

Our Goal:

Given two (2) pieces of codes, one generates data (thus, calling producer) and another performs data-procesing (thus, calling consumer) and assuming that each data instance can be processed independently; we are interested in designing a generic mechanism that leverage data-parallelism, enables concurrent data-processing, and at the same time hides associated thread-synchronization intricacies and offers a simplified API.

Now after the goal is announced, we need to decide on the approach and for that when we look into the literature, we take inspiration from long known concept of Producer-consumer (one can find information of its generic case in this Wikipedia article). We would like to adapt this concept, to distribute workload (recieved from producers) among consumers while letting all the entities (all producers/consumers) running CONCURRENTly. During our discussion, we will go through, step-by-step, a possible implementation of it, i.e. to have our producer(s) running independently, yet in harmony, of our consumer(s). We will see how to create such a concert (here onward, calling it Pipeline) while satisfying following design requirements:

  • Buffer Size:
    • fixed-sized
    • unbounded
  • Chain characteristics:
    • lossless (every item that is produced is consumed) Vs lossy (produced items are discarded when buffer is full)
    • uninterrupted (once started, pipeline continues until the last item is consumed) Vs interruptible (ability to destroy the pipeline at anytime during its life-cycle)
    • concordant (producer/consumer shares exactly same datatype) Vs discordant (produced items requires some sort of possible data transformation to match consumable item’s datatype)
    • attached (all producers are known at compile-time and bound to the chain) Vs detached (producers, perhaps ephemeral, appear, possibly in parallel, at run-time) : Implementation of these two cases we will discuss separately

Although the original producer-consumer problem considered only the CONCORDANT case (third (3rd) point among above characteristics). Nevertheless, thanks to GoF’s famous Adapter design pattern (see GoF design patterns), we would like to extend the idea, while stretching the original philosophy of the design pattern, in order to create a data oriented adapter (here onwards simply calling it adapter unless specified otherwise), in order to create such a pipeline among given discordant producer/consumers (given a condition we identify an adapter). The point of interest in doing so is to maintain separation of concerns, and, thus, to achieve simplified pipeline with detached data transformation logic.

Why ".Pipe"? To understand it, for a moment, imagine the date-flow and think of a simplistic view of data originating at producer and absorbed by consumer. Think of, as if there are two person (P and C) and P is handing over whatever he got to C. With this, I think of UNIX. As, with UNIX terminal when we want to create such a chain of actions (passing data between commands); it exactly lets us do that, thanks to famous "|" (a.k.a. pipe) syntax (see some examples here), and, so I thought of this name.

The Why?

When we talk about why we need such an implementation, we need to consider several factors:

  • along with order of complexity (the big O notation), latency is also an important factor in production quality code
  • parallel computing has become a norm in industry and in several scenarios can help reduce latencies
  • with the advancement in technology, newer frameworks/libraries/packages provide better tools for concurrent programming such as better thread-pool management, lighter substitutes to threads (e.g. task, fiber, coroutines) to lower waste cycles due to thread context switching

Now, talking about our pipeline, lets consider a task at hand (perhaps trivial):

 

Assume, we need to parse a file, lets say CSV for simplicity sake, that contains some considerable number of records (i.e. rows). Further assume that we need to store these records to some database; without any additional computation on the data.

Here we observe two (2) distinct and uncorrelated sub-tasks: read the file (producer) & save data in database (consumer). Now, consider two(2) classic (non-concurrent) implementation approaches:

  1. Read full file -> make objects’ list -> push the list to db (lets call it Approach1)
    • approach looks good but ignores the memory requirements to hold the list
    • bigger the list size, higher the latency due to the fact that consumer will wait longer to receive the list
    • total latency would be : file_handling_time + db_transaction_time  => assuming data transfer time between producer and consumer is negligible
  2. read single line -> push the object to db -> repeat until end of file (lets call it Approach2)
    • improves on memory but performs multiple db transaction that can push latency off the charts => normally, bulk inserts are cheaper
    • at a given point in time either producer is working or consumer is working
    • total latency would be : n x (single_record_handling_time + db_transaction_time) => where n is total number of records in file and assuming data transfer time between producer and consumer is negligible)

Most importantly, both approaches ignore the fact that both, file and database operations, are I/O operations, and, given even a single core processor, concurrency can be achieved through thread-interleaving thanks to non-blocking I/O. It is also possible to design yet another balance approach where instead of pushing single record to db, we will push some fixed-size (chunk) lists to database. However, as we describe about our pipeline approach next (below), one can see that it remains less attractive in term of performance.

Assuming that we have our .Pipe implementation available. We can design a producer method (reading the file) and consumer method (making db transaction), we can simple write above code as: producer.Pipe(consumers) (lets call it Approach3)

  • producer will create several lists (pre-defined size) while reading the file => several list of smaller size (i.e. chunks). Chunk size can be adjusted to have optimal bulk insert, let say.
  • consumer will take each list (chunk) and push it to DB => we can span many consumers as each push is independent
  • our glue code of .Pipe will facilitate the channeling of chunks (lists) from producer to consumers => assuming this data transfer time is negligible and buffer is unbounded
  • total latency would be : file_handling_time + k x chunk_db_transaction_time  => where k = 1/c x (total_chunk_count – chunk_pushed_during_file_operation) and c = total_consumer_count (assuming degradation in db performance due to parallel push is negligible)

With such an approach we make following significant observations:

  1. As a benefit of concurrency between producer and consumer: we are able to consume data (in this case, push it to db) while producer hasn’t finished his work (in this case, reading file)
  2. As a benefit of concurrency among consumers: we are able to reduce the end-to-end latency (in this case, by a factor of 1/c where c is count of consumers)
  3. Thus, speaking theoritically, we can add total of n (where n ~ total_records / chunck_size) consumers in our pipeline to obtain minimal latency ~ file_handling_time + chunk_db_transaction_time

In general, as total number of records increases, we would notice (left-most is lowest and right-most is highest): (lower the better)

  • memory(Approach2) < memory(Approach3) < memory(Approach1
  • latency(Approach3) < latency(Approach1) < latency(Approach2)

Thus, perhaps, it might be safe to say that our concurrent pipeline approach is a balanced approach. Thus, the why.

However, before we discuss implementation, we need to consider/make following limitations/assumptions:

  • Presence of more than 1 consumer is to achieve concurrency benefits. This assumption is important as our design is different that broadcasting (see here). In our approach, each produced item will be consumed (accepted/treated/processed) by one and only one consumer among all available consumers.
  • Although producers can take a different approach to create an item (e.g. one producer fetching records from DB, another from file, yet another receiving web-requests etc); yet, those are obliged to produce item of same datatype to be part of the pipeline. This assumption is very important as pipeline design must remain open to disparate producer channels as long as produced items are of same type.
  • Consumers cannot be added or removed from pipeline once it is constructed.
  • Implementation must remain generic, i.e. it should not make any assumption about the behavior of producer/consumer.
  • In both fixed-size and unbounded buffer case, pipeline should support infinite number of producers and consumers, theoretically.
  • In interruptible mode, pipeline will be destroyed once interrupted, thus, all unprocessed data with it.
  • In attached mode, producers cannot be added or removed from pipeline once it is constructed.
  • In detached mode, pipeline should not make any assumption about the life-cycle or count of producers. It must be open to accept items (pre-defined type) from any producer (ephemeral or long-running) during its life-cycle.
  • In discordant mode, given an adapter, pipeline construction must be possible.

About Implementation

The idea of producer-consumer is actually language neutral and can be developed in several programming languages. However, to achieve our goal, we opt to implement it in C# .Net Framework 4.6.1 while leveraging several TPL features (especially async-await) and inherent language capability to create extension methods. If you are interested to consume this implementation. Based on your choice of language you may achieve different usage forms.

During our discussion, we have provided a lot of comments along with C#.Net code snippets and added some amusing images (showing conversation among entities). Even if you feel uncomfortable with .Net syntax, do NOT be worried, you would be able to get the essential while reading this article and would be able to implement it in the language of your choice.

There are no Dinosaurs!

During my college days, I always asked myself, every time I took the operating system book in my hand, why the dinosaurs? (unfortunately, I cannot find the original cover but this picture should do for the moment) And, I used to cajole myself that the book is not as terrifying as Jurassic Park of Steven Spielberg. I still wonder, sometimes, was it to symbolize operating system as gigantic/fascinating/stupefying as dinosaur or was it just to overwhelm a sophomore. Nonetheless, it is NEITHER the right time NOR the subject of our discussion, thus, whatever the case, during this discussion there are no dinosaurs and we will try our best to keep things simple.

Creating Interfaces (Contracts)

To begin with, lets have a look at following simple picture to understand a few of our design choices and more importantly what we are actually trying to build:

Produce, Adapt, Consume... Concurrent Data processing

So based on above picture, we want:

  1. to standarderize the way producer will procure the buffer and add items in it, in ISOLATION, i.e. unaware of the presence of other producers or consumers.
  2. to standarderize the way consumer can retrieve those populated items from the buffer and perform required processing, in ISOLATION, i.e. unaware of the presence of other consumers or producers.
  3. have a buffer that can handle those concurrent operations.

In order to design our solution, we would like to focus on the buffer as it is going to be the central piece of our solution; and, its implementation is going to be impacted by the producer side requirements as well as of consumer, plus we should not forget that we need to diffuse all the features to our design. Thus, in order NOT to complicate the discussion with everything explained in a single silo based proposed solution, we further sub-divide the discussion into several smaller pieces as follow:

1. Our Producer and Buffer

As our solution is producer agnostic, i.e. we do not know how exactly the producer would produce an item (i.e. the actual producer implementation). In this case, we can only define a generic signature of it and thus our producer can be defined as simple as following delegate:

//NOTE: Some explanation are provided as comments

public delegate Task ProduceAsync<TP>(IProducerBuffer<TP> buffer, CancellationToken token);

//accepts buffer and cancellation token as inputs and returns a Task
//   where TP is the datatype of item produced by producer
//   and IProducerBuffer is an interface to our Buffer implementation
//we add CancellationToken as an input parameter in order to support 
//       interruptible pipeline feature
//In this way, by simply supplying CancellationToken.None to the pipeline 
//       we can create uninterruptible pipeline.

Based on above delegate signature, we can create following interface for our producer:

//IDisposable to avail Dispose method to perform resource clean-up
public interface IProducer<TP> : IDisposable
{
    //to perform some pre-processing initialization
    Task InitAsync();

    //actual data generating method
    Task ProduceAsync(IProducerBuffer<TP> buffer, CancellationToken token);
}

Now actual producer implementation can simply inherit IProducer<TP> interface. Though, we have designed how to provide buffer access to our producer, however, we haven’t yet know how to populate the buffer. Thus, our first requirement at buffer side, i.e. to have some method for population. Lets look at it:

//NOTE: Some explanation are provided as comments

public interface IProducerBuffer<T>
{
    //adds an item to the buffer
    //it blocks, if buffer is full, until the item can be added
    void Add(T item, CancellationToken token);

    //adds an item to the buffer with given millisecond timeout
    //if the item was added with in timeout period returns true else false
    bool TryAdd(T item, int millisecTimeout, CancellationToken token)

    //we add this second method to support our lossy pipeline feature
    //     millisecTimeout=0 means immediately add or discard
    //based on boolean outcome, the actual producer implementation can 
    //  decide the fate of produced yet discarded item

    //we also add CancellationToken to support cancellation based on
    //transient method token (we will see an example when we talk about
    //                        detached pipeline)
}

Thus, till now, we have producer and it’s buffer interfaces, and so, the means to add produced items to buffer. Now, lets look at the consumer side requirements next.

2. Our Consumer and Buffer

Similar to producer, our solution is also consumer agnostic (i.e. unaware of the the actual consumer implementation), thus, in a similar way we can define following consumer interface:

//IDisposable to avail Dispose method to perform resource clean-up
public interface IConsumer<TC> : IDisposable
{
    //to perform some pre-processing initialization
    Task InitAsync();

    //actual data consuming method
    Task ConsumeAsync(TC item, CancellationToken token);
}

while deciding about the consumer interface, especially the signature of ConsumeAsync, we had a choice to pass the buffer as method parameter as we did for producer. However, doing so we noticed such design:

  1. burdened the consumer implementation with boiler-plat code
  2. required delicate implementation to loop over the buffered items
  3. added further complexity for our discordant pipeline feature (to be discussed later)

thus, finally we decided to hide such complexity within the API and obtained a callable consumer. In such a way, the concrete consumer implementation shall focus on the business logic.

Though, no apparent requirement visible at consumer side, yet we can infer from point (2) above, that we need to loop over the items in order to drain the buffer. Thus, we need:

  1. a method to pop-out the item
  2. a boolean indicator to verify that all items are processed

So, we create our ConsumerBuffer interface:

public interface IConsumerBuffer<T>
{
    //true when all items are drained (producers are done producing too!)
    bool Finished { get; }

    //to retrieve an item
    //returns true when item was available within given millisecond timeout
    bool TryGet(int millisecTimeout, CancellationToken token, out T data);
    //we add millisecond timout to support a special case of our discordant pipeline feature
    //     anyway we can always pass millisecTimeout=Timeout.Infinite i.e. wait infinitely
}

3. Keeping Both Shards

Until this point, we are trying to fulfill all the requirements, and following item list quickly covers those points:

  • Buffer Size: We will control using a Ctor parameter.
  • Losslessness: controlled based on millisecondTimeout parameter of TryAdd method. (Note: Add method is similar to TryAdd(item, Timeout.Infinite))
  • Interruptibility: controlled using CancellationToken
  • Attachability: end-user controlled (we will see use-cases separately)

Now, the only remaining point is Concordance. In fact, the way we have defined our interfaces above, we have intentionally kept TP as producer type parameter and TC as consumer type parameter. Although, such different symbols (type placeholders) hardly matters in generics, nonetheless, it is to impose the idea that we will inject IDENTICAL <data-type> for both TP and TC during concordant pipeline construction and different <data-types> for discordant pipeline. Furthermore, for a rapid understanding of such a conflict, we offer following illustration:

Produce, Adapt, Consume... Concurrent Data processing

Now, we see that:

  • Producer can add item only when producer’s datatype <TP> is same as buffer’s datatype <T>
  • Consumer can drain items only when consumer’s datatype <TC> is same as buffer’s datatype <T>
  • only for a special case, that we call concordant pipeline, when all the three (3) datatypes are same i.e. <TP> = <TC> = <T> our current pipeline can work

Thus, above design will not work in case of discordant pipeline. With this idea in mind, we keep both shards of our *Buffer interface, for the moment. Even from the point of view of “abstraction“, we would be wise to NOT to expose TryGet method to producer whose only interest is to fill the buffer.

4. Plugging Adapter

To fulfill our last requirement, we need to review our Image 2 as shown above; as having different datatypes will create conflict. But, before we talk about how to overcome this limitation using adapter, let’s visualize what adapter must do logically based on below picture:

Produce, Adapt, Consume... Concurrent Data processing

Thus, if we consider provided adapter as a black box, we expect that by passing an object of type <TI> it will output an object of type <TO>. As per our requirements, thus, if we pass produced items of type <TP> and convert those into consumer’s type <TC>; our pipeline should work.

IMPORTANT: In order to remain generic, for the concordant case, when TC = TP = T (thus, TI = TO), we prepare a default IDENTITY adapter that does NOTHING i.e. it returns us back the same item that we provided as input without making any change in it. Following C# .Net code snippet roughly represents this idea:

public static TI IdentityAdapter<TI>(TI input)
{
    return input;
}

In order to plug such an adapter, we have following choices:

  1. Inject adapter between Producer and Buffer: We design buffer with type <TC> (shown in image 4)

    Produce, Adapt, Consume... Concurrent Data processing
     

  2. Inject adapter between two buffers: We introduce a second buffer and inject adapter between those, while first buffer has type <TP> and second has type <TC> (shown in image 5)

    Produce, Adapt, Consume... Concurrent Data processing
     

  3. Inject adapter between buffer and Consumer: We design buffer with type <TP> (shown in image 6)

    Produce, Adapt, Consume... Concurrent Data processing

Among given implementation choices, we opt to choose the third (3rd) option, i.e. injecting adapter between buffer and consumer, because:

  • By injecting adapter between producer and buffer, we complicate the producer implementation:
    • by demanding producer to make adapter call
    • by futher increasing the risk of mal-implementation for delicate corner-case object transformations (we will see one example of such a transformation)
    • ProduceAsync method signature will be burdened with third (3rd) parameter (adapter instance)
  • By injecting adapter between two buffers, we complicate our Pipeline implementation:
    • we need to maintain two (2) buffers
    • we need to synchronize two (2) buffer loops (buffer drainage)

By injecting adapter between buffer and consumer, we only need to maintain a single buffer (thus, single drainage loop), but also, we make a transperant call to adapter just at right time (before feeding data to consumer). And, by doing so we hide all these intricated implementation details behind our .Pipe call and offer complete seperation of concerns among producer, consumer and adapter so that all these three (3) pieces of code can evolve independently.

5. Vicious cycle of Agnosticism

Up until now, we kept our design both producer and consumer agnostic, however, in order to keep the complexity out of our discussion we assumed a naive approach to the Adapter. As shown above, we provided an object instance, of some given type, to our adapter and receive back an object instance of a well defined type. However, as we are close to finalize our interface design, we would like to get rid of this give-n-take assumption about our adapter. In fact, we desire to finalize the design as Adapter agnostic too! And, this is the only way we are sure that we have provided full liberty to the end-user to achieve the desired end result from such pipeline without hacking/patching business logic. End-user can then focus on actual logic and associated data-model without worrying about mundane technical plumbing between producer, consumer, and adapter.

To achieve such Adapter agnostic design, we propose following interfacing:

//NOTE: Some explanation are provided as comments 

public interface IDataAdapter<TP, TC>
{
    //accept the buffer and cancellation token and outs consumable object
    //   returns true until buffer is NOT empty! else false.
    bool TryGet(IConsumerBuffer<TP> buffer, CancellationToken token, out TC consumable);

    //we notice that we provide buffer containing produced object instances
    //   with IConsumerBuffer interface, thus exposing TryGet method!
    //Actual adapter implementation can then recover produced item (or several items)
    //    to construct an item of type TC
}

Now as we have defined all the three (3) key parts of the pipeline, given any task and assuming that our pipeline can be implemented, we can achieve an optimal solution by thinking in terms of these sub-components as shown below:

Produce, Adapt, Consume... Concurrent Data processing

Following itemized list summarizes the above idea:

  • First we work on optimal strategy to produce items
  • Second we finalize an optimal strategy to consume those produced items
  • If an adapter is required, we separately write the adapter otherwise we use IDENTITY adapter
  • We plug all the three (3) pieces to the pipeline

Before Getting Crazy with Code!

Until this point, we tried to use a lot of drawings to convey our ideas, but, unfortunately, now we are obliged to introduce the code, and, thus, below you will see some long code snippets. But do NOT be worried, we will add some amusing drawings to illustrate the same idea in psuedo manner; nonetheless, you must memorize below given pyramidical mind-map which is closely related to our concrete implementation:

Produce, Adapt, Consume... Concurrent Data processing

Implementing Interfaces (Contract Completion)

As we are aware that our solution is producer/consumer/adapter agnostic, thus, their respective concrete implementation is not our concern; once we have exposed our interfaces, end-user will inherit those to use in the pipeline. However, it would be nice to implement some default Adapters in order to cover some mundane use cases. Thus, in this section we will propose following implementations:

  1. Adapters:
    • Identity Adapter
    • Awaitable List Adapter
  2. Buffer
  3. Attached pipeline
  4. Detached pipeline

1.a Identity Adapter

Produce, Adapt, Consume... Concurrent Data processing

To start simple, we choose to implement identity adapter first and if we remember from above, it should just return the produced item as it is. We achieve this as follows:

//NOTE: Some explanation are provided as comments

//generic adapter satisfying TP = TC = T, buffer type:<T>
public class IdentityAdapter<T> : IDataAdapter<T, T>
{
    public bool TryGet(IConsumerBuffer<T> buffer, CancellationToken token, out T consumable)
    {
        //we just transfer the call on the buffer and return boolean
        //   status and also the object as it.

        return buffer.TryGet(Timeout.Infinite, token, out consumable);

        //NOTE: we pass INFINITE timeout on buffer, thus:
        //if all buffered items are processed AND producers are done...
        //   buffer will return false. Thus satisfying adapter boolean status.
        //else buffer will return True and out an instance of the produced object
        //   this again fulfils adapter behaviour.
    }
}

1.b Awaitable List Adapter

A word before implementation

Sometimes we encounter a case when consuming single item leads to a suboptimal solution; and processing those in group (chunks) is technically cost-effective. A few of such examples are:

  • Database bulk inserts are cheaper
  • Batch processing
  • Object array streaming … so on and so forth…

In order to handle such use cases, we have decided to implement awaitable list adapter, so that end-user is relieved and use it out of the box. The idea is to recover List<TC> on each TryGet call on the adapter as shown below in image 7.

NOTE: Now onwards, we will use words “chunk” and “list” interchangeably, i.e., unless and until specified otherwise, list would mean a subset (a part of) of whole data.

Produce, Adapt, Consume... Concurrent Data processing

As soon as we think about list, following design related options comes to mind related to TryGet method:

  • Should we always return identically sized lists?
  • Should we return variable size list, with some cap on size?
  • Should we return list without any cap on size?

For the first (1st) option, given the fact that it might NOT be possible to generate identically sized list (consider if we have total of 103 items and we fixed the list size to be 10, then the last list will contain ONLY 3 items instead of 10); yet, we choose to implement it based on belief that consumer logic is indifferent to the size of the chunk (and it should!) and the whole idea behind consuming chunks (instead of single instance) is to reduce associated technical latency.

The second (2nd) option is a generalized case of the first option, so we will implemented it but with some assumptions. These assumptions we will underline when we describe our implementation details.

We choose to opt out the third (3rd) option because it again questions the usefulness of spanning multiple consumers. Lets rethink that if we are able to supply unbounded lists to consumers then perhaps we are able to supply available items to a single consumer alone irrespective of the fact whether consumer has a capacity to handle such a list or not; then why to span other consumers concurrently? Thus, we observe that our design is going astray (based on our pre-decided goal).

NOTE: Perhaps due to our myopic vision, we dropped implementing third (3rd) option. Still, not to forget that our pipeline is Adapter agnostic, thus, end-user can always construct their own version of Adapter and plug it in.

What’s the BIG idea; ain’t it simply List Adapter?

The short answer is: No. it isn’t!

If you have followed us till now, perhaps you might got an impression that this adapter is all about creating a list, then why we call it “Awaitable” list adapter? Ain’t it as simple as spanning a loop to producea list? If you have got similar thoughts, then we assure you that its more than that; for the simplest fact that items we want to iterate might not be promptly awailable. In fact, to elaborate further let’s consider following below listed arguments:

  • let’s assume, the moment adapter’s TryGet method was called, the buffer was empty and producers were busy creating object instance, thus, soon there will be some items in the buffer but for the moment we need to wait (sleep)
  • the actual questions are:
    • how much should our thread sleep?
    • what if after the wait buffer is still empty, i.e. producers not yet finished populating the buffer? Should we sleep again, then how much?
    • lets say even if we came up with a very clever waiting algorithm, what about the case when producer populates the buffer just after we decided to sleep? (remember everything is running concurrently, so we have no control over the timings of those events!)
    • should we also design thread wake-up mechanism?
  • even if we decided not to wait and come out of the TryGet call, we do NOT escape from this conundrum. And, all above listed questions fall back at caller level (i.e. the code which called TryGet at the first place).
  • another question that comes to mind is what if user does NOT want to wait too much before he can consume the chunk, i.e., what if user wants to consume available items without waiting for the future items to be accumulated. (perhaps his goals are time-sensitive, e.g. writing logs to files, pushing rows to DB, processing batch items etc…)

One thing is certain that if we want to reduce on latency (as a part of our goal) we need to have a some kind of notification when the items arrives in the buffer, while our thread is asleep. Similar suggestion can also be found in the original producer-consumer problem. Now, of course we do not want to build such a mechanism inside the adapter else it would fail the whole purpose (imagine, everytime end-user/we write an adapter we need to write a separate notification mechanism). Nonetheless, if we look into the literature of producer-consumer, we already know that producer is capable of providing such a signal (at the time of adding item in the buffer). Thus, considering both the perspectives, for the moment we assume that buffer is capable of such notification.

Based on above discussion, we got following insights on buffer behaviour (we would use it during buffer’s TryGet implementation):

  • If buffer is empty, then within a given timeout period, if an element get populated, it shall come out of sleep as soon as possible (without waiting for the whole duration of sleep) and out the element (with true as boolean return value)
  • If buffer has elements, then irrespective of timeout value, it should immediately out an element (and true as boolean return value)
  • Buffer must be able to capture the production_finished signal, and then, once all the buffered items are consumed, every subsequent TryGet call would result in false boolean return value (out as null/default of type).

For the moment, we can safely assume that if we pass INFINITE timeout to buffer.TryGet method, then buffer we return us an item as soon as it gets added. This resolves one of our concerns, but, we still need to work on both fixed-size list and variable-size list preparation.

Constraints/Assumptions

While implementing Awaitable List Adapter, we keep following important points in mind:

  1. We can ALWAYS wait on buffer with INFINITE timeout. If it has elements, it should promptly return one else it should return one as soon as possible.
  2. No end-user is interested in consuming empty lists, i.e. list without any item in it. Thus, we only need to supply lists when it has AT LEAST one (1) element in it.
  3. End-user decides the size of the list as he is aware of system capabilities and his requirements.
  4. When end-user runs the pipeline to have FIXED size lists (as shown in Image 8):
    • One is aware that we might need to wait longer to populate the list if some/every buffer.TryGet ends up waiting for an item. Thus, one is INDIFFERENT to time taken to prepare such a list.
    • One is more interested in getting full sized list as it is advantageous based on his pipeline strategy.
    • One is aware that the last chunk might be partial (as discussed above). But his consumer can handle it (1 <= last_chunk_length <= length_of_full_size_list).
    • Thus, we can say he has infinite timeout but a preference for the size of the list.

      Produce, Adapt, Consume... Concurrent Data processing
       

  5. When end-user runs the pipeline to have Fixed duration lists (or variable sized) (as shown in Image 9):
    • One prefers to consume something within given time limit than to wait longer to have the list fully populated. Thus, one is time-bounded.
    • One is aware that every chunk might be of different size and his consumer can handle it (1 <= chunk_size <= max_length)
    • One is aware that he might need to wait longer to have the first (1st) item of the list if buffer.TryGet ends up waiting for the first (1st) item
    • Thus, we can say he has preferences for timeout duration (once the first item is received) and for maximum size of the list.

      Produce, Adapt, Consume... Concurrent Data processing
       

NOTE: We discuss some possible use-cases of these adapters separately (below) in the article.

Implementation

Produce, Adapt, Consume... Concurrent Data processing

Based on our constraints/assumptions, we have two (2) parameters to deal with: 1) List size and 2) Timeout period. And we already know if timeout=Infinite, then we are outputting fixed-size list else variable-sized list. Let’s look at the code then:

//NOTE: Some explanation are provided as comments

//generic adapter satisfying TP = T and TC = List<T>, buffer type:<T>
public class AwaitableListAdapter<T> : IDataAdapter<T, List<T>>
{
	private readonly int _millisecTimeout;
	private readonly int _maxListSize;

    //we ask user for timeout value and list size
    //if millisecTimeout = Timeout.Infinite
    //   we will construct fixed size chunks with size = maxListSize
    //otherwise,
    //   we will contruct variable sized chunks with max. size = maxListSize
	public AwaitableListAdapter(int maxListSize, int millisecTimeout)
	{
		_millisecTimeout = millisecTimeout;
		_maxListSize = maxListSize;
	}

	public bool TryGet(IConsumerBuffer<T> buffer, CancellationToken token, 
        out List<T> consumable)
	{
		consumable = default(List<T>);
		
        //NOTE: From our discussion, no user wants to consume 0-length list
        //      so we wait with INFINITE timeout, i.e.
        //      if item is already in buffer, we promptly receive it
        //      else we wait until an item is available... so we are good!

        if (!buffer.TryGet(Timeout.Infinite, token, out var value)) return false;

        //init list WITH first item
		consumable = new List<T>(_maxListSize) {value};

        //we choose what kind of list is required based on timeout.		
        return _millisecTimeout == Timeout.Infinite
			? TryFillFixedSize(buffer, token, consumable)
			: TryFillFixedDurationChunk(buffer, token, consumable);
	}

	private bool TryFillFixedSize(IConsumerBuffer<T> buffer, CancellationToken token,
		List<T> consumable)
	{
        //We loop until we fill the list

		while (consumable.Count < _maxListSize)
		{
            //we always wait for INFINITE time to be sure of having an item
            //     except
            // we are left with no item and production is over!

			if (buffer.TryGet(Timeout.Infinite, token, out var value))
			{
				consumable.Add(value);
			}
            else return true;
		} 
        return true;

        //our list already has at least 1 item, so we return TRUE!
	}

	private bool TryFillFixedDurationChunk(IConsumerBuffer<T> buffer, CancellationToken token,
		List<T> consumable)
	{
		var timeRemains = _millisecTimeout;
		var sw = Stopwatch.StartNew();

        //using stopwatch we can measure elapsed time

		while (consumable.Count < _maxListSize)
		{
            //and we loop until 
            //     1. chunk is not full
            //     2. we receive item with-in remaining time

			if (buffer.TryGet(timeRemains, token, out var value))
			{
				consumable.Add(value);

				if (timeRemains != 0)
				{
                    //IMPORTANT:
                    //we put a lower limit to zero coz:
                    //   1. of course, we can't wait with -ve time
                    //   2. but we want to keep looping even if given timeout has over
                    //      and we can still recover items from buffer
                    //      indeed, with timeout=0, we either promptly receive an item
                    //      or buffer returns FALSE.
                    //      this way we can always be able to provide larger chunk
                    //      when possible
                    //      hence the IF does NOT has ELSE with break/return
                    //      but the OUTER IF does has!

					timeRemains = (int) Math.Max(0, _millisecTimeout - sw.ElapsedMilliseconds);
				}
			}
			else return true;
		}
		return true;

        //our list already has at least 1 item, so we return TRUE!
	}
}

2. Implementing Buffer

At this point, we already have buffer interface and behavior based implementation requirements. Using below code written snippet we achieve these requirements:

//NOTE: Some explanation are provided as comments

//we implement both interface
public class PpcBuffer<T> : IProducerBuffer<T>, IConsumerBuffer<T>
{
    private readonly CancellationToken _token;
    private BlockingCollection<T> _collection;

    public PpcBuffer(int bufferSize, CancellationToken token)
    {
        //we say 0 represents unbounded buffer
        _collection = bufferSize.Equals(0) ? new BlockingCollection<T>()
                : new BlockingCollection<T>(bufferSize);
        _token = token;
    }

    //IProducerBuffer<T> IMPLEMENTATION >>>>>>>>>

    public void Add(T item, CancellationToken token)
    {
        //Add should wait even if buffer is FULL, so we
        //simply call TryAdd with INFINITE timeout

        TryAdd(item, Timeout.Infinite, token);
    }

    public bool TryAdd(T item, int millisecTimeout, CancellationToken token)
    {
        //either blocking collection will add it with in timeout
        //   or return false... so our requirement is satisfied
        //when timeout is INFINITE this method would either
        //   finish with item being added or in an exception
        //         1. when either of cancellation token is canceled
        //         2. buffer is closed
        //   so again we satisfy our requirements.

        using (var mergeToken = CancellationTokenSource.CreateLinkedTokenSource(token, _token))
        {
           return _collection.TryAdd(item, millisecTimeout, mergeToken.Token);
        } 
    }

    //IConsumerBuffer<T> IMPLEMENTATION >>>>>>>>>

    public bool TryGet(int millisecTimeout, CancellationToken token, out T data)
    {
        //we do not create merge token, as user should be able to
        //extract queued items once pipeline is closed for addition.
        return _collection.TryTake(out data, millisecTimeout, token);
    }

    //shows together both... closed for adding and empty... so we are good.
    public bool Finished => _collection.IsCompleted;

    //we implement CloseForAdding method to support implementation of 
    //  detached mode >>>>>>>

    public void CloseForAdding()
    {
        _collection.CompleteAdding();
    }
}

With such an implementation we are able to cover all the requirements as discussed above. Now, all that remains is the plumbing of these individual artifacts. And, so we do separately for both attached and detached pipeline below.

3. Attached Pipeline

As we have discussed, attached pipeline mode has following characteristics:

  • Consumers cannot be added or removed from pipeline once it is constructed.
  • Producers cannot be added or removed from pipeline once it is constructed.
  • Pipeline can be formed in both way: Concordant and Discordant
Raw Implementation

As our interest is to create the form producers.Pipe(consumers), we first need to device a raw implementation as fabricating final form would be just a matter of creating an extension method. We will create this method separately. Approach of our raw implementation would revolve around following idea:

  1. Run all producers indepedently as async methods
  2. Run all consumers independently as async methods
  3. Feed Adapter transformed items to consumers
  4. Observer producers as they completes production
  5. Signal buffer once all producers are done
  6. Dispose producers as they finish their work
  7. Let consumers consume finish all remaining items
  8. Dispose consumers

NOTE: We have used one of home-made extension methods to span and await on tasks (for both producers/consumers) :

  • Signature: static Task WhenAll(this Func<int, CancellationToken, Task> func, int repeatCount, CancellationToken token = default(CancellationToken))
  • Implementation details: please see here

Produce, Adapt, Consume... Concurrent Data processing

Following approach implements all above listed steps:

//NOTE: Some explanation are provided as comments

//We hide the implementation inside INTERNAL class to
//expose it through extension method
internal static class PipeImpl<TP, TC>
{
    public static Task Execute(CancellationToken token, 
        int bufferSize, 
        IDataAdapter<TP, TC> adapter,
        IReadOnlyList<IProducer<TP>> producers, 
        IReadOnlyList<IConsumer<TC>> consumers)
    {
        //instead of using await, we decided to create a new Task
        //so that caller func can await as per its convenience

        return Task.Run(async () =>
        {
            using (var localCts = new CancellationTokenSource())
            {
                using (var combinedCts = CancellationTokenSource
                    .CreateLinkedTokenSource(token, localCts.Token))
                {
                    //creating buffer as per required size
                    using (var ppcBuffer = new PpcBuffer<TP>(bufferSize, 
                                                      combinedCts.Token))
                    {
                        //span consumers
                        var rc = RunConsumers(consumers, ppcBuffer, adapter, 
                                              combinedCts.Token, localCts);
                        //span producers
                        var rp = RunProducers(producers, ppcBuffer, 
                                              combinedCts.Token, localCts);

                        //wait until all consumers and producers finish
                        await Task.WhenAll(rc, rp).ConfigureAwait(false);
                    }
                }
            }
        });
    }
    
    internal static Task RunConsumers(IReadOnlyList<IConsumer<TC>> consumers,
        IConsumerBuffer<TP> feed, IDataAdapter<TP, TC> adapter,
        CancellationToken token, CancellationTokenSource tokenSrc)
    {
        //following line span all consumers (RunConsumer method) in the list
        //as separate task

        return new Func<int, CancellationToken, Task>(async (i, t) =>
                await RunConsumer(consumers[i], feed, adapter, t, tokenSrc)
                                .ConfigureAwait(false))
            .WhenAll(consumers.Count, token);
        //our home-made WHENALL line waits on all created tasks
        // (i.e. waits on all consumer to finish)
    }

    private static async Task RunConsumer(IConsumer<TC> parallelConsumer,
        IConsumerBuffer<TP> feed, IDataAdapter<TP, TC> adapter,
        CancellationToken token, CancellationTokenSource tokenSrc)
    {
        try
        {
            //this would dispose the consumer once we have nothing left
            //to consume
            using (parallelConsumer)
            {
                //init consumers
                await parallelConsumer.InitAsync().ConfigureAwait(false);
                token.ThrowIfCancellationRequested();
          
                //we loop until adapter is capable to create a consumable
                //   instance
                while (adapter.TryGet(feed, token, out var consumable))
                {
                    //we feed the item to consumer and wait before
                    // supplying another item.          
                    await parallelConsumer.ConsumeAsync(consumable, token)
                                          .ConfigureAwait(false);
                }
            }
        }
        catch
        {
            //in case producer ends up in error
            // we cancel the token so that producer can intercept it
            if (!token.IsCancellationRequested) tokenSrc.Cancel();
            throw;
        }
    }

    private static Task RunProducers(IReadOnlyList<IProducer<TP>> producers,
        PpcBuffer<TP> buffer, CancellationToken token,
        CancellationTokenSource tokenSrc)
    {
        return Task.Run(async () =>
        {
            try
            {
                //following line span all consumers (RunProducer method) in the list
                //as separate task

                await new Func<int, CancellationToken, Task>(async (i, t) =>
                        await RunProducer(producers[i], buffer, t, tokenSrc)
                                        .ConfigureAwait(false))
                    .WhenAll(producers.Count, token).ConfigureAwait(false);
                //our home-made WHENALL line waits on all created tasks
                // (i.e. waits on all producer to finish)
            }
            finally
            {
                //>>>>> IMPORTANT: No matter whether producers finishes normally
                //                 or ends-up in error
                //                 we close the buffer
                buffer.CloseForAdding();
            }
        });
    }

    private static async Task RunProducer(IProducer<TP> parallelProducer,
        IProducerBuffer<TP> feed, CancellationToken token,
        CancellationTokenSource tokenSrc)
    {
        try
        {
            //this would dispose the producer once we have nothing left
            //to produce
            using (parallelProducer)
            {
                //initalize producer
                await parallelProducer.InitAsync().ConfigureAwait(false);
                token.ThrowIfCancellationRequested();
                
                //we provide our buffer to producer
                //it will be producer responcibility to populate it
                //    and return from it once there is nothing left
                //    to produce.
                await parallelProducer.ProduceAsync(feed, token)
                                      .ConfigureAwait(false);
            }
        }
        catch
        {
            //in case producer ends up in error
            // we cancel the token so that consumer can intercept it
            if (!token.IsCancellationRequested) tokenSrc.Cancel();
            throw;
        }
    }
}
Achieving .Pipe usage form (Syntactic Sugar)

We have all the ingredients to cook our extension methods and we propose following four (4) such methods to achieve different pipelines as we had discussed above:

  1. Concordant Pipeline: Producer type matches consumer type (i.e. <TP> = <TC>). Normally end-user need to inject IDENTITY adapter to it, but we can implicitly do it inside our method as shown below:
    //IMPLEMENTATION
    public static Task Pipe<T>(this IReadOnlyList<IProducer<T>> producers,
                                    IReadOnlyList<IConsumer<T>> consumers,
                                    CancellationToken token = default(CancellationToken),
                                    int bufferSize = 256)
    {
        return PipeImpl<T, T>.Execute(token, bufferSize, 
                                      new IdentityAdapter<T>(), 
                                      producers, consumers);
    }
    
    //=========================
    //========= USAGE==========
    //=========================
    //var producers = new IProducer<T>[]{ producer1, ..., producerN };
    //var consumers = new IConsumer<T>[]{ consumer1, ..., consumerM };
    //await producers.Pipe(consumers);
    
    //producer1, ..., producerN denotes comma separated N producer instances
    //consumer1, ..., consumerM denotes comma separated M consumer instances
  2. Discordent Pipeline with FIXED SIZE chunk: If Producer type is <T> then consumer type is List<T> and end-user seek fixed sized chunks. Normally, one needs to inject Awaitable List adapter to it, but we can implicitly do it inside our method as shown below:
    //IMPLEMENTATION
    public static Task Pipe<T>(this IReadOnlyList<IProducer<T>> producers,
                                    IReadOnlyList<IConsumer<List<T>>> consumers,
                                    int listSize,
                                    CancellationToken token = default(CancellationToken),
                                    int bufferSize = 256)
    {
        //timeout is INFINITE, we will get FIXED-size chunks
        return PipeImpl<T, List<T>>.Execute(token, bufferSize, 
                                  new AwaitableListAdapter<T>(listSize, Timeout.Infinite),
                                  producers, consumers);
    }
    
    //=========================
    //========= USAGE==========
    //=========================
    //var producers = new IProducer<T>[]{ producer1, ..., producerN };
    //var consumers = new IConsumer<List<T>>[]{ consumer1, ..., consumerM };
    //await producers.Pipe(consumers, some_positive_int_for_chunk_size);
    
    //producer1, ..., producerN denotes comma separated N producer instances
    //consumer1, ..., consumerM denotes comma separated M consumer instances
  3. Discordent Pipeline with FIXED DURATION chunk: If Producer type is <T> then consumer type is List<T> and end-user seek variable sized chunks created using fixed duration. Normally, one needs to inject Awaitable List adapter to it, but we can implicitly do it inside our method as shown below:
    //IMPLEMENTATION
    public static Task Pipe<T>(this IReadOnlyList<IProducer<T>> producers,
                                    IReadOnlyList<IConsumer<List<T>>> consumers,
                                    int listMaxSize,
                                    int millisecondTimeout,
                                    CancellationToken token = default(CancellationToken),
                                    int bufferSize = 256)
    {
        //timeout and MAX list size passed to adapter to avail chunks
        return PipeImpl<T, List<T>>.Execute(token, bufferSize, 
                                  new AwaitableListAdapter<T>(listMaxSize, millisecondTimeout),
                                  producers, consumers);
    }
    
    //=========================
    //========= USAGE==========
    //=========================
    //var producers = new IProducer<T>[]{ producer1, ..., producerN };
    //var consumers = new IConsumer<List<T>>[]{ consumer1, ..., consumerM }; 
    //await producers.Pipe(consumers, some_positive_int_for_max_chunk_size,
    //                                some_positive_int_for_timeout);
    
    //producer1, ..., producerN denotes comma separated N producer instances
    //consumer1, ..., consumerM denotes comma separated M consumer instances
  4. Generic Pipeline: Producer type is <TP> and consumer type is <TC> and IDataAdapter<TP, TC> implementation is available to end-user.
    //IMPLEMENTATION
    public static Task Pipe<TP, TC>(this IReadOnlyList<IProducer<TP>> producers,
                                         IReadOnlyList<IConsumer<TC>> consumers,
                                         IDataAdapter<TP, TC> adapter,
                                         CancellationToken token = default(CancellationToken),
                                         int bufferSize = 256)
    {
        //timeout and MAX list size passed to adapter to avail chunks
        return PipeImpl<TP, TC>.Execute(token, bufferSize, 
                                        adapter,
                                        producers, consumers);
    }
    
    //=========================
    //========= USAGE==========
    //=========================
    //var producers = new IProducer<TP>[]{ producer1, ..., producerN };
    //var consumers = new IConsumer<TC>[]{ consumer1, ..., consumerM };
    //IDataAdapter<TP, TC> adapter = ...end-user-adapter-creation-call...
    //await producers.Pipe(consumers, adapter);
    
    //producer1, ..., producerN denotes comma separated N producer instances
    //consumer1, ..., consumerM denotes comma separated M consumer instances

4. Detached Pipeline

Deatched pipeline differs a bit as we do not have producers instances available to us at pipeline construction time (i.e. while calling PipeImpl<TP,TC>.Execute) as we do have for attached mode. Due to the absence of these producers we are do not have any mechanism to populate our buffer. Also, we had discussed during our initial discussion, producers for such a pipeline may appear sporadically. Thus, unfortunately, we will NOT be able to achieve our desired producers.Pipe(consumers) usage form, however, we attempt to achieve a similar simplified usage form based on following information:

  • Actual producers are unknown and may appear sporadically to inject items in the pipeline
  • Consumers cannot be added or removed from pipeline once it is constructed
  • Pipeline can be formed in both way: Concordant and Discordant
Raw Implementation

As we do NOT have any single point in code to await on, we need to fabricate a way to keep our pipeline alive for the whole duration so that all produced items (by ephemeral sporadically appearing producers or long-running producers) can be added in it. For all pragmatic reasons, we measure such duration as: “The time duration starting from the moment when such a pipeline is constructed until the moment when the call to CloseForAdding method is made.”

With these assumptions made and intentions declared, we proceed with detached Pipeline interfacing as follow:

//we adopt this interface as this nearly mimic
//all operations of RunProducers method of 
//PipeImpl<TP, TC> static class we used for attached mode
//i.e.
//    Add and TryAdd method
//    and Dispose method
//    we have nothing to Init.
public interface IPipeline<T> : IProducerBuffer<T>, IDisposable
{
}

Produce, Adapt, Consume... Concurrent Data processing

Now, with IPipeline, we will be able to mimic all the producer related operations as we have done before (in RunProducers method of PipeImpl<TP, TC> static class). Lets look at the implementation:

//NOTE: Some explanation are provided as comments

internal sealed class PipelineImpl<TP,TC> : IPipeline<TP>
{
    private readonly CancellationTokenSource _mergedCts;
    private readonly PpcBuffer<TP> _feed;
    private readonly Task _consumerTask;
    private CancellationTokenSource _localCts;

    public Pipeline(IReadOnlyList<IConsumer<TC>> consumers, 
                    IDataAdapter<TP, TC> adapter, 
                    CancellationToken token, 
                    int bufferSize)
    {
        _localCts = new CancellationTokenSource();
        _mergedCts = CancellationTokenSource.CreateLinkedTokenSource(token, 
                                                           _localCts.Token);
        _feed = new PpcBuffer<TP>(bufferSize, _mergedCts.Token);

        //in order to span and await on our consumer
        //    we simply call the existing implementation
        //        from PipeImpl class
        _consumerTask = PipeImpl<TP, TC>.RunConsumers(consumers, _feed, 
                                                adapter, token, _localCts);
    }

    public void Add(TP item, CancellationToken token)
    {
        TryAdd(item, Timeout.Infinite, token);
    }

    public bool TryAdd(TP item, int millisecTimeout, CancellationToken token)
    {
        //passing the item to buffer
        return _feed.TryAdd(item, millisecTimeout, token);
    }

    public void Dispose()
    {
        if (_localCts == null) return;
        try
        {
            using (_localCts)
            {
                using (_mergedCts)
                {
                    using (_feed)
                    {
                        //FIRST, we cancel our local token
                        _localCts.Cancel();
                        
                        //SECOND, we close the feed for addition
                        _feed.CloseForAdding();
                        
                        //Then, we wait for remaining items to be
                        //      consumed
                        _consumerTask.Wait();
                    }
                }
            }
        }
        finally
        {
            _localCts = null;
        }
    }
}
Instance Management

Contrary to attached pipeline, where we had a single place in code to await on the whole pipeline workflow, in detached mode we do NOT have such a luxury. Thus, end-users need to maintain the instance of IPipeline<TP> somewhere after the construction and explicitly call the Dispose on it. This, of course, require some attentions, however, before rejecting the usage of this implementation we need to meditate over following thoughts:

  • Detached pipeline:
    • does not demand the complete knowledge of all possible producers at construction time
    • accepts items from both sporadic ephemeral producers and long-running producers
    • offers loose coupling between consumers and producers
    • facilitate concurrency
    • offers thread-safety
  • Detached pipeline, by its nature:
    • useful in cases, where, consumers outlives producers by large (in most of the cases consumers might live for application life-time), examples:
      • Web based (WCF based, ApiController based etc) data processing
      • Background file processing
      • Background Database operations
      • Event based processing
      • Timer based processing
      • Async batch processing, so on and so forth…
    • demands management of single instance that too can be conveniently maintained as:
      • Singleton inside Dependency Injector container
      • Static field/property etc…
    • can be disposed as needed: when application shuts-down, after closing Network interface etc…
Achieving .Pipeline usage form (Syntactic Sugar)

Above, we have already created .Pipe extension methods. In similar manner, we can obtain following .Pipeline methods:

  1. Concordant Pipeline: Producer type matches consumer type (i.e. <TP> = <TC>). Normally end-user need to inject IDENTITY adapter to it, but we can implicitly do it inside our method as shown below:
    //Implentation
    public static IPipeline<T> Pipeline<T>(this IReadOnlyList<IConsumer<T>> consumers,
                                   CancellationToken token = default(CancellationToken), 
                                   int bufferSize = 256)
    {
        return new PipelineImpl<T, T>(consumers, 
                                  new IdentityAdapter<T>(),
                                  token, bufferSize);
    }
    
    //=========================
    //========= USAGE==========
    //========================= 
    //var consumers = new IConsumer<T>[]{ consumer1, ..., consumerM };
    //var save_this_instance_somewhere = consumers.Pipeline();
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    
    //===================================
    //=========Add/TryAdd USAGE==========
    //===================================
    //elsewhere (in API methods, Event Handlers etc)
    //saved_instance.Add(item, token) OR saved_instance.TryAdd(item, timeout, token)
    
    //===================================
    //=========Dispose USAGE=============
    //===================================
    //during App Shutdown Or after network close
    //saved_instace.Dispose();
  2. Discordent Pipeline with FIXED SIZE chunk: If Producer type is <T> then consumer type is List<T> and end-user seek fixed sized chunks. Normally, one needs to inject Awaitable List adapter to it, but we can implicitly do it inside our method as shown below (IMPORTANT: As producers are sporadic, one might like to avoid this adapter completely as unnecessary consumer side delays will be observed if no producer appears for long time… in detached mode, FIXED DURATION chunk is preferred):
    //Implentation
    public static IPipeline<T> Pipeline<T>(this IReadOnlyList<IConsumer<T>> consumers,
                                   int listSize,
                                   CancellationToken token = default(CancellationToken), 
                                   int bufferSize = 256)
    {
        return new PipelineImpl<T, T>(consumers, 
                                  new AwaitableListAdapter<T>(listSize, Timeout.Infinite),
                                  token, bufferSize);
    }
    
    //=========================
    //========= USAGE==========
    //========================= 
    //var consumers = new IConsumer<T>[]{ consumer1, ..., consumerM };
    //var save_this_instance_somewhere = consumers.Pipeline(some_positive_list_size);
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    
    //===================================
    //=========Add/TryAdd USAGE==========
    //===================================
    //elsewhere (in API methods, Event Handlers etc)
    //saved_instance.Add(item, token) OR saved_instance.TryAdd(item, timeout, token)
    
    //===================================
    //=========Dispose USAGE=============
    //===================================
    //during App Shutdown Or after network close
    //saved_instace.Dispose();
  3. Discordent Pipeline with FIXED DURATION chunk: If Producer type is <T> then consumer type is List<T> and end-user seek variable sized chunks created using fixed duration. Normally, one needs to inject Awaitable List adapter to it, but we can implicitly do it inside our method as shown below:
    //Implentation
    public static IPipeline<T> Pipeline<T>(this IReadOnlyList<IConsumer<T>> consumers,
                                   int listMaxSize,
                                   int millisecondTimeout, 
                                   CancellationToken token = default(CancellationToken), 
                                   int bufferSize = 256)
    {
        return new PipelineImpl<T, T>(consumers, 
                                  new AwaitableListAdapter<T>(listMaxSize, millisecondTimeout),
                                  token, bufferSize);
    }
    
    //=========================
    //========= USAGE==========
    //========================= 
    //var consumers = new IConsumer<T>[]{ consumer1, ..., consumerM };
    //var save_this_instance_somewhere = consumers.Pipeline(some_positive_list_size,
    //                                                      some_positive_timeout);
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    
    //===================================
    //=========Add/TryAdd USAGE==========
    //===================================
    //elsewhere (in API methods, Event Handlers etc)
    //saved_instance.Add(item, token) OR saved_instance.TryAdd(item, timeout, token)
    
    //===================================
    //=========Dispose USAGE=============
    //===================================
    //during App Shutdown Or after network close
    //saved_instace.Dispose();
  4. Generic Pipeline: Producer type is <TP> and consumer type is <TC> and IDataAdapter<TP, TC> implementation is available to end-user.
    //Implentation
    public static IPipeline<TP> Pipeline<TP, TC>(this IReadOnlyList<IConsumer<TC>> consumers,
                                   IDataAdapter<TP, TC> adapter,
                                   CancellationToken token = default(CancellationToken), 
                                   int bufferSize = 256)
    {
        return new PipelineImpl<TP, TC>(consumers, 
                                  adapter,
                                  token, bufferSize);
    }
    
    //=========================
    //========= USAGE==========
    //========================= 
    //var consumers = new IConsumer<TC>[]{ consumer1, ..., consumerM };
    //IDataAdapter<TP, TC> adapter = ...end-user-adapter-creation-call...
    //var save_this_instance_somewhere = consumers.Pipeline(adapter);
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    
    //===================================
    //=========Add/TryAdd USAGE==========
    //===================================
    //elsewhere (in API methods, Event Handlers etc)
    //saved_instance.Add(item, token) OR saved_instance.TryAdd(item, timeout, token)
    
    //===================================
    //=========Dispose USAGE=============
    //===================================
    //during App Shutdown Or after network close
    //saved_instace.Dispose();

Commentary

Feature Implementation

So far, we have implemented all the initially set requirements. Before we close this discussion, we would like to wrap our features:

Feature Implementation
Feature Implementation
Buffer Size Through method parameter (bufferSize); 0 is unbounded
Losslessness using millisecond Timeout during Add/TryAdd, Timeout.Infinite represents no-loss
Interruptibility Using CancelationToken
Concordance Use of Adapters
Attachability

Attached mode implementation using .Pipe extension methods

Detached mode implementation using .Pipeline extension methods

We have also noticed:

  • Attached mode can make use of both FIXED SIZE and FIXED DURATION chunk adapters
  • Detached mode should avoid FIXED SIZE chunk adapater to avoid unintentional latencies

Original Work (C# .Net) and Nuget Package

NOTE: You can omit this section completely, if you are not interested in consuming C# .Net library of ours.

In our original work (Source Code LinkNuGet Package Link), we have further elaborated our implementation as explained below:

  • Adapter interfaces are implemented as abstract classes, so that <TP> to <TC> data transformation can be done solely based on business logic without worrying about calls to buffer.TryGet (see AwaitableAdapter and AwaitableListAdapter)

    Produce, Adapt, Consume... Concurrent Data processing

    Thus, it would be simple to inherit from either AwaitableAdapter<TP, TC> abstract class (if consuming single instances at a time) or AwaitableListAdapter<TP, TC> abstract class (if consuming data in chunks) instead of implementing IDataAdapter<TP, TC> interface. Such abstract class based inheritance would remain business-oriented as we purely write the data transformation logic inside Adapt method without worrying about buffer handling, and thus, further reducing boiler-plate code.

  • .Pipe and .Pipeline extension methods are also available on Action (synchronous delegates) and Func (task returning asynchronous delegates). Thus, avoiding the need to inherit IProducer and IConsumer interfaces when Init/Dispose methods are not warranted. (see PipeExts and PipelineExts)
  • Our current implementation is NOT capable of method chaining as we find in UNIX where we can chain multiple pipes as shown in following example:
    ls -l | grep key | less      (3 operations with 2 pipes)
  • Our current implementation suports only void consumers, i.e. consumers cannot have return values.

Usage

IMPORTANT: We suggest you to use v1.4.0 or higher; as it contains some breaking changes compared to previous versions. This library also contains some other interesting extension methods which we might cover in future articles on code-project itself. However, if you are interested in usage of those methods, you can find information here.

  • If one wants to use .Pipe implementation, instead of thinking about whole sln in a go, one must organise ones thoughts as follows:
    • Create Producer(s) (in isolation):
      • either implement IProducer<TP> interface to populate the buffer IF Init/Dispose methods are required
      • or contstruct a lambda:
        • Synchronous lambda signature: Action<IProducerBuffer<TP>, CancellationToken>
        • Async lambda signature: Func<IProducerBuffer<TP>, CancellationToken, Task>
    • Create Consumer(s) (in Isolation):
      • either implement IConsumer<TC> interface to populate the buffer IF Init/Dispose methods are required
      • or contstruct a lambda:
        • Synchronous lambda signature: Action<TC, CancellationToken>
        • Async lambda signature: Func<TC, CancellationToken, Task>
    • Create Adapter (if an existing adapter does not fit the requirement):
      • Either inherit from AwaitableAdapter<TP, TC> or AwaitableListAdapter<TP, TC> if requirements fits and implement abstract method: abstract TC Adapt(TP produced, CancellationToken token)
      • Or implement IAdapter<TP, TC> interface
    • Choose one of the exiting producers.Pipe(consumers) extension method and inject values as necessary.
  • If one wants to use .Pipeline implementation, one must organise ones thoughts as follows:
    • Create Consumer(s) and Adapter as explained above for .Pipe usage
    • Maintain IPipeline<TP> instance as deemed fit:
      • Inside Dependency Injector as Singleton
      • As static field/property etc
    • Call Dispose on the instance as deemed fit:
      • Inside App shutdown method
      • After closing network connection
      • After unregistering event handler so and so forth…
    • Use the IPipeline<TP> instance as required:
      • Inside ApiController
      • Inside WCF endpoints
      • Inside EventHandler
      • In batch method calls
      • Inside Timer based callbacks… so on and so forth

History

This is the v1 of the present idea.