Coverage for melissa/launcher/message.py: 81%

125 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-09-22 10:36 +0000

1#!/usr/bin/python3 

2 

3# Copyright (c) 2021-2022, Institut National de Recherche en Informatique et en Automatique (Inria) 

4# All rights reserved. 

5# 

6# Redistribution and use in source and binary forms, with or without 

7# modification, are permitted provided that the following conditions are met: 

8# 

9# * Redistributions of source code must retain the above copyright notice, this 

10# list of conditions and the following disclaimer. 

11# 

12# * Redistributions in binary form must reproduce the above copyright notice, 

13# this list of conditions and the following disclaimer in the documentation 

14# and/or other materials provided with the distribution. 

15# 

16# * Neither the name of the copyright holder nor the names of its 

17# contributors may be used to endorse or promote products derived from 

18# this software without specific prior written permission. 

19# 

20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 

23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE 

24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 

25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 

26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 

27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 

28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 

29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 

30 

31__all__ = [ 

32 "BYTES_PER_INT", "Type", "Message", "InvalidMessage", "Ping", "CommSize", 

33 "GroupSize", "JobSubmission", "JobUpdate", "JobCancellation", "Exit", "deserialize" 

34] 

35 

36import enum 

37 

38from melissa.utility.message import InvalidMessage, Message, bytes2int, int2bytes 

39from melissa.scheduler.job import State 

40 

41BYTES_PER_INT = 4 

42 

43 

44def _int2bytes(x: int) -> bytes: 

45 return int2bytes(x, num_bytes=BYTES_PER_INT) 

46 

47 

48class Type(enum.IntEnum): 

49 PING = 0 

50 JOB_SUBMISSION = 1 

51 JOB_UPDATE = 2 

52 JOB_CANCELLATION = 3 

53 COMM_SIZE = 4 

54 GROUP_SIZE = 5 

55 NO_MONITORING = 6 

56 EXIT = 255 

57 

58 def __str__(self) -> str: 

59 return self.name 

60 

61 

62class Ping(Message): 

63 def _serialize(self) -> bytes: 

64 return _int2bytes(Type.PING) 

65 

66 def __eq__(self, other: object) -> bool: 

67 return isinstance(other, Ping) 

68 

69 

70class Exit(Message): 

71 def __init__(self, status: int) -> None: 

72 assert status >= 0 

73 assert status <= 127 

74 self.status = status 

75 

76 def _serialize(self) -> bytes: 

77 return _int2bytes(Type.EXIT) + _int2bytes(self.status) 

78 

79 def __eq__(self, other: object) -> bool: 

80 if isinstance(other, Exit): 

81 return self.status == other.status 

82 return NotImplemented 

83 

84 

85class JobCancellation(Message): 

86 def __init__(self, job_id: int) -> None: 

87 self.job_id = job_id 

88 

89 def _serialize(self) -> bytes: 

90 return _int2bytes(Type.JOB_CANCELLATION) + _int2bytes(self.job_id) 

91 

92 def __eq__(self, other: object) -> bool: 

93 if isinstance(other, JobCancellation): 

94 return self.job_id == other.job_id 

95 return NotImplemented 

96 

97 

98class JobSubmission(Message): 

99 def __init__(self, initial_id: int, num_jobs: int) -> None: 

100 assert num_jobs > 0 

101 self.initial_id = initial_id 

102 self.num_jobs = num_jobs 

103 

104 def _serialize(self) -> bytes: 

105 return _int2bytes(Type.JOB_SUBMISSION) \ 

106 + _int2bytes(self.initial_id) \ 

107 + _int2bytes(self.num_jobs) 

108 

109 def __eq__(self, other: object) -> bool: 

110 if isinstance(other, JobSubmission): 

111 return self.initial_id == other.initial_id \ 

112 and self.num_jobs == other.num_jobs 

113 return NotImplemented 

114 

115 

116class JobUpdate(Message): 

117 def __init__(self, job_id: int, job_state: State) -> None: 

118 self.job_id = job_id 

119 self.job_state = job_state 

120 

121 def _serialize(self) -> bytes: 

122 return _int2bytes(Type.JOB_UPDATE) \ 

123 + _int2bytes(self.job_id) \ 

124 + _int2bytes(self.job_state) 

125 

126 def __eq__(self, other: object) -> bool: 

127 if isinstance(other, JobUpdate): 

128 return self.job_id == other.job_id \ 

