-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathraydium_python.py
More file actions
103 lines (62 loc) · 3.14 KB
/
Copy pathraydium_python.py
File metadata and controls
103 lines (62 loc) · 3.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from solana.rpc.api import Client
from solana.rpc.commitment import Confirmed
from solders.pubkey import Pubkey #type: ignore
from solders.keypair import Keypair #type: ignore
from solders.signature import Signature #type: ignore
# File Imports
from .pool_keys_and_id.get_pool_keys import get_pool_keys
from .buy_sell.raydium_swap import buy, sell
from .token_balance.balance import get_token_balance
from .transfer.transact import transfer_token
from .utility.verify_txn import verify_transaction
from .utility.constants import WSOL
from .config import *
class Raydium:
GAS_LIMIT = 200000
GAS_PRICE = 25000
def __init__(self, wallet_key: str, rpc_url: str = HTTP_ENDPOINT) -> None:
self.client = Client(rpc_url)
self.payer = Keypair.from_base58_string(wallet_key)
self.my_wallet = self.payer.pubkey()
# To fetch Wallet balance
def token_balance(self, token: str = str(WSOL)) -> tuple:
token = Pubkey.from_string(token)
balance = get_token_balance(self.client, token, self.payer)
return balance
# To verify a transaction
def verify_txn(self, signature: str) -> bool:
#TODO: This method if slow, could be enhanced
verification = verify_transaction(self.client, signature)
return verification
# To Transfer Token to another wallet
def transfer_tokens(self, token: str, reciver_wallet: str, amount: float) -> Signature:
token = Pubkey.from_string(token)
reciver_wallet = Pubkey.from_string(reciver_wallet)
txn = transfer_token(self.client, token, reciver_wallet, amount, self.payer, self.GAS_LIMIT, self.GAS_PRICE)
return txn
# Pool keys
def get_pool_data(self, amm_id: str) -> dict:
amm_id = Pubkey.from_string(amm_id)
pool_keys = get_pool_keys(self.client, amm_id)
return pool_keys
# Buy SWAP TXN
def buy_swap(self, amm_id: str, amount: float, pool_keys=None) -> Signature:
pool_keys = self.get_pool_data(amm_id) if not pool_keys else pool_keys
if pool_keys:
txn = buy(self.client, pool_keys['base_mint'], self.payer, amount, pool_keys, self.GAS_LIMIT, self.GAS_PRICE)
return txn
# Sell SWAP TXN
def sell_swap(self, amm_id: str, amount: float, pool_keys=None) -> Signature:
pool_keys = self.get_pool_data(amm_id) if not pool_keys else pool_keys
if pool_keys:
txn = sell(self.client, pool_keys['base_mint'], self.payer, amount, pool_keys, self.GAS_LIMIT, self.GAS_PRICE)
return txn
# Sell SWAP TXN using total balance's percent
def sell_swap_percent(self, amm_id: str, percent: float, pool_keys=None) -> Signature:
pool_keys = self.get_pool_data(amm_id) if not pool_keys else pool_keys
if pool_keys:
wallet_balance = self.token_balance(str(pool_keys['base_mint']))
if wallet_balance:
amount = wallet_balance[1] * (percent / 100)
txn = sell(self.client, pool_keys['base_mint'], self.payer, amount, pool_keys, self.GAS_LIMIT, self.GAS_PRICE)
return txn