Merge remote-tracking branch 'pc2/master'

This commit is contained in:
lucic71 2020-05-07 09:32:07 +03:00
commit 5f2f219074
20 changed files with 2487 additions and 0 deletions

39
Makefile Normal file
View File

@ -0,0 +1,39 @@
SERVER=server
SUBSCRIBER=subscriber
C_SOURCES=$(wildcard *.c) # select all c source files
CPP_SOURCES=$(wildcard *.cpp) # select all cpp source files
LIBRARY=nope
INCPATHS=include
LIBPATHS=.
LDFLAGS=
CXXFLAGS=-c -Wall -g3
CXX=g++
# Automatic generation of some important lists
# tell the makefile to generate object files for each source file
OBJECTS=$(C_SOURCES:.c=.o) $(CPP_SOURCES:.cpp=.o)
INCFLAGS=$(foreach TMP,$(INCPATHS),-I$(TMP))
LIBFLAGS=$(foreach TMP,$(LIBPATHS),-L$(TMP))
# Set up the output file names for the different output types
BINARY_SERVER=$(SERVER)
BINARY_SUBSCRIBER=$(SUBSCRIBER)
all: $(C_SOURCES) $(CPP_SOURCES) $(BINARY_SERVER) $(BINARY_SUBSCRIBER)
$(BINARY_SERVER): $(filter-out subscriber.o subscriber_utils.o, $(OBJECTS))
$(CXX) $(LIBFLAGS) $(filter-out subscriber.o subscriber_utils.o, $(OBJECTS)) $(LDFLAGS) -o $@
$(BINARY_SUBSCRIBER): $(filter-out server.o server_utils.o, $(OBJECTS))
$(CXX) $(LIBFLAGS) $(filter-out server.o server_utils.o, $(OBJECTS)) $(LDFLAGS) -o $@
.cpp.o:
$(CXX) $(INCFLAGS) $(CXXFLAGS) -fPIC $< -o $@
distclean: clean
rm -f $(BINARY_SERVER) $(BINARY_SUBSCRIBER)
clean:
rm -f $(OBJECTS)

165
README Normal file
View File

@ -0,0 +1,165 @@
POPESCU LUCIAN IOAN 321CDa
--------------------------
TEMA 2 PROTOCOALE DE COMUNICATIE
--------------------------------
I. File list
------------
include/ Header files containing definitions of functions, macros
and data structures for Topic and Client.
protocol.cpp Sends packets between the server and the subscribers
server.cpp Implements the server side functionality
server_utils.cpp Utils that forward topics, manage clients, etc
subscriber.cpp Implements the client side functionality
subscriber_utils.cpp Utilis that print a topic to stdout
tcp_handler.cpp Accepts connections, establishes connections, etc
udp_handler.cpp Establishes connections and parses datagrams
Makefile Makefile to build the server and subscriber executables
README This file
Notations:
@ - variable
# - function
CAPITAL - macro
II. Protocol
------------
The protocol used for communcation between the server and the subscribers is
build on top of the TCP protocol. It consists of multiple replies and
requests between the two instances.
Firstly the server tries to accept the connection of the subscriber using
the accept(2) function. In this time an IDREQUEST is sent to the subscriber.
The newly received client ID will be used to print a feedback message
about the connection to stdout.
The IDREQUEST consists of a single byte defined in include/protocol.hpp.
When the subscriber receives this byte from the socket, it sends back
at most CLIENTID_MAX_LEN(include/protocol.cpp:33) bytes, containing its client
id.
If the client already exists in the database of the server then a
CONNECTION_REFUSED_BYTE(include/protocol.hpp:21) is sent and the client must
disconnect. This way the server makes sure that there will not be two clients
with the same ID at the same time.
Next, the subscriber will send an un/subscribe request. For a subscribe request
the format is the following: "subscribe $topic $SF". The request is sent
in text-format, using human readable characters, over the socket. When arriving
to the server, it parses the packet and binds the clients with its desired
topic. The same happens for an unsubscribe request.
The last type of communication occurs when a topic is sent over the socket.
The server sends in the beginning a TOPIC_ANNOUNCEMENT_BYTE
(include/protocol.hpp:20) to let the subscriber know that a topic is waiting to
be transmitted. Next, the server sends the following fields from the Topic
structure(include/topic.hpp): ip, port, topic, data_type, value_sz. After
the client receives the value_sz field it will wait for value_sz bytes that
represent the value field from Topic. This happens because all the fields
in Topic have fixed length, except value, which can vary between 1 and
1500 bytes. The subscriber will also check if it received the correct number of
bytes. If not it will recall recv(2) to fill the buffer. However, if the
correct number of bytes is not received correctly in the second try, the
subscriber will probably crash or may produce undefined behavior.
III. Server
-----------
The server consist of an initialization phase and a forwarding phase.
In the initialization phase TCP and UDP connections are established,
also data structures for topic and clients are declared. Next is
the forwarding phase where a select(2) call multiplexes between different
file descriptors(TCP listener, UDP listener, TCP clients, stdin).
If the @tcp_listener file descriptor is on it means that a new subscriber
waits to be accepted using include/tcp_handler.hpp:#accept_connection. This
is also the step where the pending topics are sent to the newly subscribed
client.
IF @clients file descriptors are on it means that a new un/subscribe request
is coming and the server must process it. If the subscriber sends 0 bytes
on the socket then it disconnected and the server must delete its entry
from @clients and unvalidate its entry in @topics_table.
If the @udp_listener file descriptor is on it means that a new topic arrived
on the UDP socket, therefore the server will parse the topic to a Topic
data structure(include/topic.hpp) and forward the topic using
include/server_utils.hpp:#forward_topic. Next if the clients disconnected
during the forwarding then delete their entry from @clients also and
unvalidate their entry in @topics_table.
If STDIN_FILENO is set then the server received a command from stdin.
The only command in exit so the server will break from the forwarding
phase, will kill the connections with the clients and will end the
process.
IV. Subscriber
--------------
The subscriber operates in a similar way with the server. It has an
initialization phase where it declares a global buffer and connection
through the socket and a receiving phase where it waits data either from
server or from stdin
If it receives data from server, using the file descriptor @socket_fd,
it has to check what kind of data it is by checking @announce_byte,
which can have one of the following three values:
IDREQUEST_ANNOUNCEMENT_BYTE, TOPIC_ANNOUNCEMENT_BYTE, CONNECTION_REFUSED_BYTE.
They are defined in include/protocol.hpp.
If it receives data from stdin, then it must disconnect if the command is
exit.
V. Data structures
------------------
For representing @topics_table and @pending_table I chose a std::unordered_map
because it let me bind a topic name(@topics_table) represented as a std::string
with a std::list of clients subscribed to that topic. Same goes for @pending_table,
I bound std::string representing the client id with a std::list of topics that
will be received by it after it reconnects.
For representing the @clients I chose a ClientIO structure, defined in include/
Client.hpp. It is basically the same as Client, also defined in include/Client.hpp,
but without the SF option. The latter is used in @topics_table and helps the server
to keep track of the clients susbcribed to a given topic. I could have used a single
Client structure but for me it would have been more ambiguous, so I chose the
easier way.
VI. Macros
----------
I defined macros for each function that I considered to be error-prone in the
header files associated with the respective functions. I did so because I
wanted the code to be as readable as possible, so I tried not to mix the
main code with the error handling code.
VII. Bottlenecks, known bugs and edge cases
-------------------------------
When the client connects to the server there must exist a delay between the time
the connection is established and the first un/subscribe request is sent. This
happens because the server must send back a clientID request which may overlap
with a subscribe request sent by the subscriber. So if, for example, the user
sends its input through a pipe, the subscriber and server will not handle
correctly the input. (example: cat commands | ./subscriber id ip port)
If the subscriber fills the server with a lot of unuseful subscriptions, the
server has no way to know that it should delete them and will eventually run
out of memory.
Before the server disconencts, all messages will arrive to subscribers, because
in the forwarding phase of the server, the stdin input is handled after all
operations are done(forwarding, receiving from udp), so the subscribers should
expect that all topics are received.
The subscribers and the server will output meaningful messages to stdout when
an invalid operation occurs.
If a client will not be able to receive data through the socket on which it is
connected then it will exit and will display an error message. This happens
because when the data is malformed or cannot be transmitted it means that
something bad happened with the server so the client should not continue to
live.
When a client disconnects during topics forwarding then a new entry for its
client id is created in @pending_topics and the topics will be retransmitted
when it reconnects. If it disconnects even when the pending topics forwarding
is performed, then the topics will be dropped.

29
include/Client.hpp Normal file
View File

