12
12
import psycopg2
13
13
import json
14
14
import decimal
15
+ import re
15
16
16
17
from elasticsearch import Elasticsearch
17
18
from elasticsearch import helpers
@@ -61,7 +62,7 @@ def __init__(self, PostGISConnection, ESConnection, view, sqlquerystring):
61
62
self ._pgConnection = PostGISConnection
62
63
self ._esConnection = ESConnection
63
64
self ._view = view
64
- self ._sqlquerystring = sqlquerystring
65
+ self ._sqlquerystring = re . sub ( r'\s{2,}' , ' ' , sqlquerystring )
65
66
self ._auth = get_config_params ('config.ini' )
66
67
67
68
def pgConnection (self ):
@@ -79,12 +80,8 @@ def auth(self):
79
80
def sqlquerystring (self ):
80
81
return self ._sqlquerystring
81
82
82
- def getGeoJson (self , sqlquerystring , pgConnection ):
83
- cur = pgConnection .pgConnection ().cursor ()
84
- cur .execute (sqlquerystring )
85
- rows = cur .fetchall ()
83
+ def getGeoJson (self , rows , columns ):
86
84
if rows :
87
- columns = [name [0 ] for name in cur .description ]
88
85
geomIndex = columns .index ('st_asgeojson' )
89
86
feature_collection = {'type' : 'FeatureCollection' ,
90
87
'features' : []}
@@ -101,6 +98,7 @@ def getGeoJson(self, sqlquerystring, pgConnection):
101
98
value = row [index ]
102
99
feature ['properties' ][column ] = value
103
100
feature_collection ['features' ].append (feature )
101
+
104
102
geojsonobject = json .dumps (feature_collection ,
105
103
indent = 2 ,
106
104
default = decimal_default )
@@ -142,33 +140,41 @@ def postgis2es(self):
142
140
sqlquerystring = self .sqlquerystring ().format (
143
141
** {'limit' : self .LIMIT ,
144
142
'offset' : self .OFFSET })
145
- geojsonobject = self .getGeoJson (sqlquerystring , self .pgConnection ())
146
- while geojsonobject is not None :
147
-
148
- print (sqlquerystring )
149
- self .populateElasticSearchIndex (self .esConnection (),
150
- geojsonobject ,
151
- self .auth (),
152
- self .view ())
153
- self .OFFSET += self .LIMIT
154
-
155
- sqlquerystring = self .sqlquerystring ().format (
156
- ** {'limit' : self .LIMIT ,
157
- 'offset' : self .OFFSET })
158
- geojsonobject = self .getGeoJson (sqlquerystring ,
159
- self .pgConnection ())
143
+
144
+ # Remove LIMIT and OFFSET until we decide to change all caller scripts
145
+ sqlquerystring = re .sub (r'\s+LIMIT.*' , '' , sqlquerystring )
146
+
147
+ print (sqlquerystring )
148
+
149
+ with self .pgConnection ().pgConnection () as conn :
150
+ with conn .cursor (name = 'postgis2es_cursor' ) as cur :
151
+ cur .itersize = self .LIMIT
152
+ cur .execute (sqlquerystring )
153
+ rows = cur .fetchmany (self .LIMIT )
154
+ columns = [name [0 ] for name in cur .description ]
155
+
156
+ count = 0
157
+ while rows :
158
+ count = count + 1
159
+ print ("Rows %d-%d, %s = %s" %
160
+ (self .LIMIT * (count - 1 ) + 1 , self .LIMIT * count ,
161
+ columns [0 ], rows [0 ][0 ]))
162
+
163
+ geojsonobject = self .getGeoJson (rows , columns )
164
+ # print("populateElasticsearchIndex()")
165
+ self .populateElasticSearchIndex (self .esConnection (),
166
+ geojsonobject ,
167
+ self .auth (),
168
+ self .view ())
169
+ rows = cur .fetchmany (self .LIMIT )
160
170
161
171
return
162
172
163
173
164
174
class PostGISPointDataset (PostGISdataset ):
165
175
166
- def getGeoJson (self , sqlquerystring , pgConnection ):
167
- cur = pgConnection .pgConnection ().cursor ()
168
- cur .execute (sqlquerystring )
169
- rows = cur .fetchall ()
176
+ def getGeoJson (self , rows , columns ):
170
177
if rows :
171
- columns = [name [0 ] for name in cur .description ]
172
178
geomIndex = columns .index ('st_asgeojson' )
173
179
feature_collection = {'type' : 'FeatureCollection' ,
174
180
'features' : []}
@@ -187,6 +193,7 @@ def getGeoJson(self, sqlquerystring, pgConnection):
187
193
value = row [index ]
188
194
feature ['properties' ][column ] = value
189
195
feature_collection ['features' ].append (feature )
196
+
190
197
geojsonobject = json .dumps (feature_collection ,
191
198
indent = 2 ,
192
199
default = decimal_default )
@@ -197,12 +204,8 @@ def getGeoJson(self, sqlquerystring, pgConnection):
197
204
198
205
class PostGISTable (PostGISdataset ):
199
206
200
- def getGeoJson (self , sqlquerystring , pgConnection ):
201
- cur = pgConnection .pgConnection ().cursor ()
202
- cur .execute (sqlquerystring )
203
- rows = cur .fetchall ()
207
+ def getGeoJson (self , rows , columns ):
204
208
if rows :
205
- columns = [name [0 ] for name in cur .description ]
206
209
# geomIndex = columns.index('st_asgeojson')
207
210
feature_collection = {'type' : 'FeatureCollection' ,
208
211
'features' : []}
@@ -221,6 +224,7 @@ def getGeoJson(self, sqlquerystring, pgConnection):
221
224
value = row [index ]
222
225
feature ['properties' ][column ] = value
223
226
feature_collection ['features' ].append (feature )
227
+
224
228
geojsonobject = json .dumps (feature_collection ,
225
229
indent = 2 ,
226
230
default = decimal_default )
0 commit comments