How to use Event-Based Observable Pattern with Python (Arbitrage Hero Backend – Part 2)

Hello folks, as many of you already know there is a super behavioral pattern called observer pattern. Some times it’s called producer/consumer pattern also.

So I’ve implemented event based consumer producer code for the back end of Arbitrage Hero Project.

So basically there are 3 elements in this pattern, one of them observable class, which I called producer class in my project. Second Observer itself which I called consumer my class and last and third but not least , the message which is propagated to oberserver instances.

So basically, we propagate event instances to the observer which is created by observable. What is this message(event)? let’s look at the code first.

So if you look at the below code, you can see three different classes in one python file. We can check in __init__ method of classes if child classes have a defined attribute and this attribute is actually a runnable method or not. I’ve used this technique in my IWatchableService and IWatcherWatchable Service class which you can look at here:

So we have three classes named Event which will be our message to be passed, Consumer the observer , and Producer which is the Observable.

#Abstract class of Producer

from base.crypto_engine.MessageApi.debug import *

class Event:
    ''''''
    LIMIT_BUY_COMMAND = "LIMIT_BUY"
    LIMIT_SELL_COMMAND = "LIMIT_SELL"
    MARKET_BUY_COMMAND = "MARKET_BUY"
    MARKET_SELL_COMMAND = "MARKET_SELL"
    WITHDRAW_COMMAND = "WITHDRAW_COMMAND"
    CHECK_BALANCE_COMMAND = "CHECK_BALANCE"
    FETCH_MARKET_PRICE = "FETCH_MARKET_PRICE"
    VALIDATE_SYMBOL_BALANCE = "VALIDATE_SYMBOL_BALANCE"
    FETCH_ORDER_BOOK = "FETCH_ORDER_BOOK"
    FETCH_SYMBOL_BALANCE = "FETCH_SYMBOL_BALANCE"
    RESULT_FAIL = "RESULT_FAIL"
    FETCH_ORDER_BOOK_FROM_DB = "FETCH_ORDER_BOOK_FROM_DB"



    def __init__(self,sender,source_market,event_type:str,operation_symbol:str,data):
        self.__data = data
        self.__sender = sender
        self.__type = event_type
        self.__source_market = source_market
        self.__operation_symbol = operation_symbol

    def get_type(self):
        return self.__type
    def get_source_market(self):
        return self.__source_market
    def get_sender(self):
        return self.__sender
    def get_data(self):
        return self.__data
    def get_symbol(self):
        return self.__operation_symbol

class Consumer:

    def to_string(self):
        return str(type(self))

    def handle_event(self,event:Event):
        print("Event is RECEIVED in class {:s}".format(str(type(self))))
        print(event.get_data())


class Producer:

    def __init__(self):
        self.__consumer_list = []


    def notify_consumers(self,event:Event):
        consumer_list = self.__consumer_list
        for consumer in consumer_list:
            consumer.handle_event(event)


    def add_consumer(self,consumer):
        debug("consumer is adding")
        debug(consumer.to_string())
        if (self.__consumer_list.__contains__(consumer) != True):
            self.__consumer_list.append(consumer)
        else:
            error("Consumer is already added")
    #def operate(self):

    def get_consumer_list(self):
        return self.__consumer_list

So , how we use this classes in the program? Simple..


from time import sleep
import json
import datetime

import base.crypto_engine.utils.helper as helper
from base.crypto_engine.setting_db.exchange_info import ExchangeInfo
from base.crypto_engine.symbols import *
from base.crypto_engine.utils.config_manager import Config
from base.crypto_engine.engine.event import Consumer, Event
from base.crypto_engine.MessageApi.debug import *

from base.crypto_engine.engine.transaction_engine import Operation, TransactionEngine


