112 } |
112 } |
113 |
113 |
114 /* |
114 /* |
115 * Dequeue the query, execute the callback, and free it. |
115 * Dequeue the query, execute the callback, and free it. |
116 */ |
116 */ |
117 static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *result_info) { |
117 static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) { |
118 |
|
119 // dequeue |
118 // dequeue |
120 TAILQ_REMOVE(&query->evsql->queue, query, entry); |
119 TAILQ_REMOVE(&query->evsql->queue, query, entry); |
121 |
120 |
122 if (result_info) |
121 if (res) |
123 // call the callback |
122 // call the callback |
124 query->cb_fn(*result_info, query->cb_arg); |
123 query->cb_fn(res, query->cb_arg); |
125 |
124 |
126 // free |
125 // free |
127 _evsql_query_free(query); |
126 _evsql_query_free(query); |
128 } |
127 } |
129 |
128 |
130 /* |
129 /* |
131 * A query has failed, notify the user and remove it. |
130 * A query has failed, notify the user and remove it. |
132 */ |
131 */ |
133 static void _evsql_query_failure (struct evsql *evsql, struct evsql_query *query) { |
132 static void _evsql_query_failure (struct evsql *evsql, struct evsql_query *query) { |
134 struct evsql_result_info result; ZINIT(result); |
133 struct evsql_result_info res; ZINIT(res); |
135 |
134 |
136 // set up the result_info |
135 // set up the result_info |
137 result.evsql = evsql; |
136 res.evsql = evsql; |
138 result.error = 1; |
137 res.error = 1; |
139 |
138 |
140 // finish it off |
139 // finish it off |
141 _evsql_query_done(query, &result); |
140 _evsql_query_done(query, &res); |
142 } |
141 } |
143 |
142 |
144 /* |
143 /* |
145 * Clear every enqueued query and then free the evsql. |
144 * Clear every enqueued query and then free the evsql. |
146 * |
145 * |
147 * If result_info is given, each query will also recieve it via their callback, and the error_fn will be called. |
146 * If result_info is given, each query will also recieve it via their callback, and the error_fn will be called. |
148 */ |
147 */ |
149 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *result_info) { |
148 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) { |
150 struct evsql_query *query; |
149 struct evsql_query *query; |
151 |
150 |
152 // clear the queue |
151 // clear the queue |
153 while ((query = TAILQ_FIRST(&evsql->queue)) != NULL) { |
152 while ((query = TAILQ_FIRST(&evsql->queue)) != NULL) { |
154 _evsql_query_done(query, result_info); |
153 _evsql_query_done(query, res); |
155 |
154 |
156 TAILQ_REMOVE(&evsql->queue, query, entry); |
155 TAILQ_REMOVE(&evsql->queue, query, entry); |
157 } |
156 } |
158 |
157 |
159 // do the error callback if required |
158 // do the error callback if required |
160 if (result_info) |
159 if (res) |
161 evsql->error_fn(evsql, evsql->cb_arg); |
160 evsql->error_fn(evsql, evsql->cb_arg); |
162 |
161 |
163 // free |
162 // free |
164 free(evsql); |
163 free(evsql); |
165 } |
164 } |
212 } |
211 } |
213 |
212 |
214 static void _evsql_evpq_done (struct evpq_conn *conn, void *arg) { |
213 static void _evsql_evpq_done (struct evpq_conn *conn, void *arg) { |
215 struct evsql *evsql = arg; |
214 struct evsql *evsql = arg; |
216 struct evsql_query *query; |
215 struct evsql_query *query; |
217 struct evsql_result_info result; ZINIT(result); |
216 struct evsql_result_info res; ZINIT(res); |
218 |
217 |
219 assert((query = TAILQ_FIRST(&evsql->queue)) != NULL); |
218 assert((query = TAILQ_FIRST(&evsql->queue)) != NULL); |
220 |
219 |
221 // set up the result_info |
220 // set up the result_info |
222 result.evsql = evsql; |
221 res.evsql = evsql; |
223 |
222 |
224 if (query->result.evpq == NULL) { |
223 if (query->result.evpq == NULL) { |
225 // if a query didn't return any results (bug?), warn and fail the query |
224 // if a query didn't return any results (bug?), warn and fail the query |
226 WARNING("[evsql] evpq query didn't return any results"); |
225 WARNING("[evsql] evpq query didn't return any results"); |
227 |
226 |
228 result.error = 1; |
227 res.error = 1; |
|
228 |
|
229 } else if (strcmp(PQresultErrorMessage(query->result.evpq), "") != 0) { |
|
230 // the query failed with some error |
|
231 res.error = 1; |
|
232 res.result.pq = query->result.evpq; |
229 |
233 |
230 } else { |
234 } else { |
231 result.error = 0; |
235 res.error = 0; |
232 result.result.pq = query->result.evpq; |
236 res.result.pq = query->result.evpq; |
233 |
237 |
234 } |
238 } |
235 |
239 |
236 // finish it off |
240 // finish it off |
237 _evsql_query_done(query, &result); |
241 _evsql_query_done(query, &res); |
238 |
242 |
239 // pump the next one |
243 // pump the next one |
240 _evsql_pump(evsql); |
244 _evsql_pump(evsql); |
241 } |
245 } |
242 |
246 |
357 error: |
361 error: |
358 return NULL; |
362 return NULL; |
359 } |
363 } |
360 |
364 |
361 static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_query *query, const char *command) { |
365 static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_query *query, const char *command) { |
362 int idle; |
366 int busy; |
363 |
367 |
364 // check state |
368 // check state |
365 if ((idle = _evsql_query_idle(evsql)) < 0) |
369 if ((busy = _evsql_query_busy(evsql)) < 0) |
366 ERROR("connection is not valid"); |
370 ERROR("connection is not valid"); |
367 |
371 |
368 if (idle) { |
372 if (busy) { |
|
373 // copy the command for later execution |
|
374 if ((query->command = strdup(command)) == NULL) |
|
375 ERROR("strdup"); |
|
376 |
|
377 } else { |
369 assert(TAILQ_EMPTY(&evsql->queue)); |
378 assert(TAILQ_EMPTY(&evsql->queue)); |
370 |
379 |
371 // execute directly |
380 // execute directly |
372 if (_evsql_query_exec(evsql, query, command)) |
381 if (_evsql_query_exec(evsql, query, command)) |
373 goto error; |
382 goto error; |
374 |
383 |
375 } else { |
|
376 // copy the command for later execution |
|
377 if ((query->command = strdup(command)) == NULL) |
|
378 ERROR("strdup"); |
|
379 } |
384 } |
380 |
385 |
381 // store it on the list |
386 // store it on the list |
382 TAILQ_INSERT_TAIL(&evsql->queue, query, entry); |
387 TAILQ_INSERT_TAIL(&evsql->queue, query, entry); |
383 |
388 |
406 _evsql_query_free(query); |
411 _evsql_query_free(query); |
407 |
412 |
408 return NULL; |
413 return NULL; |
409 } |
414 } |
410 |
415 |
411 struct evsql_query *evsql_query_params (struct evsql *evsql, const char *command, struct evsql_query_params params, evsql_query_cb query_fn, void *cb_arg) { |
416 struct evsql_query *evsql_query_params (struct evsql *evsql, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) { |
412 struct evsql_query *query = NULL; |
417 struct evsql_query *query = NULL; |
413 struct evsql_query_param *param; |
418 const struct evsql_query_param *param; |
414 int idx; |
419 int idx; |
415 |
420 |
416 // alloc new query |
421 // alloc new query |
417 if ((query = _evsql_query_new(evsql, query_fn, cb_arg)) == NULL) |
422 if ((query = _evsql_query_new(evsql, query_fn, cb_arg)) == NULL) |
418 goto error; |
423 goto error; |
419 |
424 |
420 // count the params |
425 // count the params |
421 for (param = params.list; param->value || param->length; param++) |
426 for (param = params->list; param->type; param++) |
422 query->params.count++; |
427 query->params.count++; |
423 |
428 |
424 // allocate the vertical storage for the parameters |
429 // allocate the vertical storage for the parameters |
425 if (0 |
430 if (0 |
426 |
431 |
430 || !(query->params.formats = calloc(query->params.count, sizeof(int))) |
435 || !(query->params.formats = calloc(query->params.count, sizeof(int))) |
431 ) |
436 ) |
432 ERROR("calloc"); |
437 ERROR("calloc"); |
433 |
438 |
434 // transform |
439 // transform |
435 for (param = params.list, idx = 0; param->value || param->length; param++, idx++) { |
440 for (param = params->list, idx = 0; param->type; param++, idx++) { |
436 // `types` stays NULL |
441 // `types` stays NULL |
437 // query->params.types[idx] = 0; |
442 // query->params.types[idx] = 0; |
438 |
443 |
439 // values |
444 // values |
440 query->params.values[idx] = param->value; |
445 query->params.values[idx] = param->data_raw; |
441 |
446 |
442 // lengths (nonzero for NULLs) |
447 // lengths |
443 query->params.lengths[idx] = param->value ? param->length : 0; |
448 query->params.lengths[idx] = param->length; |
444 |
449 |
445 // formats, binary if length is nonzero |
450 // formats, binary if length is nonzero |
446 query->params.formats[idx] = param->value && param->length; |
451 query->params.formats[idx] = param->length ? 1 : 0; |
447 } |
452 } |
448 |
453 |
449 // result format |
454 // result format |
450 query->params.result_format = params.result_binary ? 1 : 0; |
455 switch (params->result_fmt) { |
|
456 case EVSQL_FMT_TEXT: |
|
457 query->params.result_format = 0; break; |
|
458 |
|
459 case EVSQL_FMT_BINARY: |
|
460 query->params.result_format = 1; break; |
|
461 |
|
462 default: |
|
463 FATAL("params.result_fmt: %d", params->result_fmt); |
|
464 } |
451 |
465 |
452 // execute it |
466 // execute it |
453 if (_evsql_query_enqueue(evsql, query, command)) |
467 if (_evsql_query_enqueue(evsql, query, command)) |
454 goto error; |
468 goto error; |
455 |
469 |
460 _evsql_query_free(query); |
474 _evsql_query_free(query); |
461 |
475 |
462 return NULL; |
476 return NULL; |
463 } |
477 } |
464 |
478 |
|
479 int evsql_param_string (struct evsql_query_params *params, size_t param, const char *ptr) { |
|
480 struct evsql_query_param *p = ¶ms->list[param]; |
|
481 |
|
482 assert(p->type == EVSQL_PARAM_STRING); |
|
483 |
|
484 p->data_raw = ptr; |
|
485 p->length = 0; |
|
486 |
|
487 return 0; |
|
488 } |
|
489 |
|
490 int evsql_param_uint32 (struct evsql_query_params *params, size_t param, uint32_t uval) { |
|
491 struct evsql_query_param *p = ¶ms->list[param]; |
|
492 |
|
493 assert(p->type == EVSQL_PARAM_UINT32); |
|
494 |
|
495 p->data.uint32 = htonl(uval); |
|
496 p->data_raw = (const char *) &p->data.uint32; |
|
497 p->length = sizeof(uval); |
|
498 |
|
499 return 0; |
|
500 } |
|
501 |
|
502 const char *evsql_result_error (const struct evsql_result_info *res) { |
|
503 if (!res->error) |
|
504 return "No error"; |
|
505 |
|
506 switch (res->evsql->type) { |
|
507 case EVSQL_EVPQ: |
|
508 if (!res->result.pq) |
|
509 return "unknown error (no result)"; |
|
510 |
|
511 return PQresultErrorMessage(res->result.pq); |
|
512 |
|
513 default: |
|
514 FATAL("res->evsql->type"); |
|
515 } |
|
516 |
|
517 } |
|
518 |
|
519 size_t evsql_result_rows (const struct evsql_result_info *res) { |
|
520 switch (res->evsql->type) { |
|
521 case EVSQL_EVPQ: |
|
522 return PQntuples(res->result.pq); |
|
523 |
|
524 default: |
|
525 FATAL("res->evsql->type"); |
|
526 } |
|
527 } |
|
528 |
|
529 size_t evsql_result_cols (const struct evsql_result_info *res) { |
|
530 switch (res->evsql->type) { |
|
531 case EVSQL_EVPQ: |
|
532 return PQnfields(res->result.pq); |
|
533 |
|
534 default: |
|
535 FATAL("res->evsql->type"); |
|
536 } |
|
537 } |
|
538 |
|
539 int evsql_result_binary (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, size_t size, int nullok) { |
|
540 *ptr = NULL; |
|
541 |
|
542 switch (res->evsql->type) { |
|
543 case EVSQL_EVPQ: |
|
544 if (PQgetisnull(res->result.pq, row, col)) { |
|
545 if (nullok) |
|
546 return 0; |
|
547 else |
|
548 ERROR("[%zu:%zu] field is null", row, col); |
|
549 } |
|
550 |
|
551 if (PQfformat(res->result.pq, col) != 1) |
|
552 ERROR("[%zu:%zu] PQfformat is not binary: %d", row, col, PQfformat(res->result.pq, col)); |
|
553 |
|
554 if (size && PQgetlength(res->result.pq, row, col) != size) |
|
555 ERROR("[%zu:%zu] field size mismatch: %zu -> %d", row, col, size, PQgetlength(res->result.pq, row, col)); |
|
556 |
|
557 *ptr = PQgetvalue(res->result.pq, row, col); |
|
558 |
|
559 return 0; |
|
560 |
|
561 default: |
|
562 FATAL("res->evsql->type"); |
|
563 } |
|
564 |
|
565 error: |
|
566 return -1; |
|
567 } |
|
568 |
|
569 int evsql_result_string (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, int nullok) { |
|
570 return evsql_result_binary(res, row, col, ptr, 0, nullok); |
|
571 } |
|
572 |
|
573 int evsql_result_uint16 (const struct evsql_result_info *res, size_t row, size_t col, uint16_t *uval, int nullok) { |
|
574 const char *data; |
|
575 int16_t sval; |
|
576 |
|
577 if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) |
|
578 goto error; |
|
579 |
|
580 sval = ntohs(*((int16_t *) data)); |
|
581 |
|
582 if (sval < 0) |
|
583 ERROR("negative value for unsigned: %d", sval); |
|
584 |
|
585 *uval = sval; |
|
586 |
|
587 return 0; |
|
588 |
|
589 error: |
|
590 return nullok ? 0 : -1; |
|
591 } |
|
592 |
|
593 int evsql_result_uint32 (const struct evsql_result_info *res, size_t row, size_t col, uint32_t *uval, int nullok) { |
|
594 const char *data; |
|
595 int32_t sval; |
|
596 |
|
597 if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) |
|
598 goto error; |
|
599 |
|
600 sval = ntohl(*(int32_t *) data); |
|
601 |
|
602 if (sval < 0) |
|
603 ERROR("negative value for unsigned: %d", sval); |
|
604 |
|
605 *uval = sval; |
|
606 |
|
607 return 0; |
|
608 |
|
609 error: |
|
610 return nullok ? 0 : -1; |
|
611 } |
|
612 |
|
613 int evsql_result_uint64 (const struct evsql_result_info *res, size_t row, size_t col, uint64_t *uval, int nullok) { |
|
614 const char *data; |
|
615 int64_t sval; |
|
616 |
|
617 if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) |
|
618 goto error; |
|
619 |
|
620 sval = ntohq(*(int64_t *) data); |
|
621 |
|
622 if (sval < 0) |
|
623 ERROR("negative value for unsigned: %ld", sval); |
|
624 |
|
625 *uval = sval; |
|
626 |
|
627 return 0; |
|
628 |
|
629 error: |
|
630 return nullok ? 0 : -1; |
|
631 } |
|
632 |
|
633 void evsql_result_free (const struct evsql_result_info *res) { |
|
634 switch (res->evsql->type) { |
|
635 case EVSQL_EVPQ: |
|
636 return PQclear(res->result.pq); |
|
637 |
|
638 default: |
|
639 FATAL("res->evsql->type"); |
|
640 } |
|
641 } |