129 and self.job_state == other.job_state 

130 return NotImplemented 

131 

132 

133class CommSize(Message): 

134 def __init__(self, comm_size: int) -> None: 

135 self.comm_size = comm_size 

136 

137 def _serialize(self) -> bytes: 

138 return _int2bytes(Type.COMM_SIZE) \ 

139 + _int2bytes(self.comm_size) 

140 

141 def __eq__(self, other: object) -> bool: 

142 if isinstance(other, CommSize): 

143 return self.comm_size == other.comm_size 

144 return NotImplemented 

145 

146 

147class GroupSize(Message): 

148 def __init__(self, group_size: int) -> None: 

149 self.group_size = group_size 

150 

151 def _serialize(self) -> bytes: 

152 return _int2bytes(Type.GROUP_SIZE) \ 

153 + _int2bytes(self.group_size) 

154 

155 def __eq__(self, other: object) -> bool: 

156 if isinstance(other, GroupSize): 

157 return self.group_size == other.group_size 

158 return NotImplemented 

159 

160 

161class StopTimeoutMonitoring(Message): 

162 def _serialize(self) -> bytes: 

163 return _int2bytes(Type.NO_MONITORING) 

164 

165 def __eq__(self, other: object) -> bool: 

166 return isinstance(other, StopTimeoutMonitoring) 

167 

168 

169def deserialize(bs: bytes) -> Message: 

170 if len(bs) < BYTES_PER_INT: 

171 return InvalidMessage( 

172 bs, "expected a packet of size at least {:d} bytes, got {:d} bytes". 

173 format(BYTES_PER_INT, len(bs)) 

174 ) 

175 

176 if len(bs) % BYTES_PER_INT != 0: 

177 return InvalidMessage( 

178 bs, 

179 "expected integers but packet size {:d} bytes is not divisible by {:d}" 

180 .format(len(bs), BYTES_PER_INT) 

181 ) 

182 

183 def b2i(index: int) -> int: 

184 first = index * BYTES_PER_INT 

185 last = (index + 1) * BYTES_PER_INT 

186 return bytes2int(bs[first:last]) 

187 

188 msg_type = b2i(0) 

189 # yapf: disable 

190 expected_num_bytes = ( 

191 1 * BYTES_PER_INT if msg_type == Type.PING else 

192 3 * BYTES_PER_INT if msg_type == Type.JOB_SUBMISSION else 

193 3 * BYTES_PER_INT if msg_type == Type.JOB_UPDATE else 

194 2 * BYTES_PER_INT if msg_type == Type.JOB_CANCELLATION else 

195 2 * BYTES_PER_INT if msg_type == Type.COMM_SIZE else 

196 2 * BYTES_PER_INT if msg_type == Type.GROUP_SIZE else 

197 1 * BYTES_PER_INT if msg_type == Type.NO_MONITORING else None 

198 ) 

199 # yapf: enable 

200 

201 if expected_num_bytes is not None and len(bs) != expected_num_bytes: 

202 return InvalidMessage( 

203 bs, "expected {:s} message of length {:d} bytes, got {:d}".format( 

204 str(Type(msg_type)), expected_num_bytes, len(bs) 

205 ) 

206 ) 

207 

208 if msg_type == Type.PING: 

209 return Ping() 

210 

211 if msg_type == Type.JOB_SUBMISSION: 

212 initial_job_id = b2i(1) 

213 num_jobs = b2i(2) 

214 return JobSubmission(initial_job_id, num_jobs) 

215 

216 if msg_type == Type.JOB_UPDATE: 

217 job_id = b2i(1) 

218 job_state = b2i(2) 

219 return JobUpdate(job_id, State(job_state)) 

220 

221 if msg_type == Type.JOB_CANCELLATION: 

222 job_id = b2i(1) 

223 return JobCancellation(job_id) 

224 

225 if msg_type == Type.COMM_SIZE: 

226 comm_size = b2i(1) 

227 return CommSize(comm_size) 

228 

229 if msg_type == Type.GROUP_SIZE: 

230 group_size = b2i(1) 

231 return GroupSize(group_size) 

232 

233 if msg_type == Type.NO_MONITORING: 

234 return StopTimeoutMonitoring() 

235 

236 if msg_type == Type.EXIT: 

237 status = b2i(1) 

238 return Exit(status) 

239 

240 return InvalidMessage(bs, "unknown message type {:d}".format(msg_type))