Prometheus files

From wiki.electroncash.de
Jump to navigation Jump to search

bitcoin-monitor.py[edit]

# bitcoind-monitor.py
#
# An exporter for Prometheus and Bitcoin Core.
#
# Copyright 2018 Kevin M. Gallagher
# Copyright 2019,2020 Jeff Stein
#
# Published at https://github.com/jvstein/bitcoin-prometheus-exporter
# Licensed under BSD 3-clause (see LICENSE).
#
# Dependency licenses (retrieved 2020-05-31):
#   prometheus_client: Apache 2.0
#   python-bitcoinlib: LGPLv3
#   riprova: MIT

import json
import logging
import time
import os
import signal
import sys
import socket

from datetime import datetime
from functools import lru_cache
from typing import Any
from typing import Dict
from typing import List
from typing import Union
from urllib.parse import quote
from wsgiref.simple_server import make_server

import riprova

from bitcoin.rpc import InWarmupError, Proxy, JSONRPCError
from prometheus_client import make_wsgi_app, Gauge, Counter, Info

logger = logging.getLogger("bitcoin-exporter")


# Create Prometheus metrics to track bitcoind stats.
BITCOIN_INFO = Info('bitcoin', 'The chain the daemon runs on')
BITCOIN_BLOCKS = Gauge("bitcoin_blocks", "Block height")
BITCOIN_DIFFICULTY = Gauge("bitcoin_difficulty", "Difficulty")
BITCOIN_PEERS = Gauge("bitcoin_peers", "Number of peers")
BITCOIN_HASHPS_1 = Gauge(
    "bitcoin_hashps_1", "Estimated network hash rate per second for the last block"
)
BITCOIN_HASHPS = Gauge(
    "bitcoin_hashps", "Estimated network hash rate per second for the last 120 blocks"
)

BITCOIN_WARNINGS = Counter("bitcoin_warnings", "Number of network or blockchain warnings detected")
BITCOIN_UPTIME = Gauge("bitcoin_uptime", "Number of seconds the Bitcoin daemon has been running")

BITCOIN_MEMINFO_USED = Gauge("bitcoin_meminfo_used", "Number of bytes used")
BITCOIN_MEMINFO_FREE = Gauge("bitcoin_meminfo_free", "Number of bytes available")
BITCOIN_MEMINFO_TOTAL = Gauge("bitcoin_meminfo_total", "Number of bytes managed")
BITCOIN_MEMINFO_LOCKED = Gauge("bitcoin_meminfo_locked", "Number of bytes locked")
BITCOIN_MEMINFO_CHUNKS_USED = Gauge("bitcoin_meminfo_chunks_used", "Number of allocated chunks")
BITCOIN_MEMINFO_CHUNKS_FREE = Gauge("bitcoin_meminfo_chunks_free", "Number of unused chunks")

BITCOIN_MEMPOOL_BYTES = Gauge("bitcoin_mempool_bytes", "Size of mempool in bytes")
BITCOIN_MEMPOOL_SIZE = Gauge(
    "bitcoin_mempool_size", "Number of unconfirmed transactions in mempool"
)
BITCOIN_MEMPOOL_USAGE = Gauge("bitcoin_mempool_usage", "Total memory usage for the mempool")

BITCOIN_LATEST_BLOCK_HEIGHT = Gauge(
    "bitcoin_latest_block_height", "Height or index of latest block"
)
BITCOIN_LATEST_BLOCK_SIZE = Gauge("bitcoin_latest_block_size", "Size of latest block in bytes")
BITCOIN_LATEST_BLOCK_TXS = Gauge(
    "bitcoin_latest_block_txs", "Number of transactions in latest block"
)

BITCOIN_TXCOUNT = Gauge("bitcoin_txcount", "Number of TX since the genesis block")

BITCOIN_NUM_CHAINTIPS = Gauge("bitcoin_num_chaintips", "Number of known blockchain branches")

BITCOIN_TOTAL_BYTES_RECV = Gauge("bitcoin_total_bytes_recv", "Total bytes received")
BITCOIN_TOTAL_BYTES_SENT = Gauge("bitcoin_total_bytes_sent", "Total bytes sent")

BITCOIN_LATEST_BLOCK_INPUTS = Gauge(
    "bitcoin_latest_block_inputs", "Number of inputs in transactions of latest block"
)
BITCOIN_LATEST_BLOCK_OUTPUTS = Gauge(
    "bitcoin_latest_block_outputs", "Number of outputs in transactions of latest block"
)
BITCOIN_LATEST_BLOCK_VALUE = Gauge(
    "bitcoin_latest_block_value", "Bitcoin value of all transactions in the latest block"
)