@ -0,0 +1,29 @@
#ifndef CLIENT_HPP_
#define CLIENT_HPP_
#include <string>
/*
* Definition of a client struct containg info about the
* ID of the client, the file descriptor associated with this client
* and the store&forward option. It is used for forwarding
*
*/
struct Client {
std::string clientID;
int fd;
bool sf;
};
/*
* Definition of a client struct containing info about the ID of the
* client and the file descriptor assoiated with this client. It is
* used for client-server communication.
*
*/
struct ClientIO {
std::string clientID;
int fd;
};
#endif

23
include/helpers.h Normal file
View File

@ -0,0 +1,23 @@
#pragma once
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
/*
* Macro de verificare a erorilor
* Exemplu:
* int fd = open (file_name , O_RDONLY);
* DIE( fd == -1, "open failed");
*/
#define DIE(assertion, call_description) \
do { \
if (assertion) { \
fprintf(stderr, "(%s, %d): ", \
__FILE__, __LINE__); \
perror(call_description); \
exit(EXIT_FAILURE); \
} \
} while(0)

174
include/protocol.hpp Normal file
View File

@ -0,0 +1,174 @@
#ifndef PROTOCOL_HPP_
#define PROTOCOL_HPP_
#include <string>
#include <unordered_map>
#include <list>
#include <Client.hpp>
#include "topic.hpp"
using namespace std;
/*
* Bytes that notify the subscriber what kind of message it
* is about to receive.
*
*/
#define IDREQUEST_ANNOUNCEMENT_BYTE 0xFF
#define TOPIC_ANNOUNCEMENT_BYTE 1
#define CONNECTION_REFUSED_BYTE 2
#define INVALID_REQUEST_ANNOUNCEMENT_BYTE 3
/*
* Maximum size of data that the protocol can operate with.
*
*/
#define BUFLEN (sizeof (Topic))
/*
* Maximum length of a client id.
*
*/
#define CLIENTID_MAX_LEN 10
/*
* Format of subscribe and unsubscribe requests.
*
*/
#define SUBSCRIBE "subscribe"
#define UNSUBSCRIBE "unsubscribe"
/*
* Format of store and forward options.
*
*/
#define SF_ON "1"
#define SF_OFF "0"
/*
* Maximul length (in bytes) of a un/subscribe request.
* Length of UNSUBSCRIBE + length of SF option + maximum length of a topic
* + 2 spaces.
*
*/
#define REQUEST_MAX_LEN (sizeof(UNSUBSCRIBE) - 1 + \
sizeof(SF_ON) - 1 + TOPIC_MAX_LEN + sizeof(uint8_t) * 2)
/*
* Lengths (in words separated by space) of subscribe and unsubscribe requests.
*
*/
#define SUBSCRIBE_REQUEST_LENGTH 3
#define UNSUBSCRIBE_REQUEST_LENGTH 2
/*
* Makes a request for the client_id to a client described by a file
* descriptor @client. It has also a error handler macro associated.
* If a send or recv call fails then the function will return an
* empty string and a error mesage will be displayed.
*
*/
string request_client_id(int client);
#define REQUEST_CLIENT_ID_ERROR(assertion, call_description) \
do { \
if (assertion) { \
fprintf(stderr, "(%s, %d): ", \
__FILE__, __LINE__); \
perror(call_description); \
return {}; \
} \
} while (0)
/*
* Reply for the above mentioned request. The @client_id is transmited
* via the connection established by @socket.
*
* @return: REPLY_SUCCESSFUL if the subscriber sent bytes on the socket
* REPLY_FAILED if the subscriber failed to send bytes
*
*/
#define REPLY_SUCCESSFUL 0
#define REPLY_FAILED 1
#define REPLY_CLIENT_ID_ERROR(assertion, call_description) \
do { \
if (assertion) { \
fprintf(stderr, "(%s, %d): ", \
__FILE__, __LINE__); \
perror(call_description); \
return REPLY_FAILED; \
} \
} while (0)
int reply_client_id(int socket, char *client_id);
/*
* Reply with a CONNECTION_REFUSED_BYTE from server to let the client
* know that there is already a client with the same clientID.
*
*/
#define REPLY_CONNECTION_REFUSED_ERROR(assertion, call_description) \
do { \
if (assertion) { \
fprintf(stderr, "(%s, %d): ", \
__FILE__, __LINE__); \
perror(call_description); \
} \
} while (0)
void reply_connection_refused(int socket);
/*
* Reply with a INVALID_REQUEST_ANNOUNCEMENT_BYTE from server to let the cient
* know that the un/subscribe request is sent is invalid.
*
*/
#define REPLY_INVALID_REQUEST_ERROR(assertion, call_description) \
do { \
if (assertion) { \
fprintf(stderr, "(%s, %d): ", \
__FILE__, __LINE__); \
perror(call_description); \
} \
} while (0)
void reply_invalid_request(int socket);
/*
* When a subscriber sends a subscribe request, this function will resolve
* the request by adding the client in @topics_table.
*
* @return: CONNECTION_CLOSED_ERROR - connection was closed and the request
* could not be processed
* INVALID_REQUEST_ERROR - the request does not have the falid format
* REQUEST_SUCCESSFUL - the request was successfully
* processed
*
* The error macro is used when the function cannot read data from the
* socket.
*
*/
#define REQUEST_SUCCESSFUL 1
#define CONNECTION_CLOSED_ERROR 0
#define INVALID_REQUEST_ERROR -1
int resolve_subscribe_request(int client,
unordered_map<string, list<Client>>& topics_table);
#define RESOLVE_SUBSCRIBE_REQUEST_ERROR(assertion, call_description) \
do { \
if (assertion) { \
fprintf(stderr, "(%s, %d): ", \
__FILE__, __LINE__); \
perror(call_description); \
return CONNECTION_CLOSED_ERROR; \
} \
} while (0)
#endif

94
include/server_utils.hpp Normal file
View File

@ -0,0 +1,94 @@
#ifndef SERVER_UTILS_HPP_
#define SERVER_UTILS_HPP_
#include <unordered_map>
#include <list>
#include <string>
#include "topic.hpp"
#include "Client.hpp"
#include "helpers.h"
#include "sys/socket.h"
using namespace std;
#define EXIT_COMMAND "exit"
#define UNAVAILABLE_CLIENT -1
/*
* Prints the usage if the number of arguments is not correct.
*
*/
void usage(char *filename);
/*
* Forwards the @topic to all the clients subscribed to it. Also updates
* the @pending_table if there are unavailable clients with the SF option on.
*
* If a client from @topics_table becomes unavailable while the topic is
* transmitted then the entry will be invalidated using UNAVAILABLE_CLIENT.
*
*/
void forward_topic(Topic topic, unordered_map<string, list<Client>>& topics_table,
unordered_map<string, list<Topic>>& pending_table);
/*
* After a client reconnects it must receive the pending packets associated
* with its @client_id. If the client disconnects during this operation, then
* the function will return an error value. Furthermore if the client
* disconnects during the forwarding then all pending packets will be
* dropped.
*
* @return: FORWARD_SUCCESSFUL when the operation finishes successfully
* FORWARD_FAILED the client disconnected during this operation
*
*/
#define FORWARD_SUCCESSFUL 0
#define FORWARD_FAILED 1
int forward_pending_topics(int fd, string client_id,
unordered_map<string, list<Topic>>& pending_table);
/*
* Adds a all @clients to @set for further I/O multiplexing.
* Also update max_fd.
*
*/
void fd_set_add_clients(fd_set& set, int& max_fd, list<ClientIO> clients);
/*
* Close the connection with each client from @clients.
*
*/
void clients_close_connection(list<ClientIO> clients);
/*
* Traverse @topics_table and make the client unavailable by setting its file descriptor
* field to UNAVAILABLE_CLIENT.
*
*/
void unvalidate_client(int client_fd,
unordered_map<string, list<Client>>& topics_table);
/*
* Traverse @topics_table and make the client available by setting its flie descriptor
* field to @new_fd.
*
* @return: the sf option of the validated client. It returns the sf option because there
* is no other palace where this option is stored and the server main function needs
* to know if it has to flush @pending_topics for @client_id.
*
*/
bool validate_client(string client_id, int new_fd,
unordered_map<string, list<Client>>& topics_table);
/*
* After #forward_topic function is called, some clients may be invalidated, so
* their entry must be deleted from @clientsIO.
*
*/
void delete_invalid_clients(list<Client> subscribers, list<ClientIO>& clientsIO);
#endif

View File

@ -0,0 +1,21 @@
#ifndef SUBSCRIBER_UTILS_HPP
#define SUBSCRIBER_UTILS_HPP
#include "topic.hpp"
/*
* Format of exit request.
*
*/
#define EXIT "EXIT"
void usage(char *filename);
/*
* Print to stdout the received topic.
*
*/
void print_topic(Topic *topic);
#endif

51
include/tcp_handler.hpp Normal file
View File

