A set of APIs for constructing a pipeline for data processing based on c++17.
Data processing is expressed as a pipeline of operators connected by |.
The implementation of coll is not iterater-based nor generator-based. Instead, it is based on function nesting (函数套娃)
and relies on the optimizations of the compiler to reduce the overhead of the API,
such that the processing logic would be close to using native c++ syntax (e.g., for, while, if-else) to do the same stuffs.
The optimizations are typically function inlining, return value optimization (RVO), named return value optimization (NRVO).
coll is not designed for functional programming (FP is typically composed of pure functions and avoids shared state, mutable data and side-effects).
coll also supports parallel operator for parallel pipeline processing with the help of ZAF.
- A sink operator triggers the construction of an execution object that wraps the logics of all the operators in the pipeline (for lazy evaluation).
- The logic of the child operator is typically nested inside the logic of the parent operator.
- The execution starts from the source operator(s), where the data are actively pushed by the parent operator to the child operator.
- With function inlining, we expect the logics of the coll operators (i.e., lambda calls) are well inlined by the compiler.
/* Count the occurrence of character in HelloWorld */ auto counts = []() {
auto counts = iterate("HelloWorld") iterate.proc = () { +-- map.proc = (i) { +-- groupby.count.proc = (i) { unordered_map<char, size_t> counts_map;
| map(anony_cc(char(to_lower(_)))) for (i : "HelloWorld") { | r = char(to_lower(i)); | counts_map[i]++; for (auto i : "HelloWorld") {
| groupby().count() = map(i) <---------------+ groupby.count(r); <----+ } = auto c = char(to_lower(i));
} } counts_map[c]++;
} }
return counts_map;
}();
/* Generate 10 random numbers */ generate.times.proc = () { +-- branch.proc = (i) { +-- sort.proc = (i) { []() {
/* 1. print them in sorted order */ for (i = 0; i++ < 10;) { | /*1st branch*/ | sort_buf.emplace_back(i) vector<int> sort_buf;
/* 2. remove duplicates and print */ auto x = rand() % 100 | sort(i) <---------+ } unordered_set<int> dist_buf;
generate(anony_c(rand()%100)).times(10) branch(x) <------------+ /*2nd branch*/ for (auto i = 0; i < 10; i++) {
| branch([](auto in) { } distinct(i) <------------ distinct.proc = (i) { auto x = rand() % 100;
in | sort() } } if (dist_buf.emplace(i)) sort_buf.emplace_back(x);
| println() = +--> println(i) = if (dist_buf.emplace(i).second) {
}) generate.times.end = () { +-- sort.end = () { | } cout << i << "\n";
| distinct() +--> branch.end() | sort(sort_buf) | } }
| println() | } | for (i : sort_buf) { | }
| | println(i) <--------+-- println.proc = (i) { sort(sort_buf.begin(), sort_buf.end());
+-- branch.end = () { | } cout << i; for (auto i : sort_buf) {
sort.end() <----------+ println.end() <--------+ } cout << i << "\n";
distinct.end() <----------+ } | }
} | +-- println.end = (){} }()
+-- distinct.end = () { |
println.end() <--------+
}Here is the HelloWorld example which can be found in examples/hello_world.cpp and examples/hello_world_native.cpp.
The pipeline on the left side using coll is expected to be close (after optimized by the compiler) to the processing on the right side using native c++ syntax.
What this example does is simple: we select "hello" and "world" from the input elements, then transform each character to upper case and print them out. The example uses a lots of operator for the purpose of showing how different operators are translated into native c++ syntax.
1| elements("let us", "say", "hello") 1| bool printed_head = false;
2| | tail() 2| size_t index = 0;
3| | concat( 3| array<const char*, 3> in1{"let us", "say", "hello"};
4| elements("to the", "world", "loudly") 4| bool skipped_head = false;
5| | init() 5| for (auto& i1 : in1) { // elements()
6| ) 6| if (skipped_head) { // tail()
7| | zip_with_index() 7| pair<size_t, const char*&> idx_i1{index++, i1}; // zip_with_index()
8| | filter(anony_ac(_.first % 2 == 1) 8| if (idx_i1.first % 2 == 1) { // filter()
9| | map(anony_ar(_.second)) 9| auto& i1_ref = idx_i1.second; // map()
10| | flatten() 10| for (auto c = i1_ref; *c; c++) { // flatten()
11| | map(anony_ac(char(toupper(_)))) 11| auto C = char(toupper(*c)); // map()
12| | print("", " ", "!\n"); 12| if (printed_head) { // print()
13| cout << " ";
14| } else {
15| cout << "";
16| printed_head = true;
17| }
18| cout << C;
19| }
20| }
21| } else {
22| skipped_head = true;
23| }
24| }
25| array<const char*, 3> in2{"to the", "world", "loudly"};
26| optional<const char*> prev_i2;
27| for (auto& i2 : in2) { // elements()
28| if (bool(prev_i2)) { // init()
| ... // the same logic as from line 7 to line 20, concat()
43| }
44| prev_i2 = i2;
45| }
46| if (!printed_head) { // print()
47| cout << "";
48| }
49| cout << "!\n";Note that, the above code using native syntax can actually be improved:
- When taking the
initofin1, we can directly iterate from the1stelement (zero based) to theend. No need to useskipped_headto check each element ofin1; - When taking the
tailofin2, we can directly iterate from thebeginto the second last element. No need to useprev_i2to cache the previous element;
Now, worth to mention that, the two points can be optimized in coll by pushing init and tail upwards to the source operator in compile time.
That is not done yet, but similar technique is applied for reverse operator, i.e., instead of reversing the elements in the middle of the processing, it is better to directly reverse the input from the beginning if possible.
- API: window with aggregation;
- Optimization: Improve parallel operators:
- Direct data exchange among consecutive parallel operators
- Operator List, which lists all the operators in
coll. - Range-v3, which is the Range library for C++14/17/20, basis for C++20's std::ranges.
- rangeless, which has a good summary on many existing libraries that provide similar functionalies.
- pipes, which provides small components for writing expressive code when working on collections.