BITCOIN_BAN_CREATED = Gauge(
    "bitcoin_ban_created", "Time the ban was created", labelnames=["address", "reason"]
)
BITCOIN_BANNED_UNTIL = Gauge(
    "bitcoin_banned_until", "Time the ban expires", labelnames=["address", "reason"]
)

BITCOIN_SERVER_VERSION = Gauge("bitcoin_server_version", "The server version")
BITCOIN_PROTOCOL_VERSION = Gauge("bitcoin_protocol_version", "The protocol version of the server")

BITCOIN_SIZE_ON_DISK = Gauge("bitcoin_size_on_disk", "Estimated size of the block and undo files")

BITCOIN_VERIFICATION_PROGRESS = Gauge(
    "bitcoin_verification_progress", "Estimate of verification progress [0..1]"
)

BITCOIN_RPC_ACTIVE = Gauge("bitcoin_rpc_active", "Number of RPC calls being processed")

BITCOIN_DSPROOF_COUNT = Gauge("bitcoin_dsproof_count", "Number of Double Spend Proofs")

EXPORTER_ERRORS = Counter(
    "bitcoin_exporter_errors", "Number of errors encountered by the exporter", labelnames=["type"]
)
PROCESS_TIME = Counter(
    "bitcoin_exporter_process_time", "Time spent processing metrics from bitcoin node"
)


BITCOIN_RPC_SCHEME = os.environ.get("BITCOIN_RPC_SCHEME", "http")
BITCOIN_RPC_HOST = os.environ.get("BITCOIN_RPC_HOST", "localhost")
BITCOIN_RPC_PORT = os.environ.get("BITCOIN_RPC_PORT", "8332")
BITCOIN_RPC_USER = os.environ.get("BITCOIN_RPC_USER")
BITCOIN_RPC_PASSWORD = os.environ.get("BITCOIN_RPC_PASSWORD")
BITCOIN_CONF_PATH = os.environ.get("BITCOIN_CONF_PATH")
METRICS_ADDR = os.environ.get("METRICS_ADDR", "")  # empty = any address
METRICS_PORT = int(os.environ.get("METRICS_PORT", "8334"))
RETRIES = int(os.environ.get("RETRIES", 5))
TIMEOUT = int(os.environ.get("TIMEOUT", 30))
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")


RETRY_EXCEPTIONS = (InWarmupError, ConnectionError, socket.timeout)

RpcResult = Union[Dict[str, Any], List[Any], str, int, float, bool, None]


def on_retry(err: Exception, next_try: float) -> None:
    err_type = type(err)
    exception_name = err_type.__module__ + "." + err_type.__name__
    EXPORTER_ERRORS.labels(**{"type": exception_name}).inc()
    logger.error("Retry after exception %s: %s", exception_name, err)


def error_evaluator(e: Exception) -> bool:
    return isinstance(e, RETRY_EXCEPTIONS)


@lru_cache(maxsize=1)
def rpc_client_factory():
    # Configuration is done in this order of precedence:
    #   - Explicit config file.
    #   - BITCOIN_RPC_USER and BITCOIN_RPC_PASSWORD environment variables.
    #   - Default bitcoin config file (as handled by Proxy.__init__).
    use_conf = (
        (BITCOIN_CONF_PATH is not None)
        or (BITCOIN_RPC_USER is None)
        or (BITCOIN_RPC_PASSWORD is None)
    )

    if use_conf:
        logger.info("Using config file: %s", BITCOIN_CONF_PATH or "<default>")
        return lambda: Proxy(btc_conf_file=BITCOIN_CONF_PATH, timeout=TIMEOUT)
    else:
        host = BITCOIN_RPC_HOST
        host = "{}:{}@{}".format(quote(BITCOIN_RPC_USER), quote(BITCOIN_RPC_PASSWORD), host)
        if BITCOIN_RPC_PORT:
            host = "{}:{}".format(host, BITCOIN_RPC_PORT)
        service_url = "{}://{}".format(BITCOIN_RPC_SCHEME, host)
        logger.info("Using environment configuration")
        return lambda: Proxy(service_url=service_url, timeout=TIMEOUT)


def rpc_client():
    return rpc_client_factory()()