@ -0,0 +1,51 @@
#ifndef HANDLER_TCP_HPP_
#define HANDLER_TCP_HPP_
#include <list>
#include <vector>
#include "Client.hpp"
#define DEFAULT_BACKLOG 32
using namespace std;
/*
* Set up a TCP connection by returing a file descriptor
* that points to a socket that listens for new TCP connections.
*
*/
int establish_tcp_connection(int port);
/*
* Helps a client to connect to the server specified by @server_ip
* and @server_port.
*
*/
int connect_to_server(char *server_ip, char *server_port);
/*
* Accept an incomming connection from @listener and insert its file descriptor
* in @clients_fd.
*
* @return; ACCEPT_SUCCESSFUL
* ACCEPT_FAILED
*
*/
#define ACCEPT_SUCCESSFUL 0
#define ACCEPT_FAILED 1
#define ACCEPT_CONNECTION_ERROR(assertion, call_description) \
do { \
if (assertion) { \
fprintf(stderr, "(%s, %d): ", \
__FILE__, __LINE__); \
perror(call_description); \
return ACCEPT_FAILED; \
} \
} while (0)
int accept_connection(int listener, list<ClientIO>& client);
#endif

75
include/topic.hpp Normal file
View File

@ -0,0 +1,75 @@
#ifndef TOPIC_HPP_
#define TOPIC_HPP_
#include <string.h>
#include <cstdint>
/*
* Format of data types.
*
*/
#define INT "INT"
#define SHORT_REAL "SHORT_REAL"
#define FLOAT "FLOAT"
#define STRING "STRING"
/*
* Identifiers for each data type.
*
*/
#define INT_BYTE 0
#define SHORT_REAL_BYTE 1
#define FLOAT_BYTE 2
#define STRING_BYTE 3
/*
* Size of value field in topic for each data type.
*
*/
#define INT_BYTE_VALUE_SIZE 5
#define SHORT_REAL_BYTE_VALUE_SIZE 2
#define FLOAT_BYTE_VALUE_SIZE 6
/*
* Bytes for signedness.
*
*/
#define POSITIVE 0
#define NEGATIVE 1
/*
* Length of fields present in @Topic.
*
*/
#define TOPIC_MAX_LEN 50
#define VALUE_MAX_LEN 1500
/*
* Definition of structure that will be sent over the TCP conenction
* to each subscriber.
*
*/
struct Topic {
uint32_t ip;
uint16_t port;
char topic[TOPIC_MAX_LEN];
uint8_t data_type;
size_t value_sz;
char value[VALUE_MAX_LEN];
/*
* Compare all fields in topic. Use == for primitive data types and memcmp
* for arrays.
*
*/
bool operator == (Topic& t) const {
return ip == t.ip and port == t.port and !memcmp(topic, t.topic, TOPIC_MAX_LEN)
and data_type == t.data_type and value_sz == t.value_sz
and !memcmp(value, t.value, VALUE_MAX_LEN);
}
}__attribute__((packed));
#endif

44
include/udp_handler.hpp Normal file
View File

@ -0,0 +1,44 @@
#ifndef HANDLER_UDP_HPP_
#define HANDLER_UDP_HPP_
#include <string>
#include "topic.hpp"
using namespace std;
/*
* Used for receiving a datagram in #parse_datagram.
*
*/
#define TOPIC 50
#define DATA_TYPE 1
#define CONTENT 1500
#define UDP_DATAGRAM_LEN (TOPIC + DATA_TYPE + CONTENT)
/*
* Set up a UDP connections by returning a file descriptor that points
* to a socket that listens for UDP datagrams.
*
*/
int establish_udp_connection(int port);
/*
* Receive a datagram from @socket and parse to be compatible with the
* Topic format (see include/topic.hpp).
*
*/
#define PARSE_DATAGRAM_ERROR(assertion, call_description) \
do { \
if (assertion) { \
fprintf(stderr, "(%s, %d): ", \
__FILE__, __LINE__); \
perror(call_description); \
return {}; \
} \
} while (0)
Topic parse_datagram(int socket);
#endif

309
protocol.cpp Normal file
View File

@ -0,0 +1,309 @@
#include <cstdlib>
#include <sstream>
#include <vector>
#include <iterator>
#include <algorithm>
#include <iostream>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include "protocol.hpp"
#include "helpers.h"
string request_client_id(int client) {
/*
* Announce the client that it receives an id request.
*
*/
uint8_t announce_byte = IDREQUEST_ANNOUNCEMENT_BYTE;
int write_bytes = send(client, &announce_byte, sizeof(announce_byte), 0);
REQUEST_CLIENT_ID_ERROR(write_bytes < 0, "server: request_client_id: write");
/*
* Wait for the reply.
*
*/
char buffer[CLIENTID_MAX_LEN + 1];
memset(buffer, 0x00, CLIENTID_MAX_LEN + 1);
int read_bytes = recv(client, buffer, CLIENTID_MAX_LEN, 0);
REQUEST_CLIENT_ID_ERROR(read_bytes < 0, "server: request_client_id: read");
return {buffer};
}
int reply_client_id(int socket, char *client_id) {
char buffer[CLIENTID_MAX_LEN + 1];
memset(buffer, 0x00, CLIENTID_MAX_LEN + 1);
strcpy(buffer, client_id);
int write_bytes = send(socket, buffer, CLIENTID_MAX_LEN, 0);
REPLY_CLIENT_ID_ERROR(write_bytes < 0, "client: reply_client_id: send:");
return REPLY_SUCCESSFUL;
}
void reply_connection_refused(int socket) {
uint8_t announce_byte = CONNECTION_REFUSED_BYTE;
int write_bytes = send(socket, &announce_byte, sizeof(announce_byte), 0);
REPLY_CONNECTION_REFUSED_ERROR(write_bytes < 0,
"server: reply_connection_refused: send ");
}
void reply_invalid_request(int socket) {
uint8_t announce_byte = INVALID_REQUEST_ANNOUNCEMENT_BYTE;
int write_bytes = send(socket, &announce_byte, sizeof(announce_byte), 0);
REPLY_INVALID_REQUEST_ERROR(write_bytes < 0,
"server: reply_connection_refused: send ");
}
int resolve_subscribe_request(int client,
unordered_map<string, list<Client>>& topics_table) {
char buffer[BUFLEN];
memset(buffer, 0x00, BUFLEN);
/*
* Read the request from client.
*
*/
int read_bytes = recv(client, buffer, REQUEST_MAX_LEN, 0);
RESOLVE_SUBSCRIBE_REQUEST_ERROR(read_bytes < 0,
"server: resolve_subscribe_request: read");
/*
* If 0 bytes were received it means that the client disconnected and
* the server must close the file desciptor associated.
*
*/
if (read_bytes == 0) {
return CONNECTION_CLOSED_ERROR;
}
/*
* Request the id. If #request_client_id returns an empty string
* then the connection closed and #resolve_subscribe_request will
* return CONNECTION_CLOSED_ERROR.
*
*/
string client_id = request_client_id(client);
if (client_id.empty() == true) {
return CONNECTION_CLOSED_ERROR;
}
/*
* Split the request in separate tokens.
*
*/
istringstream request_stream(buffer);
vector<string> request(istream_iterator<string>{request_stream},
istream_iterator<string>{});
/*
* Process the SUBSCRIBE and UNSUBSCRIBE requests.
*
*/
if (request.size() == SUBSCRIBE_REQUEST_LENGTH) {
/*
* Parse the request.
*
*/
string request_type = request.at(0);
string topic = request.at(1);
string sf = request.at(2);
/*
* If topic length is bigger than TOPIC_MAX_LEN then it is an
* invalid requet.
*
*/
if (topic.length() > TOPIC_MAX_LEN) {
cout << "Topic size too big. Aborting operation.." << endl;
reply_invalid_request(client);
return INVALID_REQUEST_ERROR;
}
/*
* Process a subscribe request.
*
*/
if (request_type.compare({SUBSCRIBE}) == 0) {
/*
* Check if the @topic can be found in @topics_table.
* If not insert it, else use the existing one.
*
*/
if (topics_table.find(topic) == topics_table.end()) {
topics_table.insert({topic, {}});
}
auto& topics_table_entry = topics_table.at(topic);
/*
* Check if the client alrady exists in @topics_table_entry.
*
* If it exists and has the same SF option then return with
* REQUEST_SUCCESSFUL and do nothing, else if the SF option
* is different then update the SF for this client at this
* topic and return REQUEST_SUCCESSFUL.
*
*/
for (auto& topics_table_client : topics_table_entry) {
if (topics_table_client.clientID.compare(client_id) == 0
and stoi(sf) == topics_table_client.sf) {
cout << "Client " << client_id << " already subscribed to "
<< topic << ". Skipping this operation,," << endl;
return REQUEST_SUCCESSFUL;
} else if (topics_table_client.clientID.compare(client_id) == 0
and stoi(sf) != topics_table_client.sf) {
/*
* Check SF to be a valid option.
*
*/
if (stoi(sf) != stoi(SF_ON) && stoi(sf) != stoi(SF_OFF)) {
reply_invalid_request(client);
cout << "Invalid subscribe request: SF" << endl;
return INVALID_REQUEST_ERROR;
}
topics_table_client.sf = stoi(sf);
cout << "SF option was updated for client " << client_id
<< " who was subscribed to " << topic << endl;
return REQUEST_SUCCESSFUL;
}
}
/*
* Process store and forward option.
*
*/
if (sf.compare({SF_ON}) == 0) {
topics_table_entry.push_back({client_id, client, 1});
cout << "Client " << client_id << " subscribed successfully to topic "
<< topic << " with SF option: 1" << endl;
return REQUEST_SUCCESSFUL;
} else if (sf.compare({SF_OFF}) == 0) {
topics_table_entry.push_back({client_id, client, 0});
cout << "Client " << client_id << " subscribed successfully to topic "
<< topic << " with SF option: 0" << endl;
return REQUEST_SUCCESSFUL;
} else {
reply_invalid_request(client);
cout << "Invalid subscribe request: SF" << endl;
return INVALID_REQUEST_ERROR;
}
} else {
reply_invalid_request(client);
cout << "Invalid request" << endl;
return INVALID_REQUEST_ERROR;
}
} else if(request.size() == UNSUBSCRIBE_REQUEST_LENGTH) {
/*
* Parse the request.
*
*/
string request_type = request.at(0);
string topic = request.at(1);
/*
* Process an unsubscribe request.
*
*/
if (request_type.compare({UNSUBSCRIBE}) == 0) {
/*
* Check if the @topic can be found in @topics_table.
*
*/
if(topics_table.find(topic) == topics_table.end()) {
cout << "Topic cannot be unsubscribed because it does not exists" << endl;
reply_invalid_request(client);
return INVALID_REQUEST_ERROR;
}
/*
* Find the topic to which the client was subscribed.
*
*/
auto topics_table_entry = topics_table.at(topic);
auto remove_iter = remove_if(topics_table_entry.begin(),
topics_table_entry.end(),
[client] (Client& lst_client) {return lst_client.fd == client;});
/*
* If the client was not subscribed to this topic then skip, else
* delete its corresponding Client entry from the list of clients
* subscribed to this topic.
*
*/
if (remove_iter == topics_table_entry.end()) {
reply_invalid_request(client);
cout << "Cannot remove client from this topic because he was not"
<< " subscribed" << endl;
return INVALID_REQUEST_ERROR;
} else {
topics_table_entry.erase(remove_iter);
cout << "Client " << client_id << " unsubscribed successfully from topic "
<< topic << endl;
return REQUEST_SUCCESSFUL;
}
} else {
reply_invalid_request(client);
cout << "Invalid request" << endl;
return INVALID_REQUEST_ERROR;
}
} else {
reply_invalid_request(client);
cout << "Invalid request" << endl;
return INVALID_REQUEST_ERROR;
}
}

