import socket, sys, time from select import select class Socket: def __init__(self,server): self.sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.sock.connect(server) self.sock.setblocking(0) self._read_buffer=b"" def read(self, timeout=None): try: ready=select([self.sock],[],[],timeout) if ready[0]: data=self.sock.recv(4096) if not data: return [] else: return [] except: return traceback.print_exc() data = self._read_buffer+data self._read_buffer=b"" lines = [line.strip(b"\r") for line in data.split(b"\n")] if lines[-1]: self._read_buffer=lines[-1] lines.pop(-1) lines = [line.decode("utf-8") for line in lines] return lines def send(self,line): self.sock.send(line.encode("utf-8")) def close(self): self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() def unescape(value): return value.replace(r"\:",";").replace(r"\s"," ").replace(r"\\","\\").replace(r"\r","\r").replace(r"\n","\n") def escape(value): return value.replace(";",r"\:").replace(" ",r"\s").replace("\\",r"\\").replace("\r",r"\r").replace("\n",r"\n") MISSING = None class IRCLine: def __init__(self,command,*params,tags=dict(),hostmask=""): self.command=command if len(params)==0: self.params=[] elif len(params)==1 and type(params[0]) in (list,tuple): self.params=list(params[0]) else: self.params=list(params) self.tags=tags self.hostmask=hostmask if hostmask else None @property def line(self): prefix="" if len(list(self.tags.keys()))>0: tagc = len(list(self.tags.keys())) prefix+="@" for i,tag in enumerate(self.tags.keys()): prefix+=tag if self.tags[tag] is not MISSING: prefix+="="+escape(str(self.tags[tag])) if (i+1)0 and not parts[i].startswith(":"): i-=1 if i!=0: parts[i:]=[" ".join(parts[i:])] return cls(*parts,tags=tags,hostmask=hostmask) def encode(self,*args,**kwargs): # clearly, if we're here, I'm an idiot and am trying to send an # IRCLine object down the tube. just do it. return self.line.encode(*args,**kwargs) channel="#chaos" nick="m" server="localhost" port=6667 timeout=10 message="RIP jmw2020 2020-2020" privmsg = IRCLine("PRIVMSG",channel,":"+message).line sock = Socket((server,port)) sock.send(IRCLine("NICK",nick).line) sock.send(IRCLine("USER",nick,"8","*",":"+nick).line) running=True while running: lines = sock.read() if lines and any(["376" in x for x in lines]): running = False sock.send(IRCLine("JOIN",channel).line) running=True while running: read=False for line in sock.read(timeout): read=True line = IRCLine.parse_line(line) if line.command=="PING": newline = IRCLine("PONG",line.params) sock.send(newline.line) if line.command=="PRIVMSG": if line.params[-1].startswith(":setmessage ") and line.hostmask.endswith("khuxkm@sudoers.tilde.team"): privmsg=IRCLine("PRIVMSG",channel,":"+line.params[-1][len(":setmessage "):]).line if line.params[-1].startswith(":settimeout ") and line.hostmask.endswith("khuxkm@sudoers.tilde.team"): try: timeout=int(line.params[-1][len(":settimeout "):]) except: pass sock.send(privmsg) if not read: time.sleep(timeout)