Load bulk into PostgreSQLfrom log files using Python

Shad

This is a follow-up question. Below is a piece of my Python script that reads a constantly growing log files (text) and insert data into Postgresql DB. New log file generated each day. What I do is I commit each line which cuases a huge load and a really poor performance (needs 4 hours to insert 30 min of the file data!). How can I improve this code to insert bulks insead of lines? and would this help improve the performance and reduce load? I've read about copy_from but couldn't figure out how to use it in such situation.

   import psycopg2 as psycopg
                    try:
                      connectStr = "dbname='postgis20' user='postgres' password='' host='localhost'"
                      cx = psycopg.connect(connectStr)
                      cu = cx.cursor()
                      logging.info("connected to DB")
                    except:
                      logging.error("could not connect to the database")


                import time
                file = open('textfile.log', 'r')
                while 1:
                    where = file.tell()
                    line = file.readline()
                    if not line:
                        time.sleep(1)
                        file.seek(where)
                    else:
                        print line, # already has newline
                        dodecode(line)
            ------------
    def dodecode(fields):
   global cx
   from time import strftime, gmtime
   from calendar import timegm
   import os
   msg = fields.split(',')
   part = eval(msg[2])
   msgnum = int(msg[3:6])
   print "message#:", msgnum
   print fields

   if (part==1):
     if msgnum==1:
       msg1 = msg_1.decode(bv)
       #print "message1 :",msg1
       Insert(msgnum,time,msg1)
     elif msgnum==2:
       msg2 = msg_2.decode(bv)
       #print "message2 :",msg2
       Insert(msgnum,time,msg2)
     elif msgnum==3:
     ....
     ....
     ....    
        ----------------
        def Insert(msgnum,time,msg):
         global cx

         try:    
                 if msgnum in [1,2,3]:   
                  if msg['type']==0:
                    cu.execute("INSERT INTO table1 ( messageid, timestamp, userid, position, text ) SELECT "+str(msgnum)+", '"+time+"', "+str(msg['UserID'])+", ST_GeomFromText('POINT("+str(float(msg['longitude']), '"+text+"')+" "+str(float(msg['latitude']))+")']))+"  WHERE NOT EXISTS (SELECT * FROM table1 WHERE timestamp='"+time+"' AND text='"+text+"';")      
                    cu.execute("INSERT INTO table2 ( field1,field2,field3, time_stamp, pos,) SELECT "+str(msg['UserID'])+","+str(int(msg['UserName']))+","+str(int(msg['UserIO']))+", '"+time+"', ST_GeomFromText('POINT("+str(float(msg['longitude']))+" "+str(float(msg['latitude']))+")')," WHERE NOT EXISTS (SELECT * FROM table2 WHERE field1="+str(msg['UserID'])+");")
                    cu.execute("Update table2 SET field3='"+str(int(msg['UserIO']))+"',time_stamp='"+str(time)+"',pos=ST_GeomFromText('POINT("+str(float(msg['longitude']))+" "+str(float(msg['latitude']))+")'),"' WHERE field1='"+str(msg['UserID'])+"' AND time_stamp < '"+str(time)+"';")
                  elif msg['type']==1:
                    cu.execute("INSERT INTO table1 ( messageid, timestamp, userid, position, text ) SELECT "+str(msgnum)+", '"+time+"', "+str(msg['UserID'])+", ST_GeomFromText('POINT("+str(float(msg['longitude']), '"+text+"')+" "+str(float(msg['latitude']))+")']))+"  WHERE NOT EXISTS (SELECT * FROM table1 WHERE timestamp='"+time+"' AND text='"+text+"';")    
                    cu.execute("INSERT INTO table2 ( field1,field2,field3, time_stamp, pos,) SELECT "+str(msg['UserID'])+","+str(int(msg['UserName']))+","+str(int(msg['UserIO']))+", '"+time+"', ST_GeomFromText('POINT("+str(float(msg['longitude']))+" "+str(float(msg['latitude']))+")')," WHERE NOT EXISTS (SELECT * FROM table2 WHERE field1="+str(msg['UserID'])+");")
                    cu.execute("Update table2 SET field3='"+str(int(msg['UserIO']))+"',time_stamp='"+str(time)+"',pos=ST_GeomFromText('POINT("+str(float(msg['longitude']))+" "+str(float(msg['latitude']))+")'),"' WHERE field1='"+str(msg['UserID'])+"' AND time_stamp < '"+str(time)+"';")
                  elif msg['type']==2:
                ....
                ....
                ....
     except Exception, err:
             #print('ERROR: %s\n' % str(err))
             logging.error('ERROR: %s\n' % str(err))
             cx.commit()

     cx.commit()  
Jasen

doing multiple rows per transaction, and per query will make it go faster,

when faced with a similar problem I put multiple rows in the values part of the insert query, but you have complicated insert queries, so you'll likely need a different approach.

I'd suggest creating a temporary table and inserting say 10000 rows into it with ordinary multi-row inserts

insert into temptable values ( /* row1 data */ ) ,( /* row2 data */ ) etc...

500 rows per insert.is a good starting point.

then joining the temp table with the existing data to de-dupe it.

delete from temptable using livetable where /* .join condition */ ;

and de-duping it against itself if that is needed too

delete from temptable where id not in 
  ( select distinct on ( /* unique columns */) id from temptable);

then using insert-select to copy the rows from the temporary table into the live table

insert into livetable ( /* columns */ )
  select /* columns */ from temptable; 

it looks like you might need an update-from too

and finally dropping the temp table and starting again.

ans you're writing two tables you;re going to need to double-up all these operations.

I'd do the insert by maintaing a count and a list of values to insert and then at insert time building a repeating the (%s,%s,%s,%s) part ot the query as many times as needed and passing the list of values in separately and letting psycopg2 deal with the formatting.

I'd expect making those changes could get you a speed up of 5 times for more

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

From Dev

Parsing the USPTO bulk XML files using Python

From Dev

Bulk load XML files into Cassandra

From Dev

How can I bulk/batch transcribe wav files using python?

From Dev

How to load quads using the bulk loader?

From Dev

What is the fastest way to bulk load a file (created by Postgres) into an in-memory SQLite database using Python 3?

From Dev

Bulk importing of CSV when using header files

From Dev

Python log into https website (epex spot) using requests to download files

From Dev

Python - Opening multiple files using open/np.load

From Dev

Using Census Bulk Geocoder with python requests library

From Dev

using pipeline for bulk processing in redis (python example)

From Dev

Bulk loading with special characters using Python

From Dev

Why I am not able to load excel files generated in the morning, but can load them in the afternoon in Python using Openpyxl

From Dev

Indexing log files using Solr

From Dev

Index log files using Elasticsearch

From Dev

Multiple Log Files using NLog

From Dev

Bulk renaming of csv files of same name using shell script

From Dev

How to rename bulk of jpg files to png using convert

From Dev

Using Querytable to load Text files

From Dev

HBase bulk delete as "complete bulk load"

From Java

How can I bulk load CSV files into Snowflake with the filename added as a column?

From Dev

Bulk validating yaml files

From Dev

Bulk rename files with numbering

From Dev

Bulk validating yaml files

From Dev

Rename bulk files

From Dev

Remove bulk files

From Dev

Google Compute Engine HTTP Load Balancing - Log Files Available?

From Dev

How to insert Bulk data into Neo4j using Python

From Dev

Fasted Python way to bulk csv convert outside of using pandas

From Dev

How to efficiently insert bulk data into Cassandra using Python?

Related Related

  1. 1

    Parsing the USPTO bulk XML files using Python

  2. 2

    Bulk load XML files into Cassandra

  3. 3

    How can I bulk/batch transcribe wav files using python?

  4. 4

    How to load quads using the bulk loader?

  5. 5

    What is the fastest way to bulk load a file (created by Postgres) into an in-memory SQLite database using Python 3?

  6. 6

    Bulk importing of CSV when using header files

  7. 7

    Python log into https website (epex spot) using requests to download files

  8. 8

    Python - Opening multiple files using open/np.load

  9. 9

    Using Census Bulk Geocoder with python requests library

  10. 10

    using pipeline for bulk processing in redis (python example)

  11. 11

    Bulk loading with special characters using Python

  12. 12

    Why I am not able to load excel files generated in the morning, but can load them in the afternoon in Python using Openpyxl

  13. 13

    Indexing log files using Solr

  14. 14

    Index log files using Elasticsearch

  15. 15

    Multiple Log Files using NLog

  16. 16

    Bulk renaming of csv files of same name using shell script

  17. 17

    How to rename bulk of jpg files to png using convert

  18. 18

    Using Querytable to load Text files

  19. 19

    HBase bulk delete as "complete bulk load"

  20. 20

    How can I bulk load CSV files into Snowflake with the filename added as a column?

  21. 21

    Bulk validating yaml files

  22. 22

    Bulk rename files with numbering

  23. 23

    Bulk validating yaml files

  24. 24

    Rename bulk files

  25. 25

    Remove bulk files

  26. 26

    Google Compute Engine HTTP Load Balancing - Log Files Available?

  27. 27

    How to insert Bulk data into Neo4j using Python

  28. 28

    Fasted Python way to bulk csv convert outside of using pandas

  29. 29

    How to efficiently insert bulk data into Cassandra using Python?

HotTag

Archive