@riprova.retry(
    timeout=TIMEOUT,
    backoff=riprova.ExponentialBackOff(),
    on_retry=on_retry,
    error_evaluator=error_evaluator,
)
def bitcoinrpc(*args) -> RpcResult:
    if logger.isEnabledFor(logging.DEBUG):
        logger.debug("RPC call: " + " ".join(str(a) for a in args))

    result = rpc_client().call(*args)

    logger.debug("Result:   %s", result)
    return result


@lru_cache(maxsize=1)
def getblockstats(block_hash: str):
    try:
        block = bitcoinrpc("getblockstats", block_hash, ["total_size", "txs", "height", "ins", "outs", "total_out"])
    except Exception:
        logger.exception("Failed to retrieve block " + block_hash + " statistics from bitcoind.")
        return None
    return block


def refresh_metrics() -> None:
    uptime = int(bitcoinrpc("uptime"))
    meminfo = bitcoinrpc("getmemoryinfo", "stats")["locked"]
    blockchaininfo = bitcoinrpc("getblockchaininfo")
    networkinfo = bitcoinrpc("getnetworkinfo")
    chaintips = len(bitcoinrpc("getchaintips"))
    mempool = bitcoinrpc("getmempoolinfo")
    nettotals = bitcoinrpc("getnettotals")
    rpcinfo = bitcoinrpc("getrpcinfo")
    dsprooflist = bitcoinrpc("getdsprooflist")
    txstats = bitcoinrpc("getchaintxstats")
    latest_blockstats = getblockstats(str(blockchaininfo["bestblockhash"]))
    hashps_120 = float(bitcoinrpc("getnetworkhashps", 120))  # 120 is the default
    hashps_1 = float(bitcoinrpc("getnetworkhashps", 1))

    banned = bitcoinrpc("listbanned")

    BITCOIN_INFO.info({"chain": blockchaininfo["chain"], "coin": "BCH"})
    BITCOIN_UPTIME.set(uptime)
    BITCOIN_BLOCKS.set(blockchaininfo["blocks"])
    BITCOIN_PEERS.set(networkinfo["connections"])
    BITCOIN_DIFFICULTY.set(blockchaininfo["difficulty"])
    BITCOIN_HASHPS.set(hashps_120)
    BITCOIN_HASHPS_1.set(hashps_1)
    BITCOIN_SERVER_VERSION.set(networkinfo["version"])
    BITCOIN_PROTOCOL_VERSION.set(networkinfo["protocolversion"])
    BITCOIN_SIZE_ON_DISK.set(blockchaininfo["size_on_disk"])
    BITCOIN_VERIFICATION_PROGRESS.set(blockchaininfo["verificationprogress"])

    for ban in banned:
        BITCOIN_BAN_CREATED.labels(address=ban["address"], reason=ban.get("ban_reason", "manually added")).set(
            ban["ban_created"]
        )
        BITCOIN_BANNED_UNTIL.labels(address=ban["address"], reason=ban.get("ban_reason", "manually added")).set(
            ban["banned_until"]
        )

    if networkinfo["warnings"]:
        BITCOIN_WARNINGS.inc()

    BITCOIN_TXCOUNT.set(txstats["txcount"])

    BITCOIN_NUM_CHAINTIPS.set(chaintips)

    BITCOIN_MEMINFO_USED.set(meminfo["used"])
    BITCOIN_MEMINFO_FREE.set(meminfo["free"])
    BITCOIN_MEMINFO_TOTAL.set(meminfo["total"])
    BITCOIN_MEMINFO_LOCKED.set(meminfo["locked"])
    BITCOIN_MEMINFO_CHUNKS_USED.set(meminfo["chunks_used"])
    BITCOIN_MEMINFO_CHUNKS_FREE.set(meminfo["chunks_free"])

    BITCOIN_MEMPOOL_BYTES.set(mempool["bytes"])
    BITCOIN_MEMPOOL_SIZE.set(mempool["size"])
    BITCOIN_MEMPOOL_USAGE.set(mempool["usage"])

    BITCOIN_TOTAL_BYTES_RECV.set(nettotals["totalbytesrecv"])
    BITCOIN_TOTAL_BYTES_SENT.set(nettotals["totalbytessent"])

    if latest_blockstats is not None:
        BITCOIN_LATEST_BLOCK_SIZE.set(latest_blockstats["total_size"])
        BITCOIN_LATEST_BLOCK_TXS.set(latest_blockstats["txs"])
        BITCOIN_LATEST_BLOCK_HEIGHT.set(latest_blockstats["height"])
        BITCOIN_LATEST_BLOCK_INPUTS.set(latest_blockstats["ins"])
        BITCOIN_LATEST_BLOCK_OUTPUTS.set(latest_blockstats["outs"])
        BITCOIN_LATEST_BLOCK_VALUE.set(latest_blockstats["total_out"])

    # Subtract one because we don't want to count the "getrpcinfo" call itself
    BITCOIN_RPC_ACTIVE.set(len(rpcinfo["active_commands"]) - 1)

    BITCOIN_DSPROOF_COUNT.set(len(dsprooflist))


