There are some code samples from the backend. Note that all code is not shared. Since project is still a private project and on going project..
Base Classes / Interfaces
IWatchableService.py
import threading
from api.base.crypto_engine.MessageApi.debug import *
'''
#This is a Interface-Abstraction class hybrid class.
#Service classes should inherit from this class and must implement start and init functions
int start function , service class Must implement it's routine function.
in init function service class Must implement it's constructor that Must return it self.
So with IWatchAbleService you may implement your servce without taking care of the situations that your service has stopped. Watchable service will re-start your service
'''
class IWatchableService:
INFINITE = -1 # every time when thread deads , re-start it.
def __init__(self,name:str,max_restart_count:int = -1):
self.__instance_obj = self
self.__start_op = getattr(self,"start",None)
self.__instance_constructor = getattr(self,"init",None)
if self.__instance_constructor == None or callable(self.__instance_constructor) != True:
raise NotImplemented("Child of WatchableServiec must implement init function! And implement necesarry setup there cause we will have to create a new instance with new thread!")
if self.__start_op == None or callable(self.__start_op) != True:
raise NotImplemented("Child of WatchableServiec must implement start function!")
self.__name = name
self.__restarted_count = 0
self.__max_restart_count = max_restart_count
def _run(self): # if -1 , re-start dead thread for forever.
while self.__restarted_count < self.__max_restart_count or self.__max_restart_count < 0:
if (self.__restarted_count != 0):
new_obj = self.__instance_constructor()
self.__instance_obj = new_obj
self.__current_thread = threading.Thread(target = self.__instance_obj.start , name = self.__name)
self.__restarted_count += 1
self.__current_thread.start()
self.__current_thread.join()
def start_service(self):
service_starter_thread_name = self.__name + "_starter_thread"
run_thread = threading.Thread(target=self._run, name = service_starter_thread_name)
run_thread.start()
user_feedback("{:s} is started".format(str(self.__name)),False,True)
return run_thread
IWatchableWatcherService.py
from third_party.watchdog.observers import Observer
from api.base.crypto_engine.setting_db.opportunity_info import OpportunityInfo
from base.services.i_watchable_service import IWatchableService
from base.crypto_engine.MessageApi.debug import *
from base.crypto_engine.utils import helper
from base.crypto_engine import symbols
from pathlib import Path
import time
from api.third_party.watchdog.events import PatternMatchingEventHandler
def trycatch(method):
def catched(*args, **kw):
result = None
try:
result = method(*args, **kw)
except Exception as e:
error("Error: {:s} -> {:s}".format(str(method.__name__), str(e)))
return result
return catched
class FileEventHandler(PatternMatchingEventHandler):
'''
def _ignore_directories(self,arg):
print("noluyor amk")
'''
def __init__(self,patterns,onModified:bool , onCreated:bool,threadName: str):
patterns = patterns
self._onModifiedEnabled = onModified
self._onCreatedEnabled = onCreated
self.thread_name = threadName + "_FileEventHandler"
self.__related_service_instance = None
super().__init__(patterns,ignore_directories=False,case_sensitive=False)
def save_service(self,service_instance):
self.__related_service_instance = service_instance
self.lock_required = service_instance.get_lock_required()
if self.lock_required:
debug("Observer service instance required locking folder!")
else:
debug("Observer service instance NOT required locking folder!")
def get_service_instance(self):
return self.__related_service_instance
def process(self, event):
helper.set_current_thread_name(self.thread_name)
debug("FileEventHandler processing for {:s} started ".format(str(event.src_path)))
try:
file_name = event.src_path.split("/")[-1]
file_folder = event.src_path.split(file_name)[0]
folder_locked = None
if self.lock_required:
folder_locked = helper.try_lock_folder(file_folder)
if folder_locked:
debug("{:s} is locked ".format(str(file_folder)))
else:
folder_locked = True
if (folder_locked):
debug("Folder{:s} locked for parsing {:s}".format(str(file_folder),str(event.src_path)))
service_instance = self.get_service_instance()
new_requests = service_instance.parser(event.src_path)
if new_requests is not None:
service_instance = self.get_service_instance()
service_instance.add_new_requests(new_requests)
else:
error("Parse Error no new request is parsed? ")
#os.remove(event.src_path)
#user_feedback("{:s} proessed and removed".format(str(event.src_path)))
else:
raise Exception("Could not lock folder {:s}".format(str(file_folder)))
except Exception as e:
error("Table Service : File Event Handler: Error during parsing: {:s}".format(str(e)))
finally:
try:
if self.lock_required and folder_locked:
ret = helper.release_folder_lock_if_any(file_folder)
if ret:
debug("We release folder lock of {:s}".format(str(file_folder)))
except Exception as e:
error("Error during releasing lock for folder {:s} Error:{:s}".format(str(file_folder),str(e)))
def on_modified(self, event):
if self._onModifiedEnabled and event.is_directory is not True and event.src_path.endswith(".lock") is not True:
self.process(event)
def on_created(self, event):
if self._onCreatedEnabled and event.is_directory is not True and event.src_path.endswith(".lock") is not True:
self.process(event)
class IWatchableWatcherService(IWatchableService):
def __init__(self, patterns:list, path_to_watch:str, action_interval:int=1, onModified:bool = True, onCreated:bool = True,folderLockRequired:bool=False):
if (patterns == None or len(patterns) == 0):
raise Exception("File Extensions of file must be given")
if (path_to_watch == None or len(path_to_watch) == 0):
raise Exception("PATH must be given that will be watched")
self.parser = getattr(self, "parse_file", None)
if self.parser == None or callable(self.parser) != True:
raise NotImplemented("Watcher must implement parse function!")
self.__action = getattr(self, "action",None)
if self.__action == None or callable(self.__action) != True:
raise NotImplemented("Action func must be implemented that process requests!")
self._class_name = self.__class__.__name__
self.action_interval = action_interval
self.__requests = []
self.__requests.append(self._class_name)
self.patterns = patterns
self.folder_lock_required = folderLockRequired
self.path_to_watch = path_to_watch
self._onModifiedEnabled = onModified
self._onCreatedEnabled = onCreated
super().__init__(self._class_name ,-1)
def get_lock_required(self):
return self.folder_lock_required
def init(self):
self.__init__(self.patterns, self.path_to_watch)
return self
def get_requests(self):
return self.__requests
def get_target_path(self):
return self.path_to_watch
@trycatch
def add_new_requests(self, new_requests):
all_requests = self.get_requests()
all_requests.append(new_requests)
def process_all_requests(self): # requests ordered in order to their priority!
all_request = self.get_requests()
if(len(all_request) < 1):
raise Exception("There must be the name of the service as a mark for request list")
try:
debug("{:d} request are ready to be processed".format(int(len(all_request))))
for request in all_request:
if ( type(request) == type("string")):
continue
if (request == None):
error("WTF? request can't be NONE?")
else:
try:
self.__action(request)
except Exception as e:
error("Unhandled Exception caught during execution __action function.. ErrMsg: {:s}".format(str(e)))
all_request.remove(request)
#all_request.clear() #TODO: Be sure remove func is worked..
debug("Request are cleared")
except Exception as e:
error("Error during processing requests E: {:s}".format(str(e)))
def check_path(self, path):
try:
if not os.path.exists(path):
parent_folder = str(Path(path).parent)
self.check_path(parent_folder)
error("[paths] {:s} doesn't exist: creating...".format(path))
os.mkdir(path)
return path
except Exception as e:
error("Error during checking path {:s} Error:{:s}".format(str(path),str(e)) )
def start(self):
observer = Observer()
file_event_handler = FileEventHandler(self.patterns, onModified=self._onModifiedEnabled, onCreated=self._onCreatedEnabled, threadName=self._class_name)
file_event_handler.save_service(self)
target_path = self.get_target_path()
self.check_path(target_path)
observer.schedule(file_event_handler, path=target_path)
observer.start()
try:
while True:
self.process_all_requests()
time.sleep(self.action_interval)
except KeyboardInterrupt:
error("Table Service Observer stopped by KeyboardInterrupt ")
observer.stop()
except Exception as e:
error("something you could not catch is fired dude err: {:s}".format(str(e)))
observer.stop()
error("Table Service Observer is stopped")
RequestHandlerService.py
import time
import shutil
import os
import json
import threading
import inspect
from base.crypto_engine.request_handler.koineks_handler import KoineksRequestHandler
from base.services.i_watchable_service import IWatchableService
from base.crypto_engine.utils import helper
from base.crypto_engine.MessageApi.debug import *
from base.crypto_engine import symbols
from base.crypto_engine.setting_db.request_info import RequestType,RequestInfo
from third_party.watchdog.observers import Observer
from third_party.watchdog.events import PatternMatchingEventHandler
def trycatch(method):
def catched(*args, **kw):
result = None
try:
result = method(*args, **kw)
except Exception as e:
error("Error: {:s} -> {:s}".format(str(method.__name__), str(e)))
return result
return catched
class FileEventHandler(PatternMatchingEventHandler):
patterns = ["*.request"]
service_instance = None
def save_service(self,service_instance):
FileEventHandler.service_instance = service_instance
def process(self, event):
"""
event.event_type
'modified' | 'created' | 'moved' | 'deleted'
event.is_directory
True | False
event.src_path
path/to/observed/file
"""
# the file will be processed there
user_feedback("event type: {:s}".format(str(event.event_type)))
if (str(event.event_type) == "modified"):
request_folder = self.service_instance.get_request_folder_path()
try:
debug("folder lock granted!")
new_requests = FileEventHandler.service_instance.parse_requests(event.src_path)
FileEventHandler.service_instance.add_new_requests(new_requests)
except Exception as e:
error("Error during parsing: {:s}".format(str(e)))
finally:
try:
os.remove(event.src_path)
except Exception as e:
error("Error during removing file {:s}".format(str(e)))
def on_modified(self, event):
debug("ON MODIFIED!")
self.process(event)
def on_created(self, event):
pass
'''
This Service is general service that handles request and pass them to request_handler
This service constructor takes only one essential argument which is request_handler_class. Request_Handler_Service
create a request_handler instance from this class and use **kwargs if constructor of request_handler needs any parameter to start up.
After creating request_handler instance , RequestHandlerService starts listening the request_folder that is created on constructor of RequestHandlerService
for any new coming request file. Then catch these new file , parse them and add to queue to pass them to the request_handler_instance.
Briefly!
RequestHandlerService is main_service that catches requests , parses them and passes them to the request_handler_instance.
Request_handler instance is the responsible to taking action. So request_handler is actually doing the requested action.
like buy,sell , withdraw coins and etc. or whatsoever.
'''
class RequestHandlerService(IWatchableService):
MAX_CYCLE = 1000
MAX_FAIL_COUNT = 5
ALL_REQUESTS_FOLDER = symbols.MAIN_REQUEST_FOLDER
REQUEST_HANDLER_SERVICE_DIR = symbols.REQUEST_HANDLER_SERVICE_DIR
def __init__(self,request_handler_class,**kwargs):
self.__request_handler_class = request_handler_class
self.__kwargs = kwargs
self.__requests = {}
request_type_members = inspect.getmembers(RequestType)
for request_type in request_type_members:
type_name = str(request_type[1])
if(type_name.__contains__("REQUEST")):
self.__requests.update({type_name:[]})
healthy_fnc = getattr(request_handler_class,"is_healthy",None) # check if RequestHandler class implement is_healthy func!
if (healthy_fnc == None or callable(healthy_fnc) != True):
raise Exception("Request Handler must implement is_healthy function!")
handle_request_fnc = getattr(request_handler_class,"handle_request",None) # check if RequestHandler class implement handle_request func!
if (handle_request_fnc == None or callable(handle_request_fnc) != True):
raise Exception("Request Handler must implement handle_request function!")
try:
handler_signature = inspect.signature(handle_request_fnc) #Check handle_request signature if valid.
request_paramater_signature = str(handler_signature.parameters['request'])
if request_paramater_signature.split(".")[-1] != "RequestInfo":
raise NotImplemented("handle_request function must take a parameter in RequestInfo type!")
except Exception as e:
raise Exception("handle_request function should have a paramater as request in RequestInfo type!")
self.init_handler(request_handler_class,kwargs)
super().__init__(self._service_name,-1)
def init(self):
self.__init__(self.__request_handler_class,**self.__kwargs)
return self
def init_handler(self,handler_class,kwargs):
try:
self._handler_name = str(handler_class).split('.')[-1].split("RequestHandler")[0].lower()
self._handler_main_requests_folder_path = RequestHandlerService.REQUEST_HANDLER_SERVICE_DIR + self._handler_name + symbols.REQUEST_HANDLER_TYPE_NAME + "/"
except Exception as e:
error("Fatal Error: Wrong Name Given to Class!. Handler Class name notain is %sRequestHandler")
exit(-1)
user_feedback("%s handler instance will be initialised!" % str((self._handler_name)))
if os.path.exists(RequestHandlerService.REQUEST_HANDLER_SERVICE_DIR) is not True:
user_feedback("{:s} folder is initialised for the first time".format(str(RequestHandlerService.ALL_REQUESTS_FOLDER)))
os.makedirs(RequestHandlerService.REQUEST_HANDLER_SERVICE_DIR)
if (os.path.exists(self._handler_main_requests_folder_path)) is not True:
os.makedirs(self._handler_main_requests_folder_path)
user_feedback("{:s} folder is initialised for the first time".format(str(self._handler_main_requests_folder_path)))
created_folder = self.init_request_folder(self._handler_main_requests_folder_path)
self._service_name = created_folder + "_service"
self.__listening_request_folder_path = self._handler_main_requests_folder_path + created_folder
self.__handler_instance = handler_class(kwargs["username"], kwargs["password"], kwargs["driver_name"], kwargs["headless"])
debug("Handler is initialised")
@staticmethod
def get_main_request_folder():
return RequestHandlerService.ALL_REQUESTS_FOLDER
def get_handler(self):
return self.__handler_instance
def get_requests(self):
return self.__requests
def is_handler_healthy(self):
return self.__handler_instance.is_healthy()
def get_request_folder_path(self):
return self.__listening_request_folder_path
def is_handler_busy(self):
request_dir = self.get_request_folder_path()
busy_file_path = request_dir + "/is_busy.info"
try:
with open(busy_file_path , "r") as busy_file:
is_busy = busy_file.readline()
if (str(is_busy).upper() == "TRUE"):
return True
else:
return False
except Exception as e:
return True #? bu ne amk
def set_handler_busy(self,state:bool):
if (type(state) is not bool) :
raise TypeError("set_helper_busy parameter is bool not {:s}".format(str(type(state))))
request_dir = self.get_request_folder_path()
busy_file_path = request_dir + "/is_busy.info"
try:
with open(busy_file_path , "w") as busy_file:
if (state):
busy_file.write("True")
else:
busy_file.write("False")
except Exception as e:
return True #? bu ne amk
def update_handler_cycle(self, cycle: int):
work_file = self.get_request_folder_path() + "/work_cycle.info"
try:
with open(work_file, "w+") as work_file:
work_file.write(str(cycle))
except Exception as e:
error("Error during updating work cycle {:s}".format(str(e)))
def set_handler_health_status(self,state:bool):
if (type(state) is not bool) :
raise TypeError("set_helper_health_status parameter is bool not {:s}".format(str(type(state))))
self.__handler_instance.set_healthy_status(state)
my_request_folder_path = self.get_request_folder_path()
if (os.path.exists(my_request_folder_path) is not True):
try:
os.makedirs(my_request_folder_path)
except Exception as e:
error("Error during creating file dir {:s} error: {:s}".format(str(my_request_folder_path), str(e)))
raise e
with open(my_request_folder_path + "/is_healthy.info","w") as health_info:
if (state):
health_info.write("True")
else:
self.__handler_instance.de_init()
health_info.write("False")
if state is not True:
pass
#KoineksService.destructor(1,2)
@trycatch
def init_request_folder(self,main_request_folder): #TODO:Bu fonksiyonu tekrar yaz. Silin request2 gibi folderlar varken boş slot olarak, gidip request7 alıyor request6 var diye..
other_request_folders = []
for file in os.listdir(main_request_folder):
if os.path.isdir(main_request_folder + file):
other_request_folders.append(file)
index = 0
folder_name = self._handler_name + "_" + str(index + 1) + "_handler"
self.__request_folder_name = folder_name
while (other_request_folders.__contains__(folder_name) is True): #if somehow old request folder forgotten to be deleted!
index += 1
folder_name = self._handler_name + "_" + str(index + 1) + "_handler"
my_request_folder_path = main_request_folder + folder_name
os.makedirs( my_request_folder_path)
return folder_name
@trycatch
def add_new_requests(self, new_requests):
all_requests = self.get_requests()
request_index = len(new_requests) - 1
while request_index >= 0:
request_to_add = new_requests[request_index]
request_type = request_to_add.get_type()
requests_of_type = all_requests.__getitem__(request_type)
requests_of_type.append(request_to_add)
all_requests.update({request_type: requests_of_type})
request_index = request_index - 1
return 0
@trycatch
def parse_requests(self, request_file):
with(open(request_file, "r")) as request_file:
request_list = []
data = json.load(request_file)
keys = data.keys()
for key in keys:
val = data.__getitem__(key)
request = RequestInfo.json_to_instance(val)
request_list.append(request)
return request_list
def eliminate_duplications(self, request_list):
debug("process_request_list_call list lenght {:d}".format(int(len(request_list))))
current_request = request_list[0]
if (current_request == None):
error("Error , no current request !")
return
type = current_request.get_type()
request_list_size = len(request_list)
user_feedback("Total {:d} {:s} request we have for now ".format(request_list_size, str(type)))
if (str(type).__contains__("FETCH")):
debug("will be elimination in the list")
currency_list = []
currency = current_request.get_symbol()
eliminated_request_list = []
while (current_request != None):
if (currency_list.__contains__(currency) is not True):
eliminated_request_list.append(current_request)
currency_list.append(currency)
else:
debug("{:s} fetch request eliminated!".format(str(currency)))
if (len(request_list) > 0):
current_request = request_list.pop()
currency = current_request.get_symbol()
else:
break
debug("After elimination we have {:d}".format(int(len(eliminated_request_list))))
else:
user_feedback("No elimination for type {:s}".format(str(type)))
eliminated_request_list = request_list # do not eliminate other request type
debug("After elimination we have {:d}".format(int(len(eliminated_request_list))))
return eliminated_request_list
@trycatch
def start_requests(self,eliminated_request_list:tuple):
request_handler = self.get_handler()
try_limit = RequestHandlerService.MAX_FAIL_COUNT
for current_request in eliminated_request_list:
while try_limit > 0:
try:
try_limit = try_limit - 1
self.set_handler_busy(True)
request_handler.handle_request(current_request)
try_limit = 0 # do not try cos no exception is occured?
except Exception as e:
error(str("Error{:d} in request_handler: [{:s}]".format(int(RequestHandlerService.MAX_FAIL_COUNT - try_limit),str(e))))
if (try_limit == 0):
error("Request Handler {:s} closing itself due to too much error!".format(str(request_handler.get_name())))
request_handler.de_init()
self.report_request_caused_crash(current_request)
finally:
self.set_handler_busy(False)
def report_request_caused_crash(self,request:RequestInfo):
pass #TODO: let manager know that this request couldn't be handled and let it decide what to do.
def process_request_type(self,request_type:str):
all_requests = self.get_requests()
requests = all_requests.__getitem__(request_type)
if (len(requests) > 0):
all_requests.update({request_type: []})
debug("handle {:s} request".format(str(request_type)))
eliminated_list = self.eliminate_duplications(requests)
if len(eliminated_list)> 0:
self.start_requests(eliminated_list)
def process_all_requests(self): # requests ordered in order to their priority!
all_request_types = self.get_requests().keys() #TODO: By this we don't take care of priority of requests. So solve this!
for type in all_request_types:
self.process_request_type(type)
def observer_stop(self,observer):
try:
while(observer.event_queue.unfinished_tasks != 0):
time.sleep(1)
observer.unschedule_all()
observer.stop()
observer.join(2)
user_feedback("Observer is stopped")
except Exception as e:
error("Error during stoping observer {:s}".format(str(e)))
def start(self):
cycle = 0
observer = Observer()
file_event_handler = FileEventHandler()
file_event_handler.save_service(self)
request_dir = self.get_request_folder_path()
user_feedback("HELPER STARTED ON : {:s}".format(str(request_dir)))
observer.schedule(file_event_handler, path=request_dir)
observer.start()
self.set_handler_health_status(True)
self.set_handler_busy(False)
try:
while self.is_handler_healthy() == True:
cycle = cycle + 1
self.update_handler_cycle(cycle)
if (cycle > RequestHandlerService.MAX_CYCLE):
user_feedback("Helper closing itself.")
self.set_handler_health_status(False)
if self.is_handler_busy() == True:
debug("Helper is busy!")
time.sleep(1)
continue
else:
user_feedback("waiting cycle {:d}!".format(int(cycle)))
self.process_all_requests()
time.sleep(1)
except KeyboardInterrupt:
self.observer_stop(observer)
shutil.rmtree(self.get_request_folder_path())
self.observer_stop(observer)
shutil.rmtree(self.get_request_folder_path())
'''
handler_service = RequestHandlerService(KoineksRequestHandler,username="koineks.ozan@gmail.com",password="Ozandoruk1989",headless=True,driver_name="firefox")
handler_service.start()
'''
TableService.py
import json
import time
import os
import shutil
import pygal
from pygal.style import Style
from api.base.crypto_engine.setting_db.opportunity_info import OpportunityInfo
from base.services.IWatchableWatcherService import IWatchableWatcherService
from base.crypto_engine.MessageApi.debug import *
def trycatch(method):
def catched(*args, **kw):
result = None
try:
result = method(*args, **kw)
except Exception as e:
error("Error: {:s} -> {:s}".format(str(method.__name__), str(e)))
return result
return catched
class TableService(IWatchableWatcherService):
def __init__(self, path_to_watch: str, patterns: list):
self.__max_point_count = symbols.MAX_TABLE_POINT_COUNT
self.__new_table_dir = symbols.NEW_TABLE_DIR
self.__old_table_dir = symbols.OLD_TABLE_DIR
self._patterns = patterns
self._path_to_watch = path_to_watch
super().__init__(self._patterns, path_to_watch, action_interval=10, onModified=True, onCreated=False, folderLockRequired=True)
def init(self):
self.__init__(self._path_to_watch, self._patterns)
return self
def get_max_point_count(self):
return self.__max_point_count
def get_old_table_dir(self):
return self.__old_table_dir
def get_new_table_dir(self):
return self.__new_table_dir
@trycatch
def parse_file(self, request_file_path:str):
request = None
with(open(request_file_path, "r")) as request_file:
data = json.load(request_file)
keys = data.keys()
for key in keys:
val = data.__getitem__(key)
request = OpportunityInfo.json_to_instance(val)
break
try:
os.remove(request_file_path)
except Exception as e:
error("Error during removing file after parsing completed!{:s}".format(str(e)))
return request
def determine_name(self,op_request:OpportunityInfo):
buy_market = op_request.get_buy_market()
sell_market = op_request.get_sell_market()
currency = op_request.get_currency()
name = buy_market + "_" + sell_market + "_" + currency
return name
def get_json_data_path_from_svg_file(self,svg_file:str):
return svg_file.split(".svg")[0] + ".json"
def find_max_point_count(self,json_path:str): # find longest line point count in svg file
data = self.load_data_file(json_path)
keys = data.keys()
max_point_count = 0
for key in keys:
val = data.__getitem__(key)
points_count = len(val)
if (points_count > max_point_count):
max_point_count = points_count
return max_point_count
def get_op_path_dir(self,op_request:OpportunityInfo):
new_tables_dir = self.get_new_table_dir()
creator = op_request.get_creator()
path_dir = new_tables_dir + creator
return path_dir
def add_new_info(self,op_request:OpportunityInfo): # find which file to be added
path_dir = self.get_op_path_dir(op_request)
self.create_dir_if_not_exists(path_dir)
all_svg_files = self.get_all_svg_files_in_dir(path_dir)
if all_svg_files != None and len(all_svg_files) > 0:
for file in all_svg_files: #clear filled files , and mark them as old(move them under to the old files folder..
json_file_path = self.get_json_data_path_from_svg_file(file)
max_point_count = self.find_max_point_count(json_file_path)
if (max_point_count >= self.get_max_point_count()): #TODO:Make more moduler with seperated func..
data = self.load_data_file(json_file_path)
keys = data.keys()
for key in keys:
val = data.__getitem__(key)
val = val[1:]
data.update({key:val})
with open(json_file_path,"w") as json_file:
data_to_write = json.dumps(data)
json_file.write(data_to_write)
name = self.determine_name(op_request)
left_svg_files = self.get_all_svg_files_in_dir(path_dir)
if left_svg_files != None and len(left_svg_files)>0:
matched_svg = [f for f in left_svg_files if f.__contains__(name)]
else:
matched_svg = []
if(len(matched_svg) > 0):
self.add_new_info_to_svg_file(matched_svg[0],op_request)
else:
new_svg_file = self.create_new_svg_file(op_request)[0] # returns also json_data_path in second index..
self.add_new_info_to_svg_file(new_svg_file,op_request)
def create_new_svg_file(self,op_request:OpportunityInfo):#Create fresh new svg file , because the old one is copleted max count number that a line can have..
name = self.determine_name(op_request)
now = time.strftime('%d_%m_%Y_%H:%M:%S')
svg_file_name = name+"_"+now + ".svg"
user_feedback("New svg file created : {:s}".format(str(svg_file_name)))
data_file_name = self.get_json_data_path_from_svg_file(svg_file_name)
market_pair = self.get_market_pair_name(op_request)
path_dir = self.get_op_path_dir(op_request)
data_file_path = os.path.join(path_dir,data_file_name)
svg_file_path = os.path.join(path_dir,svg_file_name)
with open(data_file_path,"w") as json_data:
first_empty_data = json.dumps({market_pair:[]})
json_data.write(first_empty_data)
return [svg_file_path,data_file_path]
def load_data_file(self,json_file_path:str):
with open(json_file_path,"r") as data_file:
data = json.load(data_file)
return data
def get_market_pair_name(self,op_info:OpportunityInfo):
return "(" + op_info.get_currency() + ")" + op_info.get_buy_market() + "-->" + op_info.get_sell_market()
def add_new_info_to_svg_file(self,file_path:str,op_request:OpportunityInfo):#Add new info the last fresh svg file
json_file = self.get_json_data_path_from_svg_file(file_path)
market_pair_name = self.get_market_pair_name(op_request)
if(os.path.exists(json_file)):
market_pair_datas = self.load_data_file(json_file)
else:
raise Exception("{:s} Data file not exists?",str(json_file))
percent = op_request.get_percent()
market_pair_points = market_pair_datas.pop(market_pair_name,[])
market_pair_points.append(percent)
new_market_pair_data_element = {market_pair_name:market_pair_points}
market_pair_datas.update(new_market_pair_data_element)
with open(json_file,"w") as json_file:
data_to_write = json.dumps(market_pair_datas)
json_file.write(data_to_write)
self.paint_svg(file_path)
return
def paint_svg(self,svg_file_path:str):
title = svg_file_path
debug("Arbitrage Table Drawing")
json_file = self.get_json_data_path_from_svg_file(svg_file_path)
json_data = self.load_data_file(json_file)
dark_gray = "#212121"
gold = "#FFD700"
soft_green = "#009688"
white = "#FFFFFF"
custom_style = Style(
background=dark_gray,
plot_background= dark_gray,
foreground= soft_green, # numbers on the y axis
foreground_strong= white, #x-y axis
foreground_subtle=soft_green, #cizgiler(lines) x axis lines..
opacity='.6',
label_font_size = 30,
opacity_hover='.9',
transition='400ms ease-in',
colors=('#E853A0', '#E8537A', '#E95355', '#E87653', '#E89B53')
)
line_chart = pygal.Line(style=custom_style ,show_legend=False, height=460)
#line_chart.title = title
is_labels_created = False
keys = json_data.keys()
for key in keys:
datas = json_data.__getitem__(key)
#datas = data.split(' ')
line_cube = []
data_count = 0
for each_data in datas:
each_data = float("{:0.1f}".format(float(each_data)))
line_cube.append(float(each_data))
data_count += 1
if (is_labels_created == False):
is_labels_created = True
line_chart.x_labels = map(str, range(0, data_count))
line_chart.add(key, line_cube)
svg_string = line_chart.render()
byte_array = bytearray(svg_string)
with open(svg_file_path, 'wb') as outputfile:
outputfile.write(byte_array)
@trycatch
def create_dir_if_not_exists(self,path:str):
if (os.path.exists(path) is not True):
try:
os.makedirs(path)
except Exception as e:
error("Error during creating file dir {:s} error: {:s}".format(str(path), str(e)))
@trycatch
def get_all_svg_files_in_dir(self,dir_path:str):
svg_files = []
files = os.listdir(dir_path)
for file in files:
file_path = os.path.join(dir_path,file)
if file_path.endswith(".svg"):
svg_files.append(file_path)
return svg_files
def action(self,request):
self.add_new_info(request)
LoadBalancer.py
import os,signal
import threading
import time
import subprocess
import json
from api.third_party.watchdog.observers import Observer
from api.third_party.watchdog.events import PatternMatchingEventHandler
from api.base.crypto_engine.setting_db.request_info import RequestInfo,RequestType
from api.base.crypto_engine.MessageApi.debug import *
from api.base.services.i_watchable_service import IWatchableService
from api.base.crypto_engine import symbols
def trycatch(method):
def catched(*args, **kw):
result = None
try:
result = method(*args, **kw)
except Exception as e:
error("Error: {:s} -> {:s}".format(str(method.__name__), str(e)))
return result
return catched
class FileEventHandler(PatternMatchingEventHandler):
patterns = ["*.request"]
service_instance = None
def save_service(self,service_instance):
FileEventHandler.service_instance = service_instance
def process(self, event):
debug("event type: {:s}".format(str(event.event_type)))
if (str(event.event_type) == "modified"):
try:
FileEventHandler.service_instance.save_new_request(event.src_path)
except Exception as e:
error("Error during load balancing: {:s}".format(str(e)))
def on_modified(self, event):
info("ON MODIFIED!")
self.process(event)
def on_created(self, event):
pass
class HandlerInstance:
busy_file = "is_busy.info"
def __init__(self,request_folder:str,busy:bool,type:str):
self.__type = type
self.__is_busy = busy
self.__request_folder_name = ""
self.set_folder(request_folder)
self.__busy_file = request_folder + "/is_busy.info"
def get_type(self):
return self.__type
def get_folder(self):
return self.__folder
def get_request_folder_name(self):
return self.__request_folder_name
def set_folder(self,folder:str):
self.__folder = folder
arr = folder.split('/')
last_len = len(arr)
name = arr[last_len-1]
self.__request_folder_name = name
def is_busy(self):
try:
with open(self.__busy_file , "r") as busy_file:
is_busy = busy_file.readline()
if (str(is_busy).upper() == "TRUE"):
return True
else:
return False
except Exception as e:
return True #? bu ne amk
def register_request(self,path):
debug("Register Request for {:s}".format(str(path)))
if os.path.isfile(path) is True:
from_path = path
to_path = self.get_folder()
user_feedback("New Request REGISTERED to {:s}".format(str(self.__request_folder_name)))
try:
with open(path , "r") as request_file:
request_info = request_file.read()
with open(self.get_folder() + "/new.request", "w") as new_request:
new_request.write(request_info)
except Exception as e:
error("Error during registering new request!")
finally:
try:
if os.path.exists(path) is True:
os.remove(path)
else:
error("Error during request registration , request file is lost ? !:P ")
except Exception as e:
error("Error during request registration {:s}".format(str(e)))
else:
error("Error {:s} is not exists".format(str(path)))
class LoadBalancer(IWatchableService):
REQUEST_DIR = symbols.MAIN_REQUEST_FOLDER
REQUEST_HANDLER_SERVICE_DIR = symbols.REQUEST_HANDLER_SERVICE_DIR
HANDLERS_STATUS_FILE = REQUEST_DIR + "open_handlers.info"
requests = [] #path:str #MUST
requests_lock = threading.RLock()
handlers = {} # {path:handler_instance} #MUST
handler_lock = threading.RLock() # any process related to handlers
def __init__(self):
super().__init__("LoadBalancer",-1)
self.request_handlers = {}
self.__is_service_healthy = True
def init(self):
self.__init__()
return self
#MUST
@staticmethod
def is_any_handler_healthy(handler_name:str):
avaiable_handler_count = LoadBalancer.get_avaiable_handler_count(handler_name)
if (avaiable_handler_count > 0) :
return True
else:
return False
#MUST
def update_handler_count_file(self,ie:dict): #open handlers count
try:
with open(LoadBalancer.HANDLERS_STATUS_FILE ,"w") as handler_info_file:
handler_info_file.write(str(json.dumps(ie)))
except Exception as e:
error("Error during updating handler count {:s}".format(str(e)))
#MUST
@staticmethod
def get_avaiable_handler_count(looking_handler_name:str): #open handlers count
try:
with open(LoadBalancer.HANDLERS_STATUS_FILE ,"r") as handler_info_file:
de = json.load(handler_info_file)
handler_names = de.keys()
for handler_name in handler_names:
if str(handler_name).__contains__(looking_handler_name):
looking_handler_name = handler_name
handler_entity_count = de.pop(handler_name,None)
if (handler_entity_count) == None:
return 0
else:
return int(handler_entity_count)
except Exception as e:
error("Error during getting handler count {:s}".format(str(e)))
return 0
#MUST
def save_new_request(self,path:str): #this is called from another thread , so you should rlcok for shared requets lock!
with self.requests_lock:
self.requests.append(path)
self.process_new_requests()
#MUST
def parse_requests(self,request_file):
with(open(request_file,"r")) as request_file:
request_list = []
data = json.load(request_file)
keys = data.keys()
for key in keys:
val = data.__getitem__(key)
request = RequestInfo.json_to_instance(val)
request_list.append( request)
return request_list
#MUST
def destroy_request(self,request_path:str):
if (os.path.isfile(request_path)):
debug("{:s} is eliminated which is already listed".format(str(request_path)))
else:
error("Request {:s} does not exists".format(str(request_path)))
#MUST
def process_new_requests(self): #This function eliminates same request and order them in order to their priority!
urgent_fetch_requests = {}
fetch_requests = {}
get_all_balance = []
important_request = []
if (len(self.requests) == 0):
debug("No Requests to balance yet..")
return
try:
current_request_file = self.requests.pop()
while(current_request_file != None):
requests = self.parse_requests(current_request_file)
if (len(requests) > 0):
request = requests[0]
type = request.get_type()
if type == RequestType.FETCH_FOR_HELP_URGENT:
symbol = request.get_symbol()
if urgent_fetch_requests.keys().__contains__(symbol) is not True:
urgent_fetch_requests.update({symbol:current_request_file})
#debug("Urgent {:s} Fetch Added First Time".format(str(symbol)))
else:
self.destroy_request(current_request_file)
elif type== RequestType.FETCH_FOR_HELP:
symbol = request.get_symbol()
if (urgent_fetch_requests.keys().__contains__(symbol) is not True and fetch_requests.keys().__contains__(symbol) is not True):
fetch_requests.update({symbol:current_request_file})
#debug("Normal {:s} Fetch Added First Time".format(str(symbol)))
else:
self.destroy_request(current_request_file)
elif type == RequestType.GET_ALL_BALANCE:#TODO: investigate this block , seems to me buggy.
if (len(get_all_balance) > 0):
self.destroy_request(current_request_file)
else:
get_all_balance.append(current_request_file)
#debug("Get All Balance Added First Time")
else: # don't eliminate these request , they must already be a unique calls. like buy,sell,withdraw!
user_feedback("{:s} request will not be processed and eliminated!".format(str(type)))
important_request.append(current_request_file)
if (len(self.requests) > 0):
current_request_file = self.requests.pop()
else:
current_request_file = None
if (fetch_requests.__len__() > 0): # lowest priority request
symbols = fetch_requests.keys()
for symbol in symbols:
request_path = fetch_requests.__getitem__(symbol)
self.requests.append(request_path)
if (urgent_fetch_requests.__len__() > 0): # higher priority requests
symbols = urgent_fetch_requests.keys()
for symbol in symbols:
request_path = urgent_fetch_requests.__getitem__(symbol)
self.requests.append(request_path)
if len(get_all_balance) > 0: # higher priority than urgent_fetchs
self.requests.append(get_all_balance[0])
if (len(important_request) > 0): # these requests has to be balanced immediately! These will be probably for arbitrage operations like buy,sell,withdraw!
self.requests.extend(important_request)
except Exception as e:
error("Error during processing requests! {:s}".format(str(e)))
#MUST
@trycatch
def delete_existing_handler(self,path):
dead_handler = self.handlers.pop(path)
request_folder_name = dead_handler.get_request_folder_name()
user_feedback("{:s} is a dead handler and deleted !".format(str(request_folder_name)))
#MUST
def save_new_handler(self,path:str,type:str):
handler_instance = HandlerInstance(path, False, type)
self.handlers.update({path:handler_instance})
#MUST
def check_if_healthy(self,handler_path):
try:
handler_path = handler_path + "/" if str(handler_path).endswith("/") is not True else handler_path
healthy_file_path = handler_path + symbols.HANDLER_HEALTHY_FILE_NAME
if os.path.isfile(healthy_file_path) is True:
with open(healthy_file_path , "r") as healthy_file:
status = healthy_file.readline()
if status.lower() == "true":
return True
else:
return False
except Exception as e:
error("Error during checking health status of handler {:s}".format(str(handler_path)))
return False
#MUST
def is_any_new_handler_avaiable(self):
with LoadBalancer.handler_lock:
for file in os.listdir(LoadBalancer.REQUEST_HANDLER_SERVICE_DIR): # Main Handler Directory , Koineks,Btcturk Main Dirs are here.
path = LoadBalancer.REQUEST_HANDLER_SERVICE_DIR + file #Sub Handler Directory , could be one of Btcturk,Koineks,Vebit etc..
if os.path.isdir(path):
type = file
handlers_of_type = self.request_handlers.pop(type,None)
handlers_of_type = handlers_of_type if handlers_of_type != None else []
for handler_dirs in os.listdir(path):
handler_path = path + "/" + handler_dirs
is_healthy = self.check_if_healthy(handler_path)
if is_healthy: # if handler is working
if handlers_of_type.__contains__(handler_path) is not True: #check if we added it before
handlers_of_type.append(handler_path)
self.save_new_handler(handler_path,type)
else: #if not working
if handlers_of_type.__contains__(handler_path) is True: #check if a health working handler died? if so , delete it from working hnadler list.
handlers_of_type.remove(handler_path)
self.delete_existing_handler(handler_path)
self.request_handlers.update({type:handlers_of_type})
avaiable_handlers_count = len(handlers_of_type) if handlers_of_type != None else 0
information_element = {type: avaiable_handlers_count}
self.update_handler_count_file(information_element)
#MUST
def find_request_handler_type(self,request_file):
with(open(request_file, "r")) as request_file:
content = json.load(request_file)
request = RequestInfo.json_to_instance(content)
handler_name = request.get_handler_name()
return handler_name
#MUST
def balance_load(self):
if len(self.requests) > 0:
user_feedback("Request Count {:d}".format(int(len(self.requests))))
with LoadBalancer.handler_lock:
handlers = self.handlers.keys()
current_requests = self.requests.copy()
self.requests.clear() #clear saved
while(len(current_requests) != 0): #while we have request in request queue.
request_path = current_requests.pop()
request_handler_type = self.find_request_handler_type(request_path)
handler_found = False
#we don't use handler lock here because we want to let other thread be able to update for newly closed handler , so we can avoid from trying to use them ;)
for handler_path in handlers: #iterate over all handlers and share requests among avaiable handlers
try:
handler = self.handlers.__getitem__(handler_path)
except Exception as e:
error("Error getting handler , handler may be closed! {:s}".format(str(e)))
return # let service to call balance_load again so we can work with updated handlers information..
if (handler.get_type().__contains__(request_handler_type) is not True):
continue
if (handler.is_busy() == False):
try:
handler_found = True
handler.register_request(request_path)
user_feedback("Reamining Request Count {:d}".format(int(len(current_requests))))
time.sleep(2) # wait for handler fot setting itself as Busy #TODO find a better way instead of wait 2 secs MAL
break
except Exception as e:
error("Error during register_request {:s}".format(str(e)))
else:
debug("{:s} is busy".format(str(handler.get_request_folder_name())))
if (handler_found == False):
debug("No Free handler found! Remaining Request: {:d}".format(int(len(current_requests))))
time.sleep(2) # let handlers done their work!
def set_service_healthy(self,state:bool):
self.__is_service_healthy = state
def is_service_healthy(self):
return self.__is_service_healthy
def start(self):
observer = Observer()
handler = FileEventHandler()
handler.save_service(self)
observer.schedule(FileEventHandler(), path = LoadBalancer.REQUEST_DIR)
observer.start()
try:
while True :
try:
#self.process_new_requests() it's caled when new request is saved automatically now.
self.is_any_new_handler_avaiable()
self.balance_load()
time.sleep(1)
if (self.is_service_healthy() != True):
user_feedback("destructor initialized , Good bye from load balancer")
observer.stop()
observer.join()
break
except Exception as e:
error("FATAL ERROR during loader_service {:s}".format(str(e)))
except KeyboardInterrupt:
observer.stop()
return
Manager.py
import os,sys,shutil,signal,time
import threading,subprocess
print("init_services of services module")
module_path = os.path.dirname(os.path.abspath(__file__))
module_path = module_path.split('api')[0]
print("{:s} is added to sys paths".format(str(module_path)))
sys.path.append(module_path)
from api.server import test
from api.server.new_server import ApiService
from api.base.crypto_engine.MessageApi.debug import *
from api.base.crypto_engine import symbols
from api.base.services.load_balancer import LoadBalancer
from base.crypto_engine.utils.market import Markets
#from api.base.services.ob_service import OrderBookService
from api.base.crypto_engine.utils.config_manager import Config
ApiServiceCheckedTmsp = None
ManagerHealth = False
class Manager():
started_service = {}
MANAGER_FILE_PATH = os.path.abspath(__file__)
@staticmethod
def destructor(a,b):
services = Manager.started_service.keys()
for service in services:
user_feedback("TODO: {:s} service should be closed!".format(str(service)))
exit(0)
def __init__(self, health_monitor_path:str, debug_mode_enabled:bool=False):
try:
self._debug_mode_enabled = debug_mode_enabled
self._health_monitor_path = health_monitor_path
current_t = threading.currentThread()
threading.Thread.setName(current_t, "Manager")
except Exception as e:
print("error during setting thread name : {:s}".format(str(e)))
user_feedback("Manager started with health path {:s}".format(str(self._health_monitor_path)))
signal.signal(signal.SIGTERM, self.destructor)
signal.signal(signal.SIGINT, self.destructor)
self.app_info = Config.get_manager_config()
log_to_file = self.app_info.get_log_to_file_enabled()
symbols.LOG_TO_FILE_ENABLED = log_to_file
Debug.set_debug_level_file("manager.config")
debug_level = self.app_info.get_manager_debug_level()
try:
Debug.set_debug_level(debug_level)
except Exception as e:
error("Wrong debug level given error is {:s}".format(str(e)))
Debug.set_debug_level("user_feedback") #default
Manager.started_service = {}
Config.set_config_folder(symbols.MARKET_AND_EXCHANGE_CONFIG_DIR)
def init(self):
self.__init__()
return self
@staticmethod
def start_process(port,pk):
server_process = subprocess.Popen(["python3", Manager.MANAGER_FILE_PATH, str(port), pk])
user_feedback("New Server is started with pid {:d}".format(int(server_process.pid)))
return server_process
def write_health_info(self,info:str):
try:
with open(self._health_monitor_path,"w") as manager_health_file:
manager_health_file.write(info)
except Exception as e:
error("Exception during writing manager health info {:s}".format(str(e)))
def check_api_monitor_health(self):
global ApiServiceCheckedTmsp
global ManagerHealth
healthy = True
while True:
debug("checking manager health status..")
time.sleep(5)
now = time.time()
if ManagerHealth == False :
error("ManagerHealth is set to False. Re-starting")
healthy = False
if ManagerHealth == False or (ApiServiceCheckedTmsp != None and now - ApiServiceCheckedTmsp > 300): #more than 60 sec
error("Manager is not responding.. Manager should be re-started....")
healthy = False
if healthy is not True:
self.write_health_info("death")
Manager.destructor(None,None)
else:
print("Manager is alive")
def clear_folder(self,path:str):
try:
if (os.path.exists(path) is True):
shutil.rmtree(path)
os.makedirs(path)
user_feedback("{:s} is cleared".format(str(path)))
except Exception as e:
error("Error during clearing folder {:s}".format(str(e)))
def clear_garbages(self):
self.clear_folder(symbols.MAIN_REQUEST_FOLDER)
self.clear_folder(symbols.SIMULATOR_REQUEST_DIR)
self.clear_folder(symbols.TABLE_REQUEST_DIR)
self.clear_folder(symbols.REQUEST_HANDLER_SERVICE_DIR)
self.clear_folder(symbols.NEW_TABLE_DIR)
self.clear_folder(symbols.MAIN_REQUEST_FOLDER)
self.clear_folder(symbols.OPPORTUNITY_DIR)
self.clear_folder(symbols.HTTP_SERVICE_REQUEST_DIR)
user_feedback("ALL GARBAGES ARE CLEARED")
def find_activated_markets(self, arbitrage_file_names):
try:
activated_markets = []
for enabled_arbitrage_name in arbitrage_file_names:
active_arbitrage = Config.get_arbitrage_from_file(enabled_arbitrage_name)
arbitrage_keys = active_arbitrage.keys()
for key in arbitrage_keys:
first_market = str(key).split("/")[0]
second_market = str(key).split("/")[1]
if(activated_markets.__contains__(first_market) != True):
activated_markets.append(first_market)
if (activated_markets.__contains__(second_market) != True):
activated_markets.append(second_market)
return activated_markets
except Exception as e:
error("Error during finding activated markets {:s}".format(str(e)))
raise e
def start(self):
global ApiServiceCheckedTmsp
global ManagerHealth
ManagerHealth = True
app_info = self.app_info
self.clear_garbages()
self.write_health_info("alive")
markets = Config.get_markets_from_config()
exchanges = Config.get_exchange_from_config()
statistic_service_enabled = app_info.get_statistis_service_enabled()
api_service_enabled = app_info.get_api_service_enabled()
op_finder_enabled = app_info.get_op_finder_enabled()
#ob_service_enabled = app_info.get_ob_service_enabled() #TODO: implement ittrue
arbitrage_file_count = app_info.get_arbitrage_file_count()
koineks_handler_count = app_info.get_koineks_handler_count()
btcturk_handler_count = app_info.get_btcturk_handler_count()
if (arbitrage_file_count <= 0 and op_finder_enabled):
raise Exception("No given Arbitrage Config file name")
arbitrage_file_names = app_info.get_arbitrage_files() if op_finder_enabled else []
koineks_cryptos_to_watch = []
atleast_one_arbitrage = False
for arbitrage_file in arbitrage_file_names:
arbitrages = Config.get_arbitrage_from_file(arbitrage_file)
arbitrage_keys = arbitrages.keys()
for arbitrage_name in arbitrage_keys:
arbitrage = arbitrages.__getitem__(arbitrage_name)
if arbitrage.get_active() == False:
debug("Arbitrage {:s} is disabled".format(str(arbitrage_name)))
continue # skip if the arbitrage is not activated
atleast_one_arbitrage = True
buy_market = arbitrage.get_market_low().get_name()
sell_market = arbitrage.get_market_high().get_name()
symbol = str(arbitrage.get_symbol()).lower()
if (str(buy_market).lower() == "koineks" or str(sell_market).lower() == "koineks"):
if (koineks_cryptos_to_watch.__contains__(symbol) is not True):
koineks_cryptos_to_watch.append(symbol) # add these symbols for koineks_ob handler!
if (atleast_one_arbitrage is not True and op_finder_enabled):
raise Exception(
"No arbitrage is enabled in given arbitrage(s) file{:s}, why I'm supposed to run then for op_finder?".format(
str(arbitrage_file_names)))
koineks_acc = app_info.get_koineks_acc()
koineks_pass = app_info.get_koineks_password()
simulator_enabled = app_info.get_simulator_enabled()
table_service_enabled = app_info.get_table_service_enabled()
koineks_ob_service_enabled = app_info.get_koineks_ob_enabled()
ob_fresh_guy_enabled = app_info.get_ob_fresh_guy_enabled()
arbitrage_enabled = app_info.get_arbitrage_enabled()
if (op_finder_enabled is not True):
if arbitrage_enabled:
raise Exception("I can't do arbitrage without op_finder enabled!")
if simulator_enabled:
raise Exception("Why I need to run simulator without op_finder ?")
if (koineks_ob_service_enabled is False):
symbols.LOCAL_ORDERBOOK_SUPPORT = False
else:
symbols.LOCAL_ORDERBOOK_SUPPORT = True
#START API_SERVICE
if api_service_enabled:
threaded_mode_open = True if self._debug_mode_enabled else False
port = app_info.get_api_service_port()
pk = app_info.get_api_service_pk()
current_server_pid = ApiService.start_process(port, pk, threaded_mode_open)
#server_process = subprocess.Popen(["python3.6",api_service_path,str(port),pk])
#api_service = ApiService(port, pk) #TODO: fetch limited time data from config
#service_thread = api_service.start_service()
#Manager.started_service.update({service_thread.getName():service_thread})
#START ORDER BOOK FETCHER SERVICES
time.sleep(3) # wait for api service get up!
if op_finder_enabled: #if op_finder_enabled we will need order book fetcher services..
from api.base.services.ob_service import OrderBookService
activated_markets = self.find_activated_markets(arbitrage_file_names)
for market in activated_markets:
ob_service = OrderBookService(market,8)
ob_service_thread = ob_service.start_service()
Manager.started_service.update({ob_service_thread.getName():ob_service_thread})
#START LoadBalaner Service
if koineks_handler_count + btcturk_handler_count > 0:
load_balancer = LoadBalancer()
service_thread = load_balancer.start_service()
Manager.started_service.update({service_thread.getName():service_thread})
if (simulator_enabled):
# START SIMULATOR_SERVICE
import api.base.services.simulator_service
simulator_service = api.base.services.simulator_service.SimulatorService()
service_thread = simulator_service.start_service()
Manager.started_service.update({service_thread.getName():service_thread})
if (table_service_enabled):
# START TABLE_SERVICE
import api.base.services.table_service
table_service = api.base.services.table_service.TableService(symbols.TABLE_REQUEST_DIR, ["*.request"])
service_thread = table_service.start_service()
Manager.started_service.update({service_thread.getName():service_thread})
if (table_service_enabled):#TODO: Make it statistic_table_service_enabled..(ofc with needed changes)
# START STATISTIC_TABLE_SERVICE
import api.base.services.statistic_table_service
statistic_table_service = api.base.services.statistic_table_service.StatisticTableService(symbols.STATISTIC_TABLE_REQUEST_DIR, ["*.request"])
service_thread = statistic_table_service.start_service()
Manager.started_service.update({service_thread.getName():service_thread})
if (koineks_ob_service_enabled):
# START KOINEKS OB FETCHER
is_ob_headless = app_info.get_koineks_ob_headless()
ob_driver_name = app_info.get_koineks_ob_driver_name()
min_cycle_time = app_info.get_koineks_ob_min_cycle_time()
import api.base.services.order_book_fetcher_service
from api.base.crypto_engine.browser.koineks_browser import KoineksBrowser
started_services = api.base.services.order_book_fetcher_service.OrderBookFetcherService.start_fetcher_sub_services(
koineks_cryptos_to_watch,
KoineksBrowser, browser_driver=ob_driver_name, headless= is_ob_headless,
# firefox,driver in headless mode(true) ,
min_cycle_time=min_cycle_time, acc=koineks_acc, password=koineks_pass)
Manager.started_service.update(started_services)
services_ready = api.base.services.order_book_fetcher_service.OrderBookFetcherService.is_all_services_are_ready()
while (services_ready is not True):
user_feedback("ob_fetcher_services are not read yet!")
time.sleep(10)
services_ready = api.base.services.order_book_fetcher_service.OrderBookFetcherService.is_all_services_are_ready()
if (koineks_handler_count > 0):# which means koineks_handler is enabled
# START KOINEKS HANDLER
from api.base.services.request_handler_service import RequestHandlerService
from api.base.crypto_engine.request_handler.koineks_handler import KoineksRequestHandler
started_handler_count = 0
driver_name = app_info.get_koineks_handler_driver_name()
is_headless = app_info.get_koineks_handler_headless()
while (started_handler_count < koineks_handler_count):
handler_service = RequestHandlerService(KoineksRequestHandler, username=koineks_acc,
password=koineks_pass, headless=is_headless,
driver_name=driver_name)
service_thread = handler_service.start_service() # Starts I_watchable_service with infinitve loop
Manager.started_service.update({service_thread.getName():service_thread})
started_handler_count += 1
# START OB(KOINEKS) FRESHER GUARDIAN
if (ob_fresh_guy_enabled): # if there is no handler , fresh guarding can't do anything
if (started_handler_count < 1 ):
raise Exception("So ob_fresh_guy will just find old values but not be able to update them cos no handler is avaiable to handle request from fresh guy?")
import api.base.services.ob_fresh_guard_service
fresh_guard_service = api.base.services.ob_fresh_guard_service.OB_Fresh_Guard_Service("koineks",
koineks_cryptos_to_watch)
service_thread = fresh_guard_service.start_service()
Manager.started_service.update({service_thread.getName():service_thread})
if op_finder_enabled:
# START OP_FINDER_SERVICE
import api.base.services.op_finder_service
for name in arbitrage_file_names:
op_finder_service = api.base.services.op_finder_service.OPFinderService(name, markets, exchanges)
service_thread = op_finder_service.start_service()
Manager.started_service.update({service_thread.getName(): service_thread})
#START STATISTIC SERVICE
if statistic_service_enabled is True:
import api.base.services.StatisticService
time.sleep(20) ## ALOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO #TODO KALDIR AMK BUNU BİR ŞEKİLDE ?:
StatisticService = api.base.services.StatisticService.StatisticService(symbols.STATISTIC_REQUEST_DIR, ["*.request"])
service_thread = StatisticService.start_service()
Manager.started_service.update({service_thread.getName():service_thread})
started_services = Manager.started_service.keys()
#START MANAGER HEALTH MONITOR SERVICE
service_thread = threading.Thread(target = self.check_api_monitor_health, name = "ManagerMonitor")
service_thread.start()
Manager.started_service.update({service_thread.getName():service_thread})
for service in started_services:
user_feedback("{:s} is started".format(str(service)))
print("{:s} is started".format(str(service)))
if (self._debug_mode_enabled):
user_feedback("### Manager is started in debug mode!! ##")
else:
user_feedback("### Manager is started normally !! ###")
if api_service_enabled:
restarted_count = 0
first_api_start_time = time.time()
last_api_restart_time = first_api_start_time
while True:
time.sleep(30)
penetration_result = ApiService.do_penetration() #returns False in case of fail or passed count in case of success as return val.
ApiServiceCheckedTmsp = time.time()
last_api_age = (time.time() - last_api_restart_time) / 3600 #3600 converts secs to hour
if (penetration_result is False):
user_feedback("Http Server{:d} is not responding,re-start it.".format(int(current_server_pid)),False,True)
try:
current_server_pid = ApiService.do_fixing(port, pk, current_server_pid)
last_api_restart_time = time.time()
restarted_count += 1
now = time.time()
avg = ((now - first_api_start_time) / restarted_count) / 3600 # 360 converts secs to hour.
user_feedback("Api Stability Info: Avg:{:0.2f} Hour Restarted Count:{:d} Last Api Age {:0.2f} Hour"
.format(float(avg), restarted_count, last_api_age))
except Exception as e:
error("Manager handled exception during Server Fixing. Lets start from BEGIN?")
ManagerHealth = False
else:
user_feedback("Api is working for {:0.2f} hour and has passed penetration test at phase {:d}".
format(float(last_api_age),int(penetration_result)),truncated_free=False, print_time=False)
if __name__ == "__main__":
try:
current_t = threading.currentThread()
threading.Thread.setName(current_t, "Manager")
except Exception as e:
print("error during setting thread name : {:s}".format(str(e)))
if(len(sys.argv) < 3):
error("Only %d parameter is given expected is 3(self, manager_health_path, debug_mode_is_enabled)" % len(sys.argv))
exit(-1)
try:
debug_mode = False
if str(sys.argv[2]) == "True":
debug_mode = True
error("##### Debug Mode Enabled! ####")
else:
error("#### Debug Mode is Disabled ####")
health_file_path = str(sys.argv[1])
print("ManagerMonitorHealth Path is {:s}".format(str(health_file_path)))
manager = Manager(health_file_path, debug_mode)
manager.start()
except Exception as e:
error("WTF on Manager Main Start: {:s}".format(str(e)))
print("WTF on Manager Main Start: {:s}".format(str(e)))
#Starter bash script..
'''
#!/bin/sh
PID=$$
home=$(pwd)
script=$home"/start.sh"
direct=""
started=0
manager_file=$home"/api/base/services/manager.py"
while true;
do
log_file=$home"/log.txt"
manager_health=$home"/manager_health.txt"
direct=">> "$log_file" 2>&1"
command="python3.5 "$manager_file" "$manager_health" "$direct
echo $command
echo "before start wait 5 sec"
sleep 5
eval "($command) &"
started=$((started + 1))
manager_pid=$!
echo "manager pid is "$manager_pid
ps aux|grep python3.5
while true;
do
echo "Reading Manager Health"
sleep 5
line=$(cat $manager_health)
echo "Fucking Line is "$line
if [ "death" = "$line" ]; then
echo "String are death"
./kill_debug.sh
sleep 2
break
else
echo "Strings are alive"
fi
done
done
'''
1 thought on “Arbitrage Hero Project Back End Part 1”