📜  如何在 pythonn 中制作比特币矿工 - Python (1)

📅  最后修改于: 2023-12-03 15:38:24.509000             🧑  作者: Mango

如何在 Python 中制作比特币矿工

欢迎来到本教程,本教程将带您一步步制作一个简单的比特币矿工程序,使用 Python 语言实现。

准备工作

在开始之前,您需要安装以下软件:

  • Python 3.x
  • Bitcoin Core

还需要安装以下 Python 库:

  • python-bitcoinrpc

您可以使用以下命令安装:

pip install python-bitcoinrpc
实现过程
1. 连接到比特币核心客户端

首先,我们需要连接到比特币核心客户端,并获取需要的信息。使用以下代码将连接到比特币核心客户端:

from bitcoinrpc.authproxy import AuthServiceProxy

rpc_user = 'rpc_user'
rpc_password = 'rpc_password'
rpc_port = 8332
rpc_connection = AuthServiceProxy(f"http://{rpc_user}:{rpc_password}@127.0.0.1:{rpc_port}/")
2. 准备工作区域

接下来,我们需要准备工作区域,这些文件将用于存储矿工所需要的数据。使用以下代码:

import os

MINER_DIRECTORY = os.path.expanduser("~/.miner")
if not os.path.exists(MINER_DIRECTORY):
    os.makedirs(MINER_DIRECTORY)

# Blockchain data
BLOCKCHAIN_PATH = os.path.join(MINER_DIRECTORY, "blocks")
if not os.path.exists(BLOCKCHAIN_PATH):
    os.makedirs(BLOCKCHAIN_PATH)

# Mempool data
MEMPOOL_PATH = os.path.join(MINER_DIRECTORY, "mempool")
if not os.path.exists(MEMPOOL_PATH):
    os.makedirs(MEMPOOL_PATH)

# Miner reward
REWARD_PATH = os.path.join(MINER_DIRECTORY, "reward.txt")
3. 下载区块链数据

接下来,我们将下载比特币区块链数据。使用以下代码:

import requests

BLOCKCHAIN_ZIP_URL = "https://bitcoin.org/bin/blockchain/bootstrap.dat.torrent"
BLOCKCHAIN_ZIP_PATH = os.path.join(MINER_DIRECTORY, "bootstrap.dat.torrent")

# Download the blockchain
def download_blockchain():
    if not os.path.isfile(BLOCKCHAIN_ZIP_PATH):
        with open(BLOCKCHAIN_ZIP_PATH, "wb") as f:
            print("Downloading blockchain: 0%")
            response = requests.get(BLOCKCHAIN_ZIP_URL, stream=True)
            total_length = response.headers.get('content-length')
            total_length = int(total_length)
            dl = 0
            for data in response.iter_content(chunk_size=4096):
                dl += len(data)
                f.write(data)
                done = int(50 * dl / total_length)
                print(f"Downloading blockchain: {'=' * done}> {' ' * (50 - done)}| {dl / 1000000:.2f}MB / {total_length / 1000000:.2f}MB", end="\r")
            print("\nBlockchain download complete!")
4. 解压区块链数据

接下来,我们将解压区块链数据。使用以下代码:

import shutil
import zipfile

BLOCKCHAIN_ZIP_PATH = os.path.join(MINER_DIRECTORY, "bootstrap.dat.torrent")

# Unzip the blockchain
def unzip_blockchain():
    BLOCKCHAIN_ZIP_EXTRACT_PATH = os.path.join(MINER_DIRECTORY, "bootstrap.dat")
    if not os.path.isfile(BLOCKCHAIN_ZIP_EXTRACT_PATH):
        print("Unzipping blockchain...")
        with zipfile.ZipFile(BLOCKCHAIN_ZIP_PATH, "r") as zip_ref:
            zip_ref.extractall(MINER_DIRECTORY)
        os.rename(os.path.join(MINER_DIRECTORY, "bootstrap.dat.torrent"), BLOCKCHAIN_ZIP_EXTRACT_PATH)
        print("Blockchain unzipped!")
    else:
        print("Blockchain already unzipped.")
5. 启动矿工

接下来,我们将启动矿工。使用以下代码:

import hashlib

def hash_block_header(version, previous_block_hash, merkle_root, time, bits, nonce):
    '''
    Hash the block header
    '''
    block_header = (version + previous_block_hash + merkle_root + time + bits + nonce).encode()
    
    return hashlib.sha256(hashlib.sha256(block_header).digest()).digest()[::-1].hex()