def sigterm_handler(signal, frame) -> None:
    logger.critical("Received SIGTERM. Exiting.")
    sys.exit(0)


def exception_count(e: Exception) -> None:
    err_type = type(e)
    exception_name = err_type.__module__ + "." + err_type.__name__
    EXPORTER_ERRORS.labels(**{"type": exception_name}).inc()


def main():
    # Set up logging to look similar to bitcoin logs (UTC).
    logging.basicConfig(
        format="%(asctime)s %(levelname)s %(message)s", datefmt="%Y-%m-%dT%H:%M:%SZ"
    )
    logging.Formatter.converter = time.gmtime
    logger.setLevel(LOG_LEVEL)

    # Handle SIGTERM gracefully.
    signal.signal(signal.SIGTERM, sigterm_handler)

    app = make_wsgi_app()

    last_refresh = None

    def refresh_app(*args, **kwargs):
        nonlocal last_refresh
        process_start = datetime.now()

        if not last_refresh or (process_start - last_refresh).total_seconds() > 5: # Limit updates to every 5 seconds
            # Allow riprova.MaxRetriesExceeded and unknown exceptions to crash the process.
            try:
                refresh_metrics()
            except riprova.exceptions.RetryError as e:
                logger.error("Refresh failed during retry. Cause: " + str(e))
                exception_count(e)
            except JSONRPCError as e:
                logger.debug("Bitcoin RPC error refresh", exc_info=True)
                exception_count(e)
            except json.decoder.JSONDecodeError as e:
                logger.error("RPC call did not return JSON. Bad credentials? " + str(e))
                sys.exit(1)

            duration = datetime.now() - process_start
            PROCESS_TIME.inc(duration.total_seconds())
            logger.info("Refresh took %s seconds", duration)
            last_refresh = process_start

        return app(*args, **kwargs)

    httpd = make_server(METRICS_ADDR, METRICS_PORT, refresh_app)
    httpd.serve_forever()


if __name__ == "__main__":
    main()

fulcrum-monitor.py[edit]

#!/usr/bin/env python3
# Copyright (c) 2021 Axel Gembe <derago@gmail.com>
# Copyright 2018 Kevin M. Gallagher
# Copyright 2019,2020 Jeff Stein
#
# Based on https://github.com/jvstein/bitcoin-prometheus-exporter
# Published at https://github.com/EchterAgo/fulcrum-prometheus-exporter
# Licensed under BSD 3-clause (see LICENSE).

import json
import logging
import time
import os
import signal
import sys
import urllib

from datetime import datetime
from wsgiref.simple_server import make_server

from prometheus_client import make_wsgi_app, Gauge, Counter, Info

logger = logging.getLogger("fulcrum-exporter")


# Create Prometheus metrics to track Fulcrum stats.
BITCOIND_EXTANT_REQUEST_CONTEXTS = Gauge("fulcrum_bitcoind_extant_request_contexts", "Extant bitcoind request contexts")
BITCOIND_REQUEST_CONTEXT_TABLE_SIZE = Gauge(
    "fulcrum_bitcoind_request_context_table_size", "bitcoind request context table size")
BITCOIND_REQUEST_TIMEOUT_COUNT = Gauge("fulcrum_bitcoind_request_timeout_count", "bitcoind request timeout count")
BITCOIND_REQUEST_ZOMBIE_COUNT = Gauge("fulcrum_bitcoind_request_zombie_count", "bitcoind request zombie count")
BITCOIND_RPCCLIENT_COUNT = Gauge("fulcrum_bitcoind_rpcclient_count", "Number of bitcoind RPC clients in existence")


