Tenncor
session.hpp
Go to the documentation of this file.
1 
9 #include <atomic>
10 
11 #include <boost/asio/thread_pool.hpp>
12 #include <boost/asio/post.hpp>
13 
14 #include "eteq/session.hpp"
15 
16 #include "ccur/partition.hpp"
17 
18 #ifndef CCUR_SESS_HPP
19 #define CCUR_SESS_HPP
20 
21 namespace ccur
22 {
23 
27 using SessReqsT = std::vector<std::pair<teq::iOperableFunc*,size_t>>;
28 
30 using LSessReqsT = std::list<std::pair<teq::iOperableFunc*,size_t>>;
31 
34 using AtomicFulfilMapT = std::unordered_map<
35  teq::iOperableFunc*,std::atomic<long>>;
36 
39 struct Session final : public eteq::iSession
40 {
41  Session (size_t nthreads = 2, OpWeightT weights = OpWeightT()) :
42  nthreads_(nthreads), weights_(weights) {}
43 
45  void track (teq::TensptrsT roots) override
46  {
47  tracked_.insert(roots.begin(), roots.end());
48 
49  teq::GraphStat stat;
50  for (auto& trac : tracked_)
51  {
52  trac->accept(stat);
53  }
54  teq::ParentFinder pfinder;
55  for (teq::TensptrT& root : roots)
56  {
57  root->accept(pfinder);
58  }
59 
60  teq::TensptrsT trackvecs(tracked_.begin(), tracked_.end());
61  PartGroupsT groups = k_partition(trackvecs, nthreads_, weights_);
62  requirements_.clear();
63  for (auto& group : groups)
64  {
65  SessReqsT reqs;
66  reqs.reserve(group.size());
67  for (teq::iFunctor* func : group)
68  {
69  auto& args = func->get_children();
70  teq::TensSetT unique_children;
71  for (const teq::FuncArg& arg : args)
72  {
73  auto tens = arg.get_tensor().get();
74  if (0 < stat.graphsize_[tens].upper_) // ignore leaves
75  {
76  unique_children.emplace(tens);
77  }
78  }
79  reqs.push_back({
80  static_cast<teq::iOperableFunc*>(func),
81  unique_children.size()
82  });
83  }
84  requirements_.push_back(reqs);
85  }
86 
87  for (auto& assocs : pfinder.parents_)
88  {
89  for (auto& parent_pair : assocs.second)
90  {
91  parents_[assocs.first].emplace(
92  static_cast<teq::iOperableFunc*>(parent_pair.first));
93  }
94  }
95 
96  ops_.clear();
97  for (auto& tpair : stat.graphsize_)
98  {
99  if (tpair.second.upper_ > 0)
100  {
101  ops_.emplace(static_cast<teq::iOperableFunc*>(tpair.first));
102  }
103  }
104  }
105 
107  void update (teq::TensSetT ignored = {}) override
108  {
109  size_t nthreads = requirements_.size();
110  std::vector<LSessReqsT> indep_requirements(nthreads);
111  for (size_t i = 0; i < nthreads; ++i)
112  {
113  auto& reqs = requirements_[i];
114  auto& indep_reqs = indep_requirements[i];
115  teq::TensSetT acceptable;
116  for (auto& root : tracked_)
117  {
118  acceptable.emplace(root.get());
119  }
120  // ignored tensors will never populate reqs
121  for (auto rit = reqs.rbegin(), ret = reqs.rend();
122  rit != ret; ++rit)
123  {
124  auto& op = rit->first;
125  if (estd::has(acceptable, op) &&
126  false == estd::has(ignored, op))
127  {
128  indep_reqs.push_front({op, rit->second});
129  auto& children = op->get_children();
130  for (auto& child : children)
131  {
132  acceptable.emplace(child.get_tensor().get());
133  }
134  }
135  }
136  }
137 
138  AtomicFulfilMapT fulfilments;
139  for (auto op : ops_)
140  {
141  fulfilments.emplace(op, 0);
142  }
143 
144  for (auto ig : ignored)
145  {
146  std::unordered_set<teq::iOperableFunc*> op_parents;
147  if (estd::get(op_parents, parents_, ig))
148  {
149  for (auto& op_parent : op_parents)
150  {
151  ++fulfilments.at(op_parent);
152  }
153  }
154  }
155 
156  // for each req in requirements distribute to thread
157  boost::asio::thread_pool pool(nthreads);
158  for (auto& reqs : indep_requirements)
159  {
160  // add thread
161  boost::asio::post(pool,
162  [this, &reqs, &fulfilments]()
163  {
164  for (auto& op : reqs)
165  {
166  // fulfilled and not ignored
167  auto& ff = fulfilments.at(op.first);
168  if (ff++ == op.second)
169  {
170  op.first->update();
171  std::unordered_set<teq::iOperableFunc*> op_parents;
172  if (estd::get(op_parents,
173  this->parents_, op.first))
174  {
175  for (auto& op_parent : op_parents)
176  {
177  ++fulfilments.at(op_parent);
178  }
179  }
180  ++ff;
181  }
182  --ff;
183  }
184  });
185  }
186  pool.join();
187  }
188 
191  teq::TensSetT ignored = {}) override
192  {
193  size_t nthreads = requirements_.size();
194  std::vector<LSessReqsT> indep_requirements(nthreads);
195  for (size_t i = 0; i < nthreads; ++i)
196  {
197  auto& reqs = requirements_[i];
198  auto& indep_reqs = indep_requirements[i];
199  teq::TensSetT acceptable;
200  for (auto& root : target)
201  {
202  acceptable.emplace(root);
203  }
204  // ignored tensors will never populate reqs
205  for (auto rit = reqs.rbegin(), ret = reqs.rend();
206  rit != ret; ++rit)
207  {
208  auto& op = rit->first;
209  if (estd::has(acceptable, op) &&
210  false == estd::has(ignored, op))
211  {
212  indep_reqs.push_front({op, rit->second});
213  auto& children = op->get_children();
214  for (auto& child : children)
215  {
216  acceptable.emplace(child.get_tensor().get());
217  }
218  }
219  }
220  }
221 
222  AtomicFulfilMapT fulfilments;
223  for (auto op : ops_)
224  {
225  fulfilments.emplace(op, 0);
226  }
227 
228  for (auto ig : ignored)
229  {
230  std::unordered_set<teq::iOperableFunc*> op_parents;
231  if (estd::get(op_parents, parents_, ig))
232  {
233  for (auto& op_parent : op_parents)
234  {
235  ++fulfilments.at(op_parent);
236  }
237  }
238  }
239 
240  // for each req in requirements distribute to thread
241  boost::asio::thread_pool pool(nthreads);
242  for (auto& reqs : indep_requirements)
243  {
244  // make thread
245  boost::asio::post(pool,
246  [this, &reqs, &fulfilments]()
247  {
248  for (auto& op : reqs)
249  {
250  // is relevant to target, is fulfilled and not ignored
251  auto& ff = fulfilments.at(op.first);
252  if (ff++ == op.second)
253  {
254  op.first->update();
255  std::unordered_set<teq::iOperableFunc*> op_parents;
256  if (estd::get(op_parents,
257  this->parents_, op.first))
258  {
259  for (auto& op_parent : op_parents)
260  {
261  ++fulfilments.at(op_parent);
262  }
263  }
264  ++ff;
265  }
266  --ff;
267  }
268  });
269  }
270  pool.join();
271  }
272 
274  void optimize (const opt::OptCtx& rules)
275  {
276  teq::TensptrsT tracked(tracked_.begin(), tracked_.end());
277  opt::optimize(tracked, rules);
278  parents_.clear();
279  track(tracked);
280  }
281 
285 
287  std::unordered_map<teq::iTensor*,
288  std::unordered_set<teq::iOperableFunc*>> parents_;
289 
292  std::vector<SessReqsT> requirements_;
293 
294 private:
295  size_t nthreads_;
296 
298 
299  std::unordered_set<teq::iOperableFunc*> ops_;
300 };
301 
302 }
303 
304 #endif // CCUR_SESS_HPP
args
Definition: csv_to_png.py:105
std::unordered_set< teq::iTensor * > TensSetT
Hash set of raw tensor pointers.
Definition: itensor.hpp:63
std::unordered_map< iTensor *, estd::NumRange< size_t > > graphsize_
Definition: traveler.hpp:105
A functor node with direct access to evaluated data.
Definition: iopfunc.hpp:20
std::vector< std::pair< teq::iOperableFunc *, size_t > > SessReqsT
Definition: session.hpp:27
std::vector< std::vector< teq::iFunctor * > > PartGroupsT
Groups of functors.
Definition: partition.hpp:20
size_t nthreads_
Definition: session.hpp:295
Interface of iOperation-defined operation node.
Definition: ifunctor.hpp:28
std::unordered_set< teq::TensptrT > TensptrSetT
Hash set of tensor smart pointers.
Definition: itensor.hpp:66
std::unordered_map< iTensor *, ParentMapT > parents_
Definition: traveler.hpp:189
teq::TensptrsT optimize(teq::TensptrsT roots, const OptCtx &opts)
Session interface that tracks and rapidly updates subgraphs.
Definition: session.hpp:27
teq::TensptrSetT tracked_
Definition: session.hpp:284
std::unordered_map< teq::iOperableFunc *, std::atomic< long > > AtomicFulfilMapT
Definition: session.hpp:35
std::unordered_set< teq::iOperableFunc * > ops_
Definition: session.hpp:299
Traveler that for each child tracks the relationship to all parents.
Definition: traveler.hpp:162
Coordinate mapper and tensor pair.
Definition: funcarg.hpp:21
Encapsulation of all conversion rules.
Definition: optimize.hpp:23
Traveler that maps each tensor to its subtree&#39;s maximum depth.
Definition: traveler.hpp:57
Definition: session.hpp:39
Definition: partition.hpp:16
std::unordered_map< teq::iTensor *, std::unordered_set< teq::iOperableFunc * > > parents_
Map of tensor to the set of the tensor&#39;s parents.
Definition: session.hpp:288
PartGroupsT k_partition(teq::TensptrsT roots, size_t k, OpWeightT weights=OpWeightT())
Return k groups of graphs under roots given some weight.
void update_target(teq::TensSetT target, teq::TensSetT ignored={}) override
Implementation of iSession.
Definition: session.hpp:190
std::unordered_map< size_t, double > OpWeightT
Map functor opcode to the operation&#39;s weight value.
Definition: partition.hpp:23
std::shared_ptr< iTensor > TensptrT
Tensor smart pointer.
Definition: itensor.hpp:51
std::vector< TensptrT > TensptrsT
Vector of tensor smart pointers.
Definition: itensor.hpp:60
Interface of traversible and differentiable nodes with shape information.
Definition: itensor.hpp:36
void track(teq::TensptrsT roots) override
Implementation of iSession.
Definition: session.hpp:45
void update(teq::TensSetT ignored={}) override
Implementation of iSession.
Definition: session.hpp:107
std::list< std::pair< teq::iOperableFunc *, size_t > > LSessReqsT
Same as SessReqsT except as a list.
Definition: session.hpp:30
OpWeightT weights_
Definition: session.hpp:297
std::vector< SessReqsT > requirements_
Definition: session.hpp:292
void optimize(const opt::OptCtx &rules)
Apply input optimization rules using opt module, then re-track.
Definition: session.hpp:274
Session(size_t nthreads=2, OpWeightT weights=OpWeightT())
Definition: session.hpp:41