49
sample_payloads.json Normal file
View File

@ -0,0 +1,49 @@
[{
"description": "topic {a_non_negative_int} - type {INT} - value{10}",
"payload_base64": "YV9ub25fbmVnYXRpdmVfaW50AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAo="
}, {
"description": "topic {a_negative_int} - type {INT} - value{-10}",
"payload_base64": "YV9uZWdhdGl2ZV9pbnQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAo="
}, {
"description": "topic {a_larger_value} - type {INT} - value{1234567890}",
"payload_base64": "YV9sYXJnZXJfdmFsdWUAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEmWAtI="
}, {
"description": "topic {a_large_negative_value} - type {INT} - value{-1234567890}",
"payload_base64": "YV9sYXJnZV9uZWdhdGl2ZV92YWx1ZQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAUmWAtI="
}, {
"description": "topic {<50 chars of abc...>} - type {INT} - value{10}",
"payload_base64": "YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3gAAAAAAAo="
}, {
"description": "topic {that_is_small_short_real} - type {SHORT_REAL} - value{2.30}",
"payload_base64": "dGhhdF9pc19zbWFsbF9zaG9ydF9yZWFsAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAOY="
}, {
"description": "topic {that_is_big_short_real} - type {SHORT_REAL} - value{655.05}",
"payload_base64": "dGhhdF9pc19iaWdfc2hvcnRfcmVhbAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB/+E="
}, {
"description": "topic {that_is_integer_short_real} - type {SHORT_REAL} - value{17 / 17.00}",
"payload_base64": "dGhhdF9pc19pbnRlZ2VyX3Nob3J0X3JlYWwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBqQ="
}, {
"description": "topic {float_seventeen} - type {FLOAT} - value{17}",
"payload_base64": "ZmxvYXRfc2V2ZW50ZWVuAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAABEA"
}, {
"description": "topic {float_minus_seventeen} - type {FLOAT} - value{-17}",
"payload_base64": "ZmxvYXRfbWludXNfc2V2ZW50ZWVuAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQAAABEA"
}, {
"description": "topic {a_strange_float} - type {FLOAT} - value{1234.4321}",
"payload_base64": "YV9zdHJhbmdlX2Zsb2F0AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAC8XAEE"
}, {
"description": "topic {a_negative_strange_float} - type {FLOAT} - value{-1234.4321}",
"payload_base64": "YV9uZWdhdGl2ZV9zdHJhbmdlX2Zsb2F0AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQC8XAEE"
}, {
"description": "topic {a_subunitary_float} - type {FLOAT} - value{0.042}",
"payload_base64": "YV9zdWJ1bml0YXJ5X2Zsb2F0AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAACoD"
}, {
"description": "topic {a_negative_subunitary_float} - type {FLOAT} - value{-0.042}",
"payload_base64": "YV9uZWdhdGl2ZV9zdWJ1bml0YXJ5X2Zsb2F0AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQAAACoD"
}, {
"description": "topic {ana_string_announce} - type {STRING} - value{Ana are mere}",
"payload_base64": "YW5hX3N0cmluZ19hbm5vdW5jZQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADQW5hIGFyZSBtZXJl"
}, {
"description": "topic {huge_string} - type {STRING} - value{<1500 chars of \"abc...\">}",
"payload_base64": "aHVnZV9zdHJpbmcAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFy"
}]

236
server.cpp Normal file
View File

