This notebook it's meant to explore how the snapshots could be processed automatically.
# Partial calculations
CALCULATE_STAKERS = True
CALCULATE_HOLDERS_AND_LP = False
# Reward parameters
# Due to floating point calculations, logarightms and divisions, a very small error rate it's very hard to avoid
# This variable sets the error tolerance for the calculations in SDAO
MAXIMUM_ERROR_SDAO = 0.000000001
TOTAL_STAKING_REWARD = 550000
TOTAL_REWARD = 825000
# Decimals AGI/AGIX
DECIMALS_AGI = 8
CONVERT_TO_FULL_BALANCE_AGI = 10 ** DECIMALS_AGI
AGI_THRESHOLD = 1000
AGI_THRESHOLD *= CONVERT_TO_FULL_BALANCE_AGI
# Decimals SDAO
DECIMALS_SDAO = 18
CONVERT_TO_FULL_BALANCE_SDAO = 10 ** DECIMALS_SDAO
# Adjust rewards to be full balance
TOTAL_STAKING_REWARD *= CONVERT_TO_FULL_BALANCE_SDAO
TOTAL_REWARD *= CONVERT_TO_FULL_BALANCE_SDAO
For the example pipeline, I'm going to use just a small number of snapshots taken manually, due to the fact that the main focus of this notebook is to create the processing pipeline, not to gather the snapshots.
The first step is to import all the libraries that we're going to use for the data processing and for gathering insights about the dataset with statistical analysis.
# Import libraries
# Pandas for tabular data
import pandas as pd
import numpy as np
from os import walk
import sys
from pprint import pprint
from tqdm import tqdm
# from tqdm.auto import tqdm # for notebooks
tqdm.pandas(file=sys.stdout)
Next step is reading the data
directory to see how many snapshots we have for each token respectively (AGI and AGIX)
def get_snapshots(path):
return next(walk(path), (None, None, []))[2]
agi_snapshots_files = get_snapshots('../data/holders/agi')
agix_snapshots_files = get_snapshots('../data/holders/agix')
pprint(agi_snapshots_files)
pprint(agix_snapshots_files)
Load snapshots for liquidity providers and stakers.
In this example, the snapshots are the same as the AGIX holders, with some accounts removed manually in the first snapshot, to focus on developing the calculations first, leaving getting the data from the database or CSV files for later on.
lp_snapshots_files = get_snapshots('../data/lp')
stakers_snapshots_files = get_snapshots('../data/stakers')
pprint(stakers_snapshots_files)
Read the snapshots' contents using pandas
and convert balances to unsinged long numbers (SDAO wei)
# Expects a CSV generated from Etherscan
def read_etherscan_csv(folder, file):
# Read csv file
data_frame = pd.read_csv('../data/%s/%s' % (folder, file))
# Sort accounts by holding amount, larger holders at the top
data_frame = data_frame.astype({'Balance': int })
data_frame['Balance'] = data_frame['Balance'].apply(lambda x: int(x * CONVERT_TO_FULL_BALANCE_AGI))
data_frame = data_frame.sort_values('Balance', ascending=False)
return data_frame
# Expects a file with two columns, "address" and "balance". Balance it's in the smallest unit, Cogs (8 decimals)
def read_simple_csv(folder, file):
# Read csv file
data_frame = pd.read_csv('../data/%s/%s' % (folder, file))
# Sort accounts by holding amount, larger holders at the top
data_frame = data_frame.sort_values('balance', ascending=False)
data_frame['balance'] = data_frame['balance'].apply(lambda x: int(x))
return data_frame
agi_snapshots_raw = [read_etherscan_csv("holders", "agi/" + file) for file in agi_snapshots_files]
agix_snapshots_raw = [read_etherscan_csv("holders", "agix/" + file) for file in agix_snapshots_files]
lp_snapshots_raw = [read_etherscan_csv("lp", file) for file in lp_snapshots_files]
stakers_snapshots_raw = [read_simple_csv("stakers", file) for file in stakers_snapshots_files]
The snapshots are now loaded as a panda DataFrame.
Let's see the structure of a single snapshot and a single row, to get a better idea of the dataset.
print(agi_snapshots_raw[0].columns)
print(agi_snapshots_raw[0].iloc[0])
Let's remove the PendingBalanceUpdate
column and rename the other two, to clean the dataset and make it more practical.
def clear_snapshot(snapshot):
cleaned_snapshot = snapshot.drop('PendingBalanceUpdate', axis="columns")
cleaned_snapshot = cleaned_snapshot.rename(columns={"HolderAddress": "address", "Balance": "balance"})
cleaned_snapshot = cleaned_snapshot.reset_index(drop=True)
return cleaned_snapshot
agi_snapshots = [clear_snapshot(snapshot) for snapshot in agi_snapshots_raw]
agix_snapshots = [clear_snapshot(snapshot) for snapshot in agix_snapshots_raw]
lp_snapshots = [clear_snapshot(snapshot) for snapshot in lp_snapshots_raw]
stakers_snapshots = [snapshot.reset_index(drop=True) for snapshot in stakers_snapshots_raw]
print(agi_snapshots[0].columns)
# Address and balance from the account with the largest holding, one of the Binance wallets
print(agi_snapshots[0].iloc[0])
With the snapshot data ready, we can start to calculate the eligible addresses.
At this point, the snapshots can be filtered by the set of addresses that have registered in the airdrop portal for a given month.
There's an initial snapshot that delimits how many addresses are eligible for the airdrop.
In my case it's the snapshot of the frozen AGI balances, but in the airdrop it would be the snapshot from 17th of April 2021, at 23:59 UTC+0.
Let's create a subset based on the addresses from the first snapshot that have more than 1.000 AGI.
print("AGI Snapshots: %s" % len(agi_snapshots))
print("AGIX Snapshots: %s" % len(agix_snapshots))
print("LP Snapshots: %s" % len(lp_snapshots))
print("Stakers Snapshots: %s" % len(stakers_snapshots))
print()
# Get the first snapshot and use it as the starting point for the calculations
def get_initial(initial_snapshot, category):
total_addresses = len(initial_snapshot.index)
eligible_addresses_initial = initial_snapshot[initial_snapshot['balance'] >= AGI_THRESHOLD]
print('Total Addresses (%s): %s' % (category, total_addresses))
print('Eligible Addresses (%s): %s' % (category, len(eligible_addresses_initial.index)))
print()
return eligible_addresses_initial
eligible_addresses_holders = get_initial(agi_snapshots[0], 'holders')
eligible_addresses_lp = get_initial(lp_snapshots[0], 'LP')
eligible_addresses_stakers = get_initial(stakers_snapshots[0], 'stakers')
display(stakers_snapshots[0])
print()
# Print address with smaller eligible balance
print(eligible_addresses_holders.iloc[-1])
We can see that from the initial ~26k addresses, only 16689 pass the threshold to be eligible.
Now, it's a matter of iterating through the remaining snapshots using this initial set of accounts, and checking if the accounts are still eligible, removing the ones that are below the threshold.
First, let's merge all snapshots (AGI and AGIX) into a single array and discard the first one, as that one it's already processed.
# Merge snapshots
holders_snapshots = agi_snapshots + agix_snapshots
Now, we iterate over the snapshots, filtering the initial set of eligible accounts.
def filter_addresses(initial_df, snapshot_df):
# Calculate intersection of eligible addresses between existing set and snapshot set
initial_set = set(initial_df['address'])
snapshot_set = set(snapshot_df['address'])
addresses_intersection = list(initial_set.intersection(snapshot_set))
# Filter addresses based on whether they're contained on the intersection set or not
filtered_df = initial_df[initial_df.apply(lambda x: x['address'] in addresses_intersection, axis=1)].copy()
def filter_lowest_balance(x):
return np.amin([x['balance'], snapshot_df.loc[snapshot_df['address'] == x['address']].iloc[0]['balance']])
# Set balance amount to the lowest of the two values (initial value and snapshot value),
# to only take into account the lower balance
filtered_df['balance'] = filtered_df.copy().progress_apply(filter_lowest_balance, axis=1)
return filtered_df
def get_eligible(initial_df, snapshots, category):
print()
print('Initial Eligible Addresses (%s): %s' % (category, len(initial_df.index)))
print()
eligible_df = initial_df
for index, snapshot in enumerate(snapshots):
print('Snapshot #%s' % index)
snapshot_eligible = snapshot[snapshot['balance'] >= AGI_THRESHOLD]
print('Eligible Addresses from snapshot: %s addresses' % len(snapshot_eligible.index))
eligible_df = filter_addresses(eligible_df, snapshot_eligible)
print('Eligible Addresses: %s' % len(eligible_df.index))
print()
print('Total Eligible Addresses (%s): %s' % (category, len(eligible_df.index)))
return eligible_df
if CALCULATE_HOLDERS_AND_LP:
eligible_addresses_holders = get_eligible(eligible_addresses_holders, holders_snapshots, 'holders')
eligible_addresses_lp = get_eligible(eligible_addresses_lp, lp_snapshots, 'LP')
if CALCULATE_STAKERS:
eligible_addresses_stakers = get_eligible(eligible_addresses_stakers, stakers_snapshots, 'stakers')
A single address could have multiple rewards. To account for this, we'll merge all the eligible addresses into a single list, removing duplicates, and add an extra column for each kind of reward.
That way, we'll be able to show all the reward types in the airdrop portal.
datasets_dict = {
"holder": eligible_addresses_holders,
"lp": eligible_addresses_lp,
"staker": eligible_addresses_stakers
}
# Merge all eligible addresses
addresses_df = pd.concat(list(datasets_dict.values())).drop_duplicates('address').drop('balance', axis=1)
addresses_df = addresses_df.reset_index(drop=True)
print('Total addresses participating in the airdrop: %s' % len(addresses_df.index))
# Append rewards to the provided column, matching addresses between both sets
def append_column_by_address(addresses_df, rewards_df, column, new_column_name=None):
def get_row_value_by_address(address):
matching_rows = rewards_df.loc[rewards_df['address'] == address]
total_matching_rows = len(matching_rows)
if total_matching_rows == 1:
return matching_rows.iloc[0][column]
elif total_matching_rows == 0:
return 0
else:
raise Exception('Error appending column to final file', 'addresses are duplicated')
result_df = addresses_df.copy()
if new_column_name is None:
new_column_name = column
print()
print('Appending "%s" column to final file' % new_column_name)
result_df.insert(len(result_df.columns), new_column_name, addresses_df.progress_apply(lambda x: get_row_value_by_address(x['address']), axis=1).astype(np.longdouble))
print()
return result_df
There are two kinds of rewards for stakers:
print('Total Eligible Stakers: %s' % len(eligible_addresses_stakers.index))
rewards_stakers_df = eligible_addresses_stakers.copy()
# Rewards per user
half_staking_reward = TOTAL_STAKING_REWARD / 2
reward_per_user = half_staking_reward / len(eligible_addresses_stakers)
adjusted_reward_per_user = reward_per_user / CONVERT_TO_FULL_BALANCE_SDAO
print('Staking reward per user: %s' % adjusted_reward_per_user)
rewards_stakers_df.insert(len(rewards_stakers_df.columns), 'staker_reward_per_user', adjusted_reward_per_user)
# Rewards per stake
total_stake = eligible_addresses_stakers['balance'].sum()
rewards_stakers_df['staker_reward_per_stake'] = rewards_stakers_df.apply(lambda x: half_staking_reward * np.double(x['balance']) / np.double(total_stake), axis=1)
rewards_stakers_df['staker_reward_per_stake'] /= CONVERT_TO_FULL_BALANCE_SDAO
display(rewards_stakers_df)
adjusted_half_staker_reward = (half_staking_reward / CONVERT_TO_FULL_BALANCE_SDAO)
calculated_staker_reward_per_user = np.sum(list(rewards_stakers_df['staker_reward_per_user']))
calculated_staker_reward_per_stake = np.sum(list(rewards_stakers_df['staker_reward_per_stake']))
print()
print('Allocated reward (stakers, per user): %s' % adjusted_half_staker_reward)
print('Calculated reward (stakers, per user): %s' % calculated_staker_reward_per_user)
print()
print('Allocated reward (stakers, per stake): %s' % adjusted_half_staker_reward)
print('Calculated reward (stakers, per stake): %s' % calculated_staker_reward_per_stake)
print()
print('Allocated reward (stakers, total): %s' % (TOTAL_STAKING_REWARD / CONVERT_TO_FULL_BALANCE_SDAO))
print('Calculated reward (stakers, total): %s' % (calculated_staker_reward_per_user + calculated_staker_reward_per_stake))
print()
per_user_error = abs(calculated_staker_reward_per_user - adjusted_half_staker_reward) > MAXIMUM_ERROR_SDAO
per_stake_error = abs(calculated_staker_reward_per_stake - adjusted_half_staker_reward) > MAXIMUM_ERROR_SDAO
total_error = abs((calculated_staker_reward_per_user + calculated_staker_reward_per_stake) - (adjusted_half_staker_reward * 2)) > MAXIMUM_ERROR_SDAO
if per_user_error or per_stake_error or total_error:
raise Exception('Error calculating rewards (stakers)', 'final reward sum does not match allocated reward')
rewards_stakers_df['balance'] /= CONVERT_TO_FULL_BALANCE_AGI
if CALCULATE_STAKERS:
addresses_df = append_column_by_address(addresses_df, rewards_stakers_df, 'staker_reward_per_user')
addresses_df = append_column_by_address(addresses_df, rewards_stakers_df, 'staker_reward_per_stake')
addresses_df['staker_reward'] = addresses_df['staker_reward_per_user'] + addresses_df['staker_reward_per_stake']
addresses_df = append_column_by_address(addresses_df, rewards_stakers_df, 'balance', 'used_staker_balance')
display(addresses_df)
Knowing the eligibility of the addresses, we can calculate the rewards for each user using the following formula.
Reward = total_reward * log10(1+user_balance) / SUM(log10(1+user_balance))
# Define SUM(log10(1+user_balance)) as a constant variable
holder_balances = list(eligible_addresses_holders['balance'])
lp_balances = list(eligible_addresses_lp['balance'])
balances_log10 = [np.log10(1 + (balance)) for balance in (holder_balances + lp_balances)]
sum_balances_log10 = np.sum(balances_log10)
# Define the function that calculates the reward for each user
def calculate_reward(total_reward, user_balance_index):
user_balance_log10 = balances_log10[user_balance_index]
# Calculate reward and convert to final balance
return (total_reward * user_balance_log10 / sum_balances_log10) / CONVERT_TO_FULL_BALANCE_SDAO
# Calculate rewards and add the SDAO value as a column to the DateFrame
holder_rewards = [calculate_reward(TOTAL_REWARD, index) for index, balance in enumerate(holder_balances)]
lp_rewards = [calculate_reward(TOTAL_REWARD, len(holder_balances) + index) for index, balance in enumerate(lp_balances)]
holder_rewards_df = eligible_addresses_holders.copy()
lp_rewards_df = eligible_addresses_lp.copy()
holder_rewards_df.insert(0, 'reward', holder_rewards)
holder_rewards_df['balance'] /= CONVERT_TO_FULL_BALANCE_AGI
lp_rewards_df.insert(0, 'reward', lp_rewards)
lp_rewards_df['balance'] /= CONVERT_TO_FULL_BALANCE_AGI
# Verify that the total amount of allocated reward matches the expected value
calculated_reward = np.sum(list(holder_rewards_df['reward'])) + np.sum(list(lp_rewards_df['reward']))
adjusted_total_reward = (TOTAL_REWARD / CONVERT_TO_FULL_BALANCE_SDAO)
print('Allocated reward (holders and LP): %s' % adjusted_total_reward)
print('Calculated reward (holders and LP): %s' % calculated_reward)
if abs(calculated_reward - adjusted_total_reward) > MAXIMUM_ERROR_SDAO:
raise Exception('Error calculating rewards', 'final reward sum does not match allocated reward')
# Add rewards to final data frame
if CALCULATE_HOLDERS_AND_LP:
addresses_df = append_column_by_address(addresses_df, holder_rewards_df, 'reward', 'holder_reward')
addresses_df = append_column_by_address(addresses_df, holder_rewards_df, 'balance', 'used_holder_balance')
addresses_df = append_column_by_address(addresses_df, lp_rewards_df, 'reward', 'lp_reward')
addresses_df = append_column_by_address(addresses_df, lp_rewards_df, 'balance', 'used_lp_balance')
addresses_df['total_reward'] = 0
if CALCULATE_HOLDERS_AND_LP:
addresses_df['total_reward'] = addresses_df['holder_reward'] + addresses_df['lp_reward']
if CALCULATE_STAKERS:
addresses_df['total_reward'] += addresses_df['staker_reward_per_user'] + addresses_df['staker_reward_per_stake']
total_calculated_reward = (addresses_df['total_reward']).sum()
expected_reward = 0
if CALCULATE_HOLDERS_AND_LP:
expected_reward = TOTAL_REWARD
if CALCULATE_STAKERS:
expected_reward += TOTAL_STAKING_REWARD
expected_reward /= CONVERT_TO_FULL_BALANCE_SDAO
if abs(total_calculated_reward - float(expected_reward)) > MAXIMUM_ERROR_SDAO:
print('Total rounding error: %s SDAO' % '{:.18f}'.format(abs(total_calculated_reward - float(expected_reward))))
raise Exception('Error calculating rewards', 'final reward sum does not match allocated reward')
# Sort addresses by total reward (descending) and recalculate indexes
addresses_df = addresses_df.sort_values('total_reward', ascending=False)
addresses_df = addresses_df.reset_index(drop=True)
print()
print('Allocated reward (stakers, holders and LP): %s' % expected_reward)
print('Calculated reward (stakers, holders and LP): %s' % total_calculated_reward)
print()
print()
print('Final rewards')
display(addresses_df)
addresses_df.to_csv('../rewards/rewards-stakers.csv')
The missing step would be to sum all the unclaimed amounts from previous airdrops, for this example that's not possible with the data at hand though.