diff --git a/modules/platforms/cpp/tests/fake_server/CMakeLists.txt b/modules/platforms/cpp/tests/fake_server/CMakeLists.txt index ea4abb5be512..1089e939478f 100644 --- a/modules/platforms/cpp/tests/fake_server/CMakeLists.txt +++ b/modules/platforms/cpp/tests/fake_server/CMakeLists.txt @@ -22,6 +22,7 @@ set(SOURCES fake_server.cpp tcp_client_channel.cpp connection_test.cpp + proxy/kgb_proxy.cpp ) ignite_test(${TARGET} SOURCES ${SOURCES} LIBS ignite-test-common ignite3-client msgpack-c ignite-protocol ignite-tuple) \ No newline at end of file diff --git a/modules/platforms/cpp/tests/fake_server/connection_test.cpp b/modules/platforms/cpp/tests/fake_server/connection_test.cpp index d1146b1ae3ec..3e90454ef693 100644 --- a/modules/platforms/cpp/tests/fake_server/connection_test.cpp +++ b/modules/platforms/cpp/tests/fake_server/connection_test.cpp @@ -15,9 +15,10 @@ * limitations under the License. */ -#include "tests/client-test/ignite_runner_suite.h" -#include "ignite/client/ignite_client.h" #include "fake_server.h" +#include "ignite/client/ignite_client.h" +#include "proxy/kgb_proxy.h" +#include "tests/client-test/ignite_runner_suite.h" #include #include @@ -76,3 +77,21 @@ TEST_F(connection_test, request_timeout) { EXPECT_EQ(error::code::OPERATION_TIMEOUT, err.get_status_code()); } } + +TEST_F(connection_test, using_proxy) { + fake_server fs{50900, get_logger()}; + proxy::kgb_proxy proxy{50800, 50900}; + + fs.start(); + proxy.start(); + + ignite_client_configuration cfg; + cfg.set_logger(get_logger()); + cfg.set_endpoints(get_endpoints()); + + auto cl = ignite_client::start(cfg, 5s); + + auto cluster_nodes = cl.get_cluster_nodes(); + + ASSERT_EQ(1, cluster_nodes.size()); +} diff --git a/modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp b/modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp new file mode 100644 index 000000000000..b162d161a759 --- /dev/null +++ b/modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.cpp @@ -0,0 +1,290 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// + +#include "kgb_proxy.h" + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace ignite::proxy { + +void set_socket_non_blocking(int fd) { + using network::detail::set_non_blocking_mode; + + if (!set_non_blocking_mode(fd, true)) { + throw std::runtime_error("Error making socket non-blocking"); + } +} + +void kgb_proxy::enable_writable_notification(int fd) { // NOLINT(*-make-member-function-const) + epoll_event ep_ev{}; + + ep_ev.data.fd = fd; + ep_ev.events = EPOLLIN | EPOLLOUT | EPOLLET; + + epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev); +} + +void kgb_proxy::disable_writable_notification(int fd) { // NOLINT(*-make-member-function-const) + epoll_event ep_ev{}; + + ep_ev.data.fd = fd; + ep_ev.events = EPOLLIN | EPOLLET; + + epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ep_ev); +} + +void kgb_proxy::do_serve() { + epoll_event events[MAX_EVENTS]; + + bool stopped = false; + while (!stopped) { + int event_cnt = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); + + for (int i = 0; i < event_cnt; ++i) { + int fd = events[i].data.fd; + + if (fd == stop_event_fd) { + uint64_t val; + read(stop_event_fd, &val, sizeof(val)); + stopped = true; + } else if (fd == server_fd) { + process_incoming_connection(); + } else { + process_socket_event(events[i]); + } + } + } +} + +void kgb_proxy::add_event_fd() { + stop_event_fd = eventfd(0, EFD_NONBLOCK); + + epoll_event ev{}; + ev.events = EPOLLIN; + ev.data.fd = stop_event_fd; + + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, stop_event_fd, &ev); +} + +void kgb_proxy::fire_stop_event() { // NOLINT(*-make-member-function-const) + uint64_t one = 1; + write(stop_event_fd, &one, sizeof(one)); +} + +void kgb_proxy::start_server_socket() { + server_fd = socket(AF_INET, SOCK_STREAM, 0); + set_socket_non_blocking(server_fd); + + int opt = 1; + setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + + sockaddr_in addr{}; + + addr.sin_family = AF_INET; + addr.sin_port = htons(in_port); + addr.sin_addr.s_addr = INADDR_ANY; + + bind(server_fd, (sockaddr *) &addr, sizeof(addr)); + listen(server_fd, 16); + + epoll_event ev{}; + + ev.events = EPOLLIN; + ev.data.fd = server_fd; + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev); +} + +void kgb_proxy::add_socket_to_epoll(int fd) { // NOLINT(*-make-member-function-const) + epoll_event ev{}; + ev.events = EPOLLIN | EPOLLET; + ev.data.fd = fd; + + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev); +} + +void kgb_proxy::process_incoming_connection() { + while (true) { + int in_fd; + { + sockaddr_in addr{}; + socklen_t len = sizeof(addr); + + in_fd = accept(server_fd, (sockaddr*) &addr, &len); + + if (in_fd < 0) { + if (errno != EAGAIN) { + std::error_code ec{errno, std::system_category()}; + std::cerr << "Unexpected issue when accepting connection err=" << ec.message() << "\n"; + } + break; + } + + set_socket_non_blocking(in_fd); + + std::cout << "Client connected to proxy fd = " << in_fd << std::endl; + } + + int out_fd; + { + out_fd = socket(AF_INET, SOCK_STREAM, 0); + + set_socket_non_blocking(out_fd); + + if (out_fd < 0) { + std::error_code ec{errno, std::system_category()}; + throw std::runtime_error("Unable to create socket for outbound proxy connection err = " + ec.message()); + } + + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_port = htons(out_port); + + inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); + + int res = connect(out_fd, (sockaddr*) &addr, sizeof(addr)); + + if (res < 0) { + if (errno != EINPROGRESS) { + std::error_code ec{errno, std::system_category()}; + throw std::runtime_error("Unable to connection to server err = " + ec.message()); + } + } + + std::cout << "Proxy connected to server fd = " << out_fd << std::endl; + } + + std::shared_ptr conn = std::make_shared(in_fd, out_fd); + + connections[in_fd] = conn; + connections[out_fd] = conn; + + add_socket_to_epoll(in_fd); + add_socket_to_epoll(out_fd); + + std::cout << "Socket pair has been created in_fd = " << in_fd << " out_fd = " << out_fd << std::endl; + } +} + +void kgb_proxy::process_socket_event(const epoll_event& ep_ev) { + int fd = ep_ev.data.fd; + + auto it = connections.find(fd); + if (it == connections.end()) { + throw std::runtime_error("Event for unknown socket occurred fd = " + std::to_string(fd)); + } + + auto conn = it->second; + + if (ep_ev.events & EPOLLIN) { + + char buf[BUFF_SIZE]; + + int src = fd; + int dst = src == conn->in_sock ? conn->out_sock : conn->in_sock; + + while (true) { + ssize_t received = recv(src, buf, sizeof(buf), 0); + + if (received < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + break; + } + + perror("recv"); + break; + + } + + if (received == 0) { + close(src); + close(dst); + + connections.erase(src); + connections.erase(dst); + break; + } + + auto& queue = src == conn->in_sock ? conn->in2out_queue : conn->out2in_queue; + queue.emplace(buf, received); + + enable_writable_notification(dst); + } + } + + if (ep_ev.events & EPOLLOUT) { + int dst = fd; + int src = dst == conn->in_sock ? conn->out_sock : conn->in_sock; + + auto& queue = src == conn->in_sock ? conn->in2out_queue : conn->out2in_queue; + + while (!queue.empty()) { + + const message_chunk& chunk = queue.front(); + ssize_t sent = send(dst, chunk.m_msg, chunk.m_size, 0); + + if (sent <= 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + break; + } + + perror("send"); + break; + } + + queue.pop(); + } + + disable_writable_notification(dst); + } +} + +kgb_proxy::~kgb_proxy() { + fire_stop_event(); + + m_polling_thread->join(); + + close(stop_event_fd); + close(server_fd); + close(epoll_fd); +} + +void kgb_proxy::start() { + epoll_fd = epoll_create1(0); + + add_event_fd(); + + start_server_socket(); + + std::cout << "Server listening on " << in_port << std::endl; + + m_polling_thread = std::make_unique([this]() { + do_serve(); + }); +} +} // namespace ignite::proxy \ No newline at end of file diff --git a/modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.h b/modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.h new file mode 100644 index 000000000000..596954d924d4 --- /dev/null +++ b/modules/platforms/cpp/tests/fake_server/proxy/kgb_proxy.h @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// + +#pragma once +#include +#include +#include +#include +#include +#include +#include + +namespace ignite::proxy { + +struct proxy_connection; + +class kgb_proxy { +public: + static constexpr int MAX_EVENTS = 64; + static constexpr int BUFF_SIZE = 4096; + + kgb_proxy(int in_port, int out_port) + : in_port(in_port) + , out_port(out_port) { } + + ~kgb_proxy(); + void start(); + +private: + void enable_writable_notification(int fd); + void disable_writable_notification(int fd); + void do_serve(); + void add_event_fd(); + void fire_stop_event(); + void start_server_socket(); + void add_socket_to_epoll(int fd); + void process_incoming_connection(); + void process_socket_event(const epoll_event &ep_ev); + const int in_port; + const int out_port; + int server_fd{-1}; + int epoll_fd{-1}; + int stop_event_fd{-1}; + std::unique_ptr m_polling_thread{}; + bool connected{false}; + std::unordered_map> connections{}; +}; + +struct message_chunk { + char* m_msg = nullptr; + size_t m_size; + + message_chunk(char *msg, size_t size) + : m_size(size) + { + assert(size <= kgb_proxy::BUFF_SIZE); + + m_msg = new char[size]; + + std::memcpy(m_msg, msg, size); + } + + ~message_chunk() { + delete[] m_msg; + } +}; + + +struct proxy_connection { + int in_sock; + int out_sock; + + proxy_connection(int cl_sock, int srv_sock) + : in_sock(cl_sock) + , out_sock(srv_sock) {} + + std::queue in2out_queue{}; + std::queue out2in_queue{}; +}; + +} // namespace ignite::proxy \ No newline at end of file