class Arbitrage(Consumer):

    def __init__(self):
        debug("Arbitrage is initalised")
        self.__transaction_engine = TransactionEngine()
        self.__buying_exchange = None
        self.__selling_exchange = None
        self.__avaiable_balance = 0
        self.__buying_market_first_symbol = None
        self.__buying_market_second_symbol = None
        self.__min_second_symbol_balance = 40
        self.__expected_rate = 0
        self.__bought_amount = 0
        self.__money_to_spend = 0
        self.__log_file_path = ""
        self.__tx_fee = None
        self.__operation_fee = 0
        self.__arbitrage_start_time = 0

        self.__balance_after_buy = 0
        self.__expected_received_amount = 0
        self.__expected_bought_coins = 0
        self.__symbol_address = None
        self.__send_op_params = None

        '''
        #if  you would like to implement one day , a limit buy/sell arbitrage take a look this previous coded samples.
            last_ask = self.__buying_exchange.get_last_price() # we set minumum last ask to last_price for buying exchange in op_finder.
            avarage_market_price = helper.get_twothousandth_higher_price(last_ask)
            amount_to_buy = second_symbol_balance / avarage_market_price
            amount_to_buy = amount_to_buy - (amount_to_buy/100) # buy a bit less than we afford.



            buy_op = Operation.create_limit_buy(amount_to_buy,avarage_market_price,self.__buying_exchange.related_market(),buy_symbol,5,2)


            #Kraken sample

            if (self.__buying_exchange.get_market().id == "kraken"):
                if (self.__buying_market_first_symbol == ETH):
                    send_op = Operation.create_kraken_withdraw(amount_to_buy,self.__buying_market_first_symbol,symbol_addres,self.__buying_exchange.related_market(),"cex-eth",20,3) # wait 10 secs between 1 try
                elif (self.__buying_market_first_symbol == XRP):
                    send_op = Operation.create_kraken_withdraw(amount_to_buy, self.__buying_market_first_symbol,
                                                               symbol_addres, self.__buying_exchange.related_market(),
                                                               "cex-xrp", 20, 3)  #  wait 10 secs between 1 try
                else:
                    error("Unsupported currency{:s} for kraken ".format(str(self.__buying_market_first_symbol)))
                    return


            else:
        '''

    def get_transaction_fee(self,market:str,currency:str): # TODO fetch it from config file .
        transaction_fee = 0
        if (market == "koineks"):  # set transaction  fee for koineks here.
            if (currency == "BTC"):
                transaction_fee = 0.0007
            elif (currency== "ETH"):
                transaction_fee = 0.005
            elif (currency == "LTC"):
                transaction_fee = 0.01
            elif (currency == "DASH"):
                transaction_fee = 0.002
            elif (currency == "DOGE"):
                transaction_fee = 2
            elif (currency == "XRP"):
                transaction_fee = 1
            elif (currency== "XLM"):
                transaction_fee = 0.3
            elif (currency== "XEM"):
                transaction_fee = 1
            elif (currency== "BCH"):
                transaction_fee = 0.001
            elif (currency== "BTG"):
                transaction_fee = 0.001
            else:
                raise Exception("Unsupported currency name for koineks : {:s}".format(str(currency)))
        elif market == "btcturk":
            transaction_fee =  0 # 0 for now.

        else:
            raise Exception("Unsupported Market {:s}".format(str(market)))

        return float(transaction_fee)




    def __report(self, msg):

        try:
            android_alarm(msg)
            user_feedback(msg)
        except Exception as e:
            error("Error during android_alarm and user_feedback")

        with open(self.__log_file_path, 'wb') as outputfile:
            byte_array = bytearray(json.dumps(msg), 'utf8')
            outputfile.write(byte_array)

    def handle_event(self, event: Event): # WE owerride the parent's method 
        if event.get_type() == Event.MARKET_SELL_COMMAND:
            data = event.get_data()

            data_as_str = ""

            try:
                data_as_str = str(data)
                self.__report(data_as_str)
            except Exception as e:
                try:
                    error("MARKET SELL COMMAND Result could not converted to string! it's type  {:s}".format(
                        str(type(data))))
                except Exception as e:
                    error("During handling exception another exception occured it's :{:s}".format(str(e)))

            started_time = self.__arbitrage_start_time.split(':')
            started_min = float(started_time[1])
            started_sec = float(started_time[2])

            end = str(datetime.now())

            end_time = end.split(':')
            end_min = float(end_time[1])
            end_sec = float(end_time[2])

            total_secs = (end_min - started_min) * 60 + (end_sec - started_sec)

            secs = total_secs % 60

            mins = int(total_secs / 60)

            arbitrage_report_msg = "Arbitrage is done Invest Money {:f}{:s} from {:s} to {:s} using {:s} Buying Price:{:f} S{:s} spent time {:d}:{:d}".format(
                float(self.__money_to_spend),
                str(self.__buying_market_second_symbol),
                str(self.__buying_exchange.get_market().id),
                str(self.__selling_exchange.get_market().id),
                str(self.__buying_market_second_symbol),
                float(self.__buying_exchange.get_last_ask()),
                str(self.__buying_market_first_symbol),
                int(mins),
                int(secs))
            self.__report(arbitrage_report_msg)
            error("Arbitrage is finished byy")
            exit(31)


        elif event.get_type() == Event.CHECK_BALANCE_COMMAND:

            error("Balance check event is recevied")

            self.__arbitrage_start_time = str(datetime.now())

            data = event.get_data()



            source_market = event.get_source_market()

            if source_market == self.__selling_exchange.related_market(): #Selling Market Second Balance Fetch Operation

                free_balance = helper.find_free_balance(data)
                first_symbol_balance = free_balance.__getitem__(self.__buying_market_first_symbol) # how much money we already have before arbitrage operation in selling exchange!

                self.__selling_exchange.set_first_balance_info(first_symbol_balance)
                debug("In Market:{:s} Current {:s} balance is {:f}".format(str(self.__selling_exchange.related_market().get_name()),
                                                                       str(self.__buying_market_first_symbol),
                                                                       float(first_symbol_balance)))
                return









            free_balance = helper.find_free_balance(data)
            second_symbol_balance = free_balance.__getitem__(self.__buying_market_second_symbol)

            if (second_symbol_balance < self.__money_to_spend):
                error(
                    "There is not sufficent amount of money({:f}{:s}) to buy {:s}".format(float(second_symbol_balance),
                                                                                          str(
                                                                                              self.__buying_market_second_symbol),
                                                                                          str(
                                                                                              self.__buying_market_first_symbol)))
                return

            balance_before_buy = free_balance.__getitem__(self.__buying_market_first_symbol)
            self.__buying_exchange.set_second_balance_info(second_symbol_balance)

            buy_symbol = self.__buying_market_first_symbol + '/' + self.__buying_market_second_symbol

            symbol_address = self.__selling_exchange.get_address()
            self.__symbol_address = symbol_address
            avarage_ask = self.__buying_exchange.get_last_ask()

            # operation_fee = self.__buying_exchange.get_fee() #TODO: save operation fee to exchange.config and fetch if from there or fetch somewhere else amk
            operation_fee = 0.32
            operation_fee = operation_fee / 100

            amount_to_buy = self.__money_to_spend / avarage_ask

            expected_bought_coins = amount_to_buy
            self.__expected_bought_coins = expected_bought_coins

            fee = expected_bought_coins * operation_fee

            self.__operation_fee = fee

            expected_bought_coins = expected_bought_coins - fee

            balance_after_buy = balance_before_buy + expected_bought_coins
            self.__balance_after_buy = balance_after_buy

            error("before buy {:f} bought{:f} fee {:f} after bought {:f}".format(float(balance_before_buy),
                                                                                 float(expected_bought_coins),
                                                                                 float(fee),
                                                                                 float(balance_after_buy)))

            # expected_received_amount = expected_bought_coins - self.__tx_fee.__getitem__(self.__buying_market_first_symbol)
            fee_key = self.__buying_market_first_symbol + '/' + self.__buying_market_second_symbol
            transaction_fee = 0
            try:
                transaction_fee = self.get_transaction_fee(self.__buying_exchange.related_market().get_name(),self.__buying_market_first_symbol)
                #transaction_fee = self.__tx_fee.__getitem__(fee_key)
            except Exception as e:
                error("transaction fee couldn't found for {:s} , {:f} will be used as default".format(str(fee_key),
                                                                                                      float(0.01)))
                transaction_fee = 0.01





            expected_received_amount = expected_bought_coins - transaction_fee  # tx_fee is transaction fee , differs from operatinal fee in market
            # buying_fee = self.__tx_fees.__getitem__(self.__buying_market_first_symbol) #Calculate fee to estimate expected receving fund

            self.__expected_received_amount = expected_received_amount

            # buy , validate , send , sell operations to give in a transaction..

            # Operations in buying exchange market in order , BUY -> VALIDATE -> SEND -> VALIDATE

            buy_market_name = self.__buying_exchange.related_market().get_name()
            sell_market_name = self.__selling_exchange.related_market().get_name()

            log_file_name = buy_market_name + "_" + sell_market_name + "_" + self.__buying_market_first_symbol

            send_op_params = {"log_file_name": log_file_name}
            self.__send_op_params = send_op_params

            if ExchangeInfo.tagable_currency(self.__buying_market_first_symbol):
                tag = self.__selling_exchange.get_tag()
                send_op_params.update({"tag": tag})

            #self.__money_to_spend = 0  # don't buy anything mak
            #BURADA KOINEKS ISE PRICE HESAPLA VER!

            if (self.__buying_exchange.related_market().get_name() == "koineks"):
                buy_op = Operation.create_limit_buy(amount_to_buy,avarage_ask,self.__buying_exchange.related_market(),buy_symbol,2,1)
            else:
                buy_op = Operation.create_market_buy(self.__money_to_spend, self.__buying_exchange.related_market(),
                                                 buy_symbol, 2, 1)
            # validate_buy = Operation.create_validate_symbol_balance(balance_after_buy,self.__buying_market_first_symbol,self.__buying_exchange.related_market(),2,1) # wait 1 secs between total 60 tries
            # send_op = Operation.create_withdraw(expected_bought_coins,self.__buying_market_first_symbol,symbol_addres,self.__buying_exchange.related_market(),2,1,send_op_params) # wait 10 secs between 1 try
            # validate_send = Operation.create_validate_symbol_balance(balance_before_buy, self.__buying_market_first_symbol, self.__selling_exchange.related_market(), 2, 1)  # wait 1 secs between total 60 tries

            # validate_receive = Operation.create_validate_symbol_balance(expected_received_amount,self.__buying_market_first_symbol,self.__selling_exchange.related_market(),5,1) # wait 100 min to receive the currency.

            # buy , validate , send , sell operations to give in a transaction..

            op_list = [buy_op]

            consumer_list = []
            consumer_list.append(self)
            Operation.add_consumers_to_transactions(op_list, consumer_list)

            self.__transaction_engine.create_and_add_transaction(op_list)


            return



        elif event.get_type() == Event.MARKET_BUY_COMMAND or event.get_type() == Event.LIMIT_BUY_COMMAND:
            error("Buy Event is recevied")
            try:
                data = event.get_data()  # bought currency data
                if (type(data) == type({})):
                    debug("Buy Operation data: type is dict val: {:s}".format(helper.convert_dict_to_str(data)))
                elif type(data) == type(""):
                    debug("Buy operation data: type is str val: {:s}".format(data))
                else:
                    debug("Buy operation data type is unknowon ant it's {:s}".format(str(type(data))))

                try:
                    price = -1
                    price = data.__getitem__('price')


                except Exception as e:
                    debug("No price data for buy operation!")

                try:
                    amount = -1
                    amount = data.__getitem__('amount')

                    if (amount >= 0 ):
                        gap = self.__expected_bought_coins - amount
                        if (gap > 0):
                            self.__balance_after_buy = self.__balance_after_buy - gap # don't try to validate that you couldn't buy.
                            self.__expected_received_amount = self.__expected_received_amount - gap # if I bought less than I wanted /could someone else stole cheap my coins before me. :/

                        self.__bought_amount = amount
                        self.__report("Bought {:f}{:s} price:{:f}".format(float(amount),  str(self.__buying_market_first_symbol),  float(price)))

                except Exception as e:
                    debug("No amount data for buy operation!")



                validate_buy = Operation.create_validate_symbol_balance(self.__balance_after_buy,
                                                                        self.__buying_market_first_symbol,
                                                                        self.__buying_exchange.related_market(), 5,
                                                                        6)  # wait 1 secs between total 60 tries

                validate_buy.add_consumer(self)
                operation_list = []
                operation_list.append(validate_buy)

                self.__transaction_engine.create_and_add_transaction(operation_list)

            except Exception as e:
                error("Error during reporting market buy command {:s}".format(str(e)))

        elif event.get_type() == Event.WITHDRAW_COMMAND:
            debug("Currency Send Event is recevied")
            debug("result is {:s}".format(str(event.get_data())))
            data = event.get_data()
            if (type(data) == dict):
                amount = data.__getitem__('amount')
            else:
                amount = event.get_data()  # bought currency data

            if (type(amount) is not float):
                try:
                    error("Probably we got error , cos withdraw result is {:s}".format(str(event.get_data())))
                    android_alarm("FATAL ERROR NEED ATTENTION , ARBITRAGE COULDN'T COMPLETE , DO IT YOURSELF : {:s} -> {:s} : {:s} : {:f}".format(str(self.__buying_exchange.related_market().get_name()),
                                                                                                                                                      str(self.__selling_exchange.related_market().get_name()),
                                                                                                                                                      str(self.__buying_market_first_symbol),
                                                                                                                                                      float(self.__expected_received_amount)))
                    android_alarm("WITHDRAW_COMMAND RESULT : {:s}".format(str(data)))
                except Exception as e:
                    error("WTF MAN!: {:s}".format(str(e)))
                    exit(-3131)



            total_second_symbol_balance = self.__selling_exchange.get_first_symbol_balance_info() + self.__expected_received_amount
            validate_receive = Operation.create_validate_symbol_balance(total_second_symbol_balance,
                                                                        self.__buying_market_first_symbol,
                                                                        self.__selling_exchange.related_market(), 150,
                                                                        2)  #  wait 100 min to receive the currency.
            validate_receive.add_consumer(self)
            operation_list = []
            operation_list.append(validate_receive)
            self.__transaction_engine.create_and_add_transaction(operation_list)

            android_alarm("Sent {:f}{:s} ".format(float(amount), str(self.__buying_market_first_symbol)))



        elif event.get_type() == Event.VALIDATE_SYMBOL_BALANCE:
            data = event.get_data()
            error("validate symbol balance data is received")
            print("data is {:s}".format(str(data)))
            source_market = event.get_source_market()

            if (data == "RESULT_FAIL"):
                if source_market == self.__buying_exchange.related_market():  # Validate buy Operation
                    error("Couldn't validate bought coins !")
                    return
                else:
                    error("Coins bought and sent but couldn't received in given time!")
                    android_alarm("Coins in the air!! {:s}".format(self.__log_file_path))
            if source_market == self.__buying_exchange.related_market(): #Validate buy Operation

                if (str(data).upper().__contains__('TRUE') is not True):
                    error("validate balance for is failed")
                    return

                #SEEENNNDDD
                send_op = Operation.create_withdraw(self.__bought_amount, self.__buying_market_first_symbol,
                                                    self.__symbol_address, self.__buying_exchange.related_market(), 2,
                                                    1,
                                                    self.__send_op_params)  #  wait 10 secs between 1 try
                send_op.add_consumer(self)
                operation_list = []
                operation_list.append(send_op)

                self.__transaction_engine.create_and_add_transaction(operation_list)


            elif source_market == self.__selling_exchange.related_market(): #Validate Receive Operation

                #BURADA_KOINEKSSE PRICE_HESAPLA_VER

                if (source_market.get_name() == "koineks"):
                    raise Exception("Not implemented yet")


                sell_op = Operation.create_market_sell(self.__expected_received_amount,
                                                       self.__selling_exchange.related_market(),
                                                       self.__selling_exchange.get_symbol(), 10, 1)

                sell_op.add_consumer(self)

                operation_list = []
                operation_list.append(sell_op)
                self.__transaction_engine.create_and_add_transaction(operation_list)

        return

    def save_arbitrage_info(self, buying_exchange: ExchangeInfo, selling_exchange: ExchangeInfo, expected_rate: float,
                            money_to_spend: float, log_file_path: str, tx_fee: dict):
        print("doing arbitrage now")
        self.__buying_exchange = buying_exchange
        self.__selling_exchange = selling_exchange
        self.__expected_rate = expected_rate
        self.__money_to_spend = money_to_spend
        self.__log_file_path = log_file_path
        self.__tx_fee = tx_fee
        # cex.privatePostGetAddress(cex.extend({'currency':'XRP'}))
        self.__buying_market_first_symbol = self.__buying_exchange.get_first_symbol()
        self.__buying_market_second_symbol = self.__buying_exchange.get_second_symbol()
        print("buying market fs: {:s}".format(str(self.__buying_market_first_symbol)))
        print("buying market ss {:s}".format(str(self.__buying_market_second_symbol)))

    def start_arbitrage(self):
        if (self.__money_to_spend >= 100):
            error("Invest Money Guard is HEERREEE , money to wish to spend {:f}".format(float(self.__money_to_spend)))

        operation_list = []
        get_first_symbol_balance_op = Operation.create_check_balance(self.__buying_exchange.related_market())
        get_second_symbol_balance_op = Operation.create_check_balance(self.__selling_exchange.related_market())

        get_first_symbol_balance_op.add_consumer(self)
        get_second_symbol_balance_op.add_consumer(self)

        operation_list.append(get_first_symbol_balance_op)
        operation_list.append(get_second_symbol_balance_op)
        debug("arbitrage is started")
        self.__transaction_engine.create_and_add_transaction(operation_list)
        self.__transaction_engine.start_engine()

