11 #include <grpc/grpc.h> 12 #include <grpcpp/channel.h> 13 #include <grpcpp/client_context.h> 15 #include "jobs/managed_job.hpp" 16 #include "jobs/sequence.hpp" 18 #include "dbg/grpc/tenncor.grpc.pb.h" 20 #ifndef DBG_GRPC_CLIENT_HPP 21 #define DBG_GRPC_CLIENT_HPP 35 ClientConfig (std::chrono::duration<int64_t,std::milli> request_duration,
36 std::chrono::duration<int64_t,std::milli> stream_duration) :
41 std::chrono::milliseconds(250);
45 std::chrono::milliseconds(10000);
53 stub_(tenncor::GraphEmitter::NewStub(channel)),
57 jobs::ManagedJob healthjob(
58 [
this](std::future<void> stop_it)
63 grpc::ClientContext context;
64 tenncor::CreateGraphResponse response;
66 std::chrono::time_point<std::chrono::system_clock> deadline =
67 std::chrono::system_clock::now() +
68 std::chrono::milliseconds(1000);
69 context.set_deadline(deadline);
71 stub_->HealthCheck(&context, empty, &empty);
74 std::this_thread::sleep_for(
75 std::chrono::milliseconds(1000));
77 while (stop_it.wait_for(std::chrono::milliseconds(1)) ==
78 std::future_status::timeout);
88 [
this](std::future<void> dependency, std::future<void> stop_it,
89 tenncor::CreateGraphRequest request)
91 if (dependency.valid())
96 std::this_thread::get_id());
97 for (
size_t attempt = 0;
98 stop_it.wait_for(std::chrono::milliseconds(1)) ==
102 grpc::ClientContext context;
103 tenncor::CreateGraphResponse response;
105 std::chrono::time_point<std::chrono::system_clock> deadline =
107 context.set_deadline(deadline);
109 grpc::Status status = this->
stub_->CreateGraph(
110 &context, request, &response);
113 auto res_status = response.status();
114 if (tenncor::Status::OK != res_status)
116 logs::errorf(
"%s: %s",
117 tenncor::Status_Name(res_status).c_str(),
118 response.message().c_str());
122 logs::infof(
"%s: CreateGraphRequest success: %s",
123 sid.c_str(), response.message().c_str());
130 "%s: CreateGraphRequest attempt %d failure: %s",
131 sid.c_str(), attempt,
132 status.error_message().c_str());
134 std::this_thread::sleep_for(
135 std::chrono::milliseconds(attempt * 1000));
137 logs::warnf(
"%s: CreateGraphRequest terminating", sid.c_str());
138 }, std::move(request));
146 [
this](std::future<void> dependency, std::future<void> stop_it,
147 tenncor::UpdateGraphRequest request)
149 if (dependency.valid())
154 std::this_thread::get_id());
155 for (
size_t attempt = 0;
156 stop_it.wait_for(std::chrono::milliseconds(1)) ==
160 grpc::ClientContext context;
161 tenncor::UpdateGraphResponse response;
163 std::chrono::time_point<std::chrono::system_clock> deadline =
165 context.set_deadline(deadline);
167 grpc::Status status = this->
stub_->UpdateGraph(
168 &context, request, &response);
171 auto res_status = response.status();
172 if (tenncor::Status::OK != res_status)
174 logs::errorf(
"%s: %s",
175 tenncor::Status_Name(res_status).c_str(),
176 response.message().c_str());
180 logs::infof(
"%s: UpdateGraphRequest success: %s",
181 sid.c_str(), response.message().c_str());
188 "%s: UpdateGraphRequest attempt %d failure: %s",
189 sid.c_str(), attempt,
190 status.error_message().c_str());
192 std::this_thread::sleep_for(
193 std::chrono::milliseconds(attempt * 1000));
195 logs::warnf(
"%s: UpdateGraphRequest terminating", sid.c_str());
196 }, std::move(request));
201 std::vector<tenncor::UpdateNodeDataRequest>& requests,
205 [
this](std::future<void> dependency,
206 std::future<void> stop_it,
207 std::vector<tenncor::UpdateNodeDataRequest> requests,
size_t update_it)
209 if (dependency.valid())
214 std::this_thread::get_id());
215 tenncor::UpdateNodeDataResponse response;
216 grpc::ClientContext context;
218 std::chrono::time_point<std::chrono::system_clock> deadline =
219 std::chrono::system_clock::now() +
221 context.set_deadline(deadline);
222 std::unique_ptr<grpc::ClientWriterInterface<
223 tenncor::UpdateNodeDataRequest>> writer(
224 stub_->UpdateNodeData(&context, &response));
226 for (
auto& request : requests)
228 if (stop_it.wait_for(std::chrono::milliseconds(1)) !=
229 std::future_status::timeout)
233 if (
false == writer->Write(request))
235 logs::errorf(
"failed to write update %d", update_it);
239 writer->WritesDone();
241 grpc::Status status = writer->Finish();
244 auto res_status = response.status();
245 if (tenncor::Status::OK != res_status)
247 logs::errorf(
"%s: %s",
248 tenncor::Status_Name(res_status).c_str(),
249 response.message().c_str());
259 "UpdateNodeData failure: %s",
260 status.error_message().c_str());
262 logs::warnf(
"%s: UpdateNodeData terminating", sid.c_str());
263 }, std::move(requests), std::move(update_it));
285 std::unique_ptr<tenncor::GraphEmitter::Stub>
stub_;
299 #endif // DBG_GRPC_CLIENT_HPP GraphEmitterClient(std::shared_ptr< grpc::ChannelInterface > channel, ClientConfig cfg)
Definition: client.hpp:51
std::atomic< bool > connected_
Definition: client.hpp:293
Definition: custom_functor.hpp:20
void create_graph(tenncor::CreateGraphRequest &request)
Add job that pass CreateGraphRequest.
Definition: client.hpp:84
ClientConfig(void)=default
std::chrono::duration< int64_t, std::milli > request_duration_
Request timeout.
Definition: client.hpp:40
Configuration wrapper for creating the client.
Definition: client.hpp:31
jobs::ManagedJob health_checker_
Definition: client.hpp:294
std::string to_string(teq::CoordptrT c)
Return brief hashable string representation of coordinate mapper.
void update_graph(tenncor::UpdateGraphRequest &request)
Add job that pass UpdateGraphRequest.
Definition: client.hpp:142
void update_node_data(std::vector< tenncor::UpdateNodeDataRequest > &requests, size_t update_it)
Add job that streams UpdateNodeDataRequest.
Definition: client.hpp:200
ClientConfig cfg_
Definition: client.hpp:287
std::unique_ptr< tenncor::GraphEmitter::Stub > stub_
Definition: client.hpp:285
static const size_t data_sync_interval
Definition: client.hpp:28
void clear(void)
Kill all request jobs.
Definition: client.hpp:279
static const size_t max_attempts
Definition: client.hpp:26
GRPC client that checks for server health and make graph creation and update calls.
Definition: client.hpp:49
bool is_connected(void)
Return true if the client is connected to the server.
Definition: client.hpp:267
std::chrono::duration< int64_t, std::milli > stream_duration_
Stream timeout.
Definition: client.hpp:44
void join(void)
Wait until all request jobs are complete.
Definition: client.hpp:273
ClientConfig(std::chrono::duration< int64_t, std::milli > request_duration, std::chrono::duration< int64_t, std::milli > stream_duration)
Definition: client.hpp:35
jobs::Sequence sequential_jobs_
Definition: client.hpp:290