CONTROLLER_INFO = Info('fulcrum_controller', 'The chain and coin the controller runs on')
CONTROLLER_HEADER_COUNT = Gauge("fulcrum_controller_header_count", "Number of headers the controller knows about")
CONTROLLER_TX_NUM = Gauge("fulcrum_controller_tx_num", "Number of transactions the controller knows about")
CONTROLLER_UTXO_SET_COUNT = Gauge("fulcrum_controller_utxo_set_count", "Number of outputs in the UTXO set")
CONTROLLER_UTXO_SET_SIZE = Gauge("fulcrum_controller_utxo_set_size_mb", "Size of the UTXO set")
CONTROLLER_ZMQ_NOTIFICATION_COUNT = Gauge("fulcrum_controller_zmq_notification_count", "Number of ZMQ notifications received")
CONTROLLER_TASK_COUNT = Gauge("fulcrum_controller_task_count", "Number of controller tasks")


JEMALLOC_STATS_ACTIVE = Gauge("fulcrum_jemalloc_stats_active", "Jemalloc active bytes")
JEMALLOC_STATS_ALLOCATED = Gauge("fulcrum_jemalloc_stats_allocated", "Jemalloc allocated bytes")
JEMALLOC_STATS_MAPPED = Gauge("fulcrum_jemalloc_stats_mapped", "Jemalloc mapped bytes")
JEMALLOC_STATS_METADATA = Gauge("fulcrum_jemalloc_stats_metadata", "Jemalloc metadata bytes")
JEMALLOC_STATS_RESIDENT = Gauge("fulcrum_jemalloc_stats_resident", "Jemalloc resident bytes")
JEMALLOC_STATS_RETAINED = Gauge("fulcrum_jemalloc_stats_retained", "Jemalloc retained bytes")


MEMORY_USAGE_PHYSICAL_KB = Gauge("fulcrum_memory_usage_physical_kb", "Physical memory usage in kilobytes")
MEMORY_USAGE_VIRTUAL_KB = Gauge("fulcrum_memory_usage_virtual_kb", "Virtual memory usage in kilobytes")


JOB_QUEUE_EXTANT_JOBS = Gauge("fulcrum_job_queue_extant_jobs", "Number of jobs in the job queue")
JOB_QUEUE_EXTANT_JOBS_MAX_LIFETIME = Gauge("fulcrum_job_queue_extant_jobs_max_lifetime",
                                           "Maximum number of jobs in the job queue over the process's life time")
JOB_QUEUE_EXTANT_JOBS_LIMIT = Gauge("fulcrum_job_queue_extant_jobs_limit", "Limit for number of jobs in the job queue")
JOB_QUEUE_COUNT_LIFETIME = Gauge("fulcrum_job_queue_count_lifetime",
                                 "Number of jobs processed by the job queue over the process's life time")
JOB_QUEUE_OVERFLOWS_LIFETIME = Gauge("fulcrum_job_queue_overflows_lifetime",
                                     "Number of job queue overflows over the process's life time")
JOB_QUEUE_THREAD_COUNT_MAX = Gauge("fulcrum_job_queue_thread_count_max", "Maximum number of threads for the job queue")


SERVER_MANAGER_PEER_COUNT = Gauge("fulcrum_server_manager_peer_count", "Peer count", labelnames=["peertype"])
SERVER_MANAGER_SERVER_CLIENT_COUNT = Gauge("fulcrum_server_manager_server_client_count",
                                           "Client count by server type", labelnames=["servertype"])
SERVER_MANAGER_CLIENT_COUNT = Gauge("fulcrum_server_manager_client_count", "Client count")
SERVER_MANAGER_CLIENT_COUNT_MAX_LIFETIME = Gauge(
    "fulcrum_server_manager_client_count_max_lifetime", "Max client count over the process's life time")
SERVER_MANAGER_TOTAL_LIFETIME_CLIENTS = Gauge(
    "fulcrum_server_manager_total_lifetime_clients", "Total number of clients over the process's life time")
SERVER_MANAGER_TRANSACTIONS_SENT_COUNT = Gauge(
    "fulcrum_server_manager_transactions_sent_count", "Number of transactions sent using this server")
SERVER_MANAGER_TRANSACTIONS_SENT_SIZE_BYTES = Gauge(
    "fulcrum_server_manager_transactions_sent_size_bytes", "Size of the transactions sent using this server in bytes")


STORAGE_DB_SHARED_BLOCK_CACHE = Gauge("fulcrum_storage_db_shared_block_cache",
                                      "Storage shared block cache", labelnames=["type"])
STORAGE_DB_SHARED_WRITE_BUFFER_MANAGER = Gauge("fulcrum_storage_db_shared_write_buffer_manager",
                                               "Storage shared write buffer manager", labelnames=["type"])
