Bread_Editor/ipc.py

83 lines
2.2 KiB
Python
Raw Normal View History

#!/usr/bin/python3
import sys
import socket
import json
import threading
HOST = "127.0.0.1"
PORT = 58592
class IPC:
def __init__(self, app):
self.app = app
self.first_instance = not self.already_running()
self.server_thread = threading.Thread(target=self.server)
self.server_thread.daemon = True
if self.first_instance:
self.server_thread.start()
else:
self.send_open_file_message(sys.argv[1:])
def already_running(self):
try:
with socket.create_connection((HOST, PORT), timeout=1):
return True
except (ConnectionRefusedError, OSError):
return False
def server(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
server_socket.bind((HOST, PORT))
server_socket.listen(1)
while True:
conn, addr = server_socket.accept()
with conn:
data = conn.recv(1024)
if data:
message = data.decode('utf-8')
self.evaluate_message(message)
def send(self, message):
try:
with socket.create_connection((HOST, PORT)) as client_socket:
client_socket.sendall(message.encode('utf-8'))
except Exception as error:
print("IPC-error:", error)
def evaluate_message(self, raw_message):
data_dict = json.loads(raw_message)
message = Message(data_dict["type"], data_dict["content"])
match message.type:
case "open_files":
self.app.open_files_queue += message.content
def send_open_file_message(self, filenames):
message = Message("open_files", filenames)
self.send(message.get_message_data())
class Message:
def __init__(self, message_type: str, content):
self.type = message_type
self.content = content
def get_message_data(self):
data_dict = self.__dict__
data_dict = dict(filter(lambda pair: not callable(pair[1]), data_dict.items())) # filter out functions
data = json.dumps(data_dict)
return data