본문 바로가기
Python

[Python] 페이지네이션으로 업비트에서 비트코인의 모든 1분봉 가져오기

by teamnova 2024. 3. 13.
728x90

 

업비트 정책 상 한 번에 가져올 수 있는 1분봉의 최대 개수는 200개입니다. 더 이전의 기록을 가져오고 싶다면 페이지네이션을 활용해야하는데요. 아래는 그 예제입니다.

 

예제에서 사용할 테이블을 생성하는 쿼리는 아래와 같습니다.

CREATE TABLE upbit_bitcoin_candle_minute (
    market VARCHAR(10),
    candle_date_time_utc DATETIME,
    candle_date_time_kst DATETIME,
    opening_price DECIMAL(20, 8),
    high_price DECIMAL(20, 8),
    low_price DECIMAL(20, 8),
    trade_price DECIMAL(20, 8),
    timestamp BIGINT,
    candle_acc_trade_price DECIMAL(20, 8),
    candle_acc_trade_volume DECIMAL(20, 8),
    unit INT
);

 

 

예제에서 사용할 코드는 아래와 같습니다.

# upbit-indexer-bitcoin-candle-minute-history.py
# upbit으로부터, 가장 최근 1분봉부터 200개 단위로 가져와서,
# 가장 오래된 1분봉까지 가져올 때까지 반복해서 데이터를 가져온 후 MariaDB에 저장하는 코드
# 비트코인 데이터만 가져오는 코드
import requests
import json
import time
from datetime import datetime
import pymysql
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


# MariaDB 데이터베이스 연결 설정
def connect_db():
    return pymysql.connect(
        host="localhost", # 데이터베이스 호스트 주소
        database="stickode_240104_upbit", # 데이터베이스 이름
        user="root", # 데이터베이스 사용자
        password="root" # 데이터베이스 비밀번호
    )


# MariaDB에서 가장 최근의 candle_date_time_utc 가져오기
def get_latest_candle_time():
    conn = connect_db()
    cursor = conn.cursor()
    try:
        cursor.execute("SELECT MAX(candle_date_time_utc) FROM upbit_bitcoin_candle_minute")
        latest_time = cursor.fetchone()[0]
        return latest_time
    except Exception as e:
        logging.error(f"최신 캔들 시간 가져오기 오류: {e}")
        return None
    finally:
        cursor.close()
        conn.close()


# 데이터베이스에 데이터 삽입
def insert_data(data):
    conn = connect_db()
    cursor = conn.cursor()
    try:
        for item in data:
            cursor.execute("""
            INSERT INTO upbit_bitcoin_candle_minute (
                market, candle_date_time_utc, candle_date_time_kst, opening_price, high_price, low_price, trade_price, timestamp, candle_acc_trade_price, candle_acc_trade_volume, unit
            ) VALUES (
                %(market)s, %(candle_date_time_utc)s, %(candle_date_time_kst)s, %(opening_price)s, %(high_price)s, %(low_price)s, %(trade_price)s, %(timestamp)s, %(candle_acc_trade_price)s, %(candle_acc_trade_volume)s, %(unit)s
            )
            """, item)
        conn.commit()
        logging.info(f"데이터 삽입 성공: {len(data)} 레코드")
    except Exception as e:
        logging.error(f"데이터베이스 오류: {e}")
    finally:
        cursor.close()
        conn.close()


def fetch_candles(url, headers):
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        return json.loads(response.text)
    else:
        logging.error(f"Error fetching data: {response.status_code}")
        return None


def format_date(date_str):
    return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S").strftime("%Y-%m-%dT%H%%3A%M%%3A%S")


headers = {"accept": "application/json"}
base_url = "https://api.upbit.com/v1/candles/minutes/1?market=KRW-BTC&count=200"

# 최근 데이터 시간 가져오기
latest_candle_time = get_latest_candle_time()
logging.info(f"Latest candle time in DB: {latest_candle_time}")

# 최초 URL 설정
next_url = base_url

while True:
    data = fetch_candles(next_url, headers)

    # 데이터가 없거나 오류가 발생한 경우 반복을 중단
    if not data:
        break

    # 데이터베이스에 저장하기 전에 가장 최신 데이터인지 확인
    if latest_candle_time and data[0]['candle_date_time_utc'] <= latest_candle_time:
        logging.info("No new data to fetch")
        break

    insert_data(data)

    # 다음 요청을 위한 URL 업데이트
    oldest_time = data[-1]['candle_date_time_utc']
    next_url = f"{base_url}&to={format_date(oldest_time)}"

    # 잠시 멈춤 (API 제한 등을 고려)
    time.sleep(1)