1. 파이썬파일
python에서 list로 묶어서 전송후 postgresql에서 unnest 풀어서 저장
import pandas as pd
import psycopg2 as pg
#%%
# -*- coding: utf-8 -*-
"""
Created on Mon Jan 13 20:57:30 2020
@author: user
"""
class PostgresDataClass():
def __init__(self,host,database,user,password):
self.host = host
self.user = user
self.database = database
self.password = password
self.type_dict = {'timestamp without time zone':'timestamp[]',
'character varying':'text[]',
'integer':'int4[]',
'date':'date[]',
'real':'real[]',
'bigint':'bigint[]',
'double precision':'double precision[]',
'numeric':'numeric[]',
'text':'text[]'}
def connect(self):
conn = pg.connect(host=self.host,
database=self.database,
user = self.user,
password = self.password)
return conn
def get_cursor(self, conn):
cur = conn.cursor()
return cur
##select return list
def select_list(self,sql,param=()):
with self.connect() as conn:
with self.get_cursor(conn) as cur:
cur.execute(sql,param)
return cur.fetchall()
##select return dataframe
def select_dataframe(self,sql,param=()):
with self.connect() as conn:
with self.get_cursor(conn) as cur:
cur.execute(sql,param)
colnames = [desc[0] for desc in cur.description]
df = pd.DataFrame(cur.fetchall())
df.columns = colnames
return df
##query execute
def execute(self,sql,param=()):
with self.connect() as conn:
with self.get_cursor(conn) as cur :
try :
cur.execute(sql,param)
conn.commit()
except Exception as e:
print('%s %s'%(e.__class.__name__,str(e)))
def set_source_list(self,data):
self.source_list = data
def insert_list(self,data,schema_table_name,size=100000):
it = iter(data)
win =[]
length = len(data)
record_sum = 0
with self.connect() as conn:
with self.get_cursor(conn) as cur:
while True:
try :
for e in range(0,size):
win.append(next(it))
except Exception :
None
if len(win)==0:
break
record_sum += len(win)
self.set_source_list(win)
self.migration(cur,schema_table_name)
win = []
conn.commit()
def migration(self,cur,schema_table_name):
schema, table_name = schema_table_name.split('.')
table_info = self.select_dataframe(""" SELECT column_name, ordinal_position,is_nullable, data_type
FROM information_schema.columns WHERE table_schema = lower(%s)
AND table_name = lower(%s) order by ordinal_position""",(schema,table_name))
index = 0
for column_name in table_info['column_name']:
exec(column_name + '=[x[{index}] for x in self.source_list]'.format(index=index))
index += 1
columns = ','.join(table_info['column_name'])
unnest_columns=[]
for i in range(0,len(table_info)):
unnest_columns.append('unnest(%({})s::{})'.format(table_info['column_name'][i], self.type_dict[table_info['data_type'][i]]))
insert_sql = """INSERT INTO {schema_table_name}({columns}) SELECT {unnest_columns} ON CONFLICT DO NOTHING""".format(schema_table_name =schema_table_name, columns =columns, unnest_columns=','.join(unnest_columns))
params = locals()
cur.execute(insert_sql,params)
2. 가져다 쓰기 (연결부분)
from postLib import PostgressConn
post = PostgressConn('host','database','user','password')
post.insert_list(tuples, 'data')
3. 끝!