@ -0,0 +1,236 @@
#include <iostream>
#include <string.h>
#include <unistd.h>
#include "server_utils.hpp"
#include "tcp_handler.hpp"
#include "udp_handler.hpp"
#include "helpers.h"
#include "protocol.hpp"
using namespace std;
int main(int argc, char **argv) {
if (argc < 2) {
usage(argv[0]);
exit(EXIT_SUCCESS);
}
/*
* Initialize the sockets used for TCP and UDP communication.
*
*/
int port = atoi(argv[1]);
DIE(port == 0, "server: atoi");
int tcp_listener = establish_tcp_connection(port);
int udp_listener = establish_udp_connection(port);
/*
* Keep a list of file descriptors associated with each TCP client
* that is connected to server.
*
*/
list<ClientIO> clients;
/*
* Keep a table that will save the clients subscribed to different
* topics.
*
*/
unordered_map<string, list<Client>> topics_table;
/*
* Keep a table that will save for each user the packets to be
* transmitted after a client reconnects and has SF option on.
*
*/
unordered_map<string, list<Topic>> pending_table;
/*
* File descriptors used for I/O multiplexing.
*
*/
fd_set read_fds;
int max_fd;
while (true) {
/*
* Initialize the file descriptors set and call select for I/O multiplexing.
* Listen on udp, tcp accept and stdin. Also listen on the connections
* established with the clients.
*
*/
FD_ZERO(&read_fds);
FD_SET(tcp_listener, &read_fds);
FD_SET(udp_listener, &read_fds);
FD_SET(STDIN_FILENO, &read_fds);
max_fd = max(tcp_listener, udp_listener);
max_fd = max(STDIN_FILENO, max_fd);
fd_set_add_clients(read_fds, max_fd, clients);
/*
* Call select for I/O multiplexing.
*
*/
int read_activity;
read_activity = select(max_fd + 1, &read_fds, NULL, NULL, NULL);
DIE(read_activity < 0, "server: select");
/*
* If something happened on tcp_listener_socket it means that a new connection
* waits to be accepted.
*
*/
if (FD_ISSET(tcp_listener, &read_fds)) {
/*
* If accept_connection was not successful, then skip
* the operations below.
*
*/
int ac_ret = accept_connection(tcp_listener, clients);
if (ac_ret == ACCEPT_SUCCESSFUL) {
/*
* Maybe the client reconnected, so the server must validate its
* entry in topics table. The entry was unvalidated by #unvalidate_client.
* It(unvalidate_client) is called when resolve_subscribe_request
* returns a CONNECTION_CLOSED_ERROR value.
*
*/
string last_connected_id = clients.back().clientID;
int last_connected_fd = clients.back().fd;
bool sf = validate_client(last_connected_id, last_connected_fd, topics_table);
/*
* If the client has the SF option on then send the topics in
* @pending_table assigned to it.
*
*/
if (sf == true) {
int fpt_ret = forward_pending_topics(last_connected_fd,
last_connected_id, pending_table);
/*
* If forwarding failed the remove the client.
*
*/
if (fpt_ret == FORWARD_FAILED) {
cout << "Client removed after pending topics forwarding was not"
<< " successful." << endl;
auto last_connected = --clients.end();
clients.erase(last_connected);
unvalidate_client(last_connected_fd, topics_table);
}
}
}
}
/*
* Check the rest of the file descriptors in read_fds. Here the server
* resolves the subscribe and unsubscribe requests.
*
*/
auto client = clients.begin();
while (client != clients.end()) {
int fd = (*client).fd;
if (FD_ISSET(fd, &read_fds)) {
int rsr_ret = resolve_subscribe_request(fd, topics_table);
/*
* If the client has disconnected then delete its entry
* from @clients. Also make its entry from @topics_table
* unavailable to let the #forward_topic function know that
* it should not send a topic on the file descriptor associated
* with this client.
*
*/
if (rsr_ret == CONNECTION_CLOSED_ERROR) {
cout << "Client " << (*client).clientID <<
" made unavailable until reconnection" << endl;
unvalidate_client((*client).fd, topics_table);
close(fd);
client = clients.erase(client);
continue;
}
}
client++;
}
/*
* If something happened on udp_listener_socket it means that a new topic arrived
* and waits to be forwarded to its subscribers.
*
*/
if (FD_ISSET(udp_listener, &read_fds)) {
Topic topic = parse_datagram(udp_listener);
/*
* If #parse_datagram returns an empty topic it means an error occured and the
* operation will be skipped.
*
*/
Topic empty_topic{};
if (topic == empty_topic) {
continue;
}
forward_topic(topic, topics_table, pending_table);
/*
* Delete invalid clients. These are the clients that disconnected
* during #forward_topic. The condition in if assures that the topic
* for which we delete the invalid clients appears in @topics_table.
*
*/
if (topics_table.find(topic.topic) != topics_table.end()) {
delete_invalid_clients(topics_table.at(topic.topic), clients);
}
}
/*
* If something happened on stdin then the server must be closed.
*
*/
if (FD_ISSET(STDIN_FILENO, &read_fds)) {
char buffer[sizeof(EXIT_COMMAND) + 1];
memset(buffer, 0x00, sizeof(EXIT_COMMAND) + 1);
fgets(buffer, sizeof(EXIT_COMMAND), stdin);
/*
* End the session if the server receives EXIT_COMMAND.
*
*/
if (strncmp(buffer, EXIT_COMMAND, sizeof(EXIT_COMMAND)) == 0) {
cout << "Disconnecting.." << endl;
break;
}
}
}
/*
* Close the connection with the tcp clients.
*
*/
clients_close_connection(clients);
close(tcp_listener);
close(udp_listener);
return EXIT_SUCCESS;
}

283
server_utils.cpp Normal file
View File

@ -0,0 +1,283 @@
#include <iostream>
#include <algorithm>
#include <string.h>
#include <unistd.h>
#include "server_utils.hpp"
#include "protocol.hpp"
/*
* Prototypes of local scope functions.
*
*/
/*
* Adds @topic in @pending_table in the entry associated with @client_id.
* It is used in #forward_topic.
*
*/
void pending_table_add(string client_id, Topic topic,
unordered_map<string, list<Topic>>& pending_table);
/*
* Helper function that send a @topic on a socket associated with @fd.
* It is configured to send a packet using the topic transmission
* protocol.
*
* @return: SEND_SUCCESSFUL - the topic was successfully transmitted
* SEND_FAILED - the connection was closed unexpectedly
*
*/
#define SEND_SUCCESSFUL 0
#define SEND_FAILED 1
int send_packet(int fd, Topic topic);
#define SEND_PACKET_ERROR(assertion, call_description) \
do { \
if (assertion) { \
fprintf(stderr, "(%s, %d): ", \
__FILE__, __LINE__); \
perror(call_description); \
return SEND_FAILED; \
} \
} while (0)
/*
* Implementation of functions defined in protocol.hpp.
*
*/
void usage(char *filename) {
std::cout << "Usage: " << filename << " PORT" << std::endl;
}
void fd_set_add_clients(fd_set& set, int& max_fd, list<ClientIO> clients) {
for (auto client = clients.begin(); client != clients.end(); client++) {
int fd = (*client).fd;
FD_SET(fd, &set);
max_fd = max(max_fd, fd);
}
}
void clients_close_connection(list<ClientIO> clients) {
for (auto client : clients) {
close(client.fd);
}
}
void forward_topic(Topic topic, unordered_map<string, list<Client>>& topics_table,
unordered_map<string, list<Topic>>& pending_table) {
/*
* Search for the topic in topics_table.
*/
if (topics_table.find(topic.topic) == topics_table.end()) {
return;
}
/*
* Send the topic to all the subscribed clients.
*
*/
auto& clients = topics_table.at(topic.topic);
for (auto& client : clients) {
int fd = client.fd;
/*
* If the client is unavailable and has SF option on the add the packet
* in the pending_table. Else if the SF option is not set, then skip.
*
*/
if (fd == UNAVAILABLE_CLIENT && client.sf == true) {
pending_table_add(client.clientID, topic, pending_table);
continue;
} else if (fd == UNAVAILABLE_CLIENT && client.sf == false) {
continue;
}
/*
* If #send_packet failed then unvalidate the client.
*
*/
int send_packet_ret = send_packet(fd, topic);
if (send_packet_ret == SEND_FAILED) {
client.fd = UNAVAILABLE_CLIENT;
}
}
}
int forward_pending_topics(int fd, string client_id,
unordered_map<string, list<Topic>>& pending_table) {
int return_code = FORWARD_SUCCESSFUL;
if (pending_table.find(client_id) == pending_table.end()) {
cout << "Nothing to forward from pending table at the moment" << endl;
return return_code;
}
auto& topics = pending_table.at(client_id);
for (auto& topic : topics) {
/*
* If the topics cannot be sent then an error persists on the
* network and all topics will be dropped.
*
*/
int send_packet_ret = send_packet(fd, topic);
if (send_packet_ret == SEND_FAILED) {
cout << "Error while trying to send pending packets for client "
<< client_id << ". Aborting this operation.." << endl;
return_code = FORWARD_FAILED;
break;
}
}
pending_table.erase(client_id);
return return_code;
}
void unvalidate_client(int client_fd,
unordered_map<string, list<Client>>& topics_table) {
for (auto& t_clients : topics_table) {
for (auto& client : t_clients.second) {
if (client.fd == client_fd) {
client.fd = UNAVAILABLE_CLIENT;
}
}
}
}
bool validate_client(string client_id, int new_fd,
unordered_map<string, list<Client>>& topics_table) {
/*
* Keep a counter that counts how many times a client was validated.
* If the counter not equal to 0 it means that the client reconnected
* and was validated.
*
*/
int count(0);
bool sf(false);
for (auto& t_clients : topics_table) {
for (auto& client : t_clients.second) {
if (client.clientID.compare(client_id) == 0) {
client.fd = new_fd;
count++;
sf = client.sf;
}
}
}
if (count != 0) {
cout << "File descriptor for client " << client_id <<
" was restored." << endl;
}
return sf;
}
void delete_invalid_clients(list<Client> subscribers, list<ClientIO>& clientsIO) {
/*
* Iterate through the subscribers and check if fd field is UNAVAILABLE_CLIENT.
* Find the clientID in clientsIO and delete that entry.
*
*/
for (auto subscriber : subscribers) {
if (subscriber.fd == UNAVAILABLE_CLIENT) {
auto client = find_if(clientsIO.begin(), clientsIO.end(),
[subscriber] (ClientIO& c)
{return c.clientID.compare(subscriber.clientID) == 0; });
if (client != clientsIO.end()) {
clientsIO.erase(client);
}
}
}
}
void pending_table_add(string client_id, Topic topic,
unordered_map<string, list<Topic>>& pending_table) {
/*
* If the @client_id is not present in @pending_table then add the client.
*
*/
if (pending_table.find(client_id) == pending_table.end()) {
pending_table.insert({client_id, {}});
}
/*
* Add @topic to the pending list of @client_id.
*
*/
auto& pending_table_entry = pending_table.at(client_id);
pending_table_entry.push_back(topic);
}
int send_packet(int fd, Topic topic) {
/*
* Announce the subscriber that a topic is coming.
*
*/
uint8_t announce_byte = TOPIC_ANNOUNCEMENT_BYTE;
int write_bytes = send(fd, &announce_byte, sizeof(announce_byte), 0);
SEND_PACKET_ERROR(write_bytes < 0, "server_utils: send_packet: send: ");
/*
* Send all fields from Topic, excluding value because it has variable size and
* must send topic.value_sz bytes.
*
*/
char buffer[sizeof(Topic) - VALUE_MAX_LEN];
memcpy(buffer, (const void *) &topic, sizeof(Topic) - VALUE_MAX_LEN);
write_bytes = send(fd, buffer, sizeof(Topic) - VALUE_MAX_LEN, 0);
SEND_PACKET_ERROR(write_bytes < 0, "server_utils: send_packet: send: ");
/*
* Send the value field.
*
*/
char buffer_value[VALUE_MAX_LEN];
size_t value_sz = topic.value_sz;
memcpy(buffer_value, topic.value, value_sz);
write_bytes = send(fd, buffer_value, value_sz, 0);
SEND_PACKET_ERROR(write_bytes < 0, "server_utils: send_packet: send: ");
return SEND_SUCCESSFUL;
}

