This example shows how to do basic wallet inflows analysis. Here we are building an entirely open-source analyzer. You can see exactly how numbers are calculated, change the rules to your liking or add new ones.
For this demo set the address of interest here:
Note this file also specifies what DashArgos queries to run on top of. You can set up new queries and point your code to them if you wish.
This file specifies our is this inflow source a problem? definition:
Now we can look at the main function. This grabs all the inflows, looks up the categories associated with inflow wallets, computes what we want to compute and then (lightly) formats the output.
A range of helper functions which wrap the underlying API are here:
import looker_sdkimport pandas as pdfrom looker.wallet_eval.config import ADDRESS, TRANSACTIONS_FROM_ADDRESS, TOKENS_SYMBOL,\ TRANSACTIONS_SUM_OF_TRANSFER_AMOUNTSfrom looker.wallet_eval.helpers import lookup_address_categories, lookup_inflows, grab_wallet_categoriesfrom looker.wallet_eval.is_suspicious import is_suspicious_address# compute total flows by token and number of flows by tokendefcompute_total_flows(inflows): tokens = inflows[TOKENS_SYMBOL].unique() total ={} num ={}for token in tokens: total[token]=sum(inflows.loc[(inflows[TOKENS_SYMBOL] == token)][TRANSACTIONS_SUM_OF_TRANSFER_AMOUNTS]) num[token]=len(inflows.loc[(inflows[TOKENS_SYMBOL] == token)][TRANSACTIONS_SUM_OF_TRANSFER_AMOUNTS])return total, num# work out what fraction of which flows are from suspicious or other sorts of flowsdefmain(): looker = looker_sdk.init40("../looker.ini") inflows =lookup_inflows(looker, ADDRESS)# grab inflow source addresses and labels inflow_addresses = inflows[TRANSACTIONS_FROM_ADDRESS].unique() tokens = inflows[TOKENS_SYMBOL].unique()# now get categories for each inflow wallet category_result =lookup_address_categories(looker, inflow_addresses) categories =grab_wallet_categories(inflow_addresses, category_result)# we are now in a position to determine which inflows are suspicious, sanctioned or otherwise sus_addresses = []for address in inflow_addresses:ifis_suspicious_address(address, inflows, categories): sus_addresses.append(address) total_amounts, total_xfers =compute_total_flows(inflows) sus_amounts, sus_xfers =compute_total_flows(inflows[inflows[TRANSACTIONS_FROM_ADDRESS].isin(sus_addresses)]) header = ["token","total inflow","flagged inflow","fraction flagged","# inflows","# flagged inflows","fraction inflows flagged"] data = []for token in tokens: total_amt = total_amounts[token] sus_amt = sus_amounts[token] total_num = total_xfers[token] sus_num = sus_xfers[token] frac = sus_amt / total_amt frac_num = sus_num / total_num res_l = [token, total_amt, sus_amt, frac, total_num, sus_num, frac_num] data.append(res_l) out_dataframe = pd.DataFrame(data, columns=header)print(str(out_dataframe))main()
# which address to checkADDRESS ="0x346eF244464679b031750f70D750B3FA65165443"# which looks to use for the underlying informationWALLET_LOOK_ID =722INFLOWS_LOOK_ID =724TOKENS_SYMBOL ="Tokens Symbol"TRANSACTIONS_FROM_ADDRESS ="Transactions From Address"TRANSACTIONS_SUM_OF_TRANSFER_AMOUNTS ="Transactions Sum of Transfer Amounts"FROM_WALLET_LABELS ="From Wallet Labels"WALLETS_ADDRESS ="Wallets Address"WALLETS_CATEGORIES ="Wallets Categories"# separator in returned listsJOINER =" ;; "
from config import TRANSACTIONS_FROM_ADDRESS, FROM_WALLET_LABELSdefis_suspicious_address(address,inflows,categories): this_categories = categories[address] this_label = inflows.loc[(inflows[TRANSACTIONS_FROM_ADDRESS]== address)][FROM_WALLET_LABELS]if'blacklisted'in this_categories or'ofac'in this_categories or'terrorists'in this_categories or'darknet market'in categories:returnTrueelif'cex'in this_categories:# exchanges are not suspicious without one of the above tagsreturnFalseelif'suspicious'in this_categories:# this catches non-exchanges which don't have a specific suspicious tagreturnTruereturnFalse