def mine_block(version, previous_block_hash, merkle_root, time, bits, target, miner_address):
    '''
    Mine a new block
    '''
    nonce = 0
    while True:
        block_hash = hash_block_header(version, previous_block_hash, merkle_root, time, bits, nonce)
        
        # Check if we have found a valid block
        if int(block_hash, 16) < target:
            print("Block found!")
            transaction = {
                "inputs": [],
                "outputs": [{
                    "value": 50.0,
                    "address": miner_address
                }]
            }
            reward_path = os.path.join(MINER_DIRECTORY, "reward.txt")
            with open(reward_path, "a") as f:
                f.write(f"{block_hash}\n")
            return block_hash, transaction
        
        # Increment the nonce
        nonce += 1
6. 组装区块头信息

接下来,我们将组装区块头信息。使用以下代码:

import time

def get_last_block_hash():
    '''
    Get the hash of the last block
    '''
    response = rpc_connection.getblockchaininfo()
    return response["bestblockhash"]

def get_merkle_root(transactions):
    '''
    Get the merkle root for a list of transactions
    '''
    hashes = [hashlib.sha256(hashlib.sha256(str(transaction).encode()).digest()).digest()[::-1].hex() for transaction in transactions]
    
    while len(hashes) > 1:
        new_hashes = []
        for i in range(0, len(hashes), 2):
            if i < len(hashes) - 1:
                new_hashes.append(hashlib.sha256(hashlib.sha256((hashes[i] + hashes[i + 1]).encode()).digest()).digest()[::-1].hex())
            else:
                new_hashes.append(hashes[i])
        hashes = new_hashes
    
    return hashes[0]

def get_block_header(miner_address, transactions, bits):
    '''
    Get the block header for a block
    '''
    version = "01000000"
    previous_block_hash = get_last_block_hash()
    merkle_root = get_merkle_root(transactions)
    time = str(int(time.time()))
    bits = bits
    nonce = "00000000"
    block_header = version + previous_block_hash + merkle_root + time + bits + nonce
    
    return block_header
7. 实现矿工逻辑

接下来,我们将实现矿工逻辑。使用以下代码:

import time

def mine(miner_address):
    '''
    Start mining
    '''
    print("Mining...")
    bits = rpc_connection.getblocktemplate()["bits"]
    target = int(bits, 16)
    
    while True:
        # Build a new block
        transactions = []
        
        # Add transactions from the mempool
        mempool = rpc_connection.getrawmempool()
        for transaction_id in mempool:
            transaction = rpc_connection.getrawtransaction(transaction_id, 1)
            transactions.append(transaction)
        
        # Mine a new block
        block_header = get_block_header(miner_address, transactions, bits)
        block_hash, transaction = mine_block(block_header[:8], block_header[8:72], block_header[72:136], block_header[136:200], block_header[200:208], target, miner_address)
        
        # Add the block to the blockchain
        print("Submitting block...")
        rpc_connection.submitblock(block_header + transaction, {})
        
        time.sleep(1)
8. 最终代码

最终代码如下:

# Dependencies
import os
import requests
import hashlib
import zipfile
import time
from bitcoinrpc.authproxy import AuthServiceProxy

# Constants
rpc_user = 'rpc_user'
rpc_password = 'rpc_password'
rpc_port = 8332
MINER_DIRECTORY = os.path.expanduser("~/.miner")
BLOCKCHAIN_PATH = os.path.join(MINER_DIRECTORY, "blocks")
MEMPOOL_PATH = os.path.join(MINER_DIRECTORY, "mempool")
REWARD_PATH = os.path.join(MINER_DIRECTORY, "reward.txt")
BLOCKCHAIN_ZIP_URL = "https://bitcoin.org/bin/blockchain/bootstrap.dat.torrent"
BLOCKCHAIN_ZIP_PATH = os.path.join(MINER_DIRECTORY, "bootstrap.dat.torrent")

# Connect to the Bitcoin Core client
rpc_connection = AuthServiceProxy(f"http://{rpc_user}:{rpc_password}@127.0.0.1:{rpc_port}/")

# Download the blockchain
def download_blockchain():
    if not os.path.isfile(BLOCKCHAIN_ZIP_PATH):
        with open(BLOCKCHAIN_ZIP_PATH, "wb") as f:
            print("Downloading blockchain: 0%")
            response = requests.get(BLOCKCHAIN_ZIP_URL, stream=True)
            total_length = response.headers.get('content-length')
            total_length = int(total_length)
            dl = 0
            for data in response.iter_content(chunk_size=4096):
                dl += len(data)
                f.write(data)
                done = int(50 * dl / total_length)
                print(f"Downloading blockchain: {'=' * done}> {' ' * (50 - done)}| {dl / 1000000:.2f}MB / {total_length / 1000000:.2f}MB", end="\r")
            print("\nBlockchain download complete!")
    