233
subscriber.cpp Normal file
View File

@ -0,0 +1,233 @@
#include <iostream>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include "tcp_handler.hpp"
#include "subscriber_utils.hpp"
#include "helpers.h"
#include "protocol.hpp"
using namespace std;
int main(int argc, char **argv) {
if (argc < 4) {
usage(argv[0]);
exit(EXIT_SUCCESS);
}
char *client_id = argv[1];
/*
* Check the length of the id.
*
*/
if (strlen(client_id) > CLIENTID_MAX_LEN) {
cout << "Client ID too long, aborting.." << endl;
exit(EXIT_SUCCESS);
}
/*
* Global buffer to save the data received via the connection established
* by @socket_fd defined below.
*
*/
char buffer[BUFLEN];
/*
* Connect to server.
*
*/
int socket_fd = connect_to_server(argv[2], argv[3]);
/*
* Create fd set for I/O multiplexing.
*
*/
fd_set read_fds;
int fd_max;
while (true) {
/*
* Initialize the file descriptors set and call select for I/O multiplexing.
* The subscriber only needs to know about standard input and socket.
*
*/
FD_ZERO(&read_fds);
FD_SET(STDIN_FILENO, &read_fds);
FD_SET(socket_fd, &read_fds);
fd_max = max(STDIN_FILENO, socket_fd);
/*
* Call select for I/O multiplexing.
*
*/
int read_action = select(fd_max + 1, &read_fds, NULL, NULL, NULL);
DIE(read_action < 0, "client: select:");
if (FD_ISSET(socket_fd, &read_fds)) {
memset(buffer, 0x00, BUFLEN);
uint8_t announce_byte;
unsigned read_bytes = recv(socket_fd, &announce_byte, sizeof(announce_byte), 0);
DIE(read_bytes < 0, "subscriber: recv: ");
/*
* Server shut down so clients will also shut down.
*
*/
if (read_bytes == 0) {
cout << "Disconnecting.." << endl;
break;
}
/*
* Server received an id_request so it will respond with
* its id.
*
*/
if (announce_byte == IDREQUEST_ANNOUNCEMENT_BYTE) {
/*
* If the subscriber could not send its client id on the socket
* it means that the server closed unexpectedly and the subscriber
* must end the process.
*
*/
int rci_ret = reply_client_id(socket_fd, client_id);
if (rci_ret == REPLY_FAILED) {
cout << "Client could not send its client id. Disconnecting.."
<< endl;
break;
}
}
/*
* Server received the topic it is subscribed to.
*
*/
else if (announce_byte == TOPIC_ANNOUNCEMENT_BYTE) {
/*
* Read all fields from topic, excluding value field.
*
*/
read_bytes = recv(socket_fd, buffer, sizeof(Topic) - VALUE_MAX_LEN, 0);
DIE(read_bytes < 0, "subscriber: recv: ");
/*
* Create a topic object to store the received bytes.
*
*/
Topic topic;
memset((void *) &topic, 0x00, sizeof(Topic));
memcpy((void *) &topic, buffer, sizeof(Topic));
/*
* If last recv(2) did not receive sizeof(Topic) - VALUE_MAX_LEN bytes
* then the subscriber must still receive bytes.
*
*/
if (read_bytes < sizeof(Topic) - VALUE_MAX_LEN) {
unsigned remaining = sizeof(Topic) - VALUE_MAX_LEN - read_bytes;
/*
* Start receiving remaining bytes from buffer + read_bytes.
*
*/
read_bytes = recv(socket_fd, buffer + read_bytes, remaining, 0);
DIE(read_bytes < 0, "subscriber: recv: ");
memcpy((void *) &topic, buffer, sizeof(Topic));
}
size_t value_sz = topic.value_sz;
/*
* Read the variable-sized field, value.
*
*/
read_bytes = recv(socket_fd, topic.value, value_sz, 0);
DIE(read_bytes < 0, "subscriber: recv: ");
/*
* Same story as above. If value_sz bytes were not received then receive
* the remaining bytes.
*
*/
if (read_bytes != value_sz) {
unsigned remaining = value_sz - read_bytes;
read_bytes = recv(socket_fd, topic.value + read_bytes, remaining, 0);
DIE(read_bytes < 0, "subscriber: recv: ");
if (read_bytes != remaining) {
cout << "Afara 2" << endl;
break;
}
}
/*
* Parse the topic and print its content.
*
*/
print_topic(&topic);
} else if (announce_byte == CONNECTION_REFUSED_BYTE) {
/*
* Notify the client that the clientID already exits.
*
*/
puts("ClientID already existent, retry with another clientID.");
break;
} else if (announce_byte == INVALID_REQUEST_ANNOUNCEMENT_BYTE) {
/*
* Notift the client that the sent request was not a valid one.
*
*/
puts("Invalid request");
continue;
}
} else if (FD_ISSET(STDIN_FILENO, &read_fds)) {
memset(buffer, 0x00, BUFLEN);
fgets(buffer, REQUEST_MAX_LEN, stdin);
/*
* Client wants to end the current session.
*
*/
if (strncmp(buffer, EXIT, strlen(EXIT)) == 0) {
cout << "Disconnecting.." << endl;
break;
}
/*
* Send message to server.
*
*/
int write_bytes = send(socket_fd, buffer, REQUEST_MAX_LEN, 0);
DIE(write_bytes < 0, "client: send");
}
}
/*
* Close the socket and exit.
*
*/
close(socket_fd);
return EXIT_SUCCESS;
}

88
subscriber_utils.cpp Normal file
View File

@ -0,0 +1,88 @@
#include <iostream>
#include <cmath>
#include <iomanip>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "subscriber_utils.hpp"
using namespace std;
void usage(char *filename) {
cout << "Usage: " << filename << " ID_client IP_server PORT_server" << endl;
}
void print_topic(Topic *topic) {
/*
* Cast directly to struct in_addr in inet_ntoa call.
*
*/
cout << inet_ntoa({ .s_addr = topic->ip }) << ":" << topic->port << " ";
cout << topic->topic << " ";
/*
* Switch based on the data type format to be displayed.
*
*/
switch (topic->data_type) {
case (INT_BYTE):
cout << INT << " ";
break;
case (SHORT_REAL_BYTE):
cout << SHORT_REAL << " ";
break;
case (FLOAT_BYTE):
cout << FLOAT << " ";
break;
case (STRING_BYTE):
cout << STRING << " ";
break;
default:
break;
}
/*
* Cast the values appropriately.
*
*/
if (topic->data_type == INT_BYTE) {
uint8_t *sign_byte = (uint8_t *) topic->value;
uint32_t *value = (uint32_t *) (topic->value + sizeof(uint8_t));
int result = htonl(*value);
result = (*sign_byte == NEGATIVE) ? (-1) * (result) : result;
cout << result;
} else if (topic->data_type == SHORT_REAL_BYTE) {
uint16_t *value = (uint16_t *) topic->value;
cout << fixed << setprecision(2) << (float) ntohs(*value) / 100;
} else if (topic->data_type == FLOAT_BYTE) {
uint8_t *sign_byte = (uint8_t *) topic->value;
uint32_t *value = (uint32_t *) (topic->value + sizeof(uint8_t));
uint8_t *power = (uint8_t *) (topic->value + sizeof(uint8_t) + sizeof(uint32_t));
double result = ntohl(*value) * pow(10, -(*power));
result = (*sign_byte == NEGATIVE) ? (-1) * result : result;
cout << fixed << setprecision(4) << (double) result;
} else if (topic->data_type == STRING_BYTE) {
cout << topic->value;
}
cout << endl;
}

