Coverage for melissa/launcher/message.py: 81%
125 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-10 22:25 +0100
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-10 22:25 +0100
1#!/usr/bin/python3
3# Copyright (c) 2021-2022, Institut National de Recherche en Informatique et en Automatique (Inria)
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are met:
8#
9# * Redistributions of source code must retain the above copyright notice, this
10# list of conditions and the following disclaimer.
11#
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15#
16# * Neither the name of the copyright holder nor the names of its
17# contributors may be used to endorse or promote products derived from
18# this software without specific prior written permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31__all__ = [
32 "BYTES_PER_INT", "Type", "Message", "InvalidMessage", "Ping", "CommSize",
33 "GroupSize", "JobSubmission", "JobUpdate", "JobCancellation", "Exit", "deserialize"
34]
36import enum
38from melissa.utility.message import InvalidMessage, Message, bytes2int, int2bytes
39from melissa.scheduler.job import State
41BYTES_PER_INT = 4
44def _int2bytes(x: int) -> bytes:
45 return int2bytes(x, num_bytes=BYTES_PER_INT)
48class Type(enum.IntEnum):
49 PING = 0
50 JOB_SUBMISSION = 1
51 JOB_UPDATE = 2
52 JOB_CANCELLATION = 3
53 COMM_SIZE = 4
54 GROUP_SIZE = 5
55 NO_MONITORING = 6
56 EXIT = 255
58 def __str__(self) -> str:
59 return self.name
62class Ping(Message):
63 def _serialize(self) -> bytes:
64 return _int2bytes(Type.PING)
66 def __eq__(self, other: object) -> bool:
67 return isinstance(other, Ping)
70class Exit(Message):
71 def __init__(self, status: int) -> None:
72 assert status >= 0
73 assert status <= 127
74 self.status = status
76 def _serialize(self) -> bytes:
77 return _int2bytes(Type.EXIT) + _int2bytes(self.status)
79 def __eq__(self, other: object) -> bool:
80 if isinstance(other, Exit):
81 return self.status == other.status
82 return NotImplemented
85class JobCancellation(Message):
86 def __init__(self, job_id: int) -> None:
87 self.job_id = job_id
89 def _serialize(self) -> bytes:
90 return _int2bytes(Type.JOB_CANCELLATION) + _int2bytes(self.job_id)
92 def __eq__(self, other: object) -> bool:
93 if isinstance(other, JobCancellation):
94 return self.job_id == other.job_id
95 return NotImplemented
98class JobSubmission(Message):
99 def __init__(self, initial_id: int, num_jobs: int) -> None:
100 assert num_jobs > 0
101 self.initial_id = initial_id
102 self.num_jobs = num_jobs
104 def _serialize(self) -> bytes:
105 return _int2bytes(Type.JOB_SUBMISSION) \
106 + _int2bytes(self.initial_id) \
107 + _int2bytes(self.num_jobs)
109 def __eq__(self, other: object) -> bool:
110 if isinstance(other, JobSubmission):
111 return self.initial_id == other.initial_id \
112 and self.num_jobs == other.num_jobs
113 return NotImplemented
116class JobUpdate(Message):
117 def __init__(self, job_id: int, job_state: State) -> None:
118 self.job_id = job_id
119 self.job_state = job_state
121 def _serialize(self) -> bytes:
122 return _int2bytes(Type.JOB_UPDATE) \
123 + _int2bytes(self.job_id) \
124 + _int2bytes(self.job_state)
126 def __eq__(self, other: object) -> bool:
127 if isinstance(other, JobUpdate):
128 return self.job_id == other.job_id \
129 and self.job_state == other.job_state
130 return NotImplemented
133class CommSize(Message):
134 def __init__(self, comm_size: int) -> None:
135 self.comm_size = comm_size
137 def _serialize(self) -> bytes:
138 return _int2bytes(Type.COMM_SIZE) \
139 + _int2bytes(self.comm_size)
141 def __eq__(self, other: object) -> bool:
142 if isinstance(other, CommSize):
143 return self.comm_size == other.comm_size
144 return NotImplemented
147class GroupSize(Message):
148 def __init__(self, group_size: int) -> None:
149 self.group_size = group_size
151 def _serialize(self) -> bytes:
152 return _int2bytes(Type.GROUP_SIZE) \
153 + _int2bytes(self.group_size)
155 def __eq__(self, other: object) -> bool:
156 if isinstance(other, GroupSize):
157 return self.group_size == other.group_size
158 return NotImplemented
161class StopTimeoutMonitoring(Message):
162 def _serialize(self) -> bytes:
163 return _int2bytes(Type.NO_MONITORING)
165 def __eq__(self, other: object) -> bool:
166 return isinstance(other, StopTimeoutMonitoring)
169def deserialize(bs: bytes) -> Message:
170 if len(bs) < BYTES_PER_INT:
171 return InvalidMessage(
172 bs, "expected a packet of size at least {:d} bytes, got {:d} bytes".
173 format(BYTES_PER_INT, len(bs))
174 )
176 if len(bs) % BYTES_PER_INT != 0:
177 return InvalidMessage(
178 bs,
179 "expected integers but packet size {:d} bytes is not divisible by {:d}"
180 .format(len(bs), BYTES_PER_INT)
181 )
183 def b2i(index: int) -> int:
184 first = index * BYTES_PER_INT
185 last = (index + 1) * BYTES_PER_INT
186 return bytes2int(bs[first:last])
188 msg_type = b2i(0)
189 # yapf: disable
190 expected_num_bytes = (
191 1 * BYTES_PER_INT if msg_type == Type.PING else
192 3 * BYTES_PER_INT if msg_type == Type.JOB_SUBMISSION else
193 3 * BYTES_PER_INT if msg_type == Type.JOB_UPDATE else
194 2 * BYTES_PER_INT if msg_type == Type.JOB_CANCELLATION else
195 2 * BYTES_PER_INT if msg_type == Type.COMM_SIZE else
196 2 * BYTES_PER_INT if msg_type == Type.GROUP_SIZE else
197 1 * BYTES_PER_INT if msg_type == Type.NO_MONITORING else None
198 )
199 # yapf: enable
201 if expected_num_bytes is not None and len(bs) != expected_num_bytes:
202 return InvalidMessage(
203 bs, "expected {:s} message of length {:d} bytes, got {:d}".format(
204 str(Type(msg_type)), expected_num_bytes, len(bs)
205 )
206 )
208 if msg_type == Type.PING:
209 return Ping()
211 if msg_type == Type.JOB_SUBMISSION:
212 initial_job_id = b2i(1)
213 num_jobs = b2i(2)
214 return JobSubmission(initial_job_id, num_jobs)
216 if msg_type == Type.JOB_UPDATE:
217 job_id = b2i(1)
218 job_state = b2i(2)
219 return JobUpdate(job_id, State(job_state))
221 if msg_type == Type.JOB_CANCELLATION:
222 job_id = b2i(1)
223 return JobCancellation(job_id)
225 if msg_type == Type.COMM_SIZE:
226 comm_size = b2i(1)
227 return CommSize(comm_size)
229 if msg_type == Type.GROUP_SIZE:
230 group_size = b2i(1)
231 return GroupSize(group_size)
233 if msg_type == Type.NO_MONITORING:
234 return StopTimeoutMonitoring()
236 if msg_type == Type.EXIT:
237 status = b2i(1)
238 return Exit(status)
240 return InvalidMessage(bs, "unknown message type {:d}".format(msg_type))