From af0a1c4bbfe5d5446bc178749eeb75e4de21bb32 Mon Sep 17 00:00:00 2001 From: lucic71 Date: Mon, 4 May 2020 22:28:53 +0300 Subject: [PATCH] Tema 2 PC --- Makefile | 39 +++++ README | 165 +++++++++++++++++++ include/Client.hpp | 29 ++++ include/helpers.h | 23 +++ include/protocol.hpp | 174 ++++++++++++++++++++ include/server_utils.hpp | 94 +++++++++++ include/subscriber_utils.hpp | 21 +++ include/tcp_handler.hpp | 51 ++++++ include/topic.hpp | 75 +++++++++ include/udp_handler.hpp | 44 +++++ protocol.cpp | 309 +++++++++++++++++++++++++++++++++++ sample_payloads.json | 49 ++++++ server.cpp | 236 ++++++++++++++++++++++++++ server_utils.cpp | 283 ++++++++++++++++++++++++++++++++ subscriber.cpp | 233 ++++++++++++++++++++++++++ subscriber_utils.cpp | 88 ++++++++++ tcp_handler.cpp | 170 +++++++++++++++++++ three_topics_payloads.json | 136 +++++++++++++++ udp_client.py | 130 +++++++++++++++ udp_handler.cpp | 138 ++++++++++++++++ 20 files changed, 2487 insertions(+) create mode 100644 Makefile create mode 100644 README create mode 100644 include/Client.hpp create mode 100644 include/helpers.h create mode 100644 include/protocol.hpp create mode 100644 include/server_utils.hpp create mode 100644 include/subscriber_utils.hpp create mode 100644 include/tcp_handler.hpp create mode 100644 include/topic.hpp create mode 100644 include/udp_handler.hpp create mode 100644 protocol.cpp create mode 100644 sample_payloads.json create mode 100644 server.cpp create mode 100644 server_utils.cpp create mode 100644 subscriber.cpp create mode 100644 subscriber_utils.cpp create mode 100644 tcp_handler.cpp create mode 100644 three_topics_payloads.json create mode 100644 udp_client.py create mode 100644 udp_handler.cpp diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..90e2bff --- /dev/null +++ b/Makefile @@ -0,0 +1,39 @@ +SERVER=server +SUBSCRIBER=subscriber +C_SOURCES=$(wildcard *.c) # select all c source files +CPP_SOURCES=$(wildcard *.cpp) # select all cpp source files +LIBRARY=nope +INCPATHS=include +LIBPATHS=. +LDFLAGS= +CXXFLAGS=-c -Wall -g3 +CXX=g++ + +# Automatic generation of some important lists + +# tell the makefile to generate object files for each source file +OBJECTS=$(C_SOURCES:.c=.o) $(CPP_SOURCES:.cpp=.o) +INCFLAGS=$(foreach TMP,$(INCPATHS),-I$(TMP)) +LIBFLAGS=$(foreach TMP,$(LIBPATHS),-L$(TMP)) + +# Set up the output file names for the different output types +BINARY_SERVER=$(SERVER) +BINARY_SUBSCRIBER=$(SUBSCRIBER) + +all: $(C_SOURCES) $(CPP_SOURCES) $(BINARY_SERVER) $(BINARY_SUBSCRIBER) + +$(BINARY_SERVER): $(filter-out subscriber.o subscriber_utils.o, $(OBJECTS)) + $(CXX) $(LIBFLAGS) $(filter-out subscriber.o subscriber_utils.o, $(OBJECTS)) $(LDFLAGS) -o $@ + +$(BINARY_SUBSCRIBER): $(filter-out server.o server_utils.o, $(OBJECTS)) + $(CXX) $(LIBFLAGS) $(filter-out server.o server_utils.o, $(OBJECTS)) $(LDFLAGS) -o $@ + +.cpp.o: + $(CXX) $(INCFLAGS) $(CXXFLAGS) -fPIC $< -o $@ + +distclean: clean + rm -f $(BINARY_SERVER) $(BINARY_SUBSCRIBER) + +clean: + rm -f $(OBJECTS) + diff --git a/README b/README new file mode 100644 index 0000000..c4cd841 --- /dev/null +++ b/README @@ -0,0 +1,165 @@ +POPESCU LUCIAN IOAN 321CDa +-------------------------- + + TEMA 2 PROTOCOALE DE COMUNICATIE + -------------------------------- + +I. File list +------------ +include/ Header files containing definitions of functions, macros + and data structures for Topic and Client. +protocol.cpp Sends packets between the server and the subscribers +server.cpp Implements the server side functionality +server_utils.cpp Utils that forward topics, manage clients, etc +subscriber.cpp Implements the client side functionality +subscriber_utils.cpp Utilis that print a topic to stdout +tcp_handler.cpp Accepts connections, establishes connections, etc +udp_handler.cpp Establishes connections and parses datagrams +Makefile Makefile to build the server and subscriber executables +README This file + +Notations: + @ - variable + # - function + CAPITAL - macro + +II. Protocol +------------ +The protocol used for communcation between the server and the subscribers is +build on top of the TCP protocol. It consists of multiple replies and +requests between the two instances. + +Firstly the server tries to accept the connection of the subscriber using +the accept(2) function. In this time an IDREQUEST is sent to the subscriber. +The newly received client ID will be used to print a feedback message +about the connection to stdout. +The IDREQUEST consists of a single byte defined in include/protocol.hpp. +When the subscriber receives this byte from the socket, it sends back +at most CLIENTID_MAX_LEN(include/protocol.cpp:33) bytes, containing its client +id. + +If the client already exists in the database of the server then a +CONNECTION_REFUSED_BYTE(include/protocol.hpp:21) is sent and the client must +disconnect. This way the server makes sure that there will not be two clients +with the same ID at the same time. + +Next, the subscriber will send an un/subscribe request. For a subscribe request +the format is the following: "subscribe $topic $SF". The request is sent +in text-format, using human readable characters, over the socket. When arriving +to the server, it parses the packet and binds the clients with its desired +topic. The same happens for an unsubscribe request. + +The last type of communication occurs when a topic is sent over the socket. +The server sends in the beginning a TOPIC_ANNOUNCEMENT_BYTE +(include/protocol.hpp:20) to let the subscriber know that a topic is waiting to +be transmitted. Next, the server sends the following fields from the Topic +structure(include/topic.hpp): ip, port, topic, data_type, value_sz. After +the client receives the value_sz field it will wait for value_sz bytes that +represent the value field from Topic. This happens because all the fields +in Topic have fixed length, except value, which can vary between 1 and +1500 bytes. The subscriber will also check if it received the correct number of +bytes. If not it will recall recv(2) to fill the buffer. However, if the +correct number of bytes is not received correctly in the second try, the +subscriber will probably crash or may produce undefined behavior. + +III. Server +----------- +The server consist of an initialization phase and a forwarding phase. +In the initialization phase TCP and UDP connections are established, +also data structures for topic and clients are declared. Next is +the forwarding phase where a select(2) call multiplexes between different +file descriptors(TCP listener, UDP listener, TCP clients, stdin). + +If the @tcp_listener file descriptor is on it means that a new subscriber +waits to be accepted using include/tcp_handler.hpp:#accept_connection. This +is also the step where the pending topics are sent to the newly subscribed +client. + +IF @clients file descriptors are on it means that a new un/subscribe request +is coming and the server must process it. If the subscriber sends 0 bytes +on the socket then it disconnected and the server must delete its entry +from @clients and unvalidate its entry in @topics_table. + +If the @udp_listener file descriptor is on it means that a new topic arrived +on the UDP socket, therefore the server will parse the topic to a Topic +data structure(include/topic.hpp) and forward the topic using +include/server_utils.hpp:#forward_topic. Next if the clients disconnected +during the forwarding then delete their entry from @clients also and +unvalidate their entry in @topics_table. + +If STDIN_FILENO is set then the server received a command from stdin. +The only command in exit so the server will break from the forwarding +phase, will kill the connections with the clients and will end the +process. + +IV. Subscriber +-------------- +The subscriber operates in a similar way with the server. It has an +initialization phase where it declares a global buffer and connection +through the socket and a receiving phase where it waits data either from +server or from stdin + +If it receives data from server, using the file descriptor @socket_fd, +it has to check what kind of data it is by checking @announce_byte, +which can have one of the following three values: +IDREQUEST_ANNOUNCEMENT_BYTE, TOPIC_ANNOUNCEMENT_BYTE, CONNECTION_REFUSED_BYTE. +They are defined in include/protocol.hpp. + +If it receives data from stdin, then it must disconnect if the command is +exit. + + +V. Data structures +------------------ +For representing @topics_table and @pending_table I chose a std::unordered_map +because it let me bind a topic name(@topics_table) represented as a std::string +with a std::list of clients subscribed to that topic. Same goes for @pending_table, +I bound std::string representing the client id with a std::list of topics that +will be received by it after it reconnects. + +For representing the @clients I chose a ClientIO structure, defined in include/ +Client.hpp. It is basically the same as Client, also defined in include/Client.hpp, +but without the SF option. The latter is used in @topics_table and helps the server +to keep track of the clients susbcribed to a given topic. I could have used a single +Client structure but for me it would have been more ambiguous, so I chose the +easier way. + +VI. Macros +---------- +I defined macros for each function that I considered to be error-prone in the +header files associated with the respective functions. I did so because I +wanted the code to be as readable as possible, so I tried not to mix the +main code with the error handling code. + + +VII. Bottlenecks, known bugs and edge cases +------------------------------- +When the client connects to the server there must exist a delay between the time +the connection is established and the first un/subscribe request is sent. This +happens because the server must send back a clientID request which may overlap +with a subscribe request sent by the subscriber. So if, for example, the user +sends its input through a pipe, the subscriber and server will not handle +correctly the input. (example: cat commands | ./subscriber id ip port) + +If the subscriber fills the server with a lot of unuseful subscriptions, the +server has no way to know that it should delete them and will eventually run +out of memory. + +Before the server disconencts, all messages will arrive to subscribers, because +in the forwarding phase of the server, the stdin input is handled after all +operations are done(forwarding, receiving from udp), so the subscribers should +expect that all topics are received. + +The subscribers and the server will output meaningful messages to stdout when +an invalid operation occurs. + +If a client will not be able to receive data through the socket on which it is +connected then it will exit and will display an error message. This happens +because when the data is malformed or cannot be transmitted it means that +something bad happened with the server so the client should not continue to +live. + +When a client disconnects during topics forwarding then a new entry for its +client id is created in @pending_topics and the topics will be retransmitted +when it reconnects. If it disconnects even when the pending topics forwarding +is performed, then the topics will be dropped. diff --git a/include/Client.hpp b/include/Client.hpp new file mode 100644 index 0000000..2189ba6 --- /dev/null +++ b/include/Client.hpp @@ -0,0 +1,29 @@ +#ifndef CLIENT_HPP_ +#define CLIENT_HPP_ + +#include + +/* + * Definition of a client struct containg info about the + * ID of the client, the file descriptor associated with this client + * and the store&forward option. It is used for forwarding + * + */ +struct Client { + std::string clientID; + int fd; + bool sf; +}; + +/* + * Definition of a client struct containing info about the ID of the + * client and the file descriptor assoiated with this client. It is + * used for client-server communication. + * + */ +struct ClientIO { + std::string clientID; + int fd; +}; + +#endif diff --git a/include/helpers.h b/include/helpers.h new file mode 100644 index 0000000..e6c6d6a --- /dev/null +++ b/include/helpers.h @@ -0,0 +1,23 @@ +#pragma once + +#include +#include +#include + +/* + * Macro de verificare a erorilor + * Exemplu: + * int fd = open (file_name , O_RDONLY); + * DIE( fd == -1, "open failed"); + */ + +#define DIE(assertion, call_description) \ + do { \ + if (assertion) { \ + fprintf(stderr, "(%s, %d): ", \ + __FILE__, __LINE__); \ + perror(call_description); \ + exit(EXIT_FAILURE); \ + } \ + } while(0) + diff --git a/include/protocol.hpp b/include/protocol.hpp new file mode 100644 index 0000000..8eaf6dc --- /dev/null +++ b/include/protocol.hpp @@ -0,0 +1,174 @@ +#ifndef PROTOCOL_HPP_ +#define PROTOCOL_HPP_ + +#include +#include +#include +#include + +#include "topic.hpp" + +using namespace std; + + +/* + * Bytes that notify the subscriber what kind of message it + * is about to receive. + * + */ +#define IDREQUEST_ANNOUNCEMENT_BYTE 0xFF +#define TOPIC_ANNOUNCEMENT_BYTE 1 +#define CONNECTION_REFUSED_BYTE 2 +#define INVALID_REQUEST_ANNOUNCEMENT_BYTE 3 + +/* + * Maximum size of data that the protocol can operate with. + * + */ +#define BUFLEN (sizeof (Topic)) + +/* + * Maximum length of a client id. + * + */ +#define CLIENTID_MAX_LEN 10 + + + +/* + * Format of subscribe and unsubscribe requests. + * + */ +#define SUBSCRIBE "subscribe" +#define UNSUBSCRIBE "unsubscribe" + +/* + * Format of store and forward options. + * + */ +#define SF_ON "1" +#define SF_OFF "0" + +/* + * Maximul length (in bytes) of a un/subscribe request. + * Length of UNSUBSCRIBE + length of SF option + maximum length of a topic + * + 2 spaces. + * + */ +#define REQUEST_MAX_LEN (sizeof(UNSUBSCRIBE) - 1 + \ + sizeof(SF_ON) - 1 + TOPIC_MAX_LEN + sizeof(uint8_t) * 2) + +/* + * Lengths (in words separated by space) of subscribe and unsubscribe requests. + * + */ +#define SUBSCRIBE_REQUEST_LENGTH 3 +#define UNSUBSCRIBE_REQUEST_LENGTH 2 + +/* + * Makes a request for the client_id to a client described by a file + * descriptor @client. It has also a error handler macro associated. + * If a send or recv call fails then the function will return an + * empty string and a error mesage will be displayed. + * + */ +string request_client_id(int client); + +#define REQUEST_CLIENT_ID_ERROR(assertion, call_description) \ + do { \ + if (assertion) { \ + fprintf(stderr, "(%s, %d): ", \ + __FILE__, __LINE__); \ + perror(call_description); \ + return {}; \ + } \ + } while (0) + +/* + * Reply for the above mentioned request. The @client_id is transmited + * via the connection established by @socket. + * + * @return: REPLY_SUCCESSFUL if the subscriber sent bytes on the socket + * REPLY_FAILED if the subscriber failed to send bytes + * + */ +#define REPLY_SUCCESSFUL 0 +#define REPLY_FAILED 1 + +#define REPLY_CLIENT_ID_ERROR(assertion, call_description) \ + do { \ + if (assertion) { \ + fprintf(stderr, "(%s, %d): ", \ + __FILE__, __LINE__); \ + perror(call_description); \ + return REPLY_FAILED; \ + } \ + } while (0) + +int reply_client_id(int socket, char *client_id); + +/* + * Reply with a CONNECTION_REFUSED_BYTE from server to let the client + * know that there is already a client with the same clientID. + * + */ + +#define REPLY_CONNECTION_REFUSED_ERROR(assertion, call_description) \ + do { \ + if (assertion) { \ + fprintf(stderr, "(%s, %d): ", \ + __FILE__, __LINE__); \ + perror(call_description); \ + } \ + } while (0) + +void reply_connection_refused(int socket); + +/* + * Reply with a INVALID_REQUEST_ANNOUNCEMENT_BYTE from server to let the cient + * know that the un/subscribe request is sent is invalid. + * + */ +#define REPLY_INVALID_REQUEST_ERROR(assertion, call_description) \ + do { \ + if (assertion) { \ + fprintf(stderr, "(%s, %d): ", \ + __FILE__, __LINE__); \ + perror(call_description); \ + } \ + } while (0) + +void reply_invalid_request(int socket); + +/* + * When a subscriber sends a subscribe request, this function will resolve + * the request by adding the client in @topics_table. + * + * @return: CONNECTION_CLOSED_ERROR - connection was closed and the request + * could not be processed + * INVALID_REQUEST_ERROR - the request does not have the falid format + * REQUEST_SUCCESSFUL - the request was successfully + * processed + * + * The error macro is used when the function cannot read data from the + * socket. + * + */ +#define REQUEST_SUCCESSFUL 1 +#define CONNECTION_CLOSED_ERROR 0 +#define INVALID_REQUEST_ERROR -1 + +int resolve_subscribe_request(int client, + unordered_map>& topics_table); + +#define RESOLVE_SUBSCRIBE_REQUEST_ERROR(assertion, call_description) \ + do { \ + if (assertion) { \ + fprintf(stderr, "(%s, %d): ", \ + __FILE__, __LINE__); \ + perror(call_description); \ + return CONNECTION_CLOSED_ERROR; \ + } \ + } while (0) + +#endif diff --git a/include/server_utils.hpp b/include/server_utils.hpp new file mode 100644 index 0000000..b5196b8 --- /dev/null +++ b/include/server_utils.hpp @@ -0,0 +1,94 @@ +#ifndef SERVER_UTILS_HPP_ +#define SERVER_UTILS_HPP_ + +#include +#include +#include + +#include "topic.hpp" +#include "Client.hpp" +#include "helpers.h" + +#include "sys/socket.h" + +using namespace std; + +#define EXIT_COMMAND "exit" + +#define UNAVAILABLE_CLIENT -1 + +/* + * Prints the usage if the number of arguments is not correct. + * + */ +void usage(char *filename); + +/* + * Forwards the @topic to all the clients subscribed to it. Also updates + * the @pending_table if there are unavailable clients with the SF option on. + * + * If a client from @topics_table becomes unavailable while the topic is + * transmitted then the entry will be invalidated using UNAVAILABLE_CLIENT. + * + */ +void forward_topic(Topic topic, unordered_map>& topics_table, + unordered_map>& pending_table); + +/* + * After a client reconnects it must receive the pending packets associated + * with its @client_id. If the client disconnects during this operation, then + * the function will return an error value. Furthermore if the client + * disconnects during the forwarding then all pending packets will be + * dropped. + * + * @return: FORWARD_SUCCESSFUL when the operation finishes successfully + * FORWARD_FAILED the client disconnected during this operation + * + */ +#define FORWARD_SUCCESSFUL 0 +#define FORWARD_FAILED 1 + +int forward_pending_topics(int fd, string client_id, + unordered_map>& pending_table); + +/* + * Adds a all @clients to @set for further I/O multiplexing. + * Also update max_fd. + * + */ +void fd_set_add_clients(fd_set& set, int& max_fd, list clients); + +/* + * Close the connection with each client from @clients. + * + */ +void clients_close_connection(list clients); + +/* + * Traverse @topics_table and make the client unavailable by setting its file descriptor + * field to UNAVAILABLE_CLIENT. + * + */ +void unvalidate_client(int client_fd, + unordered_map>& topics_table); + +/* + * Traverse @topics_table and make the client available by setting its flie descriptor + * field to @new_fd. + * + * @return: the sf option of the validated client. It returns the sf option because there + * is no other palace where this option is stored and the server main function needs + * to know if it has to flush @pending_topics for @client_id. + * + */ +bool validate_client(string client_id, int new_fd, + unordered_map>& topics_table); + +/* + * After #forward_topic function is called, some clients may be invalidated, so + * their entry must be deleted from @clientsIO. + * + */ +void delete_invalid_clients(list subscribers, list& clientsIO); + +#endif diff --git a/include/subscriber_utils.hpp b/include/subscriber_utils.hpp new file mode 100644 index 0000000..c0e0202 --- /dev/null +++ b/include/subscriber_utils.hpp @@ -0,0 +1,21 @@ +#ifndef SUBSCRIBER_UTILS_HPP +#define SUBSCRIBER_UTILS_HPP + +#include "topic.hpp" + +/* + * Format of exit request. + * + */ +#define EXIT "EXIT" + +void usage(char *filename); + +/* + * Print to stdout the received topic. + * + */ +void print_topic(Topic *topic); + +#endif + diff --git a/include/tcp_handler.hpp b/include/tcp_handler.hpp new file mode 100644 index 0000000..a2a0301 --- /dev/null +++ b/include/tcp_handler.hpp @@ -0,0 +1,51 @@ +#ifndef HANDLER_TCP_HPP_ +#define HANDLER_TCP_HPP_ + +#include +#include + +#include "Client.hpp" + +#define DEFAULT_BACKLOG 32 + +using namespace std; + +/* + * Set up a TCP connection by returing a file descriptor + * that points to a socket that listens for new TCP connections. + * + */ +int establish_tcp_connection(int port); + +/* + * Helps a client to connect to the server specified by @server_ip + * and @server_port. + * + */ +int connect_to_server(char *server_ip, char *server_port); + +/* + * Accept an incomming connection from @listener and insert its file descriptor + * in @clients_fd. + * + * @return; ACCEPT_SUCCESSFUL + * ACCEPT_FAILED + * + */ + +#define ACCEPT_SUCCESSFUL 0 +#define ACCEPT_FAILED 1 + +#define ACCEPT_CONNECTION_ERROR(assertion, call_description) \ + do { \ + if (assertion) { \ + fprintf(stderr, "(%s, %d): ", \ + __FILE__, __LINE__); \ + perror(call_description); \ + return ACCEPT_FAILED; \ + } \ + } while (0) + +int accept_connection(int listener, list& client); + +#endif diff --git a/include/topic.hpp b/include/topic.hpp new file mode 100644 index 0000000..aaf7025 --- /dev/null +++ b/include/topic.hpp @@ -0,0 +1,75 @@ +#ifndef TOPIC_HPP_ +#define TOPIC_HPP_ + +#include +#include + +/* + * Format of data types. + * + */ +#define INT "INT" +#define SHORT_REAL "SHORT_REAL" +#define FLOAT "FLOAT" +#define STRING "STRING" + +/* + * Identifiers for each data type. + * + */ +#define INT_BYTE 0 +#define SHORT_REAL_BYTE 1 +#define FLOAT_BYTE 2 +#define STRING_BYTE 3 + +/* + * Size of value field in topic for each data type. + * + */ +#define INT_BYTE_VALUE_SIZE 5 +#define SHORT_REAL_BYTE_VALUE_SIZE 2 +#define FLOAT_BYTE_VALUE_SIZE 6 + +/* + * Bytes for signedness. + * + */ +#define POSITIVE 0 +#define NEGATIVE 1 + +/* + * Length of fields present in @Topic. + * + */ +#define TOPIC_MAX_LEN 50 +#define VALUE_MAX_LEN 1500 + +/* + * Definition of structure that will be sent over the TCP conenction + * to each subscriber. + * + */ +struct Topic { + uint32_t ip; + uint16_t port; + + char topic[TOPIC_MAX_LEN]; + uint8_t data_type; + size_t value_sz; + char value[VALUE_MAX_LEN]; + + /* + * Compare all fields in topic. Use == for primitive data types and memcmp + * for arrays. + * + */ + bool operator == (Topic& t) const { + return ip == t.ip and port == t.port and !memcmp(topic, t.topic, TOPIC_MAX_LEN) + and data_type == t.data_type and value_sz == t.value_sz + and !memcmp(value, t.value, VALUE_MAX_LEN); + } + + +}__attribute__((packed)); + +#endif diff --git a/include/udp_handler.hpp b/include/udp_handler.hpp new file mode 100644 index 0000000..a67d6a4 --- /dev/null +++ b/include/udp_handler.hpp @@ -0,0 +1,44 @@ +#ifndef HANDLER_UDP_HPP_ +#define HANDLER_UDP_HPP_ + +#include +#include "topic.hpp" + +using namespace std; + +/* + * Used for receiving a datagram in #parse_datagram. + * + */ +#define TOPIC 50 +#define DATA_TYPE 1 +#define CONTENT 1500 + +#define UDP_DATAGRAM_LEN (TOPIC + DATA_TYPE + CONTENT) + +/* + * Set up a UDP connections by returning a file descriptor that points + * to a socket that listens for UDP datagrams. + * + */ +int establish_udp_connection(int port); + +/* + * Receive a datagram from @socket and parse to be compatible with the + * Topic format (see include/topic.hpp). + * + */ + +#define PARSE_DATAGRAM_ERROR(assertion, call_description) \ + do { \ + if (assertion) { \ + fprintf(stderr, "(%s, %d): ", \ + __FILE__, __LINE__); \ + perror(call_description); \ + return {}; \ + } \ + } while (0) + +Topic parse_datagram(int socket); + +#endif diff --git a/protocol.cpp b/protocol.cpp new file mode 100644 index 0000000..6d202c7 --- /dev/null +++ b/protocol.cpp @@ -0,0 +1,309 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "protocol.hpp" +#include "helpers.h" + +string request_client_id(int client) { + /* + * Announce the client that it receives an id request. + * + */ + uint8_t announce_byte = IDREQUEST_ANNOUNCEMENT_BYTE; + + int write_bytes = send(client, &announce_byte, sizeof(announce_byte), 0); + REQUEST_CLIENT_ID_ERROR(write_bytes < 0, "server: request_client_id: write"); + + /* + * Wait for the reply. + * + */ + char buffer[CLIENTID_MAX_LEN + 1]; + memset(buffer, 0x00, CLIENTID_MAX_LEN + 1); + + int read_bytes = recv(client, buffer, CLIENTID_MAX_LEN, 0); + REQUEST_CLIENT_ID_ERROR(read_bytes < 0, "server: request_client_id: read"); + + return {buffer}; +} + +int reply_client_id(int socket, char *client_id) { + char buffer[CLIENTID_MAX_LEN + 1]; + + memset(buffer, 0x00, CLIENTID_MAX_LEN + 1); + strcpy(buffer, client_id); + + int write_bytes = send(socket, buffer, CLIENTID_MAX_LEN, 0); + REPLY_CLIENT_ID_ERROR(write_bytes < 0, "client: reply_client_id: send:"); + + return REPLY_SUCCESSFUL; +} + +void reply_connection_refused(int socket) { + + uint8_t announce_byte = CONNECTION_REFUSED_BYTE; + int write_bytes = send(socket, &announce_byte, sizeof(announce_byte), 0); + REPLY_CONNECTION_REFUSED_ERROR(write_bytes < 0, + "server: reply_connection_refused: send "); + +} + +void reply_invalid_request(int socket) { + + uint8_t announce_byte = INVALID_REQUEST_ANNOUNCEMENT_BYTE; + int write_bytes = send(socket, &announce_byte, sizeof(announce_byte), 0); + REPLY_INVALID_REQUEST_ERROR(write_bytes < 0, + "server: reply_connection_refused: send "); + +} + +int resolve_subscribe_request(int client, + unordered_map>& topics_table) { + + char buffer[BUFLEN]; + memset(buffer, 0x00, BUFLEN); + + /* + * Read the request from client. + * + */ + int read_bytes = recv(client, buffer, REQUEST_MAX_LEN, 0); + RESOLVE_SUBSCRIBE_REQUEST_ERROR(read_bytes < 0, + "server: resolve_subscribe_request: read"); + + /* + * If 0 bytes were received it means that the client disconnected and + * the server must close the file desciptor associated. + * + */ + if (read_bytes == 0) { + return CONNECTION_CLOSED_ERROR; + } + + /* + * Request the id. If #request_client_id returns an empty string + * then the connection closed and #resolve_subscribe_request will + * return CONNECTION_CLOSED_ERROR. + * + */ + string client_id = request_client_id(client); + if (client_id.empty() == true) { + return CONNECTION_CLOSED_ERROR; + } + + + /* + * Split the request in separate tokens. + * + */ + istringstream request_stream(buffer); + vector request(istream_iterator{request_stream}, + istream_iterator{}); + + /* + * Process the SUBSCRIBE and UNSUBSCRIBE requests. + * + */ + if (request.size() == SUBSCRIBE_REQUEST_LENGTH) { + /* + * Parse the request. + * + */ + string request_type = request.at(0); + string topic = request.at(1); + string sf = request.at(2); + + /* + * If topic length is bigger than TOPIC_MAX_LEN then it is an + * invalid requet. + * + */ + if (topic.length() > TOPIC_MAX_LEN) { + cout << "Topic size too big. Aborting operation.." << endl; + + reply_invalid_request(client); + return INVALID_REQUEST_ERROR; + } + + /* + * Process a subscribe request. + * + */ + if (request_type.compare({SUBSCRIBE}) == 0) { + + /* + * Check if the @topic can be found in @topics_table. + * If not insert it, else use the existing one. + * + */ + if (topics_table.find(topic) == topics_table.end()) { + topics_table.insert({topic, {}}); + } + + auto& topics_table_entry = topics_table.at(topic); + + /* + * Check if the client alrady exists in @topics_table_entry. + * + * If it exists and has the same SF option then return with + * REQUEST_SUCCESSFUL and do nothing, else if the SF option + * is different then update the SF for this client at this + * topic and return REQUEST_SUCCESSFUL. + * + */ + for (auto& topics_table_client : topics_table_entry) { + + if (topics_table_client.clientID.compare(client_id) == 0 + and stoi(sf) == topics_table_client.sf) { + + cout << "Client " << client_id << " already subscribed to " + << topic << ". Skipping this operation,," << endl; + return REQUEST_SUCCESSFUL; + + } else if (topics_table_client.clientID.compare(client_id) == 0 + and stoi(sf) != topics_table_client.sf) { + + /* + * Check SF to be a valid option. + * + */ + if (stoi(sf) != stoi(SF_ON) && stoi(sf) != stoi(SF_OFF)) { + + reply_invalid_request(client); + cout << "Invalid subscribe request: SF" << endl; + return INVALID_REQUEST_ERROR; + + } + + + topics_table_client.sf = stoi(sf); + + cout << "SF option was updated for client " << client_id + << " who was subscribed to " << topic << endl; + return REQUEST_SUCCESSFUL; + } + + } + + /* + * Process store and forward option. + * + */ + if (sf.compare({SF_ON}) == 0) { + + topics_table_entry.push_back({client_id, client, 1}); + + cout << "Client " << client_id << " subscribed successfully to topic " + << topic << " with SF option: 1" << endl; + return REQUEST_SUCCESSFUL; + + } else if (sf.compare({SF_OFF}) == 0) { + + topics_table_entry.push_back({client_id, client, 0}); + + cout << "Client " << client_id << " subscribed successfully to topic " + << topic << " with SF option: 0" << endl; + return REQUEST_SUCCESSFUL; + + } else { + + reply_invalid_request(client); + cout << "Invalid subscribe request: SF" << endl; + return INVALID_REQUEST_ERROR; + + } + + + } else { + + reply_invalid_request(client); + cout << "Invalid request" << endl; + return INVALID_REQUEST_ERROR; + + } + + + } else if(request.size() == UNSUBSCRIBE_REQUEST_LENGTH) { + /* + * Parse the request. + * + */ + string request_type = request.at(0); + string topic = request.at(1); + + /* + * Process an unsubscribe request. + * + */ + if (request_type.compare({UNSUBSCRIBE}) == 0) { + + /* + * Check if the @topic can be found in @topics_table. + * + */ + if(topics_table.find(topic) == topics_table.end()) { + cout << "Topic cannot be unsubscribed because it does not exists" << endl; + + reply_invalid_request(client); + return INVALID_REQUEST_ERROR; + } + + /* + * Find the topic to which the client was subscribed. + * + */ + auto topics_table_entry = topics_table.at(topic); + + auto remove_iter = remove_if(topics_table_entry.begin(), + topics_table_entry.end(), + [client] (Client& lst_client) {return lst_client.fd == client;}); + + /* + * If the client was not subscribed to this topic then skip, else + * delete its corresponding Client entry from the list of clients + * subscribed to this topic. + * + */ + if (remove_iter == topics_table_entry.end()) { + + reply_invalid_request(client); + cout << "Cannot remove client from this topic because he was not" + << " subscribed" << endl; + return INVALID_REQUEST_ERROR; + + } else { + + topics_table_entry.erase(remove_iter); + + cout << "Client " << client_id << " unsubscribed successfully from topic " + << topic << endl; + return REQUEST_SUCCESSFUL; + + } + + } else { + + reply_invalid_request(client); + cout << "Invalid request" << endl; + return INVALID_REQUEST_ERROR; + + } + + } else { + + reply_invalid_request(client); + cout << "Invalid request" << endl; + return INVALID_REQUEST_ERROR; + + } + +} + diff --git a/sample_payloads.json b/sample_payloads.json new file mode 100644 index 0000000..93fffbd --- /dev/null +++ b/sample_payloads.json @@ -0,0 +1,49 @@ +[{ + "description": "topic {a_non_negative_int} - type {INT} - value{10}", + "payload_base64": "YV9ub25fbmVnYXRpdmVfaW50AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAo=" + }, { + "description": "topic {a_negative_int} - type {INT} - value{-10}", + "payload_base64": "YV9uZWdhdGl2ZV9pbnQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAo=" + }, { + "description": "topic {a_larger_value} - type {INT} - value{1234567890}", + "payload_base64": "YV9sYXJnZXJfdmFsdWUAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEmWAtI=" + }, { + "description": "topic {a_large_negative_value} - type {INT} - value{-1234567890}", + "payload_base64": "YV9sYXJnZV9uZWdhdGl2ZV92YWx1ZQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAUmWAtI=" + }, { + "description": "topic {<50 chars of abc...>} - type {INT} - value{10}", + "payload_base64": "YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3gAAAAAAAo=" + }, { + "description": "topic {that_is_small_short_real} - type {SHORT_REAL} - value{2.30}", + "payload_base64": "dGhhdF9pc19zbWFsbF9zaG9ydF9yZWFsAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAOY=" + }, { + "description": "topic {that_is_big_short_real} - type {SHORT_REAL} - value{655.05}", + "payload_base64": "dGhhdF9pc19iaWdfc2hvcnRfcmVhbAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB/+E=" + }, { + "description": "topic {that_is_integer_short_real} - type {SHORT_REAL} - value{17 / 17.00}", + "payload_base64": "dGhhdF9pc19pbnRlZ2VyX3Nob3J0X3JlYWwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBqQ=" + }, { + "description": "topic {float_seventeen} - type {FLOAT} - value{17}", + "payload_base64": "ZmxvYXRfc2V2ZW50ZWVuAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAABEA" + }, { + "description": "topic {float_minus_seventeen} - type {FLOAT} - value{-17}", + "payload_base64": "ZmxvYXRfbWludXNfc2V2ZW50ZWVuAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQAAABEA" + }, { + "description": "topic {a_strange_float} - type {FLOAT} - value{1234.4321}", + "payload_base64": "YV9zdHJhbmdlX2Zsb2F0AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAC8XAEE" + }, { + "description": "topic {a_negative_strange_float} - type {FLOAT} - value{-1234.4321}", + "payload_base64": "YV9uZWdhdGl2ZV9zdHJhbmdlX2Zsb2F0AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQC8XAEE" + }, { + "description": "topic {a_subunitary_float} - type {FLOAT} - value{0.042}", + "payload_base64": "YV9zdWJ1bml0YXJ5X2Zsb2F0AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAACoD" + }, { + "description": "topic {a_negative_subunitary_float} - type {FLOAT} - value{-0.042}", + "payload_base64": "YV9uZWdhdGl2ZV9zdWJ1bml0YXJ5X2Zsb2F0AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQAAACoD" + }, { + "description": "topic {ana_string_announce} - type {STRING} - value{Ana are mere}", + "payload_base64": "YW5hX3N0cmluZ19hbm5vdW5jZQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADQW5hIGFyZSBtZXJl" + }, { + "description": "topic {huge_string} - type {STRING} - value{<1500 chars of \"abc...\">}", + "payload_base64": "aHVnZV9zdHJpbmcAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFy" + }] \ No newline at end of file diff --git a/server.cpp b/server.cpp new file mode 100644 index 0000000..3d61c22 --- /dev/null +++ b/server.cpp @@ -0,0 +1,236 @@ +#include + +#include +#include + +#include "server_utils.hpp" +#include "tcp_handler.hpp" +#include "udp_handler.hpp" +#include "helpers.h" +#include "protocol.hpp" + +using namespace std; + +int main(int argc, char **argv) { + if (argc < 2) { + usage(argv[0]); + exit(EXIT_SUCCESS); + } + + /* + * Initialize the sockets used for TCP and UDP communication. + * + */ + int port = atoi(argv[1]); + DIE(port == 0, "server: atoi"); + + int tcp_listener = establish_tcp_connection(port); + int udp_listener = establish_udp_connection(port); + + /* + * Keep a list of file descriptors associated with each TCP client + * that is connected to server. + * + */ + list clients; + + /* + * Keep a table that will save the clients subscribed to different + * topics. + * + */ + unordered_map> topics_table; + + /* + * Keep a table that will save for each user the packets to be + * transmitted after a client reconnects and has SF option on. + * + */ + unordered_map> pending_table; + + /* + * File descriptors used for I/O multiplexing. + * + */ + fd_set read_fds; + int max_fd; + + while (true) { + /* + * Initialize the file descriptors set and call select for I/O multiplexing. + * Listen on udp, tcp accept and stdin. Also listen on the connections + * established with the clients. + * + */ + FD_ZERO(&read_fds); + + FD_SET(tcp_listener, &read_fds); + FD_SET(udp_listener, &read_fds); + FD_SET(STDIN_FILENO, &read_fds); + + max_fd = max(tcp_listener, udp_listener); + max_fd = max(STDIN_FILENO, max_fd); + + fd_set_add_clients(read_fds, max_fd, clients); + + /* + * Call select for I/O multiplexing. + * + */ + int read_activity; + read_activity = select(max_fd + 1, &read_fds, NULL, NULL, NULL); + DIE(read_activity < 0, "server: select"); + + /* + * If something happened on tcp_listener_socket it means that a new connection + * waits to be accepted. + * + */ + if (FD_ISSET(tcp_listener, &read_fds)) { + /* + * If accept_connection was not successful, then skip + * the operations below. + * + */ + int ac_ret = accept_connection(tcp_listener, clients); + + if (ac_ret == ACCEPT_SUCCESSFUL) { + /* + * Maybe the client reconnected, so the server must validate its + * entry in topics table. The entry was unvalidated by #unvalidate_client. + * It(unvalidate_client) is called when resolve_subscribe_request + * returns a CONNECTION_CLOSED_ERROR value. + * + */ + string last_connected_id = clients.back().clientID; + int last_connected_fd = clients.back().fd; + bool sf = validate_client(last_connected_id, last_connected_fd, topics_table); + + /* + * If the client has the SF option on then send the topics in + * @pending_table assigned to it. + * + */ + if (sf == true) { + int fpt_ret = forward_pending_topics(last_connected_fd, + last_connected_id, pending_table); + + /* + * If forwarding failed the remove the client. + * + */ + if (fpt_ret == FORWARD_FAILED) { + + cout << "Client removed after pending topics forwarding was not" + << " successful." << endl; + + auto last_connected = --clients.end(); + clients.erase(last_connected); + + unvalidate_client(last_connected_fd, topics_table); + + } + } + } + + } + + /* + * Check the rest of the file descriptors in read_fds. Here the server + * resolves the subscribe and unsubscribe requests. + * + */ + auto client = clients.begin(); + while (client != clients.end()) { + + int fd = (*client).fd; + if (FD_ISSET(fd, &read_fds)) { + int rsr_ret = resolve_subscribe_request(fd, topics_table); + + /* + * If the client has disconnected then delete its entry + * from @clients. Also make its entry from @topics_table + * unavailable to let the #forward_topic function know that + * it should not send a topic on the file descriptor associated + * with this client. + * + */ + if (rsr_ret == CONNECTION_CLOSED_ERROR) { + cout << "Client " << (*client).clientID << + " made unavailable until reconnection" << endl; + + unvalidate_client((*client).fd, topics_table); + + close(fd); + + client = clients.erase(client); + continue; + } + } + + client++; + + } + + /* + * If something happened on udp_listener_socket it means that a new topic arrived + * and waits to be forwarded to its subscribers. + * + */ + if (FD_ISSET(udp_listener, &read_fds)) { + Topic topic = parse_datagram(udp_listener); + /* + * If #parse_datagram returns an empty topic it means an error occured and the + * operation will be skipped. + * + */ + Topic empty_topic{}; + if (topic == empty_topic) { + continue; + } + + forward_topic(topic, topics_table, pending_table); + + /* + * Delete invalid clients. These are the clients that disconnected + * during #forward_topic. The condition in if assures that the topic + * for which we delete the invalid clients appears in @topics_table. + * + */ + if (topics_table.find(topic.topic) != topics_table.end()) { + delete_invalid_clients(topics_table.at(topic.topic), clients); + } + } + + /* + * If something happened on stdin then the server must be closed. + * + */ + if (FD_ISSET(STDIN_FILENO, &read_fds)) { + char buffer[sizeof(EXIT_COMMAND) + 1]; + memset(buffer, 0x00, sizeof(EXIT_COMMAND) + 1); + + fgets(buffer, sizeof(EXIT_COMMAND), stdin); + + /* + * End the session if the server receives EXIT_COMMAND. + * + */ + if (strncmp(buffer, EXIT_COMMAND, sizeof(EXIT_COMMAND)) == 0) { + cout << "Disconnecting.." << endl; + break; + } + } + + } + + /* + * Close the connection with the tcp clients. + * + */ + clients_close_connection(clients); + close(tcp_listener); + close(udp_listener); + + return EXIT_SUCCESS; +} diff --git a/server_utils.cpp b/server_utils.cpp new file mode 100644 index 0000000..e93ba96 --- /dev/null +++ b/server_utils.cpp @@ -0,0 +1,283 @@ +#include +#include + +#include +#include + +#include "server_utils.hpp" +#include "protocol.hpp" + +/* + * Prototypes of local scope functions. + * + */ + +/* + * Adds @topic in @pending_table in the entry associated with @client_id. + * It is used in #forward_topic. + * + */ +void pending_table_add(string client_id, Topic topic, + unordered_map>& pending_table); + +/* + * Helper function that send a @topic on a socket associated with @fd. + * It is configured to send a packet using the topic transmission + * protocol. + * + * @return: SEND_SUCCESSFUL - the topic was successfully transmitted + * SEND_FAILED - the connection was closed unexpectedly + * + */ + +#define SEND_SUCCESSFUL 0 +#define SEND_FAILED 1 + +int send_packet(int fd, Topic topic); + +#define SEND_PACKET_ERROR(assertion, call_description) \ + do { \ + if (assertion) { \ + fprintf(stderr, "(%s, %d): ", \ + __FILE__, __LINE__); \ + perror(call_description); \ + return SEND_FAILED; \ + } \ + } while (0) + + +/* + * Implementation of functions defined in protocol.hpp. + * + */ + +void usage(char *filename) { + std::cout << "Usage: " << filename << " PORT" << std::endl; +} + +void fd_set_add_clients(fd_set& set, int& max_fd, list clients) { + + for (auto client = clients.begin(); client != clients.end(); client++) { + int fd = (*client).fd; + + FD_SET(fd, &set); + max_fd = max(max_fd, fd); + } + +} + +void clients_close_connection(list clients) { + + for (auto client : clients) { + close(client.fd); + } + +} + +void forward_topic(Topic topic, unordered_map>& topics_table, + unordered_map>& pending_table) { + /* + * Search for the topic in topics_table. + */ + if (topics_table.find(topic.topic) == topics_table.end()) { + return; + } + + /* + * Send the topic to all the subscribed clients. + * + */ + auto& clients = topics_table.at(topic.topic); + for (auto& client : clients) { + int fd = client.fd; + + /* + * If the client is unavailable and has SF option on the add the packet + * in the pending_table. Else if the SF option is not set, then skip. + * + */ + if (fd == UNAVAILABLE_CLIENT && client.sf == true) { + + pending_table_add(client.clientID, topic, pending_table); + continue; + + } else if (fd == UNAVAILABLE_CLIENT && client.sf == false) { + continue; + } + + /* + * If #send_packet failed then unvalidate the client. + * + */ + int send_packet_ret = send_packet(fd, topic); + if (send_packet_ret == SEND_FAILED) { + client.fd = UNAVAILABLE_CLIENT; + } + + } + +} + +int forward_pending_topics(int fd, string client_id, + unordered_map>& pending_table) { + + int return_code = FORWARD_SUCCESSFUL; + + if (pending_table.find(client_id) == pending_table.end()) { + cout << "Nothing to forward from pending table at the moment" << endl; + return return_code; + } + + auto& topics = pending_table.at(client_id); + for (auto& topic : topics) { + + /* + * If the topics cannot be sent then an error persists on the + * network and all topics will be dropped. + * + */ + int send_packet_ret = send_packet(fd, topic); + if (send_packet_ret == SEND_FAILED) { + cout << "Error while trying to send pending packets for client " + << client_id << ". Aborting this operation.." << endl; + + return_code = FORWARD_FAILED; + break; + } + + } + + pending_table.erase(client_id); + return return_code; +} + +void unvalidate_client(int client_fd, + unordered_map>& topics_table) { + + for (auto& t_clients : topics_table) { + for (auto& client : t_clients.second) { + + if (client.fd == client_fd) { + client.fd = UNAVAILABLE_CLIENT; + } + + } + } + +} + +bool validate_client(string client_id, int new_fd, + unordered_map>& topics_table) { + + /* + * Keep a counter that counts how many times a client was validated. + * If the counter not equal to 0 it means that the client reconnected + * and was validated. + * + */ + int count(0); + + bool sf(false); + + for (auto& t_clients : topics_table) { + for (auto& client : t_clients.second) { + + if (client.clientID.compare(client_id) == 0) { + client.fd = new_fd; + + count++; + sf = client.sf; + } + + } + } + + if (count != 0) { + cout << "File descriptor for client " << client_id << + " was restored." << endl; + } + + return sf; +} + +void delete_invalid_clients(list subscribers, list& clientsIO) { + /* + * Iterate through the subscribers and check if fd field is UNAVAILABLE_CLIENT. + * Find the clientID in clientsIO and delete that entry. + * + */ + for (auto subscriber : subscribers) { + + if (subscriber.fd == UNAVAILABLE_CLIENT) { + + auto client = find_if(clientsIO.begin(), clientsIO.end(), + [subscriber] (ClientIO& c) + {return c.clientID.compare(subscriber.clientID) == 0; }); + + if (client != clientsIO.end()) { + clientsIO.erase(client); + } + } + + } + +} + + +void pending_table_add(string client_id, Topic topic, + unordered_map>& pending_table) { + + /* + * If the @client_id is not present in @pending_table then add the client. + * + */ + if (pending_table.find(client_id) == pending_table.end()) { + pending_table.insert({client_id, {}}); + } + + /* + * Add @topic to the pending list of @client_id. + * + */ + auto& pending_table_entry = pending_table.at(client_id); + pending_table_entry.push_back(topic); + +} + + +int send_packet(int fd, Topic topic) { + + /* + * Announce the subscriber that a topic is coming. + * + */ + uint8_t announce_byte = TOPIC_ANNOUNCEMENT_BYTE; + + int write_bytes = send(fd, &announce_byte, sizeof(announce_byte), 0); + SEND_PACKET_ERROR(write_bytes < 0, "server_utils: send_packet: send: "); + + /* + * Send all fields from Topic, excluding value because it has variable size and + * must send topic.value_sz bytes. + * + */ + char buffer[sizeof(Topic) - VALUE_MAX_LEN]; + memcpy(buffer, (const void *) &topic, sizeof(Topic) - VALUE_MAX_LEN); + + write_bytes = send(fd, buffer, sizeof(Topic) - VALUE_MAX_LEN, 0); + SEND_PACKET_ERROR(write_bytes < 0, "server_utils: send_packet: send: "); + + /* + * Send the value field. + * + */ + char buffer_value[VALUE_MAX_LEN]; + + size_t value_sz = topic.value_sz; + memcpy(buffer_value, topic.value, value_sz); + + write_bytes = send(fd, buffer_value, value_sz, 0); + SEND_PACKET_ERROR(write_bytes < 0, "server_utils: send_packet: send: "); + + return SEND_SUCCESSFUL; +} diff --git a/subscriber.cpp b/subscriber.cpp new file mode 100644 index 0000000..330e9d9 --- /dev/null +++ b/subscriber.cpp @@ -0,0 +1,233 @@ +#include + +#include +#include +#include + +#include "tcp_handler.hpp" +#include "subscriber_utils.hpp" +#include "helpers.h" +#include "protocol.hpp" + +using namespace std; + +int main(int argc, char **argv) { + + if (argc < 4) { + usage(argv[0]); + exit(EXIT_SUCCESS); + } + + char *client_id = argv[1]; + + /* + * Check the length of the id. + * + */ + if (strlen(client_id) > CLIENTID_MAX_LEN) { + cout << "Client ID too long, aborting.." << endl; + exit(EXIT_SUCCESS); + } + + /* + * Global buffer to save the data received via the connection established + * by @socket_fd defined below. + * + */ + char buffer[BUFLEN]; + + /* + * Connect to server. + * + */ + int socket_fd = connect_to_server(argv[2], argv[3]); + + /* + * Create fd set for I/O multiplexing. + * + */ + fd_set read_fds; + int fd_max; + + while (true) { + + /* + * Initialize the file descriptors set and call select for I/O multiplexing. + * The subscriber only needs to know about standard input and socket. + * + */ + FD_ZERO(&read_fds); + + FD_SET(STDIN_FILENO, &read_fds); + FD_SET(socket_fd, &read_fds); + + fd_max = max(STDIN_FILENO, socket_fd); + + /* + * Call select for I/O multiplexing. + * + */ + int read_action = select(fd_max + 1, &read_fds, NULL, NULL, NULL); + DIE(read_action < 0, "client: select:"); + + if (FD_ISSET(socket_fd, &read_fds)) { + memset(buffer, 0x00, BUFLEN); + + uint8_t announce_byte; + unsigned read_bytes = recv(socket_fd, &announce_byte, sizeof(announce_byte), 0); + DIE(read_bytes < 0, "subscriber: recv: "); + + /* + * Server shut down so clients will also shut down. + * + */ + if (read_bytes == 0) { + cout << "Disconnecting.." << endl; + break; + } + + /* + * Server received an id_request so it will respond with + * its id. + * + */ + if (announce_byte == IDREQUEST_ANNOUNCEMENT_BYTE) { + /* + * If the subscriber could not send its client id on the socket + * it means that the server closed unexpectedly and the subscriber + * must end the process. + * + */ + int rci_ret = reply_client_id(socket_fd, client_id); + + if (rci_ret == REPLY_FAILED) { + cout << "Client could not send its client id. Disconnecting.." + << endl; + break; + } + } + + /* + * Server received the topic it is subscribed to. + * + */ + else if (announce_byte == TOPIC_ANNOUNCEMENT_BYTE) { + /* + * Read all fields from topic, excluding value field. + * + */ + read_bytes = recv(socket_fd, buffer, sizeof(Topic) - VALUE_MAX_LEN, 0); + DIE(read_bytes < 0, "subscriber: recv: "); + + /* + * Create a topic object to store the received bytes. + * + */ + Topic topic; + + memset((void *) &topic, 0x00, sizeof(Topic)); + memcpy((void *) &topic, buffer, sizeof(Topic)); + + /* + * If last recv(2) did not receive sizeof(Topic) - VALUE_MAX_LEN bytes + * then the subscriber must still receive bytes. + * + */ + if (read_bytes < sizeof(Topic) - VALUE_MAX_LEN) { + unsigned remaining = sizeof(Topic) - VALUE_MAX_LEN - read_bytes; + + /* + * Start receiving remaining bytes from buffer + read_bytes. + * + */ + read_bytes = recv(socket_fd, buffer + read_bytes, remaining, 0); + DIE(read_bytes < 0, "subscriber: recv: "); + + memcpy((void *) &topic, buffer, sizeof(Topic)); + + } + + size_t value_sz = topic.value_sz; + + /* + * Read the variable-sized field, value. + * + */ + + read_bytes = recv(socket_fd, topic.value, value_sz, 0); + DIE(read_bytes < 0, "subscriber: recv: "); + + /* + * Same story as above. If value_sz bytes were not received then receive + * the remaining bytes. + * + */ + if (read_bytes != value_sz) { + unsigned remaining = value_sz - read_bytes; + + read_bytes = recv(socket_fd, topic.value + read_bytes, remaining, 0); + DIE(read_bytes < 0, "subscriber: recv: "); + + if (read_bytes != remaining) { + cout << "Afara 2" << endl; + break; + } + + } + + /* + * Parse the topic and print its content. + * + */ + print_topic(&topic); + + } else if (announce_byte == CONNECTION_REFUSED_BYTE) { + /* + * Notify the client that the clientID already exits. + * + */ + puts("ClientID already existent, retry with another clientID."); + break; + + } else if (announce_byte == INVALID_REQUEST_ANNOUNCEMENT_BYTE) { + /* + * Notift the client that the sent request was not a valid one. + * + */ + puts("Invalid request"); + continue; + } + + + + } else if (FD_ISSET(STDIN_FILENO, &read_fds)) { + memset(buffer, 0x00, BUFLEN); + fgets(buffer, REQUEST_MAX_LEN, stdin); + + /* + * Client wants to end the current session. + * + */ + if (strncmp(buffer, EXIT, strlen(EXIT)) == 0) { + cout << "Disconnecting.." << endl; + break; + } + + /* + * Send message to server. + * + */ + int write_bytes = send(socket_fd, buffer, REQUEST_MAX_LEN, 0); + DIE(write_bytes < 0, "client: send"); + } + + } + + /* + * Close the socket and exit. + * + */ + close(socket_fd); + return EXIT_SUCCESS; + +} diff --git a/subscriber_utils.cpp b/subscriber_utils.cpp new file mode 100644 index 0000000..dca5e64 --- /dev/null +++ b/subscriber_utils.cpp @@ -0,0 +1,88 @@ +#include +#include +#include +#include +#include +#include + +#include "subscriber_utils.hpp" + +using namespace std; + +void usage(char *filename) { + cout << "Usage: " << filename << " ID_client IP_server PORT_server" << endl; +} + +void print_topic(Topic *topic) { + /* + * Cast directly to struct in_addr in inet_ntoa call. + * + */ + cout << inet_ntoa({ .s_addr = topic->ip }) << ":" << topic->port << " "; + cout << topic->topic << " "; + + /* + * Switch based on the data type format to be displayed. + * + */ + switch (topic->data_type) { + case (INT_BYTE): + cout << INT << " "; + break; + + case (SHORT_REAL_BYTE): + cout << SHORT_REAL << " "; + break; + + case (FLOAT_BYTE): + cout << FLOAT << " "; + break; + + case (STRING_BYTE): + cout << STRING << " "; + break; + + default: + break; + } + + /* + * Cast the values appropriately. + * + */ + if (topic->data_type == INT_BYTE) { + + uint8_t *sign_byte = (uint8_t *) topic->value; + uint32_t *value = (uint32_t *) (topic->value + sizeof(uint8_t)); + + int result = htonl(*value); + result = (*sign_byte == NEGATIVE) ? (-1) * (result) : result; + + cout << result; + + } else if (topic->data_type == SHORT_REAL_BYTE) { + + uint16_t *value = (uint16_t *) topic->value; + cout << fixed << setprecision(2) << (float) ntohs(*value) / 100; + + } else if (topic->data_type == FLOAT_BYTE) { + + uint8_t *sign_byte = (uint8_t *) topic->value; + uint32_t *value = (uint32_t *) (topic->value + sizeof(uint8_t)); + + uint8_t *power = (uint8_t *) (topic->value + sizeof(uint8_t) + sizeof(uint32_t)); + + double result = ntohl(*value) * pow(10, -(*power)); + result = (*sign_byte == NEGATIVE) ? (-1) * result : result; + + cout << fixed << setprecision(4) << (double) result; + + + } else if (topic->data_type == STRING_BYTE) { + + cout << topic->value; + + } + + cout << endl; +} diff --git a/tcp_handler.cpp b/tcp_handler.cpp new file mode 100644 index 0000000..5b7a9c8 --- /dev/null +++ b/tcp_handler.cpp @@ -0,0 +1,170 @@ +#include +#include +#include +#include + +#include "tcp_handler.hpp" +#include "helpers.h" +#include "protocol.hpp" + +#include +#include +#include +#include +#include + +using namespace std; + +int establish_tcp_connection(int port) { + /* + * Create listener socket. + * + */ + int listener_socket = socket(AF_INET, SOCK_STREAM, 0); + DIE(listener_socket < 0, "server: tcp: socket"); + + /* + * Define the attributes of the socket. + * + */ + struct sockaddr_in server_addr_tcp; + server_addr_tcp.sin_family = AF_INET; + server_addr_tcp.sin_addr.s_addr = INADDR_ANY; + server_addr_tcp.sin_port = htons(port); + + /* + * Disable Nagle buffering using setsockopt with TCP_NODELAY option. + * SO_REUSEADDR is used because we want to reuse a port shortly after + * it was forecefully closed. + * + */ + int optname = 1; + + int setsockopt_ret; + setsockopt_ret = setsockopt(listener_socket, IPPROTO_TCP, + TCP_NODELAY | SO_REUSEADDR , &optname, sizeof(int)); + DIE(setsockopt_ret < 0, "server: tcp: setsockopt"); + + /* + * Bind the socket. + * + */ + int bind_ret_tcp; + bind_ret_tcp = bind(listener_socket, (struct sockaddr *) &server_addr_tcp, + sizeof(struct sockaddr_in)); + DIE(bind_ret_tcp < 0, "server: tcp: bind"); + + /* + * Listen for incoming connections. + * + */ + int listen_ret; + listen_ret = listen(listener_socket, DEFAULT_BACKLOG); + DIE(listen_ret < 0, "server: tcp: listen"); + + return listener_socket; +} + +int connect_to_server(char *server_ip, char *server_port) { + int sockfd; + struct sockaddr_in server_addr; + + /* + * Convert the port. + * + */ + int port = atoi(server_port); + DIE(port == 0, "client: atoi: invalid port"); + + /* + * Create the socket. + * + */ + sockfd = socket(PF_INET, SOCK_STREAM, 0); + DIE(sockfd < 0, "client: socket"); + + /* + * Fill the info about the server the client want to connect to. + * + */ + server_addr.sin_family = PF_INET; + server_addr.sin_port = htons(port); + + int inet_aton_ret = inet_aton(server_ip, &server_addr.sin_addr); + DIE(inet_aton_ret == 0, "client: inet_aton "); + + /* + * Connect to server. + * + */ + int connect_ret = connect(sockfd, (struct sockaddr *) &server_addr, + sizeof(struct sockaddr)); + DIE(connect_ret < 0, "client: connect"); + + return sockfd; +} + +int accept_connection(int listener, list& clients) { + + struct sockaddr_in incoming_client; + socklen_t incoming_client_len = sizeof(struct sockaddr_in); + + /* + * Accept the connection using accept. + * + */ + int new_socket = accept(listener, (struct sockaddr *) &incoming_client, + &incoming_client_len); + ACCEPT_CONNECTION_ERROR(new_socket < 0, "server: accept"); + + /* + * Disable Nagle buffering using setsockopt with TCP_NODELAY option. + * + */ + int optname = 1; + + int setsockopt_ret; + setsockopt_ret = setsockopt(new_socket, IPPROTO_TCP, + TCP_NODELAY , &optname, sizeof(int)); + DIE(setsockopt_ret < 0, "server: tcp: setsockopt"); + + /* + * Request the name of the client. If the #request_client_id returns + * an empty string then the connection closed and the connection cannot + * be accepted + * + */ + string client_id = request_client_id(new_socket); + if (client_id.empty() == true) { + cout << "Connection closed while trying to request the id of the client" + << endl; + + return ACCEPT_FAILED; + } + + /* + * If the id already exists then reject the connection and send + * a reply about the rejection, else add the new id in the container. + * + */ + auto existent_client = find_if(clients.begin(), clients.end(), + [client_id] (ClientIO& cli) {return cli.clientID.compare(client_id) == 0;}); + + if (existent_client != clients.end()) { + cout << "Client already connected, refusing connection." << endl; + + reply_connection_refused(new_socket); + return ACCEPT_FAILED; + } + + clients.push_back({client_id, new_socket}); + + /* + * Print info about the newly connected client. + * + */ + cout << "New client " << client_id << " connected from " << ntohs(incoming_client.sin_port) + << ":" << inet_ntoa(incoming_client.sin_addr) << endl; + + return ACCEPT_SUCCESSFUL; +} diff --git a/three_topics_payloads.json b/three_topics_payloads.json new file mode 100644 index 0000000..d5c11ba --- /dev/null +++ b/three_topics_payloads.json @@ -0,0 +1,136 @@ +[{ + "description": "topic {topic_a} - type {INT} - value{10}", + "payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAo=" + }, { + "description": "topic {topic_a} - type {INT} - value{-10}", + "payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAo=" + }, { + "description": "topic {topic_a} - type {INT} - value{1234567890}", + "payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEmWAtI=" + }, { + "description": "topic {topic_a} - type {INT} - value{-1234567890}", + "payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAUmWAtI=" + }, { + "description": "topic {topic_a} - type {SHORT_REAL} - value{2.30}", + "payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAOY=" + }, { + "description": "topic {topic_a} - type {SHORT_REAL} - value{655.05}", + "payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB/+E=" + }, { + "description": "topic {topic_a} - type {SHORT_REAL} - value{17 / 17.00}", + "payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBqQ=" + }, { + "description": "topic {topic_a} - type {FLOAT} - value{17}", + "payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAABEA" + }, { + "description": "topic {topic_a} - type {FLOAT} - value{-17}", + "payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQAAABEA" + }, { + "description": "topic {topic_a} - type {FLOAT} - value{1234.4321}", + "payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAC8XAEE" + }, { + "description": "topic {topic_a} - type {FLOAT} - value{-1234.4321}", + "payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQC8XAEE" + }, { + "description": "topic {topic_a} - type {FLOAT} - value{0.042}", + "payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAACoD" + }, { + "description": "topic {topic_a} - type {FLOAT} - value{-0.042}", + "payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQAAACoD" + }, { + "description": "topic {topic_a} - type {STRING} - value{Ana are mere}", + "payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADQW5hIGFyZSBtZXJl" + }, { + "description": "topic {topic_a} - type {STRING} - value{<1500 chars of \"abc...\">}", + "payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFy" + }, { + "description": "topic {topic_b} - type {INT} - value{10}", + "payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAo=" + }, { + "description": "topic {topic_b} - type {INT} - value{-10}", + "payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAo=" + }, { + "description": "topic {topic_b} - type {INT} - value{1234567890}", + "payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEmWAtI=" + }, { + "description": "topic {topic_b} - type {INT} - value{-1234567890}", + "payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAUmWAtI=" + }, { + "description": "topic {topic_b} - type {SHORT_REAL} - value{2.30}", + "payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAOY=" + }, { + "description": "topic {topic_b} - type {SHORT_REAL} - value{655.05}", + "payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB/+E=" + }, { + "description": "topic {topic_b} - type {SHORT_REAL} - value{17 / 17.00}", + "payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBqQ=" + }, { + "description": "topic {topic_b} - type {FLOAT} - value{17}", + "payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAABEA" + }, { + "description": "topic {topic_b} - type {FLOAT} - value{-17}", + "payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQAAABEA" + }, { + "description": "topic {topic_b} - type {FLOAT} - value{1234.4321}", + "payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAC8XAEE" + }, { + "description": "topic {topic_b} - type {FLOAT} - value{-1234.4321}", + "payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQC8XAEE" + }, { + "description": "topic {topic_b} - type {FLOAT} - value{0.042}", + "payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAACoD" + }, { + "description": "topic {topic_b} - type {FLOAT} - value{-0.042}", + "payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQAAACoD" + }, { + "description": "topic {topic_b} - type {STRING} - value{Ana are mere}", + "payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADQW5hIGFyZSBtZXJl" + }, { + "description": "topic {topic_b} - type {STRING} - value{<1500 chars of \"abc...\">}", + "payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFy" + }, { + "description": "topic {topic_c} - type {INT} - value{10}", + "payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAo=" + }, { + "description": "topic {topic_c} - type {INT} - value{-10}", + "payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAo=" + }, { + "description": "topic {topic_c} - type {INT} - value{1234567890}", + "payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEmWAtI=" + }, { + "description": "topic {topic_c} - type {INT} - value{-1234567890}", + "payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAUmWAtI=" + }, { + "description": "topic {topic_c} - type {SHORT_REAL} - value{2.30}", + "payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAOY=" + }, { + "description": "topic {topic_c} - type {SHORT_REAL} - value{655.05}", + "payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB/+E=" + }, { + "description": "topic {topic_c} - type {SHORT_REAL} - value{17 / 17.00}", + "payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBqQ=" + }, { + "description": "topic {topic_c} - type {FLOAT} - value{17}", + "payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAABEA" + }, { + "description": "topic {topic_c} - type {FLOAT} - value{-17}", + "payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQAAABEA" + }, { + "description": "topic {topic_c} - type {FLOAT} - value{1234.4321}", + "payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAC8XAEE" + }, { + "description": "topic {topic_c} - type {FLOAT} - value{-1234.4321}", + "payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQC8XAEE" + }, { + "description": "topic {topic_c} - type {FLOAT} - value{0.042}", + "payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAACoD" + }, { + "description": "topic {topic_c} - type {FLOAT} - value{-0.042}", + "payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQAAACoD" + }, { + "description": "topic {topic_c} - type {STRING} - value{Ana are mere}", + "payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADQW5hIGFyZSBtZXJl" + }, { + "description": "topic {topic_c} - type {STRING} - value{<1500 chars of \"abc...\">}", + "payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFy" + }] diff --git a/udp_client.py b/udp_client.py new file mode 100644 index 0000000..10c7f0c --- /dev/null +++ b/udp_client.py @@ -0,0 +1,130 @@ +__author__ = 'Dorinel Filip' + +import socket +import base64 +import sys +import random +import time +import json +import os +import argparse +from ipaddress import ip_address +from utils.unpriv_port import unprivileged_port_type, get_unprivileged_port_meta +import textwrap + + +def setup_parser(): + def get_mode_help(): + return textwrap.dedent( + '''Specifies the mode used for the load generator as following: +* all_once - send each payload in the list once +* manual - let you choose which message to send next +* random - continuously send random payloads from the list''') + + def read_json_file(parent_parser, arg): + if not os.path.exists(arg): + parent_parser.error('The file "{}" does not exist!'.format(arg)) + else: + with open(arg, 'r') as f: + return json.load(f) # return parsed JSON + + parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter, + description='UDP Client for Communication Protocols (2018-2019) Homework #2') + + inputs = parser.add_argument_group('Input') + inputs.add_argument('--input_file', type=lambda arg: read_json_file(parser, arg), metavar='FILE', + default='sample_payloads.json', + help='JSON file to read payloads from (default: sample_payloads.json)') + + server_address = parser.add_argument_group('Server Address & Port (required)') + server_address.add_argument('server_ip', help='Server IP', type=ip_address) + server_address.add_argument('server_port', help='Server Port', type=unprivileged_port_type) + + source_address = parser.add_argument_group('Source Address') + source_address.add_argument('--source-address', type=ip_address, default='0.0.0.0', + help='IP Address to be bind by UDP client (default: unspecified)') + source_address.add_argument('--source-port', type=unprivileged_port_type, default=0, + metavar=get_unprivileged_port_meta('source-port'), + help='UDP port to be used as source for this client (default: random port)') + + mode = parser.add_argument_group('Workload changing parameters') + mode.add_argument('--mode', default='all_once', type=str, choices=('all_once', 'manual', 'random'), + help=get_mode_help()) + + load = parser.add_argument_group('Load characteristics') + load.add_argument('--count', type=int, + help='Number of packets to be send (only used for when mode is random, default: infinity)') + load.add_argument('--delay', help='Wait time (in ms) between two messages (default: 0)', type=int, default=0) + + return parser + + +def setup_socket(parsed_args): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.bind((str(parsed_args.source_address), parsed_args.source_port)) + return sock + + +def send_message(sock, message, parsed_args): + to_send = base64.standard_b64decode(message['payload_base64']) + sent = sock.sendto(to_send, (str(parsed_args.server_ip), parsed_args.server_port)) + print('Sent ({}/{} bytes) << {} >>'.format(sent, len(to_send), message['description'])) + time.sleep(parsed_args.delay / 1000) + + +def run_all_once(sock, parsed_args): + for message in parsed_args.input_file: + send_message(sock, message, parsed_args) + + +def run_manual(sock, parsed_args): + header = 'Please chose one of the following' + options = '\n'.join(['{}. {}'.format(i, x['description']) for i, x in enumerate(parsed_args.input_file)]) + input_ask = 'Input a int ({} - {}): '.format(0, len(parsed_args.input_file) - 1) + for_exit_string = 'To end the program, input "exit"' + prompter_string = '\n'.join((header, options, for_exit_string)) + print(prompter_string) + + while True: + choice = input(input_ask) + if choice == 'exit': + return + + choice = int(choice) + if (choice >= 0) and (choice < len(parsed_args.input_file)): + message = parsed_args.input_file[choice] + send_message(sock, message, parsed_args) + else: + print('Maybe try a valid option.') + + +def run_random(sock, parsed_args): + n = parsed_args.count + count = 0 + while (n is None) or (count < n): + message = random.choice(parsed_args.input_file) + send_message(sock, message, parsed_args) + count += 1 + + +def main(): + # Parse arguments + parser = setup_parser() + parsed_args = parser.parse_args(sys.argv[1:]) + sock = setup_socket(parsed_args) + + print('Client source PORT: {}'.format(sock.getsockname()[1])) + + if parsed_args.mode == 'all_once': + print('Running in all_once mode...\n') + run_all_once(sock, parsed_args) + elif parsed_args.mode == 'manual': + print('Running in manual mode...\n') + run_manual(sock, parsed_args) + else: + print('Running in random mode..\n') + run_random(sock, parsed_args) + + +if __name__ == '__main__': + main() diff --git a/udp_handler.cpp b/udp_handler.cpp new file mode 100644 index 0000000..c28d9c3 --- /dev/null +++ b/udp_handler.cpp @@ -0,0 +1,138 @@ +#include + +#include + +#include +#include +#include +#include +#include + +#include "udp_handler.hpp" +#include "helpers.h" + + +int establish_udp_connection(int port) { + + /* + * Create UDP socket. + * + */ + int listener_socket = socket(AF_INET, SOCK_DGRAM, 0); + DIE(listener_socket < 0, "server: udp: socket"); + + /* + * Define the attributes of the socket. + * + */ + struct sockaddr_in server_addr_udp; + server_addr_udp.sin_family = AF_INET; + server_addr_udp.sin_addr.s_addr = INADDR_ANY; + server_addr_udp.sin_port = htons(port); + + /* + * Bind the socket. + * + */ + int bind_ret_udp; + bind_ret_udp = bind(listener_socket, (const struct sockaddr *) &server_addr_udp, + sizeof(struct sockaddr_in)); + DIE(bind_ret_udp < 0, "server: udp: bind"); + + return listener_socket; + +} + +Topic parse_datagram(int socket) { + /* + * Read the datagram coming from @socket in @buffer. + * + */ + char buffer[UDP_DATAGRAM_LEN]; + memset(buffer, 0x00, UDP_DATAGRAM_LEN); + + /* + * Declare a sockaddr_in buffer for receiving information + * about the upd_client such as port and ip. + * + */ + struct sockaddr_in incoming_client; + socklen_t incoming_client_len = sizeof(struct sockaddr_in); + + int read_bytes = recvfrom(socket, buffer, UDP_DATAGRAM_LEN, 0, + (struct sockaddr *) &incoming_client, &incoming_client_len); + PARSE_DATAGRAM_ERROR(read_bytes < 0, "server: add_new_topic: read"); + + /* + * Create a new topic and fill its fields. + * + */ + Topic topic; + memset(&topic, 0x00, sizeof(Topic)); + + /* + * IP + * + */ + topic.ip = incoming_client.sin_addr.s_addr; + + /* + * PORT + * + */ + topic.port = ntohs(incoming_client.sin_port); + + /* + * TOPIC + * + */ + strncpy(topic.topic, buffer, TOPIC); + + /* + * DATA_TYPE + * + */ + topic.data_type = buffer[TOPIC]; + + /* + * VALUE + * + */ + if (topic.data_type == INT_BYTE) { + + topic.value_sz = INT_BYTE_VALUE_SIZE; + memcpy(topic.value, buffer + TOPIC + 1, INT_BYTE_VALUE_SIZE); + + } else if (topic.data_type == SHORT_REAL_BYTE) { + + topic.value_sz = SHORT_REAL_BYTE_VALUE_SIZE; + memcpy(topic.value, buffer + TOPIC + 1, topic.value_sz); + + } else if (topic.data_type == FLOAT_BYTE) { + + topic.value_sz = FLOAT_BYTE_VALUE_SIZE; + memcpy(topic.value, buffer + TOPIC + 1, topic.value_sz); + + } else if (topic.data_type == STRING_BYTE) { + + topic.value_sz = strlen(buffer + TOPIC + 1); + memcpy(topic.value, buffer + TOPIC + 1, topic.value_sz); + + } + + /* + * If data type is not a value that was defined in topic.hpp it means + * that an overflow in topic.topic field occured and the received topic + * is not a valid one. Return emtpy topic in this case. + * + */ + else { + PARSE_DATAGRAM_ERROR(true, "overflow occured when receiving topic name"); + } + + return topic; +} + + + +