mosfet/mosfet/protocol/types.py

218 lines
6.4 KiB
Python

from __future__ import division
import struct
from minecraft.networking.types.basic import (
Type, Byte, Short, Integer, Long, Float, Double,
ShortPrefixedByteArray, Boolean, VarInt, TrailingByteArray,
Position, String, UnsignedByte
)
from minecraft.networking.types.utility import Vector
class IntegerPrefixedByteArray(Type):
@staticmethod
def read(file_object):
length = Integer.read(file_object)
return struct.unpack(str(length) + "s", file_object.read(length))[0]
@staticmethod
def send(value, socket):
Integer.send(len(value), socket)
socket.send(value)
TAG_End = 0
TAG_Byte = 1
TAG_Short = 2
TAG_Int = 3
TAG_Long = 4
TAG_Float = 5
TAG_Double = 6
TAG_Byte_Array = 7
TAG_String = 8
TAG_List = 9
TAG_Compound = 10
TAG_Int_Array = 11
TAG_Long_Array = 12
class Nbt(Type):
@staticmethod
def read(file_object):
type_id = Byte.read(file_object)
if type_id != TAG_Compound:
#raise Exception("Invalid NBT header")
return None
name = ShortPrefixedByteArray.read(file_object).decode('utf-8')
a = Nbt.decode_tag(file_object, TAG_Compound)
a['_name'] = name
return a
@staticmethod
def decode_tag(file_object, type_id):
if type_id == TAG_Byte:
return Byte.read(file_object)
elif type_id == TAG_Short:
return Short.read(file_object)
elif type_id == TAG_Int:
return Integer.read(file_object)
elif type_id == TAG_Long:
return Long.read(file_object)
elif type_id == TAG_Float:
return Float.read(file_object)
elif type_id == TAG_Double:
return Double.read(file_object)
elif type_id == TAG_Byte_Array:
return IntegerPrefixedByteArray.read(file_object).decode('utf-8')
elif type_id == TAG_String:
return ShortPrefixedByteArray.read(file_object)
elif type_id == TAG_List:
list_type_id = Byte.read(file_object)
size = Integer.read(file_object)
a = []
for i in range(size):
a.append(Nbt.decode_tag(file_object, list_type_id))
return a
elif type_id == TAG_Compound:
c = { }
child_type_id = Byte.read(file_object)
while child_type_id != TAG_End:
child_name = ShortPrefixedByteArray.read(file_object).decode('utf-8')
c[child_name] = Nbt.decode_tag(file_object, child_type_id)
child_type_id = Byte.read(file_object)
return c
elif type_id == TAG_Int_Array:
size = Integer.read(file_object)
a = []
for i in range(size):
a.append(Integer.read(file_object))
return a
elif type_id == TAG_Long_Array:
size = Integer.read(file_object)
a = []
for i in range(size):
a.append(Long.read(file_object))
return a
else:
raise Exception("Invalid NBT tag type")
@staticmethod
def send(value, socket):
# TODO
pass
class Slot(Type):
def __init__(self, present, item_id=None, item_count=None, nbt=None):
self.present = present
self.item_id = item_id
self.item_count = item_count
self.nbt = nbt
def __str__(self):
return str(self.__dict__)
def __repr__(self):
if self.present:
return 'Slot(present={}, item_id={}, item_count={}, nbt={}'.format(
self.present, self.item_id, self.item_count, self.nbt)
else:
return 'Slot(present={})'.format(self.present)
@staticmethod
def read(file_object):
present = Boolean.read(file_object)
if present:
item_id = VarInt.read(file_object)
item_count = Byte.read(file_object)
nbt = Nbt.read(file_object)
else:
item_id = None
item_count = None
nbt = None
return Slot(present, item_id, item_count, nbt)
@staticmethod
def send(value, socket):
Boolean.send(value.present, socket)
if value.present:
VarInt.send(value.item_id, socket)
Byte.send(value.item_count, socket)
Byte.send(0x00, socket)
class Entry(Type):
types = {
0: Byte,
1: VarInt,
2: Float,
3: String,
5: Boolean,
6: Slot,
7: Boolean,
9: Position,
18: VarInt,
}
def __init__(self, index, type, value):
self.index = index
self.type = type
self.value = value
def __str__(self):
return str(self.__dict__)
def __repr__(self):
return 'Entry(index={}, type={}, value={})'.format(
self.index, self.type, self.value)
@staticmethod
def read(file_object, context):
index = UnsignedByte.read(file_object)
if index == 0xff: return None
type = VarInt.read(file_object)
try:
value = Entry.types[type].read(file_object)
except TypeError:
value = Entry.types[type].read_with_context(file_object, context)
except KeyError:
return None
return Entry(index, type, value)
class Trade(Type):
fields = (
'input_item_1',
'output_item',
'has_second_item',
'input_item_2',
'trade_disabled',
'num_uses',
'max_num_uses',
'xp',
'special_price',
'price_multiplier',
'demand',
)
def __str__(self):
return str(self.__dict__)
def __repr__(self):
inner_str = ', '.join('%s=%s' % (a, getattr(self, a, None)) for a in self.fields if hasattr(self, a))
return 'Trade(%s)' % inner_str
@staticmethod
def read(file_object):
trade = Trade()
trade.input_item_1 = Slot.read(file_object)
trade.output_item = Slot.read(file_object)
trade.has_second_item = Boolean.read(file_object)
if trade.has_second_item:
trade.input_item_2 = Slot.read(file_object)
trade.trade_disabled = Boolean.read(file_object)
trade.num_uses = Integer.read(file_object)
trade.max_num_uses = Integer.read(file_object)
trade.xp = Integer.read(file_object)
trade.special_price = Integer.read(file_object)
trade.price_multiplier = Float.read(file_object)
trade.demand = Integer.read(file_object)
return trade