Hello folks, as many of you already know there is a super behavioral pattern called observer pattern. Some times it’s called producer/consumer pattern also.
So I’ve implemented event based consumer producer code for the back end of Arbitrage Hero Project.
So basically there are 3 elements in this pattern, one of them observable class, which I called producer class in my project. Second Observer itself which I called consumer my class and last and third but not least , the message which is propagated to oberserver instances.
So basically, we propagate event instances to the observer which is created by observable. What is this message(event)? let’s look at the code first.
So if you look at the below code, you can see three different classes in one python file. We can check in __init__ method of classes if child classes have a defined attribute and this attribute is actually a runnable method or not. I’ve used this technique in my IWatchableService and IWatcherWatchable Service class which you can look at here:
So we have three classes named Event which will be our message to be passed, Consumer the observer , and Producer which is the Observable.
#Abstract class of Producer
from base.crypto_engine.MessageApi.debug import *
class Event:
''''''
LIMIT_BUY_COMMAND = "LIMIT_BUY"
LIMIT_SELL_COMMAND = "LIMIT_SELL"
MARKET_BUY_COMMAND = "MARKET_BUY"
MARKET_SELL_COMMAND = "MARKET_SELL"
WITHDRAW_COMMAND = "WITHDRAW_COMMAND"
CHECK_BALANCE_COMMAND = "CHECK_BALANCE"
FETCH_MARKET_PRICE = "FETCH_MARKET_PRICE"
VALIDATE_SYMBOL_BALANCE = "VALIDATE_SYMBOL_BALANCE"
FETCH_ORDER_BOOK = "FETCH_ORDER_BOOK"
FETCH_SYMBOL_BALANCE = "FETCH_SYMBOL_BALANCE"
RESULT_FAIL = "RESULT_FAIL"
FETCH_ORDER_BOOK_FROM_DB = "FETCH_ORDER_BOOK_FROM_DB"
def __init__(self,sender,source_market,event_type:str,operation_symbol:str,data):
self.__data = data
self.__sender = sender
self.__type = event_type
self.__source_market = source_market
self.__operation_symbol = operation_symbol
def get_type(self):
return self.__type
def get_source_market(self):
return self.__source_market
def get_sender(self):
return self.__sender
def get_data(self):
return self.__data
def get_symbol(self):
return self.__operation_symbol
class Consumer:
def to_string(self):
return str(type(self))
def handle_event(self,event:Event):
print("Event is RECEIVED in class {:s}".format(str(type(self))))
print(event.get_data())
class Producer:
def __init__(self):
self.__consumer_list = []
def notify_consumers(self,event:Event):
consumer_list = self.__consumer_list
for consumer in consumer_list:
consumer.handle_event(event)
def add_consumer(self,consumer):
debug("consumer is adding")
debug(consumer.to_string())
if (self.__consumer_list.__contains__(consumer) != True):
self.__consumer_list.append(consumer)
else:
error("Consumer is already added")
#def operate(self):
def get_consumer_list(self):
return self.__consumer_list
So , how we use this classes in the program? Simple..
from time import sleep
import json
import datetime
import base.crypto_engine.utils.helper as helper
from base.crypto_engine.setting_db.exchange_info import ExchangeInfo
from base.crypto_engine.symbols import *
from base.crypto_engine.utils.config_manager import Config
from base.crypto_engine.engine.event import Consumer, Event
from base.crypto_engine.MessageApi.debug import *
from base.crypto_engine.engine.transaction_engine import Operation, TransactionEngine
class Arbitrage(Consumer):
def __init__(self):
debug("Arbitrage is initalised")
self.__transaction_engine = TransactionEngine()
self.__buying_exchange = None
self.__selling_exchange = None
self.__avaiable_balance = 0
self.__buying_market_first_symbol = None
self.__buying_market_second_symbol = None
self.__min_second_symbol_balance = 40
self.__expected_rate = 0
self.__bought_amount = 0
self.__money_to_spend = 0
self.__log_file_path = ""
self.__tx_fee = None
self.__operation_fee = 0
self.__arbitrage_start_time = 0
self.__balance_after_buy = 0
self.__expected_received_amount = 0
self.__expected_bought_coins = 0
self.__symbol_address = None
self.__send_op_params = None
'''
#if you would like to implement one day , a limit buy/sell arbitrage take a look this previous coded samples.
last_ask = self.__buying_exchange.get_last_price() # we set minumum last ask to last_price for buying exchange in op_finder.
avarage_market_price = helper.get_twothousandth_higher_price(last_ask)
amount_to_buy = second_symbol_balance / avarage_market_price
amount_to_buy = amount_to_buy - (amount_to_buy/100) # buy a bit less than we afford.
buy_op = Operation.create_limit_buy(amount_to_buy,avarage_market_price,self.__buying_exchange.related_market(),buy_symbol,5,2)
#Kraken sample
if (self.__buying_exchange.get_market().id == "kraken"):
if (self.__buying_market_first_symbol == ETH):
send_op = Operation.create_kraken_withdraw(amount_to_buy,self.__buying_market_first_symbol,symbol_addres,self.__buying_exchange.related_market(),"cex-eth",20,3) # wait 10 secs between 1 try
elif (self.__buying_market_first_symbol == XRP):
send_op = Operation.create_kraken_withdraw(amount_to_buy, self.__buying_market_first_symbol,
symbol_addres, self.__buying_exchange.related_market(),
"cex-xrp", 20, 3) # wait 10 secs between 1 try
else:
error("Unsupported currency{:s} for kraken ".format(str(self.__buying_market_first_symbol)))
return
else:
'''
def get_transaction_fee(self,market:str,currency:str): # TODO fetch it from config file .
transaction_fee = 0
if (market == "koineks"): # set transaction fee for koineks here.
if (currency == "BTC"):
transaction_fee = 0.0007
elif (currency== "ETH"):
transaction_fee = 0.005
elif (currency == "LTC"):
transaction_fee = 0.01
elif (currency == "DASH"):
transaction_fee = 0.002
elif (currency == "DOGE"):
transaction_fee = 2
elif (currency == "XRP"):
transaction_fee = 1
elif (currency== "XLM"):
transaction_fee = 0.3
elif (currency== "XEM"):
transaction_fee = 1
elif (currency== "BCH"):
transaction_fee = 0.001
elif (currency== "BTG"):
transaction_fee = 0.001
else:
raise Exception("Unsupported currency name for koineks : {:s}".format(str(currency)))
elif market == "btcturk":
transaction_fee = 0 # 0 for now.
else:
raise Exception("Unsupported Market {:s}".format(str(market)))
return float(transaction_fee)
def __report(self, msg):
try:
android_alarm(msg)
user_feedback(msg)
except Exception as e:
error("Error during android_alarm and user_feedback")
with open(self.__log_file_path, 'wb') as outputfile:
byte_array = bytearray(json.dumps(msg), 'utf8')
outputfile.write(byte_array)
def handle_event(self, event: Event): # WE owerride the parent's method
if event.get_type() == Event.MARKET_SELL_COMMAND:
data = event.get_data()
data_as_str = ""
try:
data_as_str = str(data)
self.__report(data_as_str)
except Exception as e:
try:
error("MARKET SELL COMMAND Result could not converted to string! it's type {:s}".format(
str(type(data))))
except Exception as e:
error("During handling exception another exception occured it's :{:s}".format(str(e)))
started_time = self.__arbitrage_start_time.split(':')
started_min = float(started_time[1])
started_sec = float(started_time[2])
end = str(datetime.now())
end_time = end.split(':')
end_min = float(end_time[1])
end_sec = float(end_time[2])
total_secs = (end_min - started_min) * 60 + (end_sec - started_sec)
secs = total_secs % 60
mins = int(total_secs / 60)
arbitrage_report_msg = "Arbitrage is done Invest Money {:f}{:s} from {:s} to {:s} using {:s} Buying Price:{:f} S{:s} spent time {:d}:{:d}".format(
float(self.__money_to_spend),
str(self.__buying_market_second_symbol),
str(self.__buying_exchange.get_market().id),
str(self.__selling_exchange.get_market().id),
str(self.__buying_market_second_symbol),
float(self.__buying_exchange.get_last_ask()),
str(self.__buying_market_first_symbol),
int(mins),
int(secs))
self.__report(arbitrage_report_msg)
error("Arbitrage is finished byy")
exit(31)
elif event.get_type() == Event.CHECK_BALANCE_COMMAND:
error("Balance check event is recevied")
self.__arbitrage_start_time = str(datetime.now())
data = event.get_data()
source_market = event.get_source_market()
if source_market == self.__selling_exchange.related_market(): #Selling Market Second Balance Fetch Operation
free_balance = helper.find_free_balance(data)
first_symbol_balance = free_balance.__getitem__(self.__buying_market_first_symbol) # how much money we already have before arbitrage operation in selling exchange!
self.__selling_exchange.set_first_balance_info(first_symbol_balance)
debug("In Market:{:s} Current {:s} balance is {:f}".format(str(self.__selling_exchange.related_market().get_name()),
str(self.__buying_market_first_symbol),
float(first_symbol_balance)))
return
free_balance = helper.find_free_balance(data)
second_symbol_balance = free_balance.__getitem__(self.__buying_market_second_symbol)
if (second_symbol_balance < self.__money_to_spend):
error(
"There is not sufficent amount of money({:f}{:s}) to buy {:s}".format(float(second_symbol_balance),
str(
self.__buying_market_second_symbol),
str(
self.__buying_market_first_symbol)))
return
balance_before_buy = free_balance.__getitem__(self.__buying_market_first_symbol)
self.__buying_exchange.set_second_balance_info(second_symbol_balance)
buy_symbol = self.__buying_market_first_symbol + '/' + self.__buying_market_second_symbol
symbol_address = self.__selling_exchange.get_address()
self.__symbol_address = symbol_address
avarage_ask = self.__buying_exchange.get_last_ask()
# operation_fee = self.__buying_exchange.get_fee() #TODO: save operation fee to exchange.config and fetch if from there or fetch somewhere else amk
operation_fee = 0.32
operation_fee = operation_fee / 100
amount_to_buy = self.__money_to_spend / avarage_ask
expected_bought_coins = amount_to_buy
self.__expected_bought_coins = expected_bought_coins
fee = expected_bought_coins * operation_fee
self.__operation_fee = fee
expected_bought_coins = expected_bought_coins - fee
balance_after_buy = balance_before_buy + expected_bought_coins
self.__balance_after_buy = balance_after_buy
error("before buy {:f} bought{:f} fee {:f} after bought {:f}".format(float(balance_before_buy),
float(expected_bought_coins),
float(fee),
float(balance_after_buy)))
# expected_received_amount = expected_bought_coins - self.__tx_fee.__getitem__(self.__buying_market_first_symbol)
fee_key = self.__buying_market_first_symbol + '/' + self.__buying_market_second_symbol
transaction_fee = 0
try:
transaction_fee = self.get_transaction_fee(self.__buying_exchange.related_market().get_name(),self.__buying_market_first_symbol)
#transaction_fee = self.__tx_fee.__getitem__(fee_key)
except Exception as e:
error("transaction fee couldn't found for {:s} , {:f} will be used as default".format(str(fee_key),
float(0.01)))
transaction_fee = 0.01
expected_received_amount = expected_bought_coins - transaction_fee # tx_fee is transaction fee , differs from operatinal fee in market
# buying_fee = self.__tx_fees.__getitem__(self.__buying_market_first_symbol) #Calculate fee to estimate expected receving fund
self.__expected_received_amount = expected_received_amount
# buy , validate , send , sell operations to give in a transaction..
# Operations in buying exchange market in order , BUY -> VALIDATE -> SEND -> VALIDATE
buy_market_name = self.__buying_exchange.related_market().get_name()
sell_market_name = self.__selling_exchange.related_market().get_name()
log_file_name = buy_market_name + "_" + sell_market_name + "_" + self.__buying_market_first_symbol
send_op_params = {"log_file_name": log_file_name}
self.__send_op_params = send_op_params
if ExchangeInfo.tagable_currency(self.__buying_market_first_symbol):
tag = self.__selling_exchange.get_tag()
send_op_params.update({"tag": tag})
#self.__money_to_spend = 0 # don't buy anything mak
#BURADA KOINEKS ISE PRICE HESAPLA VER!
if (self.__buying_exchange.related_market().get_name() == "koineks"):
buy_op = Operation.create_limit_buy(amount_to_buy,avarage_ask,self.__buying_exchange.related_market(),buy_symbol,2,1)
else:
buy_op = Operation.create_market_buy(self.__money_to_spend, self.__buying_exchange.related_market(),
buy_symbol, 2, 1)
# validate_buy = Operation.create_validate_symbol_balance(balance_after_buy,self.__buying_market_first_symbol,self.__buying_exchange.related_market(),2,1) # wait 1 secs between total 60 tries
# send_op = Operation.create_withdraw(expected_bought_coins,self.__buying_market_first_symbol,symbol_addres,self.__buying_exchange.related_market(),2,1,send_op_params) # wait 10 secs between 1 try
# validate_send = Operation.create_validate_symbol_balance(balance_before_buy, self.__buying_market_first_symbol, self.__selling_exchange.related_market(), 2, 1) # wait 1 secs between total 60 tries
# validate_receive = Operation.create_validate_symbol_balance(expected_received_amount,self.__buying_market_first_symbol,self.__selling_exchange.related_market(),5,1) # wait 100 min to receive the currency.
# buy , validate , send , sell operations to give in a transaction..
op_list = [buy_op]
consumer_list = []
consumer_list.append(self)
Operation.add_consumers_to_transactions(op_list, consumer_list)
self.__transaction_engine.create_and_add_transaction(op_list)
return
elif event.get_type() == Event.MARKET_BUY_COMMAND or event.get_type() == Event.LIMIT_BUY_COMMAND:
error("Buy Event is recevied")
try:
data = event.get_data() # bought currency data
if (type(data) == type({})):
debug("Buy Operation data: type is dict val: {:s}".format(helper.convert_dict_to_str(data)))
elif type(data) == type(""):
debug("Buy operation data: type is str val: {:s}".format(data))
else:
debug("Buy operation data type is unknowon ant it's {:s}".format(str(type(data))))
try:
price = -1
price = data.__getitem__('price')
except Exception as e:
debug("No price data for buy operation!")
try:
amount = -1
amount = data.__getitem__('amount')
if (amount >= 0 ):
gap = self.__expected_bought_coins - amount
if (gap > 0):
self.__balance_after_buy = self.__balance_after_buy - gap # don't try to validate that you couldn't buy.
self.__expected_received_amount = self.__expected_received_amount - gap # if I bought less than I wanted /could someone else stole cheap my coins before me. :/
self.__bought_amount = amount
self.__report("Bought {:f}{:s} price:{:f}".format(float(amount), str(self.__buying_market_first_symbol), float(price)))
except Exception as e:
debug("No amount data for buy operation!")
validate_buy = Operation.create_validate_symbol_balance(self.__balance_after_buy,
self.__buying_market_first_symbol,
self.__buying_exchange.related_market(), 5,
6) # wait 1 secs between total 60 tries
validate_buy.add_consumer(self)
operation_list = []
operation_list.append(validate_buy)
self.__transaction_engine.create_and_add_transaction(operation_list)
except Exception as e:
error("Error during reporting market buy command {:s}".format(str(e)))
elif event.get_type() == Event.WITHDRAW_COMMAND:
debug("Currency Send Event is recevied")
debug("result is {:s}".format(str(event.get_data())))
data = event.get_data()
if (type(data) == dict):
amount = data.__getitem__('amount')
else:
amount = event.get_data() # bought currency data
if (type(amount) is not float):
try:
error("Probably we got error , cos withdraw result is {:s}".format(str(event.get_data())))
android_alarm("FATAL ERROR NEED ATTENTION , ARBITRAGE COULDN'T COMPLETE , DO IT YOURSELF : {:s} -> {:s} : {:s} : {:f}".format(str(self.__buying_exchange.related_market().get_name()),
str(self.__selling_exchange.related_market().get_name()),
str(self.__buying_market_first_symbol),
float(self.__expected_received_amount)))
android_alarm("WITHDRAW_COMMAND RESULT : {:s}".format(str(data)))
except Exception as e:
error("WTF MAN!: {:s}".format(str(e)))
exit(-3131)
total_second_symbol_balance = self.__selling_exchange.get_first_symbol_balance_info() + self.__expected_received_amount
validate_receive = Operation.create_validate_symbol_balance(total_second_symbol_balance,
self.__buying_market_first_symbol,
self.__selling_exchange.related_market(), 150,
2) # wait 100 min to receive the currency.
validate_receive.add_consumer(self)
operation_list = []
operation_list.append(validate_receive)
self.__transaction_engine.create_and_add_transaction(operation_list)
android_alarm("Sent {:f}{:s} ".format(float(amount), str(self.__buying_market_first_symbol)))
elif event.get_type() == Event.VALIDATE_SYMBOL_BALANCE:
data = event.get_data()
error("validate symbol balance data is received")
print("data is {:s}".format(str(data)))
source_market = event.get_source_market()
if (data == "RESULT_FAIL"):
if source_market == self.__buying_exchange.related_market(): # Validate buy Operation
error("Couldn't validate bought coins !")
return
else:
error("Coins bought and sent but couldn't received in given time!")
android_alarm("Coins in the air!! {:s}".format(self.__log_file_path))
if source_market == self.__buying_exchange.related_market(): #Validate buy Operation
if (str(data).upper().__contains__('TRUE') is not True):
error("validate balance for is failed")
return
#SEEENNNDDD
send_op = Operation.create_withdraw(self.__bought_amount, self.__buying_market_first_symbol,
self.__symbol_address, self.__buying_exchange.related_market(), 2,
1,
self.__send_op_params) # wait 10 secs between 1 try
send_op.add_consumer(self)
operation_list = []
operation_list.append(send_op)
self.__transaction_engine.create_and_add_transaction(operation_list)
elif source_market == self.__selling_exchange.related_market(): #Validate Receive Operation
#BURADA_KOINEKSSE PRICE_HESAPLA_VER
if (source_market.get_name() == "koineks"):
raise Exception("Not implemented yet")
sell_op = Operation.create_market_sell(self.__expected_received_amount,
self.__selling_exchange.related_market(),
self.__selling_exchange.get_symbol(), 10, 1)
sell_op.add_consumer(self)
operation_list = []
operation_list.append(sell_op)
self.__transaction_engine.create_and_add_transaction(operation_list)
return
def save_arbitrage_info(self, buying_exchange: ExchangeInfo, selling_exchange: ExchangeInfo, expected_rate: float,
money_to_spend: float, log_file_path: str, tx_fee: dict):
print("doing arbitrage now")
self.__buying_exchange = buying_exchange
self.__selling_exchange = selling_exchange
self.__expected_rate = expected_rate
self.__money_to_spend = money_to_spend
self.__log_file_path = log_file_path
self.__tx_fee = tx_fee
# cex.privatePostGetAddress(cex.extend({'currency':'XRP'}))
self.__buying_market_first_symbol = self.__buying_exchange.get_first_symbol()
self.__buying_market_second_symbol = self.__buying_exchange.get_second_symbol()
print("buying market fs: {:s}".format(str(self.__buying_market_first_symbol)))
print("buying market ss {:s}".format(str(self.__buying_market_second_symbol)))
def start_arbitrage(self):
if (self.__money_to_spend >= 100):
error("Invest Money Guard is HEERREEE , money to wish to spend {:f}".format(float(self.__money_to_spend)))
operation_list = []
get_first_symbol_balance_op = Operation.create_check_balance(self.__buying_exchange.related_market())
get_second_symbol_balance_op = Operation.create_check_balance(self.__selling_exchange.related_market())
get_first_symbol_balance_op.add_consumer(self)
get_second_symbol_balance_op.add_consumer(self)
operation_list.append(get_first_symbol_balance_op)
operation_list.append(get_second_symbol_balance_op)
debug("arbitrage is started")
self.__transaction_engine.create_and_add_transaction(operation_list)
self.__transaction_engine.start_engine()
In the above code, we have Arbitrage class which is a actully consumer. Because we create operations in this class but asks this operations to be performed by another class which will be our Producer class. (Transaction Engine). Transaction engine is the class which is responsible for operating tasks and notify its consumer’s via consumer’s overrided handle_event method. So let’s have a look at our TransactionEngine.py code.
#Author Ozan Ozenoglu ozan.ozenoglu@gmail.com
#File Created at 08.11.2017
'''
Scope of Module:
This module has functions to create/add/work/delete transaction.
'''
from base.crypto_engine.utils import helper as helper
from base.crypto_engine.engine.event import *
from base.crypto_engine.setting_db.request_info import RequestType,RequestInfo
from builtins import float
from random import randint
#due to dependcy of debug to time , impprt time module later than debug module
from base.crypto_engine.MessageApi.debug import *
from time import sleep, time
from base.crypto_engine.symbols import *
import json, requests
MIN_RANDOM_RANGE = 0
MAX_RANDOM_RANGE = 99999999
TRANSACTION_LIFE = 600 * 10 #10min min
class Operation(Producer):
def dump_operation(self):
debug("Type:{:s} symbol:{:s} amount:{:f} market{:s}".format(str(self.__operation_type),str(self.__symbol),float(self.__amount),str(self.__market.id)))
def __init__(self,market , operation_type:str,max_try:int,max_wait:int,params={}):
super().__init__() # call parent constructor
self.__amount = 0
self.__key = ""
self.__is_completed = False
self.__address = ""
self.__limit_price = 0
self.__operation_type = operation_type
self.__try_count = 0
self.__max_try_count = max_try
self.__wait_time_between_try = max_wait
self.m_market = market # class Market instance
self.__market = market.get_market()
self.__symbol = None
self.__params = params
debug("Operation {:s} is created".format(operation_type))
def get_type(self):
return self.__operation_type
def get_completed(self):
return self.__is_completed
def set_completed(self,val:bool):
self.__is_completed = val
@staticmethod
def add_consumers_to_transactions(operation_list,consumer_list):
for operation in operation_list:
for consumer in consumer_list:
operation.add_consumer(consumer)
@staticmethod
def create_fetch_market_price(market,symbol:str,max_try=3,wait=10):
operation = Operation(market,Event.FETCH_MARKET_PRICE,max_try,wait)
operation.__symbol = symbol
return operation
@staticmethod
def create_check_balance(market ,max_try=3,wait=60):
operation = Operation(market,Event.CHECK_BALANCE_COMMAND,max_try,wait)
return operation
@staticmethod
def create_market_buy(amount:float,market ,symbol:str,max_try=3,wait=60):
operation = Operation(market,Event.MARKET_BUY_COMMAND,max_try,wait)
operation.__symbol = symbol
operation.__amount = amount
return operation
@staticmethod
def create_limit_buy(amount:float,price:float,market,symbol:str,max_try=3,wait=60):
operation = Operation(market,Event.LIMIT_BUY_COMMAND,max_try,wait)
operation.__symbol = symbol
operation.__amount = amount
operation.__limit_price = price
return operation
@staticmethod
def create_market_sell(amount:float,market ,symbol:str,max_try=3,wait=60):
operation = Operation(market,Event.MARKET_SELL_COMMAND,max_try,wait)
operation.__symbol = symbol
operation.__amount = amount
return operation
@staticmethod
def create_limit_sell(amount:float,price:float,market ,symbol:str,max_try=3,wait=60):
operation = Operation(market,Event.LIMIT_SELL_COMMAND,max_try,wait)
operation.__symbol = symbol
operation.__amount = amount
operation.__limit_price = price
return operation
@staticmethod
def create_withdraw(amount:float,symbol:str,address:str,market ,max_try=3,wait=60,params={}):
operation = Operation(market,Event.WITHDRAW_COMMAND,max_try,wait)
operation.__amount = amount
operation.__address = address
operation.__symbol = symbol
operation.__params = params
return operation
@staticmethod
def create_kraken_withdraw(amount:float,symbol:str,address:str,market ,key:str,max_try=1,wait=2):
operation = Operation(market,Event.WITHDRAW_COMMAND,max_try,wait)
operation.__amount = amount
operation.__address = address
operation.__symbol = symbol
operation.__key = key
return operation
@staticmethod
def create_validate_symbol_balance(amount:float,symbol:str,market ,max_try=3,wait=60):
operation = Operation(market,Event.VALIDATE_SYMBOL_BALANCE,max_try,wait)
operation.__amount = amount
operation.__symbol = symbol
return operation
@staticmethod
def create_fetch_order_books(market ,symbol:str,max_try=3,wait=10,online=False): #if online is false then check from ob_service database instead of directly market itself.
event = Event.FETCH_ORDER_BOOK if online else Event.FETCH_ORDER_BOOK_FROM_DB
operation = Operation(market, event, max_try, wait)
operation.__symbol = symbol
return operation
def get_orderbook_from_db(self, market_name,symbol): #symbol should be like BTC/USD
data = {'event': 'get_order_book', 'data': None, 'private_key': 'osman_is_my_girl'}
data.update({'data': {"market_name":market_name, "symbol_pair":symbol} }) #sample symbol should be "BTC/USD"
str_data = json.dumps(data)
ret = requests.get("http://ozenoglu.com:8000/api_call", data=str_data)
ret = json.loads(ret.content.decode())
return ret
def get_result(self):
if self.__operation_type == Event.WITHDRAW_COMMAND:
debug("Operation {:s} is started address{:s}".format(self.__operation_type,str(self.__address)))
if self.__key != "":
result = self.__market.withdraw(self.__symbol,self.__amount,self.__address,{'key':self.__key}) #if there is a need for key use this
else:
result = self.__market.withdraw(self.__symbol,self.__amount,self.__address,self.__params)
debug("Operation result is {:s} Withdraw Amount{:f}".format(str(result),float(self.__amount)))
return result
if self.__operation_type == Event.FETCH_ORDER_BOOK_FROM_DB:
result = None
debug("Operation {:s} is started".format(self.__operation_type))
try:
result = self.get_orderbook_from_db(self.m_market.get_name(),self.__symbol)
except Exception as e:
error("Error during fetching order book from db ex: {:s}".format(str(e)))
raise e
return result
if self.__operation_type == Event.FETCH_ORDER_BOOK:
result = None
debug("Operation {:s} is started".format(self.__operation_type))
try:
result = self.__market.fetch_order_book(self.__symbol)
except Exception as e :
warn("Exception occured during fetching market price {:s}".format(str(e)))
try:
if self.m_market.get_name() == "koineks":
currency = str(self.__symbol).split('/')[0]
fetch_request = RequestInfo("koineks", RequestType.FETCH_FOR_HELP_URGENT, currency)
fetch_request.set_log_file_name(currency + "_fetch_urgent_request")
fetch_request.push_request()
sleep(8) # wait 8 secs before try again.
result = self.__market.fetch_order_book(self.__symbol)
else:
error("Error during fetching {:s} order book from {:s} error:{:s}".format(str(self.__symbol),self.m_market.get_name(),str(e)))
except Exception as e:
error("Error during handling koineks special operation for fetching order book")
raise e
#debug("Operation result is {:s}".format(result))
return result
if self.__operation_type == Event.LIMIT_SELL_COMMAND:
debug("Operation {:s} is started".format(self.__operation_type))
result = self.__market.create_limit_sell_order(self.__symbol,self.__amount,self.__limit_price)
debug("Operation result is {:s}".format(result))
return result
if self.__operation_type == Event.MARKET_SELL_COMMAND:
debug("Operation {:s} is started".format(self.__operation_type))
result = self.__market.create_market_sell_order(self.__symbol,self.__amount)
debug("Operation result is {:s}".format(str(result)))
return result
if self.__operation_type == Event.LIMIT_BUY_COMMAND:
debug("Operation {:s} is started".format(self.__operation_type))
result = self.__market.create_limit_buy_order(self.__symbol,self.__amount,self.__limit_price)
debug("Operation result is {:s}".format(str(result)))
return result
if self.__operation_type == Event.MARKET_BUY_COMMAND:
debug("Operation {:s} is started".format(self.__operation_type))
result = self.__market.create_market_buy_order(self.__symbol,self.__amount)
debug("Operation result is {:s}".format(str(result)))
return result
if self.__operation_type == Event.CHECK_BALANCE_COMMAND:
debug("Operation {:s} is started".format(self.__operation_type))
result = self.__market.fetch_balance()
return result
if self.__operation_type == Event.FETCH_MARKET_PRICE:
debug("Operation {:s} is started".format(self.__operation_type))
result = self.__market.fetch_ticker(self.__symbol)
debug("Operation result is {:s}".format(str(result)))
return result
if self.__operation_type == Event.VALIDATE_SYMBOL_BALANCE:
debug("Operation {:s} is started".format(self.__operation_type))
data = self.__market.fetch_balance()
symbol_fbalance = helper.find_symbol_fbalance(data,self.__symbol)
debug("Free Symbol {:s} balance is {:f}".format(str(self.__symbol),float(symbol_fbalance)))
try:
self.__amount = round(self.__amount,4)
self.__amount = helper.get_thousandth_lower_price(self.__amount)
if symbol_fbalance >= self.__amount:
debug("There is enough balance of {:s} balance is:{:f} expecting is:{:f}".format(str(self.__symbol),float(symbol_fbalance), float(self.__amount)))
return True
else:
if (self.__try_count < 4) : # maximum 4 times , decreased it!
self.__amount = helper.get_thousandth_lower_price(self.__amount)
debug("There is not enough balance of {:s} balance is:{:f} new expecting is:{:f}".format(str(self.__symbol),float(symbol_fbalance),float(self.__amount)))
return False
except Exception as e:
error("Error during Operation {:s}".format(str(self.__operation_type)))
dump_trace()
return False
return True
def operate(self):
debug(" operation {:s} will be try to execute".format(self.__operation_type))
if self.get_completed()== True:
error("This is already operated operation ") #TODO Find a better way not to trigger already finished operation
return True
while(self.__try_count < self.__max_try_count ):
self.__try_count += 1
is_exception_occured = False
result = None
try:
result = self.get_result()
except Exception as e:
error("Unhalded exception is {:s}".format(str(e)))
is_exception_occured = True
self.dump_operation()
if ( result == None or is_exception_occured ):
debug("operation is failed ,system will wait before trying again for {:d}".format(self.__try_count))
sleep(self.__wait_time_between_try)
else:
self.set_completed(True)
consumer_list = self.get_consumer_list()
debug ("count of registered consumers to notified: {:d}".format(len(consumer_list)))
debug("CONSUMERS")
debug(consumer_list)
event = Event(self,self.m_market, self.__operation_type,self.__symbol, result)
self.notify_consumers(event)
del self
return result
error("Max try number {:d} is reached operation is aborting..".format(self.__max_try_count))
event = Event(self, self.m_market, self.__operation_type, self.__symbol, Event.RESULT_FAIL)
self.notify_consumers(event)
return None
class Transaction(): #internal class
def __init__(self, operations):
self.__operation_list = operations
self.__is_completed = False # if the transaction is completed make it True
self.__is_error = False # if there is an error during transaction make it True
self.__error_message = "" # if there is an error during tansaction store the error message here.
self.__transaction_id = randint(MIN_RANDOM_RANGE,MAX_RANDOM_RANGE) # create a random integer betwen renges.
self.__created_time = time()
self.__cretead_time_str = str(datetime.now())
self.__completed_time = 0
self.__completed_time_str = ""
self.__is_experied = False # Make it true if it's expired.
self.__transaction_life = self.__created_time + TRANSACTION_LIFE
debug("A new transaction is created with id {:d} at time {:s}".format(self.__transaction_id,self.__cretead_time_str))
def get_operrations_list(self):
return self.__operation_list
def get_transaction_id(self):
return self.__transaction_id
def operate(self):
debug("Transaction with id {:d} is started".format(self.__transaction_id))
current_time = time()
if (current_time > self.__transaction_life): # if the transaction is too old , do not execute it. Market data may be changed.
self.__is_experied = True
self.__is_error = True
self.__error_message = "Expired"
return False
else:
self.__is_completed = True
self.__completed_time_str = str (datetime.now())
self.__completed_time = time()
for operation in self.__operation_list:
if (operation.operate() == None):
error("Transaction with id {:d} is failed:".format(self.__transaction_id))
return False
return True
class TransactionEngine:
def __init__(self):
self.transactions = []
self.completed_transactions = []
self.failed_transactions = []
self.__is_in_wait_mode = False # initial of transaction engine wait mode is false , user is expected to start first.
def create_and_add_transaction(self, operations):
if (type(operations) is not type ([])):
if (type(operations) == type(Operation()) ):
my_operations = []
my_operations.append(operations)
operations = my_operations
else:
raise Exception("Un supported format for operation list , type {:s}".format(str(type(operations))))
new_transaction = Transaction(operations)
self.transactions.append(new_transaction)
debug("New transaction with id {:d} is created".format(new_transaction.get_transaction_id()))
if (self.__is_in_wait_mode == True): # if it's in wait mode start engine.
self.start_engine()
def __get_next_transaction(self):
if (len(self.transactions) > 0): # bugfix: out of index exception
return self.transactions[0] # return first transaction in the list
else:
return None
def __remove_transaction(self,transaction_to_delete:Transaction):
if (len(self.transactions) >= 1):
self.transactions.remove(transaction_to_delete)
debug("Transaction with id {:d} is deleted".format(transaction_to_delete.get_transaction_id()))
else:
error("Tried to remove transaction but it's already empty!")
def start_engine(self):
next_transaction = self.__get_next_transaction()
while (next_transaction is not None):
operation_list = next_transaction.get_operrations_list()
size = len(operation_list)
debug("next transaction id {:s} , operations list len {:d} ".format(str(next_transaction.get_transaction_id()),size))
if size >= 1:
index = 0
for operation in operation_list:
type = operation.get_type()
debug("Operation{:d} type {:s}".format(int(index),str(type)))
operation.dump_operation()
index = index + 1
result = next_transaction.operate()
if (result == True):
try:
self.completed_transactions.append(next_transaction)
self.__remove_transaction(next_transaction)
next_transaction = self.__get_next_transaction()
except Exception as e:
error("ALL TRANSACTIONS ABORTED , Exception is occured {:s}".format(str(e)))
self.transactions.clear() # abort all transactions
return
else:
error("Transaction Engine couldn't finish Transaction with id: {:d}".format(next_transaction.get_transaction_id()))
self.failed_transactions.append(next_transaction)
self.__remove_transaction(next_transaction)
next_transaction = None
return False
self.__is_in_wait_mode = True # wait for until next transaction is created.
debug("Transaction engine is in wait mode date: {:s}".format(str(datetime.now())))
#print(okcoinusd().fetch_ticker(BTC_USD))
'''
transaction_engine = TransactionEngine()
market = okcoinusd()
fetch_price_operation = Operation.create_fetch_market_price(market, BTC_USD)
fetch_price_operation.add_consumer(self)
operation_list = []
operation_list.append(fetch_price_operation)
transaction_engine.create_and_add_transaction(operation_list)
transaction_engine.start_engine()
'''
In the above code, we have a method called operete. This method is operating the statically operated Operation instances and then perform a notify job for registered/Subscribed consumers..
Any comment and feedbacks are welcome.
Ozan Özenoğlu..