author | Tero Marttila <terom@fixme.fi> |
Sat, 29 Nov 2008 00:39:47 +0200 | |
branch | new-evsql |
changeset 46 | 75cecfc4603b |
parent 25 | 99a41f48e29b |
permissions | -rw-r--r-- |
12 | 1 |
|
2 |
#include <event2/event.h> |
|
3 |
#include <assert.h> |
|
4 |
#include <stdlib.h> |
|
5 |
||
6 |
#include "evpq.h" |
|
7 |
#include "lib/error.h" |
|
8 |
||
9 |
struct evpq_conn { |
|
10 |
struct event_base *ev_base; |
|
11 |
struct evpq_callback_info user_cb; |
|
21
e5da1d428e3e
new evsql for queueing SQL queries
Tero Marttila <terom@fixme.fi>
parents:
12
diff
changeset
|
12 |
void *user_cb_arg; |
12 | 13 |
|
14 |
PGconn *pg_conn; |
|
15 |
||
16 |
struct event *ev; |
|
17 |
||
18 |
enum evpq_state state; |
|
19 |
}; |
|
20 |
||
21 |
/* |
|
22 |
* This evpq_conn has experienced a GENERAL FAILURE. |
|
23 |
*/ |
|
24 |
static void _evpq_failure (struct evpq_conn *conn) { |
|
25 |
// update state |
|
26 |
conn->state = EVPQ_FAILURE; |
|
27 |
||
28 |
// notify |
|
21
e5da1d428e3e
new evsql for queueing SQL queries
Tero Marttila <terom@fixme.fi>
parents:
12
diff
changeset
|
29 |
conn->user_cb.fn_failure(conn, conn->user_cb_arg); |
12 | 30 |
} |
31 |
||
32 |
/* |
|
33 |
* Initial connect was succesfull |
|
34 |
*/ |
|
35 |
static void _evpq_connect_ok (struct evpq_conn *conn) { |
|
36 |
// update state |
|
37 |
conn->state = EVPQ_CONNECTED; |
|
38 |
||
39 |
// notify |
|
21
e5da1d428e3e
new evsql for queueing SQL queries
Tero Marttila <terom@fixme.fi>
parents:
12
diff
changeset
|
40 |
conn->user_cb.fn_connected(conn, conn->user_cb_arg); |
12 | 41 |
} |
42 |
||
43 |
/* |
|
44 |
* Initial connect failed |
|
45 |
*/ |
|
46 |
static void _evpq_connect_fail (struct evpq_conn *conn) { |
|
47 |
// just mark it as a generic failure |
|
48 |
_evpq_failure(conn); |
|
49 |
} |
|
50 |
||
51 |
/* |
|
52 |
* Receive a result and gives it to the user. If there was no more results, update state and tell the user. |
|
53 |
* |
|
54 |
* Returns zero if we got a result, 1 if there were/are no more results to handle. |
|
55 |
*/ |
|
56 |
static int _evpq_query_result (struct evpq_conn *conn) { |
|
57 |
PGresult *result; |
|
58 |
||
59 |
// get the result |
|
60 |
if ((result = PQgetResult(conn->pg_conn)) == NULL) { |
|
61 |
// no more results, update state |
|
62 |
conn->state = EVPQ_CONNECTED; |
|
63 |
||
64 |
// tell the user the query is done |
|
22
85ba190a9e68
fix evpq/evpq_test compilation
Tero Marttila <terom@fixme.fi>
parents:
21
diff
changeset
|
65 |
conn->user_cb.fn_done(conn, conn->user_cb_arg); |
12 | 66 |
|
67 |
// stop waiting for more results |
|
68 |
return 1; |
|
69 |
||
70 |
} else { |
|
71 |
// got a result, give it to the user |
|
21
e5da1d428e3e
new evsql for queueing SQL queries
Tero Marttila <terom@fixme.fi>
parents:
12
diff
changeset
|
72 |
conn->user_cb.fn_result(conn, result, conn->user_cb_arg); |
12 | 73 |
|
74 |
// great |
|
75 |
return 0; |
|
76 |
} |
|
77 |
} |
|
78 |
||
79 |
/* |
|
80 |
* Schedule a new _evpq_event for this connection. |
|
81 |
*/ |
|
82 |
static int _evpq_schedule (struct evpq_conn *conn, short what, void (*handler)(evutil_socket_t, short, void *)) { |
|
83 |
assert(conn->pg_conn != NULL); |
|
84 |
||
85 |
// ensure we have a valid socket, this should be the case after the PQstatus check... |
|
86 |
if (PQsocket(conn->pg_conn) < 0) |
|
87 |
FATAL("PQsocket gave invalid socket"); |
|
88 |
||
89 |
// reschedule with a new event |
|
90 |
if (conn->ev) { |
|
91 |
event_assign(conn->ev, conn->ev_base, PQsocket(conn->pg_conn), what, handler, conn); |
|
92 |
||
93 |
} else { |
|
94 |
if ((conn->ev = event_new(conn->ev_base, PQsocket(conn->pg_conn), what, handler, conn)) == NULL) |
|
95 |
PERROR("event_new"); |
|
96 |
||
97 |
} |
|
98 |
||
99 |
// add it |
|
100 |
// XXX: timeouts? |
|
101 |
if (event_add(conn->ev, NULL)) |
|
102 |
PERROR("event_add"); |
|
103 |
||
104 |
// success |
|
105 |
return 0; |
|
106 |
||
107 |
error: |
|
108 |
return -1; |
|
109 |
} |
|
110 |
||
111 |
/* |
|
112 |
* Handle events on the PQ socket while connecting |
|
113 |
*/ |
|
114 |
static void _evpq_connect_event (evutil_socket_t fd, short what, void *arg) { |
|
115 |
struct evpq_conn *conn = arg; |
|
116 |
PostgresPollingStatusType poll_status; |
|
117 |
||
118 |
// this is only for connect events |
|
119 |
assert(conn->state == EVPQ_CONNECT); |
|
120 |
||
121 |
// XXX: timeouts? |
|
122 |
||
123 |
// ask PQ what to do |
|
124 |
switch ((poll_status = PQconnectPoll(conn->pg_conn))) { |
|
125 |
case PGRES_POLLING_READING: |
|
126 |
// poll for read |
|
127 |
what = EV_READ; |
|
128 |
||
129 |
// reschedule |
|
130 |
break; |
|
131 |
||
132 |
case PGRES_POLLING_WRITING: |
|
133 |
// poll for write |
|
134 |
what = EV_WRITE; |
|
135 |
||
136 |
// reschedule |
|
137 |
break; |
|
138 |
||
139 |
case PGRES_POLLING_OK: |
|
140 |
// connected |
|
141 |
_evpq_connect_ok(conn); |
|
142 |
||
143 |
// done |
|
144 |
return; |
|
145 |
||
146 |
case PGRES_POLLING_FAILED: |
|
147 |
// faaaaail! |
|
148 |
_evpq_connect_fail(conn); |
|
149 |
||
150 |
// done |
|
151 |
return; |
|
152 |
||
153 |
default: |
|
154 |
FATAL("PQconnectPoll gave a weird value: %d", poll_status); |
|
155 |
} |
|
156 |
||
157 |
// reschedule |
|
158 |
if (_evpq_schedule(conn, what, _evpq_connect_event)) |
|
159 |
goto error; |
|
160 |
||
161 |
// done, wait for the next event |
|
162 |
return; |
|
163 |
||
164 |
error: |
|
165 |
// XXX: reset? |
|
166 |
_evpq_failure(conn); |
|
167 |
} |
|
168 |
||
169 |
static void _evpq_query_event (evutil_socket_t fd, short what, void *arg) { |
|
170 |
struct evpq_conn *conn = arg; |
|
171 |
||
172 |
// this is only for query events |
|
173 |
assert(conn->state == EVPQ_QUERY); |
|
174 |
||
175 |
// XXX: PQflush, timeouts |
|
176 |
assert(what == EV_READ); |
|
177 |
||
178 |
// we're going to assume that all queries will *require* data for their results |
|
179 |
// this would break otherwise (PQconsumeInput might block?) |
|
180 |
assert(PQisBusy(conn->pg_conn) != 0); |
|
181 |
||
182 |
// handle input |
|
183 |
if (PQconsumeInput(conn->pg_conn) == 0) |
|
184 |
ERROR("PQconsumeInput: %s", PQerrorMessage(conn->pg_conn)); |
|
185 |
||
186 |
// handle results |
|
187 |
while (PQisBusy(conn->pg_conn) == 0) { |
|
188 |
// handle the result |
|
189 |
if (_evpq_query_result(conn) == 1) { |
|
190 |
// no need to wait for anything anymore |
|
191 |
return; |
|
192 |
} |
|
193 |
||
194 |
// loop to handle the next result |
|
195 |
} |
|
196 |
||
197 |
// still need to wait for a result, so reschedule |
|
198 |
if (_evpq_schedule(conn, EV_READ, _evpq_query_event)) |
|
199 |
goto error; |
|
200 |
||
201 |
// done, wait for the next event |
|
202 |
return; |
|
203 |
||
204 |
error: |
|
205 |
// XXX: reset? |
|
206 |
_evpq_failure(conn); |
|
207 |
||
208 |
} |
|
209 |
||
21
e5da1d428e3e
new evsql for queueing SQL queries
Tero Marttila <terom@fixme.fi>
parents:
12
diff
changeset
|
210 |
struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info, void *cb_arg) { |
12 | 211 |
struct evpq_conn *conn = NULL; |
212 |
||
213 |
// alloc our context |
|
214 |
if ((conn = calloc(1, sizeof(*conn))) == NULL) |
|
215 |
ERROR("calloc"); |
|
216 |
||
217 |
// initial state |
|
218 |
conn->ev_base = ev_base; |
|
219 |
conn->user_cb = cb_info; |
|
21
e5da1d428e3e
new evsql for queueing SQL queries
Tero Marttila <terom@fixme.fi>
parents:
12
diff
changeset
|
220 |
conn->user_cb_arg = cb_arg; |
12 | 221 |
conn->state = EVPQ_INIT; |
222 |
||
223 |
// create our PGconn |
|
224 |
if ((conn->pg_conn = PQconnectStart(conninfo)) == NULL) |
|
225 |
PERROR("PQconnectStart"); |
|
226 |
||
227 |
// check for immediate failure |
|
228 |
if (PQstatus(conn->pg_conn) == CONNECTION_BAD) |
|
229 |
ERROR("PQstatus indicates CONNECTION_BAD after PQconnectStart"); |
|
230 |
||
231 |
// assume PGRES_POLLING_WRITING |
|
232 |
if (_evpq_schedule(conn, EV_WRITE, _evpq_connect_event)) |
|
233 |
goto error; |
|
234 |
||
235 |
// connecting |
|
236 |
conn->state = EVPQ_CONNECT; |
|
237 |
||
238 |
// success, wait for the connection to be established |
|
239 |
return conn; |
|
240 |
||
241 |
error: |
|
25
99a41f48e29b
evsql transactions, it compiles...
Tero Marttila <terom@fixme.fi>
parents:
24
diff
changeset
|
242 |
if (conn) |
99a41f48e29b
evsql transactions, it compiles...
Tero Marttila <terom@fixme.fi>
parents:
24
diff
changeset
|
243 |
evpq_release(conn); |
12 | 244 |
|
245 |
return NULL; |
|
246 |
} |
|
247 |
||
23
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
248 |
static int _evpq_check_query (struct evpq_conn *conn) { |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
249 |
// just check the state |
12 | 250 |
if (conn->state != EVPQ_CONNECTED) |
251 |
ERROR("invalid evpq state: %d", conn->state); |
|
252 |
||
23
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
253 |
// ok |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
254 |
return 0; |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
255 |
|
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
256 |
error: |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
257 |
return -1; |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
258 |
} |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
259 |
|
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
260 |
static int _evpq_handle_query (struct evpq_conn *conn) { |
12 | 261 |
// update state |
262 |
conn->state = EVPQ_QUERY; |
|
263 |
||
264 |
// XXX: PQflush |
|
265 |
||
266 |
// poll for read |
|
267 |
if (_evpq_schedule(conn, EV_READ, _evpq_query_event)) |
|
268 |
goto error; |
|
269 |
||
270 |
// and then we wait |
|
271 |
return 0; |
|
272 |
||
273 |
error: |
|
274 |
return -1; |
|
275 |
} |
|
276 |
||
23
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
277 |
int evpq_query (struct evpq_conn *conn, const char *command) { |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
278 |
// check state |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
279 |
if (_evpq_check_query(conn)) |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
280 |
goto error; |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
281 |
|
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
282 |
// do the query |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
283 |
if (PQsendQuery(conn->pg_conn, command) == 0) |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
284 |
ERROR("PQsendQuery: %s", PQerrorMessage(conn->pg_conn)); |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
285 |
|
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
286 |
// handle it |
24 | 287 |
if (_evpq_handle_query(conn)) |
23
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
288 |
goto error; |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
289 |
|
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
290 |
// success |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
291 |
return 0; |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
292 |
|
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
293 |
error: |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
294 |
return -1; |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
295 |
} |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
296 |
|
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
297 |
int evpq_query_params (struct evpq_conn *conn, const char *command, int nParams, const Oid *paramTypes, const char * const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat) { |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
298 |
// check state |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
299 |
if (_evpq_check_query(conn)) |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
300 |
goto error; |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
301 |
|
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
302 |
// do the query |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
303 |
if (PQsendQueryParams(conn->pg_conn, command, nParams, paramTypes, paramValues, paramLengths, paramFormats, resultFormat) == 0) |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
304 |
ERROR("PQsendQueryParams: %s", PQerrorMessage(conn->pg_conn)); |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
305 |
|
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
306 |
// handle it |
24 | 307 |
if (_evpq_handle_query(conn)) |
23
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
308 |
goto error; |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
309 |
|
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
310 |
// success |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
311 |
return 0; |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
312 |
|
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
313 |
error: |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
314 |
return -1; |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
315 |
|
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
316 |
} |
1dee73ae4ad0
evsql_query_params compiles...
Tero Marttila <terom@fixme.fi>
parents:
22
diff
changeset
|
317 |
|
25
99a41f48e29b
evsql transactions, it compiles...
Tero Marttila <terom@fixme.fi>
parents:
24
diff
changeset
|
318 |
void evpq_release (struct evpq_conn *conn) { |
99a41f48e29b
evsql transactions, it compiles...
Tero Marttila <terom@fixme.fi>
parents:
24
diff
changeset
|
319 |
if (conn->ev) |
99a41f48e29b
evsql transactions, it compiles...
Tero Marttila <terom@fixme.fi>
parents:
24
diff
changeset
|
320 |
event_free(conn->ev); |
99a41f48e29b
evsql transactions, it compiles...
Tero Marttila <terom@fixme.fi>
parents:
24
diff
changeset
|
321 |
|
99a41f48e29b
evsql transactions, it compiles...
Tero Marttila <terom@fixme.fi>
parents:
24
diff
changeset
|
322 |
if (conn->pg_conn) |
99a41f48e29b
evsql transactions, it compiles...
Tero Marttila <terom@fixme.fi>
parents:
24
diff
changeset
|
323 |
PQfinish(conn->pg_conn); |
99a41f48e29b
evsql transactions, it compiles...
Tero Marttila <terom@fixme.fi>
parents:
24
diff
changeset
|
324 |
|
99a41f48e29b
evsql transactions, it compiles...
Tero Marttila <terom@fixme.fi>
parents:
24
diff
changeset
|
325 |
free(conn); |
99a41f48e29b
evsql transactions, it compiles...
Tero Marttila <terom@fixme.fi>
parents:
24
diff
changeset
|
326 |
} |
99a41f48e29b
evsql transactions, it compiles...
Tero Marttila <terom@fixme.fi>
parents:
24
diff
changeset
|
327 |
|
21
e5da1d428e3e
new evsql for queueing SQL queries
Tero Marttila <terom@fixme.fi>
parents:
12
diff
changeset
|
328 |
enum evpq_state evpq_state (struct evpq_conn *conn) { |
e5da1d428e3e
new evsql for queueing SQL queries
Tero Marttila <terom@fixme.fi>
parents:
12
diff
changeset
|
329 |
return conn->state; |
e5da1d428e3e
new evsql for queueing SQL queries
Tero Marttila <terom@fixme.fi>
parents:
12
diff
changeset
|
330 |
} |
e5da1d428e3e
new evsql for queueing SQL queries
Tero Marttila <terom@fixme.fi>
parents:
12
diff
changeset
|
331 |
|
12 | 332 |
const PGconn *evpq_pgconn (struct evpq_conn *conn) { |
333 |
return conn->pg_conn; |
|
334 |
} |