STORAGE_DB_STATS_CUR_SIZE_ALL_MEM_TABLES_BYTES = Gauge("fulcrum_storage_db_stats_cur_size_all_mem_tables_bytes",
                                                       "Approximate size of active and unflushed immutable memtables in bytes", labelnames=["database"])
STORAGE_DB_STATS_ESTIMATE_TABLE_READERS_MEM_BYTES = Gauge("fulcrum_storage_db_stats_estimate_table_readers_mem_bytes",
                                                          "Estimated memory used for reading SST tables, excluding memory used in block cache (e.g., filter and index blocks) in bytes", labelnames=["database"])
STORAGE_CACHES_LRU_SIZE_BYTES = Gauge("fulcrum_storage_caches_lru_size_bytes",
                                      "LRU Cache size in bytes", labelnames=["cachetype"])
STORAGE_CACHES_LRU_ENTRY_COUNT = Gauge("fulcrum_storage_caches_lru_entry_count",
                                       "LRU Cache entry count", labelnames=["cachetype"])
STORAGE_CACHES_LRU_APPROX_HITS = Gauge("fulcrum_storage_caches_lru_approx_hits",
                                       "LRU Cache approximate hits", labelnames=["cachetype"])
STORAGE_CACHES_LRU_APPROX_MISSES = Gauge("fulcrum_storage_caches_lru_approx_misses",
                                         "LRU Cache approximate misses", labelnames=["cachetype"])
STORAGE_CACHES_MERKLEHEADERS = Gauge("fulcrum_storage_caches_merkleheaders", "Merkleheader cache size", labelnames=["type"])
STORAGE_MERGE_CALLS = Gauge("fulcrum_storage_merge_calls", "Merged storage calls")


SUBSMGR_ACTIVE_SUBS_COUNT = Gauge("fulcrum_subsmgr_active_subs_count", "Number of active client subscriptions")
SUBSMGR_UNIQUE_SCRIPTHASH_SUBS = Gauge("fulcrum_subsmgr_unique_subs_count",
                                       "Number of unique scripthashes subscribed (including zombies)")
SUBSMGR_PENDING_NOTIF_COUNT = Gauge("fulcrum_subsmgr_pending_notif_count", "Number of pending notifications")
SUBSMGR_SUBS_BUCKET_COUNT = Gauge("fulcrum_subsmgr_subs_bucket_count", "Number of subscription buckets")
SUBSMGR_SUBS_CACHE_HITS = Gauge("fulcrum_subsmgr_subs_cache_hits", "Number of subscription cache hits")
SUBSMGR_SUBS_CACHE_MISSES = Gauge("fulcrum_subsmgr_subs_cache_misses", "Number of subscription cache misses")
SUBSMGR_SUBS_LOAD_FACTOR = Gauge("fulcrum_subsmgr_subs_load_factor", "Subscription load factor")


EXPORTER_ERRORS = Counter("fulcrum_exporter_errors",
                          "Number of errors encountered by the exporter", labelnames=["type"])
PROCESS_TIME = Counter("fulcrum_exporter_process_time", "Time spent processing metrics from Fulcrum")


FULCRUM_STATS_URL = os.environ.get("FULCRUM_STATS_URL", "http://127.0.0.1:8080/stats")
METRICS_ADDR = os.environ.get("METRICS_ADDR", "")  # empty = any address
METRICS_PORT = int(os.environ.get("METRICS_PORT", "50039"))
RETRIES = int(os.environ.get("RETRIES", 5))
TIMEOUT = int(os.environ.get("TIMEOUT", 30))
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")