In the above code, we have Arbitrage class which is a actully consumer. Because we create operations in this class but asks this operations to be performed by another class which will be our Producer class. (Transaction Engine). Transaction engine is the class which is responsible for operating tasks and notify its consumer’s via consumer’s overrided handle_event method. So let’s have a look at our TransactionEngine.py code.

#Author Ozan Ozenoglu ozan.ozenoglu@gmail.com
#File Created at 08.11.2017

'''
Scope of Module:
This module has functions to create/add/work/delete transaction.
'''

from base.crypto_engine.utils import helper as helper
from base.crypto_engine.engine.event import *
from base.crypto_engine.setting_db.request_info import RequestType,RequestInfo
from builtins import float
from random import randint
#due to dependcy of debug to time , impprt time module later than debug module
from base.crypto_engine.MessageApi.debug import *
from time import sleep, time
from base.crypto_engine.symbols import *

import json, requests

MIN_RANDOM_RANGE = 0
MAX_RANDOM_RANGE = 99999999
TRANSACTION_LIFE = 600 * 10 #10min min


class Operation(Producer):

    def dump_operation(self):
        debug("Type:{:s} symbol:{:s} amount:{:f} market{:s}".format(str(self.__operation_type),str(self.__symbol),float(self.__amount),str(self.__market.id)))

    def __init__(self,market , operation_type:str,max_try:int,max_wait:int,params={}):
        super().__init__() # call parent constructor

        self.__amount = 0
        self.__key = ""
        self.__is_completed = False
        self.__address = ""
        self.__limit_price = 0
        self.__operation_type = operation_type
        self.__try_count = 0
        self.__max_try_count = max_try
        self.__wait_time_between_try = max_wait
        self.m_market = market # class Market instance
        self.__market = market.get_market()
        self.__symbol = None
        self.__params = params
        debug("Operation {:s} is created".format(operation_type))
    def get_type(self):
        return self.__operation_type

    def get_completed(self):
        return self.__is_completed
    def set_completed(self,val:bool):
        self.__is_completed = val


    @staticmethod
    def add_consumers_to_transactions(operation_list,consumer_list):
        for operation in operation_list:
            for consumer in consumer_list:
                operation.add_consumer(consumer)


    @staticmethod
    def create_fetch_market_price(market,symbol:str,max_try=3,wait=10):
        operation = Operation(market,Event.FETCH_MARKET_PRICE,max_try,wait)
        operation.__symbol = symbol
        return operation
    @staticmethod
    def create_check_balance(market ,max_try=3,wait=60):
        operation = Operation(market,Event.CHECK_BALANCE_COMMAND,max_try,wait)
        return operation
    @staticmethod
    def create_market_buy(amount:float,market ,symbol:str,max_try=3,wait=60):
        operation = Operation(market,Event.MARKET_BUY_COMMAND,max_try,wait)
        operation.__symbol = symbol
        operation.__amount = amount
        return operation
    @staticmethod
    def create_limit_buy(amount:float,price:float,market,symbol:str,max_try=3,wait=60):
        operation = Operation(market,Event.LIMIT_BUY_COMMAND,max_try,wait)
        operation.__symbol = symbol
        operation.__amount = amount
        operation.__limit_price = price
        return operation
    @staticmethod
    def create_market_sell(amount:float,market ,symbol:str,max_try=3,wait=60):
        operation = Operation(market,Event.MARKET_SELL_COMMAND,max_try,wait)
        operation.__symbol = symbol
        operation.__amount = amount
        return operation
    @staticmethod
    def create_limit_sell(amount:float,price:float,market ,symbol:str,max_try=3,wait=60):
        operation = Operation(market,Event.LIMIT_SELL_COMMAND,max_try,wait)
        operation.__symbol = symbol
        operation.__amount = amount
        operation.__limit_price = price
        return operation
    @staticmethod
    def create_withdraw(amount:float,symbol:str,address:str,market ,max_try=3,wait=60,params={}):
        operation = Operation(market,Event.WITHDRAW_COMMAND,max_try,wait)
        operation.__amount = amount
        operation.__address = address
        operation.__symbol = symbol
        operation.__params = params
        return operation

    @staticmethod
    def create_kraken_withdraw(amount:float,symbol:str,address:str,market ,key:str,max_try=1,wait=2):
        operation = Operation(market,Event.WITHDRAW_COMMAND,max_try,wait)
        operation.__amount = amount
        operation.__address = address
        operation.__symbol = symbol
        operation.__key = key
        return operation

    @staticmethod
    def create_validate_symbol_balance(amount:float,symbol:str,market ,max_try=3,wait=60):
        operation = Operation(market,Event.VALIDATE_SYMBOL_BALANCE,max_try,wait)
        operation.__amount = amount
        operation.__symbol = symbol
        return operation

    @staticmethod
    def create_fetch_order_books(market ,symbol:str,max_try=3,wait=10,online=False): #if online is false then check from ob_service database instead of directly market itself.
        event = Event.FETCH_ORDER_BOOK if online else Event.FETCH_ORDER_BOOK_FROM_DB
        operation = Operation(market, event, max_try, wait)
        operation.__symbol = symbol
        return operation
    
    def get_orderbook_from_db(self, market_name,symbol): #symbol should be like BTC/USD
        data = {'event': 'get_order_book', 'data': None, 'private_key': 'osman_is_my_girl'}
        data.update({'data': {"market_name":market_name, "symbol_pair":symbol} }) #sample symbol should be "BTC/USD"
        str_data = json.dumps(data)
        ret = requests.get("http://ozenoglu.com:8000/api_call", data=str_data)
        ret =  json.loads(ret.content.decode())
        return ret

    def get_result(self):

        if self.__operation_type == Event.WITHDRAW_COMMAND:
            debug("Operation {:s} is started address{:s}".format(self.__operation_type,str(self.__address)))
            if self.__key != "":
                result = self.__market.withdraw(self.__symbol,self.__amount,self.__address,{'key':self.__key}) #if there is a need for key use this
            else:
                result = self.__market.withdraw(self.__symbol,self.__amount,self.__address,self.__params)

            debug("Operation result is {:s} Withdraw Amount{:f}".format(str(result),float(self.__amount)))
            return result
        
        if self.__operation_type == Event.FETCH_ORDER_BOOK_FROM_DB:
            result = None
            debug("Operation {:s} is started".format(self.__operation_type))
            try:
                result = self.get_orderbook_from_db(self.m_market.get_name(),self.__symbol)                
            except Exception as e:
                error("Error during fetching order book from db ex: {:s}".format(str(e)))
                raise e
            return result
                
        if self.__operation_type == Event.FETCH_ORDER_BOOK:
            result = None
            debug("Operation {:s} is started".format(self.__operation_type))
            try:
                result = self.__market.fetch_order_book(self.__symbol)
            except Exception as e :
                warn("Exception occured during fetching market price {:s}".format(str(e)))
                try:
                    if self.m_market.get_name() == "koineks":
                        currency = str(self.__symbol).split('/')[0]
                        fetch_request = RequestInfo("koineks", RequestType.FETCH_FOR_HELP_URGENT, currency)
                        fetch_request.set_log_file_name(currency + "_fetch_urgent_request")
                        fetch_request.push_request()
                        sleep(8)  # wait 8 secs before try again.
                        result = self.__market.fetch_order_book(self.__symbol)
                    else:
                        error("Error during fetching {:s} order book from {:s} error:{:s}".format(str(self.__symbol),self.m_market.get_name(),str(e)))
                except Exception as e:
                    error("Error during handling koineks special operation for fetching order book")
                    raise e
            #debug("Operation result is {:s}".format(result))
            return result


        if self.__operation_type == Event.LIMIT_SELL_COMMAND:
            debug("Operation {:s} is started".format(self.__operation_type))
            result = self.__market.create_limit_sell_order(self.__symbol,self.__amount,self.__limit_price)
            debug("Operation result is {:s}".format(result))
            return result
        if self.__operation_type == Event.MARKET_SELL_COMMAND:
            debug("Operation {:s} is started".format(self.__operation_type))
            result = self.__market.create_market_sell_order(self.__symbol,self.__amount)
            debug("Operation result is {:s}".format(str(result)))
            return result
        if self.__operation_type == Event.LIMIT_BUY_COMMAND:
            debug("Operation {:s} is started".format(self.__operation_type))
            result = self.__market.create_limit_buy_order(self.__symbol,self.__amount,self.__limit_price)
            debug("Operation result is {:s}".format(str(result)))
            return result
        if self.__operation_type == Event.MARKET_BUY_COMMAND:
            debug("Operation {:s} is started".format(self.__operation_type))
            result = self.__market.create_market_buy_order(self.__symbol,self.__amount)

            debug("Operation result is {:s}".format(str(result)))

            return result
        if self.__operation_type == Event.CHECK_BALANCE_COMMAND:
            debug("Operation {:s} is started".format(self.__operation_type))
            result = self.__market.fetch_balance()
            return result
        if self.__operation_type == Event.FETCH_MARKET_PRICE:
            debug("Operation {:s} is started".format(self.__operation_type))
            result = self.__market.fetch_ticker(self.__symbol)
            debug("Operation result is {:s}".format(str(result)))
            return result

        if self.__operation_type == Event.VALIDATE_SYMBOL_BALANCE:
            debug("Operation {:s} is started".format(self.__operation_type))

            data = self.__market.fetch_balance()
            symbol_fbalance = helper.find_symbol_fbalance(data,self.__symbol)
            debug("Free Symbol {:s} balance is {:f}".format(str(self.__symbol),float(symbol_fbalance)))
            try:
                self.__amount = round(self.__amount,4)
                self.__amount = helper.get_thousandth_lower_price(self.__amount)
                if symbol_fbalance >= self.__amount:
                    debug("There is  enough balance of {:s} balance is:{:f} expecting is:{:f}".format(str(self.__symbol),float(symbol_fbalance), float(self.__amount)))
                    return True
                else:
                    if (self.__try_count < 4) : # maximum 4 times  , decreased it!
                        self.__amount = helper.get_thousandth_lower_price(self.__amount)
                    debug("There is not enough balance of {:s} balance is:{:f} new expecting is:{:f}".format(str(self.__symbol),float(symbol_fbalance),float(self.__amount)))
                    return False
            except Exception as e:
                error("Error during Operation {:s}".format(str(self.__operation_type)))
                dump_trace()
                return False
        return True

    def operate(self):
        debug(" operation {:s} will be try to execute".format(self.__operation_type))
        if self.get_completed()== True:
            error("This is already operated operation ") #TODO Find a better way not to trigger already finished operation
            return True

        while(self.__try_count < self.__max_try_count ):
            self.__try_count += 1
            is_exception_occured = False
            result = None
            try:
                result = self.get_result()

            except Exception as e:
                error("Unhalded exception is {:s}".format(str(e)))
                is_exception_occured = True
                self.dump_operation()


            if ( result == None or is_exception_occured  ):

                debug("operation is failed ,system will wait before trying again for {:d}".format(self.__try_count))
                sleep(self.__wait_time_between_try)

            else:
                self.set_completed(True)
                consumer_list = self.get_consumer_list()
                debug ("count of registered consumers to notified: {:d}".format(len(consumer_list)))
                debug("CONSUMERS")
                debug(consumer_list)

                event = Event(self,self.m_market, self.__operation_type,self.__symbol, result)
                self.notify_consumers(event)

                del self
                return result
        error("Max try number {:d} is reached operation is aborting..".format(self.__max_try_count))
        event = Event(self, self.m_market, self.__operation_type, self.__symbol, Event.RESULT_FAIL)
        self.notify_consumers(event)
        return None




