Tenncor
session.hpp
Go to the documentation of this file.
1 
10 #include <grpc/grpc.h>
11 #include <grpcpp/create_channel.h>
12 #include <grpcpp/security/credentials.h>
13 
14 #include <boost/uuid/uuid.hpp>
15 #include <boost/uuid/uuid_generators.hpp>
16 #include <boost/uuid/uuid_io.hpp>
17 
18 #include "jobs/scope_guard.hpp"
19 
20 #include "eteq/session.hpp"
21 
22 #include "tag/tag.hpp"
23 
24 #include "dbg/grpc/client.hpp"
25 
26 #ifndef DBG_SESSION_HPP
27 #define DBG_SESSION_HPP
28 
29 namespace dbg
30 {
31 
32 static const std::string tag_str_key = "name";
33 
34 static const std::string tag_node_type = "node_type";
35 
36 static const std::string edge_label_fmt = "parent-child-%d";
37 
39 struct EdgeInfo
40 {
41  size_t parent_;
42  size_t child_;
43  std::string label_;
44 };
45 
47 struct EdgeInfoHash final
48 {
49  size_t operator() (const EdgeInfo& edge) const
50  {
51  std::stringstream ss;
52  ss << edge.parent_ << ","
53  << edge.child_ << ","
54  << edge.label_;
55  return std::hash<std::string>()(ss.str());
56  }
57 };
58 
60 inline bool operator == (const EdgeInfo& lhs, const EdgeInfo& rhs)
61 {
62  EdgeInfoHash hasher;
63  return hasher(lhs) == hasher(rhs);
64 }
65 
67 struct InteractiveSession final : public eteq::iSession
68 {
70  static boost::uuids::random_generator uuid_gen_;
71 
72  InteractiveSession (std::shared_ptr<grpc::ChannelInterface> channel,
73  ClientConfig client_cfg = ClientConfig(),
74  tag::TagRegistry& registry = tag::get_reg()) :
75  registry_(registry),
76  client_(channel, client_cfg)
77  {
78  logs::infof("created session: %s", sess_id_.c_str());
79  }
80 
81  InteractiveSession (std::string host,
82  ClientConfig client_cfg = ClientConfig()) :
83  InteractiveSession(grpc::CreateChannel(host,
84  grpc::InsecureChannelCredentials()), client_cfg) {}
85 
87  void track (teq::TensptrsT roots) override
88  {
89  sess_.track(roots);
90 
91  teq::ParentFinder pfinder;
92  for (auto root : roots)
93  {
94  root->accept(stat_);
95  root->accept(pfinder);
96  }
97 
98  for (auto& assocs : pfinder.parents_)
99  {
100  for (auto& parent_pair : assocs.second)
101  {
102  parents_[assocs.first].emplace(
103  static_cast<teq::iOperableFunc*>(parent_pair.first));
104  }
105  }
106 
107  // setup request
108  tenncor::CreateGraphRequest request;
109  auto payload = request.mutable_payload();
110  payload->set_graph_id(sess_id_);
111  for (auto& statpair : stat_.graphsize_)
112  {
113  auto tens = statpair.first;
114  auto& range = statpair.second;
115  size_t id = node_ids_.size() + 1;
116  if (false == estd::has(node_ids_, tens))
117  {
118  node_ids_.emplace(tens, id);
119  // add to request
120  auto node = payload->add_nodes();
121  node->set_id(id);
122  auto tags = node->mutable_tags();
123  {
124  tenncor::Strings tag_str;
125  tag_str.add_strings(tens->to_string());
126  tags->insert({tag_str_key, tag_str});
127  }
128  {
129  tenncor::Strings type_str;
130  if (0 == range.upper_)
131  {
132  type_str.add_strings("leaf");
133  }
134  else
135  {
136  type_str.add_strings("functor");
137  }
138  tags->insert({tag_node_type, type_str});
139  }
140  {
141  auto inner_tags = registry_.get_tags(tens);
142  std::map<std::string,tenncor::Strings> outer_tags;
143  for (auto& itags : inner_tags)
144  {
145  google::protobuf::RepeatedPtrField<std::string>
146  field(itags.second.begin(), itags.second.end());
147  tenncor::Strings otags;
148  otags.mutable_strings()->Swap(&field);
149  outer_tags.emplace(itags.first, otags);
150  }
151  tags->insert(outer_tags.begin(), outer_tags.end());
152  }
153  auto s = tens->shape();
154  google::protobuf::RepeatedField<uint32_t> shape(
155  s.begin(), s.end());
156  node->mutable_shape()->Swap(&shape);
157  auto location = node->mutable_location();
158  location->set_maxheight(range.upper_);
159  location->set_minheight(range.lower_);
160  }
161  }
162  for (auto& statpair : stat_.graphsize_)
163  {
164  auto tens = statpair.first;
165  auto& range = statpair.second;
166  if (range.upper_ > 0)
167  {
168  auto f = static_cast<teq::iFunctor*>(tens);
169  auto& children = f->get_children();
170  for (size_t i = 0, n = children.size(); i < n; ++i)
171  {
172  auto& child = children[i];
173  auto child_tens = child.get_tensor().get();
174  auto shaper = child.get_shaper();
175  auto coorder = child.get_coorder();
176  std::string label = fmts::sprintf(edge_label_fmt, i);
177  EdgeInfo edgeinfo{
178  node_ids_[f],
179  node_ids_[child_tens],
180  label,
181  };
182  if (false == estd::has(edges_, edgeinfo))
183  {
184  edges_.emplace(edgeinfo);
185  // add to request
186  auto edge = payload->add_edges();
187  edge->set_parent(node_ids_[f]);
188  edge->set_child(node_ids_[child_tens]);
189  edge->set_label(label);
190  if (false == teq::is_identity(shaper.get()))
191  {
192  edge->set_shaper(shaper->to_string());
193  }
194  if (false == teq::is_identity(coorder.get()))
195  {
196  edge->set_coorder(coorder->to_string());
197  }
198  }
199  }
200  }
201  }
202 
203  client_.create_graph(request);
204  }
205 
207  void update (teq::TensSetT ignored = {}) override
208  {
209  jobs::ScopeGuard defer([this]() { ++this->update_it_; });
210 
211  // ignore any node data updates when
212  // not connected or out of sync interval
213  if (false == client_.is_connected() || 0 < update_it_ % data_sync_interval)
214  {
215  sess_.update(ignored);
216  return;
217  }
218 
219  // basic copy over from session::update
220  std::list<teq::iOperableFunc*> reqs;
221  teq::TensSetT acceptable;
222  for (auto& root : sess_.tracked_)
223  {
224  acceptable.emplace(root.get());
225  }
226  // ignored tensors will never populate reqs
227  for (auto rit = sess_.ops_.rbegin(),
228  ret = sess_.ops_.rend();
229  rit != ret; ++rit)
230  {
231  auto& op = *rit;
232  if (estd::has(acceptable, op) &&
233  false == estd::has(ignored, op))
234  {
235  reqs.push_front(op);
236  auto& children = op->get_children();
237  for (auto& child : children)
238  {
239  acceptable.emplace(child.get_tensor().get());
240  }
241  }
242  }
243 
244  std::vector<tenncor::UpdateNodeDataRequest> requests;
245  requests.reserve(sess_.ops_.size());
246 
247  for (auto& statpair : stat_.graphsize_)
248  {
249  if (0 == statpair.second.upper_)
250  {
251  auto leaf = static_cast<teq::iLeaf*>(statpair.first);
252  egen::_GENERATED_DTYPE dtype =
253  (egen::_GENERATED_DTYPE) leaf->type_code();
254  std::vector<float> data;
255  size_t nelems = leaf->shape().n_elems();
256  egen::type_convert(data, leaf->data(), dtype, nelems);
257 
258  tenncor::UpdateNodeDataRequest request;
259  auto payload = request.mutable_payload();
260  payload->set_graph_id(sess_id_);
261  payload->set_node_id(node_ids_[leaf]);
262  google::protobuf::RepeatedField<float> field(
263  data.begin(), data.end());
264  payload->mutable_data()->Swap(&field);
265  requests.push_back(request);
266  }
267  }
268 
269  // ignored nodes and its dependers will never fulfill requirement
270  for (auto& op : reqs)
271  {
272  op->update();
273  egen::_GENERATED_DTYPE dtype =
274  (egen::_GENERATED_DTYPE) op->type_code();
275  std::vector<float> data;
276  size_t nelems = op->shape().n_elems();
277  egen::type_convert(data, op->data(), dtype, nelems);
278  auto& op_parents = parents_[op];
279 
280  // create requests (bulk of the overhead)
281  tenncor::UpdateNodeDataRequest request;
282  auto payload = request.mutable_payload();
283  payload->set_graph_id(sess_id_);
284  payload->set_node_id(node_ids_[op]);
285  google::protobuf::RepeatedField<float> field(
286  data.begin(), data.end());
287  payload->mutable_data()->Swap(&field);
288  requests.push_back(request);
289  }
290 
292  }
293 
296  teq::TensSetT targeted,
297  teq::TensSetT ignored = {}) override
298  {
299  jobs::ScopeGuard defer([this]() { ++this->update_it_; });
300 
301  // ignore any node data updates when
302  // not connected or out of sync interval
303  if (false == client_.is_connected() || 0 < update_it_ % data_sync_interval)
304  {
305  sess_.update_target(targeted, ignored);
306  return;
307  }
308 
309  // basic copy over from session::update_target
310  std::list<teq::iOperableFunc*> reqs;
311  teq::TensSetT acceptable;
312  for (auto& root : targeted)
313  {
314  acceptable.emplace(root);
315  }
316  // ignored tensors will never populate reqs
317  for (auto rit = sess_.ops_.rbegin(), ret = sess_.ops_.rend();
318  rit != ret; ++rit)
319  {
320  auto& op = *rit;
321  if (estd::has(acceptable, op) &&
322  false == estd::has(ignored, op))
323  {
324  reqs.push_front(op);
325  auto& children = op->get_children();
326  for (auto& child : children)
327  {
328  acceptable.emplace(child.get_tensor().get());
329  }
330  }
331  }
332 
333  std::vector<tenncor::UpdateNodeDataRequest> requests;
334  requests.reserve(reqs.size());
335 
336  for (auto& statpair : stat_.graphsize_)
337  {
338  if (0 == statpair.second.upper_)
339  {
340  auto leaf = static_cast<teq::iLeaf*>(statpair.first);
341  egen::_GENERATED_DTYPE dtype =
342  (egen::_GENERATED_DTYPE) leaf->type_code();
343  std::vector<float> data;
344  size_t nelems = leaf->shape().n_elems();
345  egen::type_convert(data, leaf->data(), dtype, nelems);
346 
347  tenncor::UpdateNodeDataRequest request;
348  auto payload = request.mutable_payload();
349  payload->set_graph_id(sess_id_);
350  payload->set_node_id(node_ids_[leaf]);
351  google::protobuf::RepeatedField<float> field(
352  data.begin(), data.end());
353  payload->mutable_data()->Swap(&field);
354  requests.push_back(request);
355  }
356  }
357 
358  // ignored nodes and its dependers will never fulfill requirement
359  for (auto& op : reqs)
360  {
361  op->update();
362  egen::_GENERATED_DTYPE dtype =
363  (egen::_GENERATED_DTYPE) op->type_code();
364  std::vector<float> data;
365  size_t nelems = op->shape().n_elems();
366  egen::type_convert(data, op->data(), dtype, nelems);
367 
368  // create requests (bulk of the overhead)
369  tenncor::UpdateNodeDataRequest request;
370  auto payload = request.mutable_payload();
371  payload->set_graph_id(sess_id_);
372  payload->set_node_id(node_ids_[op]);
373  google::protobuf::RepeatedField<float> field(
374  data.begin(), data.end());
375  payload->mutable_data()->Swap(&field);
376  requests.push_back(request);
377  }
378 
380  }
381 
383  void optimize (const opt::OptCtx& rules)
384  {
385  sess_.optimize(rules);
386 
387  // update graph
388  node_ids_.clear();
389  edges_.clear();
390 
391  stat_.graphsize_.clear();
392  parents_.clear();
393  teq::ParentFinder pfinder;
394  for (auto tr : sess_.tracked_)
395  {
396  tr->accept(stat_);
397  tr->accept(pfinder);
398  }
399 
400  for (auto& assocs : pfinder.parents_)
401  {
402  for (auto& parent_pair : assocs.second)
403  {
404  parents_[assocs.first].emplace(
405  static_cast<teq::iOperableFunc*>(parent_pair.first));
406  }
407  }
408 
409  // setup request
410  tenncor::UpdateGraphRequest request;
411  auto payload = request.mutable_payload();
412  payload->set_graph_id(sess_id_);
413  for (auto& statpair : stat_.graphsize_)
414  {
415  auto tens = statpair.first;
416  auto& range = statpair.second;
417  size_t id = node_ids_.size() + 1;
418  if (false == estd::has(node_ids_, tens))
419  {
420  node_ids_.emplace(tens, id);
421  // add to request
422  auto node = payload->add_nodes();
423  node->set_id(id);
424  auto tags = node->mutable_tags();
425  {
426  tenncor::Strings tag_str;
427  tag_str.add_strings(tens->to_string());
428  tags->insert({tag_str_key, tag_str});
429  }
430  {
431  tenncor::Strings type_str;
432  if (0 == range.upper_)
433  {
434  type_str.add_strings("leaf");
435  }
436  else
437  {
438  type_str.add_strings("functor");
439  }
440  tags->insert({tag_node_type, type_str});
441  }
442  {
443  auto inner_tags = registry_.get_tags(tens);
444  std::map<std::string,tenncor::Strings> outer_tags;
445  for (auto& itags : inner_tags)
446  {
447  google::protobuf::RepeatedPtrField<std::string>
448  field(itags.second.begin(), itags.second.end());
449  tenncor::Strings otags;
450  otags.mutable_strings()->Swap(&field);
451  outer_tags.emplace(itags.first, otags);
452  }
453  tags->insert(outer_tags.begin(), outer_tags.end());
454  }
455  auto s = tens->shape();
456  google::protobuf::RepeatedField<uint32_t> shape(
457  s.begin(), s.end());
458  node->mutable_shape()->Swap(&shape);
459  auto location = node->mutable_location();
460  location->set_maxheight(range.upper_);
461  location->set_minheight(range.lower_);
462  }
463  }
464  for (auto& statpair : stat_.graphsize_)
465  {
466  auto tens = statpair.first;
467  auto& range = statpair.second;
468  if (range.upper_ > 0)
469  {
470  auto f = static_cast<teq::iFunctor*>(tens);
471  auto& children = f->get_children();
472  for (size_t i = 0, n = children.size(); i < n; ++i)
473  {
474  auto& child = children[i];
475  auto child_tens = child.get_tensor().get();
476  auto shaper = child.get_shaper();
477  auto coorder = child.get_coorder();
478  std::string label = fmts::sprintf(edge_label_fmt, i);
479  EdgeInfo edgeinfo{
480  node_ids_[f],
481  node_ids_[child_tens],
482  label,
483  };
484  if (false == estd::has(edges_, edgeinfo))
485  {
486  edges_.emplace(edgeinfo);
487  // add to request
488  auto edge = payload->add_edges();
489  edge->set_parent(node_ids_[f]);
490  edge->set_child(node_ids_[child_tens]);
491  edge->set_label(label);
492  if (false == teq::is_identity(shaper.get()))
493  {
494  edge->set_shaper(shaper->to_string());
495  }
496  if (false == teq::is_identity(coorder.get()))
497  {
498  edge->set_coorder(coorder->to_string());
499  }
500  }
501  }
502  }
503  }
504 
505  client_.update_graph(request);
506  }
507 
509  void join (void)
510  {
511  client_.join();
512  }
513 
516  const std::chrono::time_point<std::chrono::system_clock>& deadline)
517  {
518  std::condition_variable client_done;
519  std::thread timed_killer(
520  [&]()
521  {
522  std::mutex mtx;
523  std::unique_lock<std::mutex> lck(mtx);
524  client_done.wait_until(lck, deadline);
525  this->client_.clear();
526  });
527  client_.join();
528  client_done.notify_one();
529  timed_killer.join();
530  }
531 
533  void stop (void)
534  {
535  client_.clear();
536  }
537 
539  std::string get_session_id (void) const
540  {
541  return sess_id_;
542  }
543 
545  std::unique_ptr<tenncor::GraphEmitter::Stub> stub_;
546 
549 
552 
553 private:
556 
557  std::unordered_map<teq::iTensor*,size_t> node_ids_;
558 
559  std::unordered_set<EdgeInfo,EdgeInfoHash> edges_;
560 
561  size_t update_it_ = 0;
562 
564 
566 
567  std::unordered_map<teq::iTensor*,
568  std::unordered_set<teq::iOperableFunc*>> parents_;
569 };
570 
571 boost::uuids::random_generator InteractiveSession::uuid_gen_;
572 
573 }
574 
575 #endif // DBG_SESSION_HPP
InteractiveSession(std::shared_ptr< grpc::ChannelInterface > channel, ClientConfig client_cfg=ClientConfig(), tag::TagRegistry &registry=tag::get_reg())
Definition: session.hpp:72
std::unordered_set< teq::iTensor * > TensSetT
Hash set of raw tensor pointers.
Definition: itensor.hpp:63
std::unique_ptr< tenncor::GraphEmitter::Stub > stub_
GRPC Client.
Definition: session.hpp:545
void track(teq::TensptrsT roots) override
Implementation of iSession.
Definition: session.hpp:87
std::unordered_set< EdgeInfo, EdgeInfoHash > edges_
Definition: session.hpp:559
std::unordered_map< iTensor *, estd::NumRange< size_t > > graphsize_
Definition: traveler.hpp:105
void update_target(teq::TensSetT targeted, teq::TensSetT ignored={}) override
Implementation of iSession.
Definition: session.hpp:295
virtual const ArgsT & get_children(void) const =0
Return children nodes as a vector of raw pointers.
static boost::uuids::random_generator uuid_gen_
UUID random generator.
Definition: session.hpp:70
Session that makes GRPC client calls.
Definition: session.hpp:67
std::string label_
Definition: session.hpp:43
Interface of iOperation-defined operation node.
Definition: ifunctor.hpp:28
void update(teq::TensSetT ignored={}) override
Implementation of iSession.
Definition: session.hpp:207
Registry for associating tensors to tag collectives.
Definition: tag.hpp:165
static const std::string edge_label_fmt
Definition: session.hpp:36
std::unordered_map< teq::iTensor *, size_t > node_ids_
Definition: session.hpp:557
Definition: custom_functor.hpp:20
void create_graph(tenncor::CreateGraphRequest &request)
Add job that pass CreateGraphRequest.
Definition: client.hpp:84
Definition: session.hpp:47
size_t parent_
Definition: session.hpp:41
std::unordered_map< iTensor *, ParentMapT > parents_
Definition: traveler.hpp:189
void track(teq::TensptrsT roots) override
Implementation of iSession.
Definition: session.hpp:50
void optimize(const opt::OptCtx &rules)
Apply input optimization rules using opt module, then re-track.
Definition: session.hpp:146
virtual size_t type_code(void) const =0
Return data type encoding.
void join(void)
Wait until client completes its request calls.
Definition: session.hpp:509
std::unordered_map< teq::iTensor *, std::unordered_set< teq::iOperableFunc * > > parents_
Definition: session.hpp:568
Configuration wrapper for creating the client.
Definition: client.hpp:31
std::string sess_id_
Definition: session.hpp:554
Session interface that tracks and rapidly updates subgraphs.
Definition: session.hpp:27
std::vector< teq::iOperableFunc * > ops_
Operable functors ordered by height in the tracked graph.
Definition: session.hpp:158
InteractiveSession(std::string host, ClientConfig client_cfg=ClientConfig())
Definition: session.hpp:81
Traveler that for each child tracks the relationship to all parents.
Definition: traveler.hpp:162
teq::TensptrSetT tracked_
Definition: session.hpp:155
Encapsulation of all conversion rules.
Definition: optimize.hpp:23
Traveler that maps each tensor to its subtree&#39;s maximum depth.
Definition: traveler.hpp:57
GraphEmitterClient client_
Definition: session.hpp:563
size_t operator()(const EdgeInfo &edge) const
Definition: session.hpp:49
std::string to_string(teq::CoordptrT c)
Return brief hashable string representation of coordinate mapper.
std::vector< TensptrT > TensptrsT
Vector of tensor smart pointers.
Definition: itensor.hpp:60
void update_graph(tenncor::UpdateGraphRequest &request)
Add job that pass UpdateGraphRequest.
Definition: client.hpp:142
void join_then_stop(const std::chrono::time_point< std::chrono::system_clock > &deadline)
Wait until specified deadline, then terminate all jobs in the client.
Definition: session.hpp:515
void stop(void)
Kill all request jobs.
Definition: session.hpp:533
bool operator==(const EdgeInfo &lhs, const EdgeInfo &rhs)
Graph edge equality.
Definition: session.hpp:60
void update_node_data(std::vector< tenncor::UpdateNodeDataRequest > &requests, size_t update_it)
Add job that streams UpdateNodeDataRequest.
Definition: client.hpp:200
eteq::Session sess_
Session underneath.
Definition: session.hpp:548
static const size_t data_sync_interval
Definition: client.hpp:28
TagRegistry & get_reg(void)
Return reference to global tag registry.
TagRepsT get_tags(const teq::iTensor *tens)
Return all key-labels under the collective associated with tens.
Definition: tag.hpp:184
static const std::string tag_node_type
Definition: session.hpp:34
void clear(void)
Kill all request jobs.
Definition: client.hpp:279
Graph edge hashing.
Definition: session.hpp:47
size_t update_it_
Definition: session.hpp:561
Interface of traversible and differentiable nodes with shape information.
Definition: itensor.hpp:36
GRPC client that checks for server health and make graph creation and update calls.
Definition: client.hpp:49
bool is_connected(void)
Return true if the client is connected to the server.
Definition: client.hpp:267
std::string get_session_id(void) const
Return session id.
Definition: session.hpp:539
void update(teq::TensSetT ignored={}) override
Implementation of iSession.
Definition: session.hpp:82
tag::TagRegistry & registry_
Tag registry.
Definition: session.hpp:551
static const std::string tag_str_key
Definition: session.hpp:32
void join(void)
Wait until all request jobs are complete.
Definition: client.hpp:273
size_t child_
Definition: session.hpp:42
void update_target(teq::TensSetT target, teq::TensSetT ignored={}) override
Implementation of iSession.
Definition: session.hpp:114
Graph edge intermediate representation.
Definition: session.hpp:39
void optimize(const opt::OptCtx &rules)
Apply input optimization rules using opt module, then re-track.
Definition: session.hpp:383
Leaf of the graph commonly representing the variable in an equation.
Definition: ileaf.hpp:19
teq::GraphStat stat_
Definition: session.hpp:565
bool is_identity(iCoordMap *coorder)
Checks if the coord mapper is an identity mapper.