본문 바로가기
Python

[Python] 실시간 비트코인 거래 데이터를 MariaDB에 저장하기

by teamnova 2024. 2. 5.
728x90

https://stickode.tistory.com/1038

 

저번 시간에 이어서, 오늘은 실시간으로 수신한 비트코인 거래 데이터들을 DB(mariaDB)에 INSERT하겠습니다.

 

MariaDB에는 다음과 같은 테이블을 추가하시면 됩니다.

CREATE TABLE trades (
    type VARCHAR(50),
    code VARCHAR(20),
    timestamp BIGINT,
    trade_date DATE,
    trade_time TIME,
    trade_timestamp BIGINT,
    trade_price DECIMAL(20, 4),
    trade_volume DECIMAL(20, 8),
    ask_bid VARCHAR(10),
    prev_closing_price DECIMAL(20, 8),
    `change` VARCHAR(10),
    change_price DECIMAL(20, 4),
    sequential_id BIGINT,
    stream_type VARCHAR(20)
);

 

 

아래 파일을 복사하신 후 실행하시면 됩니다. 파일명은 main.py입니다.

# pip install websocket-client pymysql
import json
import websocket
import pymysql

# 데이터베이스 연결 설정
db = pymysql.connect(host='localhost', user='root', password='root', db='stickode_231204_upbit', charset='utf8mb4')
cursor = db.cursor()

def on_message(ws, message):
    data = json.loads(message)  # JSON 데이터 파싱

    # 데이터베이스에 데이터 삽입
    sql = '''INSERT INTO trades (type, code, timestamp, trade_date, trade_time, trade_timestamp, trade_price, trade_volume, ask_bid, prev_closing_price, `change`, change_price, sequential_id, stream_type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'''
    values = (data['type'], data['code'], data['timestamp'], data['trade_date'], data['trade_time'], data['trade_timestamp'], data['trade_price'], data['trade_volume'], data['ask_bid'], data['prev_closing_price'], data['change'], data['change_price'], data['sequential_id'], data['stream_type'])

    try:
        cursor.execute(sql, values)
        db.commit()
        print("Insert successful!")  # 데이터 삽입 성공 메시지 출력

    except Exception as e:
        print(f"Database Error: {e}")
        db.rollback()

    # print(data)  # 디코딩된 데이터 출력

def on_connect(ws):
    print("connected!")  # 연결됨 메시지 출력
    # 연결 시 서버에 보낼 메시지 전송
    ws.send('[{"ticket": "test" }, {"type": "trade", "codes": ["KRW-BTC" ] }, {"format": "DEFAULT" } ]')

def on_error(ws, err):
    print(err)  # 에러 발생 시 에러 메시지 출력

def on_close(ws, status_code, msg):
    print("closed!")  # 연결 종료 시 메시지 출력

# 웹소켓 애플리케이션 생성 및 설정
ws_app = websocket.WebSocketApp("wss://api.upbit.com/websocket/v1",
                                on_message=on_message,
                                on_open=on_connect,
                                on_error=on_error,
                                on_close=on_close)
ws_app.run_forever()  # 웹소켓 애플리케이션 실행

# 프로그램 종료 시 데이터베이스 연결 종료
db.close()