def refresh_metrics() -> None:
    with urllib.request.urlopen(FULCRUM_STATS_URL) as stats_json:
        stats = json.load(stats_json)

    daemon = stats["Bitcoin Daemon"]
    BITCOIND_EXTANT_REQUEST_CONTEXTS.set(daemon["extant request contexts"])
    BITCOIND_REQUEST_CONTEXT_TABLE_SIZE.set(daemon["request context table size"])
    BITCOIND_REQUEST_TIMEOUT_COUNT.set(daemon["request timeout count"])
    BITCOIND_REQUEST_ZOMBIE_COUNT.set(daemon["request zombie count"])
    BITCOIND_RPCCLIENT_COUNT.set(len(daemon["rpc clients"]))

    ctrl = stats["Controller"]
    CONTROLLER_INFO.info({"chain": ctrl["Chain"], "coin": ctrl["Coin"]})
    CONTROLLER_HEADER_COUNT.set(ctrl["Header count"])
    CONTROLLER_TX_NUM.set(ctrl["TxNum"])
    CONTROLLER_UTXO_SET_COUNT.set(ctrl["UTXO set"])
    CONTROLLER_UTXO_SET_SIZE.set(float(ctrl["UTXO set bytes"].split()[0]))
    CONTROLLER_ZMQ_NOTIFICATION_COUNT.set(sum(value["notifications"]
                                              for key, value in ctrl["ZMQ Notifiers (active)"].items()))
    CONTROLLER_TASK_COUNT.set(len(ctrl["tasks"]))

    if "Jemalloc" in stats:
        jas = stats["Jemalloc"]["stats"]
        JEMALLOC_STATS_ACTIVE.set(jas["active"])
        JEMALLOC_STATS_ALLOCATED.set(jas["allocated"])
        JEMALLOC_STATS_MAPPED.set(jas["mapped"])
        JEMALLOC_STATS_METADATA.set(jas["metadata"])
        JEMALLOC_STATS_RESIDENT.set(jas["resident"])
        JEMALLOC_STATS_RETAINED.set(jas["retained"])

    MEMORY_USAGE_PHYSICAL_KB.set(stats["Memory Usage"]["physical kB"])
    MEMORY_USAGE_VIRTUAL_KB.set(stats["Memory Usage"]["virtual kB"])

    jobq = stats["Misc"]["Job Queue (Thread Pool)"]
    JOB_QUEUE_EXTANT_JOBS.set(jobq["extant jobs"])
    JOB_QUEUE_EXTANT_JOBS_MAX_LIFETIME.set(jobq["extant jobs (max lifetime)"])
    JOB_QUEUE_EXTANT_JOBS_LIMIT.set(jobq["extant limit"])
    JOB_QUEUE_COUNT_LIFETIME.set(jobq["job count (lifetime)"])
    JOB_QUEUE_OVERFLOWS_LIFETIME.set(jobq["job queue overflows (lifetime)"])
    JOB_QUEUE_THREAD_COUNT_MAX.set(jobq["thread count (max)"])

    srvm = stats["Server Manager"]

    for peertype in ["bad", "failed", "peers", "queued"]:
        SERVER_MANAGER_PEER_COUNT.labels(peertype).set(len(srvm["PeerMgr"][peertype]))

    for servertype in ["AdminSrv", "SslSrv", "TcpSrv", "WsSrv", "WssSrv"]:
        servers = [value for key, value in srvm["Servers"].items() if key.startswith(servertype)]
        SERVER_MANAGER_SERVER_CLIENT_COUNT.labels(servertype).set(sum(s["numClients"] for s in servers))

    SERVER_MANAGER_CLIENT_COUNT.set(srvm["number of clients"])
    SERVER_MANAGER_CLIENT_COUNT_MAX_LIFETIME.set(srvm["number of clients (max lifetime)"])
    SERVER_MANAGER_TOTAL_LIFETIME_CLIENTS.set(srvm["number of clients (total lifetime connections)"])
    SERVER_MANAGER_TRANSACTIONS_SENT_COUNT.set(srvm["transactions sent"])
    SERVER_MANAGER_TRANSACTIONS_SENT_SIZE_BYTES.set(srvm["transactions sent (bytes)"])

    stor = stats["Storage"]
    STORAGE_DB_SHARED_BLOCK_CACHE.labels("capacity").set(stor["DB Shared Block Cache"]["capacity"])
    STORAGE_DB_SHARED_BLOCK_CACHE.labels("usage").set(stor["DB Shared Block Cache"]["usage"])
    STORAGE_DB_SHARED_WRITE_BUFFER_MANAGER.labels("buffersize").set(
        stor["DB Shared Write Buffer Manager"]["buffer size"])
    STORAGE_DB_SHARED_WRITE_BUFFER_MANAGER.labels("memoryusage").set(
        stor["DB Shared Write Buffer Manager"]["memory usage"])

    for database in ["blkinfo", "meta", "scripthash_history", "scripthash_unspent", "undo", "utxoset"]:
        STORAGE_DB_STATS_CUR_SIZE_ALL_MEM_TABLES_BYTES.labels(database).set(
            stor["DB Stats"][database]["rocksdb.cur-size-all-mem-tables"])
        STORAGE_DB_STATS_ESTIMATE_TABLE_READERS_MEM_BYTES.labels(database).set(
            stor["DB Stats"][database]["rocksdb.estimate-table-readers-mem"])

    STORAGE_CACHES_LRU_SIZE_BYTES.labels("blockheight2txhashes").set(
        stor["caches"]["LRU Cache: Block Height -> TxHashes"]["Size bytes"])
    STORAGE_CACHES_LRU_ENTRY_COUNT.labels("blockheight2txhashes").set(
        stor["caches"]["LRU Cache: Block Height -> TxHashes"]["nBlocks"])
    STORAGE_CACHES_LRU_APPROX_HITS.labels("blockheight2txhashes").set(
        stor["caches"]["LRU Cache: Block Height -> TxHashes"]["~hits"])
    STORAGE_CACHES_LRU_APPROX_MISSES.labels("blockheight2txhashes").set(
        stor["caches"]["LRU Cache: Block Height -> TxHashes"]["~misses"])

    STORAGE_CACHES_LRU_SIZE_BYTES.labels("txnum2txhash").set(stor["caches"]["LRU Cache: TxNum -> TxHash"]["Size bytes"])
    STORAGE_CACHES_LRU_ENTRY_COUNT.labels("txnum2txhash").set(stor["caches"]["LRU Cache: TxNum -> TxHash"]["nItems"])
    STORAGE_CACHES_LRU_APPROX_HITS.labels("txnum2txhash").set(stor["caches"]["LRU Cache: TxNum -> TxHash"]["~hits"])
    STORAGE_CACHES_LRU_APPROX_MISSES.labels("txnum2txhash").set(stor["caches"]["LRU Cache: TxNum -> TxHash"]["~misses"])

    STORAGE_CACHES_MERKLEHEADERS.labels("count").set(stor["caches"]["merkleHeaders_Size"])
    STORAGE_CACHES_MERKLEHEADERS.labels("bytes").set(stor["caches"]["merkleHeaders_SizeBytes"])

    STORAGE_MERGE_CALLS.set(stor["merge calls"])

    subm = stats["SubsMgr"]
    SUBSMGR_ACTIVE_SUBS_COUNT.set(subm["Num. active client subscriptions"])
    SUBSMGR_UNIQUE_SCRIPTHASH_SUBS.set(subm["Num. unique scripthashes subscribed (including zombies)"])
    SUBSMGR_PENDING_NOTIF_COUNT.set(len(subm["pendingNotifications"]))
    SUBSMGR_SUBS_BUCKET_COUNT.set(subm["subscriptions bucket count"])
    SUBSMGR_SUBS_CACHE_HITS.set(subm["subscriptions cache hits"])
    SUBSMGR_SUBS_CACHE_MISSES.set(subm["subscriptions cache misses"])
    SUBSMGR_SUBS_LOAD_FACTOR.set(subm["subscriptions load factor"])