class Transaction(): #internal class
    def __init__(self, operations):
        self.__operation_list = operations
        self.__is_completed = False # if the transaction is completed make it True
        self.__is_error = False # if there is an error during transaction make it True
        self.__error_message = "" # if there is an error during tansaction store the error message here.
        self.__transaction_id = randint(MIN_RANDOM_RANGE,MAX_RANDOM_RANGE) # create a random integer betwen renges.
        self.__created_time = time()
        self.__cretead_time_str = str(datetime.now())
        self.__completed_time = 0
        self.__completed_time_str = ""
        self.__is_experied = False # Make it true if it's expired.
        self.__transaction_life = self.__created_time + TRANSACTION_LIFE
        debug("A new transaction is created with id {:d} at time {:s}".format(self.__transaction_id,self.__cretead_time_str))


    def get_operrations_list(self):
        return self.__operation_list

    def get_transaction_id(self):
        return self.__transaction_id

    def operate(self):

        debug("Transaction with id {:d} is started".format(self.__transaction_id))

        current_time = time()
        if (current_time  > self.__transaction_life): # if the transaction is too old , do not execute it. Market data may be changed.
            self.__is_experied = True
            self.__is_error = True
            self.__error_message = "Expired"
            return False
        else:
            self.__is_completed = True
            self.__completed_time_str = str (datetime.now())
            self.__completed_time = time()
            for operation in self.__operation_list:
                if (operation.operate() == None):
                    error("Transaction with id {:d} is failed:".format(self.__transaction_id))
                    return False

            return True


