Source code for pyagar.messages

"""
``pyagar.messages``
===================

Protocol implementation.

"""
# pylint: disable=I0011,C0103
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from enum import Enum
import struct

INT8 = "b"
INT16 = "h"
INT32 = "i"
INT64 = "q"
UINT8 = "B"
UINT16 = "H"
UINT32 = "I"
UINT64 = "Q"
FLOAT32 = "f"
FLOAT64 = "d"

Cell = namedtuple("Cell", ['id', 'x', 'y', 'size', 'color', 'is_virus',
                           'name'])
Screen = namedtuple("Screen", ["x1", "y1", "x2", "y2"])
Camera = namedtuple("Camera", ["x", "y", "zoom"])
Player = namedtuple("Player", ["id", "name"])
Eat = namedtuple("Eat", ["eater", "eatee"])
Dissapear = namedtuple("Dissapear", ["id"])
PlayerID = namedtuple("PlayerID", ["id"])


[docs]class BaseMSG(metaclass=ABCMeta): """ All messages inherits from this class. Contains utility methods for unpack the data. """ def __init__(self, buf, offset=0): self.buf = buf self.offset = offset self.parse()
[docs] def get(self, ctype): """Unpack the given ``ctype`` and update the offset.""" s = struct.unpack_from(ctype, self.buf, offset=self.offset) self.offset += struct.calcsize(ctype) if len(s) == 1: return s[0] else: return list(s)
[docs] def getUint8(self): """Unpack an ``UINT8``.""" return self.get(UINT8)
[docs] def getInt8(self): """Unpack an ``INT8``.""" return self.get(INT8)
[docs] def getUint16(self): """Unpack an ``UINT16``.""" return self.get(UINT16)
[docs] def getInt16(self): """Unpack an ``INT16``.""" return self.get(INT16)
[docs] def getUint32(self): """Unpack an ``UINT32``.""" return self.get(UINT32)
[docs] def getInt32(self): """Unpack an ``INT32``.""" return self.get(INT32)
[docs] def getFloat32(self): """Unpack an ``FLOAT32``.""" return self.get(FLOAT32)
[docs] def getUint64(self): """Unpack an ``UINT64``.""" return self.get(UINT64)
[docs] def getInt64(self): """Unpack an ``INT64``.""" return self.get(INT64)
[docs] def getFloat64(self): """Unpack an ``FLOAT64``.""" return self.get(FLOAT64)
[docs] def string(self, ctype=UINT16): """Unpack a string.""" bls = struct.calcsize(ctype) def _get(): """Generate the secuence of characters to the Null.""" while True: d = struct.unpack_from(ctype, self.buf, offset=self.offset)[0] self.offset += bls if d == 0: break else: yield d return "".join(chr(c) for c in _get())
@abstractmethod
[docs] def parse(self): """This method must be implemented in the message subclass.""" pass
[docs]class Status(BaseMSG): """ The status of the stage. Who eats who, what is visible, what dissapears... """
[docs] def parse(self): """Unpacks the data.""" self.cells = [] self.eat = [] self.num = self.getUint16() for _ in range(self.num): self.eat.append(Eat(eater=self.getUint32(), eatee=self.getUint32())) _id = self.getUint32() while _id != 0: x = self.getInt32() y = self.getInt32() size = self.getInt16() # Get Cell Color color = "%06x" % (self.getUint8() << 16 | self.getUint8() << 8 | self.getUint8()) k = self.getUint8() is_virus = bool(k & 1) # r = (k & 16) if k & 2: self.offset += 4 elif k & 4: self.offset += 8 elif k & 8: self.offset += 16 name = self.string() or None self.cells.append(Cell(_id, x, y, size, color, is_virus, name)) _id = self.getUint32() self.balls_on_screen = self.getUint32() self.dissapears = [] for _ in range(self.balls_on_screen): self.dissapears.append(Dissapear(self.getUint32()))
def __repr__(self): return "Eat=%r\nCells=%r\nDissapears=%r\n" % (self.eat, self.cells, self.dissapears)
[docs]class Leaderboard(BaseMSG): """ The ``Leaderboard``. The top bigger cells in descending order. This message is only received in ``FFA`` and ``Experimental`` mode. """
[docs] def parse(self): """Unpacks the data.""" self.players = [] for _ in range(self.getUint32()): self.players.append(Player(id=self.getUint32(), name=self.string()))
def __repr__(self): return repr(self.players)
[docs]class TeamsScore(BaseMSG): """ The ``TeamScore``. The percent of mass of each team. This message is only received in the ``Team`` mode. """
[docs] def parse(self): """Unpacks the data.""" self.numteams = self.getUint32() self.players = [self.getFloat32() for i in range(self.numteams)]
def __repr__(self): return repr(self.players)
[docs]class ScreenAndCamera(BaseMSG): """ Screen and Camera position, all in one. This message is the first message in the stream. """
[docs] def parse(self): """Unpacks the data.""" self.screen = Screen(x1=self.getFloat64(), y1=self.getFloat64(), x2=self.getFloat64(), y2=self.getFloat64()) self.camera = Camera(x=(self.screen.x2 + self.screen.x1) / 2, y=(self.screen.y2 + self.screen.y1) / 2, zoom=1)
def __repr__(self): return "%s %s" % (self.screen, self.camera)
[docs]class CameraPosition(BaseMSG): """ Change in the camera position and/or the zoom. Only received in ``Spectate`` mode. """
[docs] def parse(self): """Unpacks the data.""" self.camera = Camera(x=self.getFloat32(), y=self.getFloat32(), zoom=self.getFloat32())
def __repr__(self): return repr(self.camera)
[docs]class PlayerCell(BaseMSG): """ ID of the player. """
[docs] def parse(self): """Unpacks the data.""" self.cell = PlayerID(self.getUint32())
def __repr__(self): return repr(self.cell)
[docs]class ResetSomething(BaseMSG): """ This message is present in the code but never seen. """
[docs] def parse(self): """Unpacks the data.""" pass
[docs]class SetQARA(BaseMSG): """ This message is present in the code but never seen. """
[docs] def parse(self): """Unpacks the data.""" self.ca = self.getInt16() self.da = self.getInt16() self.sa = True
[docs]class MSGType(Enum): """ This enum contains the identifier of each message along with the name of the class which parses it. """ Status = 16 CameraPosition = 17 ResetSomething = 20 SetQARA = 21 PlayerCell = 32 Leaderboard = 49 TeamsScore = 50 ScreenAndCamera = 64 @property def cls(self): """Returns the parser class of this enum.""" return globals().get(self.name)
[docs]class MSG(BaseMSG): """ All messages. This class identify the specific message type and calls the proper parser. """
[docs] def parse(self): """Unpacks the message identifier and instantiate the parser.""" c = self.get("B") if c == 240: self.offset += 5 c = self.get("B") self.msgtype = c try: msgcls = MSGType(self.msgtype).cls except ValueError: self.data = None else: self.data = msgcls(self.buf, self.offset)
def __repr__(self): if self.data is not None: return repr(self.data) else: return repr(self.msgtype)