|
import psycopg2
|
|
|
class Dbconnect():
|
|
def testSelect(self):
|
conn = psycopg2.connect(host='192.168.11.42',port='5115',database='pgDMMSNS', user='postgres',password='simple000')
|
print('connected')
|
#SELECT gid, did, tid, oid, cid, lid, level, symcolor, symweight, symstyle, geom FROM gis3d."fsc-401-c-0";
|
|
cursor = conn.cursor()
|
cursor.execute('SELECT gid, did, tid, oid, geom FROM gis3d."fsc-401-c-0"')
|
rows =cursor.fetchall()
|
for row in rows:
|
print ('id=',row[0], ',gid=',row[1],',tid=',row[2],',oid=',row[3],'\n')
|
cursor.close()
|
conn.close()
|
#select must offline
|
def fetchTableWithConn(self,dbQuery,fnProc=None):
|
arrayRow=[]
|
try:
|
cursor = self.dbConn.cursor()
|
cursor.execute(dbQuery)
|
rows = cursor.fetchall()
|
|
arrayRow=rows
|
#for row in rows:
|
# #print(row)
|
# #print(fnProc)
|
# if(fnProc!=None):
|
# if(fnProc.add !=None):
|
# fnProc.add( (row) )
|
|
except(Exception, psycopg2.Error) as error:
|
print("error connect",error)
|
#conn =None
|
finally:
|
#if(fnProc!=None):
|
# if(fnProc.done !=None):
|
# fnProc.done( (dbQuery) )
|
cursor.close()
|
#read it
|
print('close cursor')
|
for row in arrayRow:
|
if(fnProc!=None):
|
if(fnProc.add != None):
|
fnProc.add ( (row) )
|
lastrw=row
|
if(fnProc !=None):
|
if(fnProc.done != None):
|
fnProc.done({'sql':dbQuery,'row':lastrw} )
|
|
def fetchTable(self,dbConf,dbQuery,fnProc=None):
|
print('try database')
|
#print('sqlis->',dbQuery)
|
|
try:
|
conn = psycopg2.connect(
|
host=dbConf['host'],
|
port=dbConf['port'],
|
database=dbConf['database'],
|
user=dbConf['user'],
|
password=dbConf['password']
|
)
|
cursor = conn.cursor()
|
cursor.execute(dbQuery)
|
rows = cursor.fetchall()
|
lastrw={}
|
for row in rows:
|
#print(row)
|
#print(fnProc)
|
if(fnProc!=None):
|
if(fnProc.add !=None):
|
fnProc.add( (row) )
|
lastrw=row
|
|
except(Exception, psycopg2.Error) as error:
|
print("error connect",error)
|
conn =None
|
finally:
|
if(fnProc!=None):
|
if(fnProc.done !=None):
|
fnProc.done( ({'sql':dbQuery,'row':lastrw}) )
|
|
if( conn != None):
|
cursor.close()
|
conn.close()
|
|
def executeWithConn(self,dbSql,fnProc=None):
|
try:
|
cursor = self.dbConn.cursor()
|
|
cursor.execute(dbSql)
|
#record = cursor.fetchone()
|
|
if(fnProc!=None):
|
if(fnProc.add !=None):
|
fnProc.add ( {} )
|
|
#self.dbConn.commit()
|
|
except(Exception, psycopg2.Error) as error:
|
print("error connect",error,dbSql)
|
finally:
|
if(fnProc!=None):
|
if(fnProc.done !=None):
|
fnProc.done( (dbSql) )
|
cursor.close()
|
|
def execute(self,dbConf,dbSql,fnProc=None):
|
print('try exec-database')
|
#print('sqlis->',dbQuery)
|
|
try:
|
conn = psycopg2.connect(
|
host=dbConf['host'],
|
port=dbConf['port'],
|
database=dbConf['database'],
|
user=dbConf['user'],
|
password=dbConf['password']
|
)
|
|
cursor = conn.cursor()
|
|
cursor.execute(dbSql)
|
#record = cursor.fetchone()
|
|
if(fnProc!=None):
|
if(fnProc.add !=None):
|
fnProc.add ( {} )
|
|
conn.commit()
|
print('commit')
|
except(Exception, psycopg2.Error) as error:
|
print("error connect",error)
|
conn =None
|
finally:
|
if(fnProc!=None):
|
if(fnProc.done !=None):
|
fnProc.done( (dbSql) )
|
if( conn != None):
|
cursor.close()
|
conn.close()
|
|
def tryConn(self, dbConf):
|
print('try database')
|
print(dbConf)
|
print(self)
|
|
try:
|
if (self.dbConn != None):
|
self.destroyConn()
|
|
self.dbConn = psycopg2.connect(
|
host=dbConf['host'],
|
port=dbConf['port'],
|
database=dbConf['database'],
|
user=dbConf['user'],
|
password=dbConf['password'],
|
|
)
|
self.dbConn.set_session(autocommit=True)
|
|
except(Exception, psycopg2.Error) as error:
|
print("error connect",error)
|
self.dbConn=None
|
|
def destroyConn(self):
|
try:
|
if( self.dbConn != None):
|
self.dbConn.close()
|
|
except(Exception, psycopg2.Error) as error:
|
print('close connect')
|
finally:
|
self.dbConn = None
|
|
def __init__(self):
|
self.dbConn=None
|