Coverage for melissa/launcher/processor.py: 50%
22 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-07-09 14:19 +0200
« prev ^ index » next coverage.py v7.6.12, created at 2025-07-09 14:19 +0200
1#!/usr/bin/python3
3# Copyright (c) 2021-2022, Institut National de Recherche en Informatique et en Automatique (Inria)
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are met:
8#
9# * Redistributions of source code must retain the above copyright notice, this
10# list of conditions and the following disclaimer.
11#
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15#
16# * Neither the name of the copyright holder nor the names of its
17# contributors may be used to endorse or promote products derived from
18# this software without specific prior written permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31from typing import List
33from melissa.utility import time
35from . import action, event
36from . import state_machine as sm
37from .monitoring import RestHttpServer
40class Processor:
42 _state: sm.State
44 def execute(self, ev: event.Event) -> List[action.Action]:
45 raise NotImplementedError("Processor.execute()")
48class DefaultProcessor(Processor):
49 def __init__(
50 self, cfg: sm.Configuration, initial_state: sm.State,
51 rest_server: RestHttpServer
52 ) -> None:
53 self._config = cfg
54 self._state = initial_state
55 self._rest_server = rest_server
57 def execute(self, ev: event.Event) -> List[action.Action]:
58 if isinstance(ev, event.HttpRequest_):
59 self._rest_server.handle_request()
60 return []
62 new_state, actions = sm.transition(
63 self._config, self._state, time.monotonic(), ev
64 )
65 self._state = new_state
66 self._rest_server.notify(ev, new_state, actions)
67 return actions