170
tcp_handler.cpp Normal file
View File

@ -0,0 +1,170 @@
#include <iostream>
#include <string>
#include <vector>
#include <algorithm>
#include "tcp_handler.hpp"
#include "helpers.h"
#include "protocol.hpp"
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
using namespace std;
int establish_tcp_connection(int port) {
/*
* Create listener socket.
*
*/
int listener_socket = socket(AF_INET, SOCK_STREAM, 0);
DIE(listener_socket < 0, "server: tcp: socket");
/*
* Define the attributes of the socket.
*
*/
struct sockaddr_in server_addr_tcp;
server_addr_tcp.sin_family = AF_INET;
server_addr_tcp.sin_addr.s_addr = INADDR_ANY;
server_addr_tcp.sin_port = htons(port);
/*
* Disable Nagle buffering using setsockopt with TCP_NODELAY option.
* SO_REUSEADDR is used because we want to reuse a port shortly after
* it was forecefully closed.
*
*/
int optname = 1;
int setsockopt_ret;
setsockopt_ret = setsockopt(listener_socket, IPPROTO_TCP,
TCP_NODELAY | SO_REUSEADDR , &optname, sizeof(int));
DIE(setsockopt_ret < 0, "server: tcp: setsockopt");
/*
* Bind the socket.
*
*/
int bind_ret_tcp;
bind_ret_tcp = bind(listener_socket, (struct sockaddr *) &server_addr_tcp,
sizeof(struct sockaddr_in));
DIE(bind_ret_tcp < 0, "server: tcp: bind");
/*
* Listen for incoming connections.
*
*/
int listen_ret;
listen_ret = listen(listener_socket, DEFAULT_BACKLOG);
DIE(listen_ret < 0, "server: tcp: listen");
return listener_socket;
}
int connect_to_server(char *server_ip, char *server_port) {
int sockfd;
struct sockaddr_in server_addr;
/*
* Convert the port.
*
*/
int port = atoi(server_port);
DIE(port == 0, "client: atoi: invalid port");
/*
* Create the socket.
*
*/
sockfd = socket(PF_INET, SOCK_STREAM, 0);
DIE(sockfd < 0, "client: socket");
/*
* Fill the info about the server the client want to connect to.
*
*/
server_addr.sin_family = PF_INET;
server_addr.sin_port = htons(port);
int inet_aton_ret = inet_aton(server_ip, &server_addr.sin_addr);
DIE(inet_aton_ret == 0, "client: inet_aton ");
/*
* Connect to server.
*
*/
int connect_ret = connect(sockfd, (struct sockaddr *) &server_addr,
sizeof(struct sockaddr));
DIE(connect_ret < 0, "client: connect");
return sockfd;
}
int accept_connection(int listener, list<ClientIO>& clients) {
struct sockaddr_in incoming_client;
socklen_t incoming_client_len = sizeof(struct sockaddr_in);
/*
* Accept the connection using accept.
*
*/
int new_socket = accept(listener, (struct sockaddr *) &incoming_client,
&incoming_client_len);
ACCEPT_CONNECTION_ERROR(new_socket < 0, "server: accept");
/*
* Disable Nagle buffering using setsockopt with TCP_NODELAY option.
*
*/
int optname = 1;
int setsockopt_ret;
setsockopt_ret = setsockopt(new_socket, IPPROTO_TCP,
TCP_NODELAY , &optname, sizeof(int));
DIE(setsockopt_ret < 0, "server: tcp: setsockopt");
/*
* Request the name of the client. If the #request_client_id returns
* an empty string then the connection closed and the connection cannot
* be accepted
*
*/
string client_id = request_client_id(new_socket);
if (client_id.empty() == true) {
cout << "Connection closed while trying to request the id of the client"
<< endl;
return ACCEPT_FAILED;
}
/*
* If the id already exists then reject the connection and send
* a reply about the rejection, else add the new id in the container.
*
*/
auto existent_client = find_if(clients.begin(), clients.end(),
[client_id] (ClientIO& cli) {return cli.clientID.compare(client_id) == 0;});
if (existent_client != clients.end()) {
cout << "Client already connected, refusing connection." << endl;
reply_connection_refused(new_socket);
return ACCEPT_FAILED;
}
clients.push_back({client_id, new_socket});
/*
* Print info about the newly connected client.
*
*/
cout << "New client " << client_id << " connected from " << ntohs(incoming_client.sin_port)
<< ":" << inet_ntoa(incoming_client.sin_addr) << endl;
return ACCEPT_SUCCESSFUL;
}

136
three_topics_payloads.json Normal file
View File

@ -0,0 +1,136 @@
[{
"description": "topic {topic_a} - type {INT} - value{10}",
"payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAo="
}, {
"description": "topic {topic_a} - type {INT} - value{-10}",
"payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAo="
}, {
"description": "topic {topic_a} - type {INT} - value{1234567890}",
"payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEmWAtI="
}, {
"description": "topic {topic_a} - type {INT} - value{-1234567890}",
"payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAUmWAtI="
}, {
"description": "topic {topic_a} - type {SHORT_REAL} - value{2.30}",
"payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAOY="
}, {
"description": "topic {topic_a} - type {SHORT_REAL} - value{655.05}",
"payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB/+E="
}, {
"description": "topic {topic_a} - type {SHORT_REAL} - value{17 / 17.00}",
"payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBqQ="
}, {
"description": "topic {topic_a} - type {FLOAT} - value{17}",
"payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAABEA"
}, {
"description": "topic {topic_a} - type {FLOAT} - value{-17}",
"payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQAAABEA"
}, {
"description": "topic {topic_a} - type {FLOAT} - value{1234.4321}",
"payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAC8XAEE"
}, {
"description": "topic {topic_a} - type {FLOAT} - value{-1234.4321}",
"payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQC8XAEE"
}, {
"description": "topic {topic_a} - type {FLOAT} - value{0.042}",
"payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAACoD"
}, {
"description": "topic {topic_a} - type {FLOAT} - value{-0.042}",
"payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQAAACoD"
}, {
"description": "topic {topic_a} - type {STRING} - value{Ana are mere}",
"payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADQW5hIGFyZSBtZXJl"
}, {
"description": "topic {topic_a} - type {STRING} - value{<1500 chars of \"abc...\">}",
"payload_base64": "dG9waWNfYQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFy"
}, {
"description": "topic {topic_b} - type {INT} - value{10}",
"payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAo="
}, {
"description": "topic {topic_b} - type {INT} - value{-10}",
"payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAo="
}, {
"description": "topic {topic_b} - type {INT} - value{1234567890}",
"payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEmWAtI="
}, {
"description": "topic {topic_b} - type {INT} - value{-1234567890}",
"payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAUmWAtI="
}, {
"description": "topic {topic_b} - type {SHORT_REAL} - value{2.30}",
"payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAOY="
}, {
"description": "topic {topic_b} - type {SHORT_REAL} - value{655.05}",
"payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB/+E="
}, {
"description": "topic {topic_b} - type {SHORT_REAL} - value{17 / 17.00}",
"payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBqQ="
}, {
"description": "topic {topic_b} - type {FLOAT} - value{17}",
"payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAABEA"
}, {
"description": "topic {topic_b} - type {FLOAT} - value{-17}",
"payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQAAABEA"
}, {
"description": "topic {topic_b} - type {FLOAT} - value{1234.4321}",
"payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAC8XAEE"
}, {
"description": "topic {topic_b} - type {FLOAT} - value{-1234.4321}",
"payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQC8XAEE"
}, {
"description": "topic {topic_b} - type {FLOAT} - value{0.042}",
"payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAACoD"
}, {
"description": "topic {topic_b} - type {FLOAT} - value{-0.042}",
"payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQAAACoD"
}, {
"description": "topic {topic_b} - type {STRING} - value{Ana are mere}",
"payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADQW5hIGFyZSBtZXJl"
}, {
"description": "topic {topic_b} - type {STRING} - value{<1500 chars of \"abc...\">}",
"payload_base64": "dG9waWNfYgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFy"
}, {
"description": "topic {topic_c} - type {INT} - value{10}",
"payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAo="
}, {
"description": "topic {topic_c} - type {INT} - value{-10}",
"payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAo="
}, {
"description": "topic {topic_c} - type {INT} - value{1234567890}",
"payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEmWAtI="
}, {
"description": "topic {topic_c} - type {INT} - value{-1234567890}",
"payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAUmWAtI="
}, {
"description": "topic {topic_c} - type {SHORT_REAL} - value{2.30}",
"payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAOY="
}, {
"description": "topic {topic_c} - type {SHORT_REAL} - value{655.05}",
"payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB/+E="
}, {
"description": "topic {topic_c} - type {SHORT_REAL} - value{17 / 17.00}",
"payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBqQ="
}, {
"description": "topic {topic_c} - type {FLOAT} - value{17}",
"payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAABEA"
}, {
"description": "topic {topic_c} - type {FLOAT} - value{-17}",
"payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQAAABEA"
}, {
"description": "topic {topic_c} - type {FLOAT} - value{1234.4321}",
"payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAC8XAEE"
}, {
"description": "topic {topic_c} - type {FLOAT} - value{-1234.4321}",
"payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQC8XAEE"
}, {
"description": "topic {topic_c} - type {FLOAT} - value{0.042}",
"payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAACoD"
}, {
"description": "topic {topic_c} - type {FLOAT} - value{-0.042}",
"payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAQAAACoD"
}, {
"description": "topic {topic_c} - type {STRING} - value{Ana are mere}",
"payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADQW5hIGFyZSBtZXJl"
}, {
"description": "topic {topic_c} - type {STRING} - value{<1500 chars of \"abc...\">}",
"payload_base64": "dG9waWNfYwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADYWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXphYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5emFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6YWJjZGVmZ2hpamtsbW5vcHFy"
}]

