Fixed bugs, added auto-reconnect feature to socket client
This commit is contained in:
parent
e55717f601
commit
c462ab1be3
|
@ -10,21 +10,26 @@ Socket Client
|
|||
from abots.helpers import eprint, cast, jots, jsto, utc_now_timestamp
|
||||
|
||||
from struct import pack, unpack
|
||||
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
|
||||
from socket import socket, timeout as sock_timeout
|
||||
from socket import AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
|
||||
from ssl import wrap_socket
|
||||
from threading import Thread, Event
|
||||
from queue import Queue, Empty
|
||||
from time import sleep
|
||||
from random import randint
|
||||
|
||||
class SocketClient(Thread):
|
||||
def __init__(self, host, port, buffer_size=4096, secure=False,
|
||||
timeout=None):
|
||||
timeout=None, daemon=False, reconnects=10):
|
||||
super().__init__()
|
||||
self.setDaemon(daemon)
|
||||
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.buffer_size = buffer_size
|
||||
self.secure = secure
|
||||
self.timeout = timeout
|
||||
self.reconnects = reconnects
|
||||
self.sock = socket(AF_INET, SOCK_STREAM)
|
||||
if self.secure:
|
||||
self.sock = wrap_socket(self.sock, **kwargs)
|
||||
|
@ -35,6 +40,8 @@ class SocketClient(Thread):
|
|||
self.kill_switch = Event()
|
||||
self.ready = Event()
|
||||
self.stopped = Event()
|
||||
self.broken = Event()
|
||||
self.reconnecting = Event()
|
||||
|
||||
self._inbox = Queue()
|
||||
self._events = Queue()
|
||||
|
@ -52,9 +59,9 @@ class SocketClient(Thread):
|
|||
self.sock.settimeout(1)
|
||||
try:
|
||||
self.sock.connect(self.connection)
|
||||
except OSError as e:
|
||||
return e
|
||||
return None
|
||||
except Exception as e:
|
||||
return True, e
|
||||
return False, None
|
||||
|
||||
def _obtain(self, queue, timeout=False):
|
||||
if timeout is False:
|
||||
|
@ -72,6 +79,8 @@ class SocketClient(Thread):
|
|||
def _queue_thread(self, inbox, timeout):
|
||||
while not self.kill_switch.is_set():
|
||||
for message in self._obtain(inbox, timeout):
|
||||
if self.broken.is_set():
|
||||
self.reconnecting.wait()
|
||||
self.send_message(message)
|
||||
|
||||
def _recv_bytes(self, get_bytes, decode=True):
|
||||
|
@ -93,6 +102,8 @@ class SocketClient(Thread):
|
|||
packet = self.sock.recv(bufsize)
|
||||
# The socket can either be broken or no longer open at all
|
||||
except (BrokenPipeError, OSError) as e:
|
||||
if not isinstance(e, sock_timeout):
|
||||
self._attempt_reconnect()
|
||||
return None
|
||||
data = data + packet
|
||||
return data.decode() if decode else data
|
||||
|
@ -121,12 +132,46 @@ class SocketClient(Thread):
|
|||
except OSError:
|
||||
return None
|
||||
|
||||
def _attempt_reconnect(self):
|
||||
if self.kill_switch.is_set():
|
||||
return
|
||||
print("BROKEN!")
|
||||
self.reconnecting.clear()
|
||||
self.broken.set()
|
||||
event = dict()
|
||||
event["name"] = "socket-down"
|
||||
event["data"] = dict()
|
||||
event["data"]["when"] = utc_now_timestamp()
|
||||
self._send_event(event)
|
||||
attempts = 0
|
||||
while attempts <= self.reconnects or not self.kill_switch.is_set():
|
||||
# Need to be run to prevent ConnectionAbortedError
|
||||
self.sock.__init__()
|
||||
err, report = self._prepare()
|
||||
if not err:
|
||||
self.reconnecting.set()
|
||||
self.broken.clear()
|
||||
event = dict()
|
||||
event["name"] = "socket-up"
|
||||
event["data"] = dict()
|
||||
event["data"]["when"] = utc_now_timestamp()
|
||||
self._send_event(event)
|
||||
return
|
||||
# Exponential backoff
|
||||
attempts = attempts + 1
|
||||
max_delay = (2**attempts) - 1
|
||||
delay = randint(0, max_delay)
|
||||
sleep(delay)
|
||||
self.stop()
|
||||
|
||||
def send_message(self, message, *args):
|
||||
packaged = self._package_message(message, *args)
|
||||
try:
|
||||
self.sock.send(packaged)
|
||||
except OSError:
|
||||
self.stop()
|
||||
except (BrokenPipeError, OSError) as e:
|
||||
if not isinstance(e, sock_timeout):
|
||||
self._attempt_reconnect()
|
||||
self._attempt_reconnect()
|
||||
|
||||
def recv(self):
|
||||
return [letter for letter in self._obtain(self._outbox)]
|
||||
|
@ -135,15 +180,17 @@ class SocketClient(Thread):
|
|||
self._inbox.put(message)
|
||||
|
||||
def run(self):
|
||||
err = self._prepare()
|
||||
if err is not None:
|
||||
eprint(err)
|
||||
return err
|
||||
err, report = self._prepare()
|
||||
if err:
|
||||
eprint(report)
|
||||
return report
|
||||
queue_args = self._inbox, self.timeout
|
||||
Thread(target=self._queue_thread, args=queue_args).start()
|
||||
print("Client ready!")
|
||||
self.ready.set()
|
||||
while self.running:
|
||||
if self.broken.is_set():
|
||||
self.reconnecting.wait()
|
||||
message = self._get_message()
|
||||
if message is None:
|
||||
continue
|
||||
|
@ -151,12 +198,12 @@ class SocketClient(Thread):
|
|||
|
||||
def stop(self, done=None):
|
||||
# print("Stopping client!")
|
||||
self.kill_switch.set()
|
||||
event = dict()
|
||||
event["name"] = "closing"
|
||||
event["data"] = dict()
|
||||
event["data"]["when"] = utc_now_timestamp()
|
||||
self._send_event(event)
|
||||
self.kill_switch.set()
|
||||
self.running = False
|
||||
self.sock.close()
|
||||
self.stopped.set()
|
||||
|
|
|
@ -21,8 +21,9 @@ from queue import Queue, Empty
|
|||
|
||||
class SocketServer(Thread):
|
||||
def __init__(self, host, port, listeners=5, buffer_size=4096,
|
||||
secure=False, timeout=None):
|
||||
secure=False, timeout=None, daemon=False):
|
||||
super().__init__()
|
||||
self.setDaemon(daemon)
|
||||
|
||||
# The connection information for server, the clients will use this to
|
||||
# connect to the server
|
||||
|
@ -74,9 +75,7 @@ class SocketServer(Thread):
|
|||
client_host, client_port = address
|
||||
self.sockets.append(sock)
|
||||
|
||||
# Spawn new thread for client
|
||||
client_kill = Event()
|
||||
|
||||
client_uuid = sha256()
|
||||
self.uuids[client_uuid] = dict()
|
||||
self.uuids[client_uuid]["sock"] = sock
|
||||
|
@ -109,8 +108,17 @@ class SocketServer(Thread):
|
|||
# Send message and uuid of sender to outbox queue
|
||||
letter = uuid, message
|
||||
self._outbox.put(letter)
|
||||
self.close_sock(uuid)
|
||||
return
|
||||
|
||||
def _queue_thread(self, inbox, timeout):
|
||||
while not self.kill_switch.is_set():
|
||||
for letter in self._obtain(inbox, timeout):
|
||||
if len(letter) != 2:
|
||||
continue
|
||||
uuid, message = letter
|
||||
if uuid == "cast":
|
||||
self.broadcast_message(uuid, message)
|
||||
else:
|
||||
self.send_message(uuid, message)
|
||||
|
||||
def _obtain(self, queue, timeout=False):
|
||||
if timeout is False:
|
||||
|
@ -125,17 +133,6 @@ class SocketServer(Thread):
|
|||
except Empty:
|
||||
break
|
||||
|
||||
def _queue_thread(self, inbox, timeout):
|
||||
while not self.kill_switch.is_set():
|
||||
for letter in self._obtain(inbox, timeout):
|
||||
if len(letter) != 2:
|
||||
continue
|
||||
uuid, message = letter
|
||||
if uuid == "cast":
|
||||
self.broadcast_message(uuid, message)
|
||||
else:
|
||||
self.send_message(uuid, message)
|
||||
|
||||
# Prepares socket server before starting it
|
||||
def _prepare(self):
|
||||
self.sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
|
||||
|
@ -248,12 +245,13 @@ class SocketServer(Thread):
|
|||
return err
|
||||
queue_args = self._inbox, self.timeout
|
||||
Thread(target=self._queue_thread, args=queue_args).start()
|
||||
print("Server ready!")
|
||||
# print("Server ready!")
|
||||
self.ready.set()
|
||||
while not self.kill_switch.is_set():
|
||||
try:
|
||||
# Accept new socket client
|
||||
client_sock, client_address = self.sock.accept()
|
||||
# print(client_address)
|
||||
self._new_client(client_sock, client_address)
|
||||
# The socket can either be broken or no longer open at all
|
||||
except (BrokenPipeError, OSError) as e:
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
#!env/bin/python3
|
||||
|
||||
from abots.net import SocketServer, SocketClient
|
||||
from abots.events import ThreadMarshal
|
||||
|
||||
pool_size = 2
|
||||
marshal = ThreadMarshal(2)
|
||||
server = SocketServer("localhost", 10401, timeout=3)
|
||||
client = SocketClient("localhost", 10401, timeout=3)
|
||||
host = "localhost"
|
||||
port = 10401
|
||||
timeout = 3
|
||||
|
||||
marshal.reserve(server.start)
|
||||
server = SocketServer(host, port, timeout=timeout)
|
||||
client = SocketClient(host, port, timeout=timeout)
|
||||
|
||||
server.start()
|
||||
server.ready.wait()
|
||||
marshal.reserve(client.start)
|
||||
client.start()
|
Loading…
Reference in New Issue