Tenncor
client.hpp
Go to the documentation of this file.
1 
9 #include <chrono>
10 
11 #include <grpc/grpc.h>
12 #include <grpcpp/channel.h>
13 #include <grpcpp/client_context.h>
14 
15 #include "jobs/managed_job.hpp"
16 #include "jobs/sequence.hpp"
17 
18 #include "dbg/grpc/tenncor.grpc.pb.h"
19 
20 #ifndef DBG_GRPC_CLIENT_HPP
21 #define DBG_GRPC_CLIENT_HPP
22 
23 namespace dbg
24 {
25 
26 static const size_t max_attempts = 10;
27 
28 static const size_t data_sync_interval = 50;
29 
32 {
33  ClientConfig (void) = default;
34 
35  ClientConfig (std::chrono::duration<int64_t,std::milli> request_duration,
36  std::chrono::duration<int64_t,std::milli> stream_duration) :
37  request_duration_(request_duration), stream_duration_(stream_duration) {}
38 
40  std::chrono::duration<int64_t,std::milli> request_duration_ =
41  std::chrono::milliseconds(250);
42 
44  std::chrono::duration<int64_t,std::milli> stream_duration_ =
45  std::chrono::milliseconds(10000);
46 };
47 
49 struct GraphEmitterClient final
50 {
51  GraphEmitterClient (std::shared_ptr<grpc::ChannelInterface> channel,
52  ClientConfig cfg) :
53  stub_(tenncor::GraphEmitter::NewStub(channel)),
54  cfg_(cfg),
55  connected_(true)
56  {
57  jobs::ManagedJob healthjob(
58  [this](std::future<void> stop_it)
59  {
60  tenncor::Empty empty;
61  do
62  {
63  grpc::ClientContext context;
64  tenncor::CreateGraphResponse response;
65  // set context deadline
66  std::chrono::time_point<std::chrono::system_clock> deadline =
67  std::chrono::system_clock::now() +
68  std::chrono::milliseconds(1000);
69  context.set_deadline(deadline);
70  grpc::Status status =
71  stub_->HealthCheck(&context, empty, &empty);
72  this->connected_ = status.ok();
73 
74  std::this_thread::sleep_for(
75  std::chrono::milliseconds(1000));
76  }
77  while (stop_it.wait_for(std::chrono::milliseconds(1)) ==
78  std::future_status::timeout);
79  });
80  health_checker_ = std::move(healthjob);
81  }
82 
84  void create_graph (tenncor::CreateGraphRequest& request)
85  {
86  // retries sending creation request unless stop_it times out
87  sequential_jobs_.attach_job(
88  [this](std::future<void> dependency, std::future<void> stop_it,
89  tenncor::CreateGraphRequest request)
90  {
91  if (dependency.valid())
92  {
93  dependency.get(); // wait for dependency completion
94  }
95  std::string sid = fmts::to_string(
96  std::this_thread::get_id());
97  for (size_t attempt = 0;
98  stop_it.wait_for(std::chrono::milliseconds(1)) ==
99  std::future_status::timeout && attempt < max_attempts;
100  ++attempt)
101  {
102  grpc::ClientContext context;
103  tenncor::CreateGraphResponse response;
104  // set context deadline
105  std::chrono::time_point<std::chrono::system_clock> deadline =
106  std::chrono::system_clock::now() + cfg_.request_duration_;
107  context.set_deadline(deadline);
108 
109  grpc::Status status = this->stub_->CreateGraph(
110  &context, request, &response);
111  if (status.ok())
112  {
113  auto res_status = response.status();
114  if (tenncor::Status::OK != res_status)
115  {
116  logs::errorf("%s: %s",
117  tenncor::Status_Name(res_status).c_str(),
118  response.message().c_str());
119  }
120  else
121  {
122  logs::infof("%s: CreateGraphRequest success: %s",
123  sid.c_str(), response.message().c_str());
124  return;
125  }
126  }
127  else
128  {
129  logs::errorf(
130  "%s: CreateGraphRequest attempt %d failure: %s",
131  sid.c_str(), attempt,
132  status.error_message().c_str());
133  }
134  std::this_thread::sleep_for(
135  std::chrono::milliseconds(attempt * 1000));
136  }
137  logs::warnf("%s: CreateGraphRequest terminating", sid.c_str());
138  }, std::move(request));
139  }
140 
142  void update_graph (tenncor::UpdateGraphRequest& request)
143  {
144  // retries sending creation request unless stop_it times out
145  sequential_jobs_.attach_job(
146  [this](std::future<void> dependency, std::future<void> stop_it,
147  tenncor::UpdateGraphRequest request)
148  {
149  if (dependency.valid())
150  {
151  dependency.get(); // wait for dependency completion
152  }
153  std::string sid = fmts::to_string(
154  std::this_thread::get_id());
155  for (size_t attempt = 0;
156  stop_it.wait_for(std::chrono::milliseconds(1)) ==
157  std::future_status::timeout && attempt < max_attempts;
158  ++attempt)
159  {
160  grpc::ClientContext context;
161  tenncor::UpdateGraphResponse response;
162  // set context deadline
163  std::chrono::time_point<std::chrono::system_clock> deadline =
164  std::chrono::system_clock::now() + cfg_.request_duration_;
165  context.set_deadline(deadline);
166 
167  grpc::Status status = this->stub_->UpdateGraph(
168  &context, request, &response);
169  if (status.ok())
170  {
171  auto res_status = response.status();
172  if (tenncor::Status::OK != res_status)
173  {
174  logs::errorf("%s: %s",
175  tenncor::Status_Name(res_status).c_str(),
176  response.message().c_str());
177  }
178  else
179  {
180  logs::infof("%s: UpdateGraphRequest success: %s",
181  sid.c_str(), response.message().c_str());
182  return;
183  }
184  }
185  else
186  {
187  logs::errorf(
188  "%s: UpdateGraphRequest attempt %d failure: %s",
189  sid.c_str(), attempt,
190  status.error_message().c_str());
191  }
192  std::this_thread::sleep_for(
193  std::chrono::milliseconds(attempt * 1000));
194  }
195  logs::warnf("%s: UpdateGraphRequest terminating", sid.c_str());
196  }, std::move(request));
197  }
198 
201  std::vector<tenncor::UpdateNodeDataRequest>& requests,
202  size_t update_it)
203  {
204  sequential_jobs_.attach_job(
205  [this](std::future<void> dependency,
206  std::future<void> stop_it,
207  std::vector<tenncor::UpdateNodeDataRequest> requests, size_t update_it)
208  {
209  if (dependency.valid())
210  {
211  dependency.get(); // wait for dependency completion
212  }
213  std::string sid = fmts::to_string(
214  std::this_thread::get_id());
215  tenncor::UpdateNodeDataResponse response;
216  grpc::ClientContext context;
217  // set context deadline
218  std::chrono::time_point<std::chrono::system_clock> deadline =
219  std::chrono::system_clock::now() +
220  std::chrono::milliseconds(cfg_.stream_duration_);
221  context.set_deadline(deadline);
222  std::unique_ptr<grpc::ClientWriterInterface<
223  tenncor::UpdateNodeDataRequest>> writer(
224  stub_->UpdateNodeData(&context, &response));
225 
226  for (auto& request : requests)
227  {
228  if (stop_it.wait_for(std::chrono::milliseconds(1)) !=
229  std::future_status::timeout)
230  {
231  break;
232  }
233  if (false == writer->Write(request))
234  {
235  logs::errorf("failed to write update %d", update_it);
236  break;
237  }
238  }
239  writer->WritesDone();
240 
241  grpc::Status status = writer->Finish();
242  if (status.ok())
243  {
244  auto res_status = response.status();
245  if (tenncor::Status::OK != res_status)
246  {
247  logs::errorf("%s: %s",
248  tenncor::Status_Name(res_status).c_str(),
249  response.message().c_str());
250  }
251  else
252  {
253  return;
254  }
255  }
256  else
257  {
258  logs::errorf(
259  "UpdateNodeData failure: %s",
260  status.error_message().c_str());
261  }
262  logs::warnf("%s: UpdateNodeData terminating", sid.c_str());
263  }, std::move(requests), std::move(update_it));
264  }
265 
267  bool is_connected (void)
268  {
269  return connected_;
270  }
271 
273  void join (void)
274  {
275  sequential_jobs_.join();
276  }
277 
279  void clear (void)
280  {
281  sequential_jobs_.stop();
282  }
283 
284 private:
285  std::unique_ptr<tenncor::GraphEmitter::Stub> stub_;
286 
288 
289  // every request from emitter has dependency on the previous request
290  jobs::Sequence sequential_jobs_;
291 
292  // connection state
293  std::atomic<bool> connected_;
294  jobs::ManagedJob health_checker_;
295 };
296 
297 }
298 
299 #endif // DBG_GRPC_CLIENT_HPP
GraphEmitterClient(std::shared_ptr< grpc::ChannelInterface > channel, ClientConfig cfg)
Definition: client.hpp:51
std::atomic< bool > connected_
Definition: client.hpp:293
Definition: custom_functor.hpp:20
void create_graph(tenncor::CreateGraphRequest &request)
Add job that pass CreateGraphRequest.
Definition: client.hpp:84
ClientConfig(void)=default
std::chrono::duration< int64_t, std::milli > request_duration_
Request timeout.
Definition: client.hpp:40
Configuration wrapper for creating the client.
Definition: client.hpp:31
jobs::ManagedJob health_checker_
Definition: client.hpp:294
std::string to_string(teq::CoordptrT c)
Return brief hashable string representation of coordinate mapper.
void update_graph(tenncor::UpdateGraphRequest &request)
Add job that pass UpdateGraphRequest.
Definition: client.hpp:142
void update_node_data(std::vector< tenncor::UpdateNodeDataRequest > &requests, size_t update_it)
Add job that streams UpdateNodeDataRequest.
Definition: client.hpp:200
ClientConfig cfg_
Definition: client.hpp:287
std::unique_ptr< tenncor::GraphEmitter::Stub > stub_
Definition: client.hpp:285
static const size_t data_sync_interval
Definition: client.hpp:28
void clear(void)
Kill all request jobs.
Definition: client.hpp:279
static const size_t max_attempts
Definition: client.hpp:26
GRPC client that checks for server health and make graph creation and update calls.
Definition: client.hpp:49
bool is_connected(void)
Return true if the client is connected to the server.
Definition: client.hpp:267
std::chrono::duration< int64_t, std::milli > stream_duration_
Stream timeout.
Definition: client.hpp:44
void join(void)
Wait until all request jobs are complete.
Definition: client.hpp:273
ClientConfig(std::chrono::duration< int64_t, std::milli > request_duration, std::chrono::duration< int64_t, std::milli > stream_duration)
Definition: client.hpp:35
jobs::Sequence sequential_jobs_
Definition: client.hpp:290