pygbot/irc.py

83 lines
2.7 KiB
Python

import ssl
import sys
import time
import socket
class IRC:
def __init__(self, sock=None):
if sock is None:
self.sock = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
else:
self.sock = sock
def connect(self, host, port, nick, realname, username, password=None, tls=False):
ctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
if tls == True:
self.sock = ctx.wrap_socket(self.sock)
time.sleep(1)
self.sock.connect((host, port))
time.sleep(5)
self.sock.send(bytes(f"NICK {nick}\n", "UTF-8"))
time.sleep(1)
self.sock.send(bytes(("USER "+ nick + " " + username + " " + nick + " :" + realname +"\n"), "UTF-8"))
time.sleep(1)
def rawsend(self, msg):
totalsent = 0
while totalsent < len(msg):
sent = self.sock.send(msg[totalsent:])
if sent <= 0:
raise RuntimeError("socket connection broken")
totalsent = totalsent + sent
def send(self, receiver, msg):
self.rawsend(bytes(f"PRIVMSG {receiver} :{msg}\n", "UTF-8"))
def recv(self, msglen=2048):
chunks = []
bytes_recd = 0
while bytes_recd < msglen:
chunk = self.sock.recv(msglen - bytes_recd)
if chunk == b'':
raise RuntimeError("socket connection broken")
chunks.append(chunk)
bytes_recd = bytes_recd + len(chunk)
return b''.join(chunks)
def join(self, channel):
self.rawsend(bytes(f"JOIN {channel}\n", "UTF-8"))
def kick(self, channel, nick):
self.rawsend(bytes(f"KICK {channel} {nick}\n", "UTF-8"))
def setnick(self, nick):
self.rawsend(bytes(f"NICK {nick}\n", "UTF-8"))
def command_parser(self, line):
args = line.split()
if len(args) > 1 and args[0] == "PING":
self.rawsend(bytes('PONG ' + args[1] + '\r\n', "UTF-8"))
return ''
if len(args) > 2 and args[1] == "INVITE":
self.join(args[3][1:])
return ''
if len(args) > 2 and args[1] == "PRIVMSG":
return args
else:
return ''
def get_response(self, msglen=1):
resp = self.recv(msglen)
try:
resp = resp.decode("UTF-8")
except:
return ''
return resp
def shutdown(self, sig=None, frame=None, msg=None):
self.rawsend(b"QUIT Caught SIGINT\n") if msg == None else self.rawsend(bytes(f"QUIT bye\n", "UTF-8"))
self.sock.shutdown(2) #SHUT_RDWR
self.sock.close()
sys.exit(0)