# Unzip the blockchain
def unzip_blockchain():
    BLOCKCHAIN_ZIP_EXTRACT_PATH = os.path.join(MINER_DIRECTORY, "bootstrap.dat")
    if not os.path.isfile(BLOCKCHAIN_ZIP_EXTRACT_PATH):
        print("Unzipping blockchain...")
        with zipfile.ZipFile(BLOCKCHAIN_ZIP_PATH, "r") as zip_ref:
            zip_ref.extractall(MINER_DIRECTORY)
        os.rename(os.path.join(MINER_DIRECTORY, "bootstrap.dat.torrent"), BLOCKCHAIN_ZIP_EXTRACT_PATH)
        print("Blockchain unzipped!")
    else:
        print("Blockchain already unzipped.")

# Prepare the workspace
if not os.path.exists(MINER_DIRECTORY):
    os.makedirs(MINER_DIRECTORY)
if not os.path.exists(BLOCKCHAIN_PATH):
    os.makedirs(BLOCKCHAIN_PATH)
if not os.path.exists(MEMPOOL_PATH):
    os.makedirs(MEMPOOL_PATH)

# Download and unzip the blockchain
download_blockchain()
unzip_blockchain()

def hash_block_header(version, previous_block_hash, merkle_root, time, bits, nonce):
    '''
    Hash the block header
    '''
    block_header = (version + previous_block_hash + merkle_root + time + bits + nonce).encode()
    
    return hashlib.sha256(hashlib.sha256(block_header).digest()).digest()[::-1].hex()

def mine_block(version, previous_block_hash, merkle_root, time, bits, target, miner_address):
    '''
    Mine a new block
    '''
    nonce = 0
    while True:
        block_hash = hash_block_header(version, previous_block_hash, merkle_root, time, bits, nonce)
        
        # Check if we have found a valid block
        if int(block_hash, 16) < target:
            print("Block found!")
            transaction = {
                "inputs": [],
                "outputs": [{
                    "value": 50.0,
                    "address": miner_address
                }]
            }
            reward_path = os.path.join(MINER_DIRECTORY, "reward.txt")
            with open(reward_path, "a") as f:
                f.write(f"{block_hash}\n")
            return block_hash, transaction
        
        # Increment the nonce
        nonce += 1

def get_last_block_hash():
    '''
    Get the hash of the last block
    '''
    response = rpc_connection.getblockchaininfo()
    return response["bestblockhash"]

def get_merkle_root(transactions):
    '''
    Get the merkle root for a list of transactions
    '''
    hashes = [hashlib.sha256(hashlib.sha256(str(transaction).encode()).digest()).digest()[::-1].hex() for transaction in transactions]
    
    while len(hashes) > 1:
        new_hashes = []
        for i in range(0, len(hashes), 2):
            if i < len(hashes) - 1:
                new_hashes.append(hashlib.sha256(hashlib.sha256((hashes[i] + hashes[i + 1]).encode()).digest()).digest()[::-1].hex())
            else:
                new_hashes.append(hashes[i])
        hashes = new_hashes
    
    return hashes[0]

def get_block_header(miner_address, transactions, bits):
    '''
    Get the block header for a block
    '''
    version = "01000000"
    previous_block_hash = get_last_block_hash()
    merkle_root = get_merkle_root(transactions)
    time = str(int(time.time()))
    bits = bits
    nonce = "00000000"
    block_header = version + previous_block_hash + merkle_root + time + bits + nonce
    
    return block_header

def mine(miner_address):
    '''
    Start mining
    '''
    print("Mining...")
    bits = rpc_connection.getblocktemplate()["bits"]
    target = int(bits, 16)
    
    while True:
        # Build a new block
        transactions = []
        
        # Add transactions from the mempool
        mempool = rpc_connection.getrawmempool()
        for transaction_id in mempool:
            transaction = rpc_connection.getrawtransaction(transaction_id, 1)
            transactions.append(transaction)
        
        # Mine a new block
        block_header = get_block_header(miner_address, transactions, bits)
        block_hash, transaction = mine_block(block_header[:8], block_header[8:72], block_header[72:136], block_header[136:200], block_header[200:208], target, miner_address)
        
        # Add the block to the blockchain
        print("Submitting block...")
        rpc_connection.submitblock(block_header + transaction, {})
        
        time.sleep(1)

if __name__ == "__main__":
    miner_address = rpc_connection.getnewaddress()
    mine(miner_address)

运行代码即可启动矿工,开始挖矿。