def sigterm_handler(signal, frame) -> None:
    logger.critical("Received SIGTERM. Exiting.")
    sys.exit(0)


def exception_count(e: Exception) -> None:
    err_type = type(e)
    exception_name = err_type.__module__ + "." + err_type.__name__
    EXPORTER_ERRORS.labels(**{"type": exception_name}).inc()


def main():
    # Set up logging to look similar to bitcoin logs (UTC).
    logging.basicConfig(
        format="%(asctime)s %(levelname)s %(message)s", datefmt="%Y-%m-%dT%H:%M:%SZ"
    )
    logging.Formatter.converter = time.gmtime
    logger.setLevel(LOG_LEVEL)

    # Handle SIGTERM gracefully.
    signal.signal(signal.SIGTERM, sigterm_handler)

    app = make_wsgi_app()

    last_refresh = None

    def refresh_app(*args, **kwargs):
        nonlocal last_refresh
        process_start = datetime.now()

        if not last_refresh or (process_start - last_refresh).total_seconds() > 1: # Limit updates to every 1 seconds
            try:
                refresh_metrics()
            except Exception as e:
                logger.debug("Refresh failed", exc_info=True)
                exception_count(e)

            duration = datetime.now() - process_start
            PROCESS_TIME.inc(duration.total_seconds())
            logger.info("Refresh took %s seconds", duration)
            last_refresh = process_start

        return app(*args, **kwargs)

    httpd = make_server(METRICS_ADDR, METRICS_PORT, refresh_app)
    httpd.serve_forever()


if __name__ == "__main__":
    main()