Skip to content

Commit

Permalink
Merge pull request #40 from stefanDeveloper/update-documentation
Browse files Browse the repository at this point in the history
Add docstrings and fix closing of LogServer
  • Loading branch information
stefanDeveloper authored Oct 7, 2024
2 parents 36d133f + 4893eec commit b285b1f
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 17 deletions.
97 changes: 82 additions & 15 deletions src/logserver/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@


class LogServer:
"""
Server for receiving, storing and sending single log lines. Opens a port for receiving messages, listens for log
lines via Kafka and reads newly added lines from an input file. To retrieve a log line from the server,
other modules can connect to its outgoing/sending port. The server will then send its oldest log line as a response.
"""

def __init__(self) -> None:
logger.debug("Initializing LogServer...")
self.host = None
Expand All @@ -45,7 +51,11 @@ def __init__(self) -> None:
self.data_queue = queue.Queue()
self.kafka_consume_handler = KafkaConsumeHandler(topic=LISTEN_ON_TOPIC)

async def open(self):
async def open(self) -> None:
"""
Opens both ports for sending and receiving and starts reading from the input file as well as listening for
messages via Kafka. Can be stopped via a ``KeyboardInterrupt``.
"""
logger.debug("Opening LogServer sockets...")
logger.debug(f"Creating the sending socket on port {self.port_out}...")
send_server = await asyncio.start_server(
Expand All @@ -70,13 +80,25 @@ async def open(self):
)
except KeyboardInterrupt:
logger.debug("Stop serving...")
finally:
send_server.close()
receive_server.close()
await asyncio.gather(
send_server.wait_closed(), receive_server.wait_closed()
)
logger.debug("Both sockets closed.")

send_server.close()
receive_server.close()
await asyncio.gather(send_server.wait_closed(), receive_server.wait_closed())
logger.debug("Both sockets closed.")
async def handle_connection(self, reader, writer, sending: bool) -> None:
"""
Handles new incoming connection attempts. If the maximum number of possible connections is not yet reached, the
connection is approved and the log line is sent or received, depending on the calling method. If the number is
reached, a warning message will be printed and no connection gets established.
async def handle_connection(self, reader, writer, sending: bool):
Args:
reader: Responsible for reading incoming data
writer: Responsible for writing outgoing data
sending (bool): Sending if True, receiving otherwise
"""
logger.debug(f"Handling connection with {sending=}...")
if self.number_of_connections < MAX_NUMBER_OF_CONNECTIONS:
logger.debug(
Expand Down Expand Up @@ -112,7 +134,11 @@ async def handle_connection(self, reader, writer, sending: bool):
writer.close()
await writer.wait_closed()

async def handle_kafka_inputs(self):
async def handle_kafka_inputs(self) -> None:
"""
Starts a loop to continuously listen on the configured Kafka topic. If a message is consumed, it is added
to the data queue.
"""
loop = asyncio.get_running_loop()

while True:
Expand All @@ -122,7 +148,14 @@ async def handle_kafka_inputs(self):
logger.info(f"Received message via Kafka:\n{value}")
self.data_queue.put(value)

async def async_follow(self, file: str = READ_FROM_FILE):
async def async_follow(self, file: str = READ_FROM_FILE) -> None:
"""
Continuously checks for new lines at the end of the input file. If one or multiple new lines are found, any
empty lines are removed and the remaining lines added to the data queue.
Args:
file (str): File to be read as string
"""
async with aiofiles.open(file, mode="r") as file:
# jump to end of file
await file.seek(0, 2)
Expand All @@ -142,16 +175,37 @@ async def async_follow(self, file: str = READ_FROM_FILE):
logger.info(f"Extracted message from file:\n{cleaned_line}")
self.data_queue.put(cleaned_line)

async def handle_send_logline(self, reader, writer):
async def handle_send_logline(self, reader, writer) -> None:
"""
Handles the sending of a logline by calling :meth:`handle_connection` with ``sending=True``.
Args:
reader: Responsible for reading incoming data
writer: Responsible for writing outgoing data
"""
logger.debug("Calling handle_connection with sending=True...")
await self.handle_connection(reader, writer, True)

async def handle_receive_logline(self, reader, writer):
async def handle_receive_logline(self, reader, writer) -> None:
"""
Handles the receiving of a logline by calling :meth:`handle_connection` with ``sending=False``.
Args:
reader: Responsible for reading incoming data
writer: Responsible for writing outgoing data
"""
logger.debug("Calling handle_connection with sending=False...")
await self.handle_connection(reader, writer, False)

@staticmethod
async def send_logline(writer, logline):
async def send_logline(writer, logline) -> None:
"""
Sends the given log line encoded as UTF-8 to the connected component.
Args:
writer: Responsible for writing outgoing data
logline: Logline to be sent
"""
if logline:
logger.debug(f"Sending {logline=}...")
writer.write(logline.encode("utf-8"))
Expand All @@ -161,7 +215,13 @@ async def send_logline(writer, logline):

logger.debug("No logline available")

async def receive_logline(self, reader):
async def receive_logline(self, reader) -> None:
"""
Receives a log line encoded as UTF-8 from the connected component and adds it to the data queue.
Args:
reader: Responsible for reading incoming data
"""
while True:
data = await reader.read(1024)
if not data:
Expand All @@ -171,16 +231,23 @@ async def receive_logline(self, reader):
self.data_queue.put(received_message)

def get_next_logline(self) -> str | None:
"""
Returns and removes the oldest log line in the data queue.
Returns:
Oldest log line in the data queue.
"""
logger.debug("Getting next available logline...")
if not self.data_queue.empty():
logger.debug("Returning logline...")
return self.data_queue.get()
return None

# TODO: Add a close method


def main():
def main() -> None:
"""
Creates the :class:`LogServer` instance and starts it.
"""
logger.info("Starting LogServer...")
server_instance = LogServer()
logger.debug("LogServer started. Opening sockets...")
Expand Down
23 changes: 21 additions & 2 deletions src/prefilter/prefilter.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ def __init__(self):
self.kafka_consume_handler = KafkaConsumeHandler(topic="Prefilter")
logger.debug("Initialized Prefilter.")

def get_and_fill_data(self):
def get_and_fill_data(self) -> None:
"""
Clears data already stored and consumes new data. Unpacks the data and checks if it is empty. If that is the
case, an info message is shown, otherwise the data is stored internally, including timestamps.
"""
logger.debug("Checking for existing data...")
if self.unfiltered_data:
logger.warning("Overwriting existing data by new message...")
Expand Down Expand Up @@ -84,6 +88,9 @@ def filter_by_error(self) -> None:
logger.info("Data successfully filtered.")

def send_filtered_data(self):
"""
Sends the filtered data if available via the :class:`KafkaProduceHandler`.
"""
if not self.unfiltered_data:
logger.debug("No unfiltered or filtered data is available.")
return
Expand Down Expand Up @@ -124,7 +131,19 @@ def clear_data(self):
logger.debug("Cleared data.")


def main(one_iteration: bool = False):
def main(one_iteration: bool = False) -> None:
"""
Runs the main loop with by
1. Retrieving new data,
2. Filtering the data and
3. Sending the filtered data if not empty.
Stops by a ``KeyboardInterrupt``, any internal data is lost.
Args:
one_iteration (bool): Only one iteration is done if True (for testing purposes). False by default.
"""
logger.info("Starting Prefilter...")
prefilter = Prefilter()
logger.info(f"Prefilter started.")
Expand Down

0 comments on commit b285b1f

Please sign in to comment.