class TransactionEngine:
    def __init__(self):
        self.transactions = []
        self.completed_transactions = []
        self.failed_transactions = []
        self.__is_in_wait_mode = False # initial of transaction engine wait mode is false , user is expected to start first.

    def create_and_add_transaction(self, operations):
        if (type(operations) is not type ([])):
            if (type(operations) == type(Operation()) ):
                my_operations = []
                my_operations.append(operations)
                operations = my_operations
            else:
                raise Exception("Un supported format for operation list , type {:s}".format(str(type(operations))))


        new_transaction = Transaction(operations)
        self.transactions.append(new_transaction)
        debug("New transaction with id {:d} is created".format(new_transaction.get_transaction_id()))
        if (self.__is_in_wait_mode == True): # if it's in wait mode start engine.
            self.start_engine()

    def __get_next_transaction(self):
        if (len(self.transactions) > 0): # bugfix: out of index exception
            return self.transactions[0] # return first transaction in the list
        else:
            return None

    def __remove_transaction(self,transaction_to_delete:Transaction):
        if (len(self.transactions) >= 1):
            self.transactions.remove(transaction_to_delete)
            debug("Transaction with id {:d} is deleted".format(transaction_to_delete.get_transaction_id()))
        else:
            error("Tried to remove transaction but it's already empty!")

    def start_engine(self):

        next_transaction = self.__get_next_transaction()
        while (next_transaction is not None):
            operation_list = next_transaction.get_operrations_list()
            size = len(operation_list)
            debug("next transaction id {:s} , operations list len {:d} ".format(str(next_transaction.get_transaction_id()),size))
            if size >= 1:
                index = 0
                for operation in operation_list:
                    type = operation.get_type()
                    debug("Operation{:d} type {:s}".format(int(index),str(type)))
                    operation.dump_operation()
                    index = index + 1
            result = next_transaction.operate()
            if (result == True):
                try:
                    self.completed_transactions.append(next_transaction)
                    self.__remove_transaction(next_transaction)
                    next_transaction = self.__get_next_transaction()
                except Exception as e:
                    error("ALL TRANSACTIONS ABORTED , Exception is occured {:s}".format(str(e)))
                    self.transactions.clear() # abort all transactions
                    return
            else:
                error("Transaction Engine couldn't finish Transaction with id: {:d}".format(next_transaction.get_transaction_id()))
                self.failed_transactions.append(next_transaction)
                self.__remove_transaction(next_transaction)
                next_transaction = None
                return False
        self.__is_in_wait_mode = True # wait for until next transaction is created.
        debug("Transaction engine is in wait mode date: {:s}".format(str(datetime.now())))









#print(okcoinusd().fetch_ticker(BTC_USD))

'''
transaction_engine = TransactionEngine()
market  = okcoinusd()
fetch_price_operation = Operation.create_fetch_market_price(market, BTC_USD)
fetch_price_operation.add_consumer(self)
operation_list = []
operation_list.append(fetch_price_operation)
transaction_engine.create_and_add_transaction(operation_list)
transaction_engine.start_engine()
'''

In the above code, we have a method called operete. This method is operating the statically operated Operation instances and then perform a notify job for registered/Subscribed consumers..

Any comment and feedbacks are welcome.

Ozan Özenoğlu..

Leave a Reply

Your email address will not be published. Required fields are marked *