130
udp_client.py Normal file
View File

@ -0,0 +1,130 @@
__author__ = 'Dorinel Filip'
import socket
import base64
import sys
import random
import time
import json
import os
import argparse
from ipaddress import ip_address
from utils.unpriv_port import unprivileged_port_type, get_unprivileged_port_meta
import textwrap
def setup_parser():
def get_mode_help():
return textwrap.dedent(
'''Specifies the mode used for the load generator as following:
* all_once - send each payload in the list once
* manual - let you choose which message to send next
* random - continuously send random payloads from the list''')
def read_json_file(parent_parser, arg):
if not os.path.exists(arg):
parent_parser.error('The file "{}" does not exist!'.format(arg))
else:
with open(arg, 'r') as f:
return json.load(f) # return parsed JSON
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter,
description='UDP Client for Communication Protocols (2018-2019) Homework #2')
inputs = parser.add_argument_group('Input')
inputs.add_argument('--input_file', type=lambda arg: read_json_file(parser, arg), metavar='FILE',
default='sample_payloads.json',
help='JSON file to read payloads from (default: sample_payloads.json)')
server_address = parser.add_argument_group('Server Address & Port (required)')
server_address.add_argument('server_ip', help='Server IP', type=ip_address)
server_address.add_argument('server_port', help='Server Port', type=unprivileged_port_type)
source_address = parser.add_argument_group('Source Address')
source_address.add_argument('--source-address', type=ip_address, default='0.0.0.0',
help='IP Address to be bind by UDP client (default: unspecified)')
source_address.add_argument('--source-port', type=unprivileged_port_type, default=0,
metavar=get_unprivileged_port_meta('source-port'),
help='UDP port to be used as source for this client (default: random port)')
mode = parser.add_argument_group('Workload changing parameters')
mode.add_argument('--mode', default='all_once', type=str, choices=('all_once', 'manual', 'random'),
help=get_mode_help())
load = parser.add_argument_group('Load characteristics')
load.add_argument('--count', type=int,
help='Number of packets to be send (only used for when mode is random, default: infinity)')
load.add_argument('--delay', help='Wait time (in ms) between two messages (default: 0)', type=int, default=0)
return parser
def setup_socket(parsed_args):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((str(parsed_args.source_address), parsed_args.source_port))
return sock
def send_message(sock, message, parsed_args):
to_send = base64.standard_b64decode(message['payload_base64'])
sent = sock.sendto(to_send, (str(parsed_args.server_ip), parsed_args.server_port))
print('Sent ({}/{} bytes) << {} >>'.format(sent, len(to_send), message['description']))
time.sleep(parsed_args.delay / 1000)
def run_all_once(sock, parsed_args):
for message in parsed_args.input_file:
send_message(sock, message, parsed_args)
def run_manual(sock, parsed_args):
header = 'Please chose one of the following'
options = '\n'.join(['{}. {}'.format(i, x['description']) for i, x in enumerate(parsed_args.input_file)])
input_ask = 'Input a int ({} - {}): '.format(0, len(parsed_args.input_file) - 1)
for_exit_string = 'To end the program, input "exit"'
prompter_string = '\n'.join((header, options, for_exit_string))
print(prompter_string)
while True:
choice = input(input_ask)
if choice == 'exit':
return
choice = int(choice)
if (choice >= 0) and (choice < len(parsed_args.input_file)):
message = parsed_args.input_file[choice]
send_message(sock, message, parsed_args)
else:
print('Maybe try a valid option.')
def run_random(sock, parsed_args):
n = parsed_args.count
count = 0
while (n is None) or (count < n):
message = random.choice(parsed_args.input_file)
send_message(sock, message, parsed_args)
count += 1
def main():
# Parse arguments
parser = setup_parser()
parsed_args = parser.parse_args(sys.argv[1:])
sock = setup_socket(parsed_args)
print('Client source PORT: {}'.format(sock.getsockname()[1]))
if parsed_args.mode == 'all_once':
print('Running in all_once mode...\n')
run_all_once(sock, parsed_args)
elif parsed_args.mode == 'manual':
print('Running in manual mode...\n')
run_manual(sock, parsed_args)
else:
print('Running in random mode..\n')
run_random(sock, parsed_args)
if __name__ == '__main__':
main()

138
udp_handler.cpp Normal file
View File

@ -0,0 +1,138 @@
#include <iostream>
#include <string.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include "udp_handler.hpp"
#include "helpers.h"
int establish_udp_connection(int port) {
/*
* Create UDP socket.
*
*/
int listener_socket = socket(AF_INET, SOCK_DGRAM, 0);
DIE(listener_socket < 0, "server: udp: socket");
/*
* Define the attributes of the socket.
*
*/
struct sockaddr_in server_addr_udp;
server_addr_udp.sin_family = AF_INET;
server_addr_udp.sin_addr.s_addr = INADDR_ANY;
server_addr_udp.sin_port = htons(port);
/*
* Bind the socket.
*
*/
int bind_ret_udp;
bind_ret_udp = bind(listener_socket, (const struct sockaddr *) &server_addr_udp,
sizeof(struct sockaddr_in));
DIE(bind_ret_udp < 0, "server: udp: bind");
return listener_socket;
}
Topic parse_datagram(int socket) {
/*
* Read the datagram coming from @socket in @buffer.
*
*/
char buffer[UDP_DATAGRAM_LEN];
memset(buffer, 0x00, UDP_DATAGRAM_LEN);
/*
* Declare a sockaddr_in buffer for receiving information
* about the upd_client such as port and ip.
*
*/
struct sockaddr_in incoming_client;
socklen_t incoming_client_len = sizeof(struct sockaddr_in);
int read_bytes = recvfrom(socket, buffer, UDP_DATAGRAM_LEN, 0,
(struct sockaddr *) &incoming_client, &incoming_client_len);
PARSE_DATAGRAM_ERROR(read_bytes < 0, "server: add_new_topic: read");
/*
* Create a new topic and fill its fields.
*
*/
Topic topic;
memset(&topic, 0x00, sizeof(Topic));
/*
* IP
*
*/
topic.ip = incoming_client.sin_addr.s_addr;
/*
* PORT
*
*/
topic.port = ntohs(incoming_client.sin_port);
/*
* TOPIC
*
*/
strncpy(topic.topic, buffer, TOPIC);
/*
* DATA_TYPE
*
*/
topic.data_type = buffer[TOPIC];
/*
* VALUE
*
*/
if (topic.data_type == INT_BYTE) {
topic.value_sz = INT_BYTE_VALUE_SIZE;
memcpy(topic.value, buffer + TOPIC + 1, INT_BYTE_VALUE_SIZE);
} else if (topic.data_type == SHORT_REAL_BYTE) {
topic.value_sz = SHORT_REAL_BYTE_VALUE_SIZE;
memcpy(topic.value, buffer + TOPIC + 1, topic.value_sz);
} else if (topic.data_type == FLOAT_BYTE) {
topic.value_sz = FLOAT_BYTE_VALUE_SIZE;
memcpy(topic.value, buffer + TOPIC + 1, topic.value_sz);
} else if (topic.data_type == STRING_BYTE) {
topic.value_sz = strlen(buffer + TOPIC + 1);
memcpy(topic.value, buffer + TOPIC + 1, topic.value_sz);
}
/*
* If data type is not a value that was defined in topic.hpp it means
* that an overflow in topic.topic field occured and the received topic
* is not a valid one. Return emtpy topic in this case.
*
*/
else {
PARSE_DATAGRAM_ERROR(true, "overflow occured when receiving topic name");
}
return topic;
}