src/evsql.c
changeset 24 82cfdb6680d1
parent 23 1dee73ae4ad0
child 25 99a41f48e29b
equal deleted inserted replaced
23:1dee73ae4ad0 24:82cfdb6680d1
   112 }
   112 }
   113 
   113 
   114 /*
   114 /*
   115  * Dequeue the query, execute the callback, and free it.
   115  * Dequeue the query, execute the callback, and free it.
   116  */
   116  */
   117 static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *result_info) {
   117 static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) {
   118 
       
   119     // dequeue
   118     // dequeue
   120     TAILQ_REMOVE(&query->evsql->queue, query, entry);
   119     TAILQ_REMOVE(&query->evsql->queue, query, entry);
   121     
   120     
   122     if (result_info) 
   121     if (res) 
   123         // call the callback
   122         // call the callback
   124         query->cb_fn(*result_info, query->cb_arg);
   123         query->cb_fn(res, query->cb_arg);
   125     
   124     
   126     // free
   125     // free
   127     _evsql_query_free(query);
   126     _evsql_query_free(query);
   128 }
   127 }
   129 
   128 
   130 /*
   129 /*
   131  * A query has failed, notify the user and remove it.
   130  * A query has failed, notify the user and remove it.
   132  */
   131  */
   133 static void _evsql_query_failure (struct evsql *evsql, struct evsql_query *query) {
   132 static void _evsql_query_failure (struct evsql *evsql, struct evsql_query *query) {
   134     struct evsql_result_info result; ZINIT(result);
   133     struct evsql_result_info res; ZINIT(res);
   135 
   134 
   136     // set up the result_info
   135     // set up the result_info
   137     result.evsql = evsql;
   136     res.evsql = evsql;
   138     result.error = 1;
   137     res.error = 1;
   139 
   138 
   140     // finish it off
   139     // finish it off
   141     _evsql_query_done(query, &result);
   140     _evsql_query_done(query, &res);
   142 }
   141 }
   143 
   142 
   144 /*
   143 /*
   145  * Clear every enqueued query and then free the evsql.
   144  * Clear every enqueued query and then free the evsql.
   146  *
   145  *
   147  * If result_info is given, each query will also recieve it via their callback, and the error_fn will be called.
   146  * If result_info is given, each query will also recieve it via their callback, and the error_fn will be called.
   148  */
   147  */
   149 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *result_info) {
   148 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) {
   150     struct evsql_query *query;
   149     struct evsql_query *query;
   151     
   150     
   152     // clear the queue
   151     // clear the queue
   153     while ((query = TAILQ_FIRST(&evsql->queue)) != NULL) {
   152     while ((query = TAILQ_FIRST(&evsql->queue)) != NULL) {
   154         _evsql_query_done(query, result_info);
   153         _evsql_query_done(query, res);
   155         
   154         
   156         TAILQ_REMOVE(&evsql->queue, query, entry);
   155         TAILQ_REMOVE(&evsql->queue, query, entry);
   157     }
   156     }
   158     
   157     
   159     // do the error callback if required
   158     // do the error callback if required
   160     if (result_info)
   159     if (res)
   161         evsql->error_fn(evsql, evsql->cb_arg);
   160         evsql->error_fn(evsql, evsql->cb_arg);
   162     
   161     
   163     // free
   162     // free
   164     free(evsql);
   163     free(evsql);
   165 }
   164 }
   212 }
   211 }
   213 
   212 
   214 static void _evsql_evpq_done (struct evpq_conn *conn, void *arg) {
   213 static void _evsql_evpq_done (struct evpq_conn *conn, void *arg) {
   215     struct evsql *evsql = arg;
   214     struct evsql *evsql = arg;
   216     struct evsql_query *query;
   215     struct evsql_query *query;
   217     struct evsql_result_info result; ZINIT(result);
   216     struct evsql_result_info res; ZINIT(res);
   218 
   217 
   219     assert((query = TAILQ_FIRST(&evsql->queue)) != NULL);
   218     assert((query = TAILQ_FIRST(&evsql->queue)) != NULL);
   220     
   219     
   221     // set up the result_info
   220     // set up the result_info
   222     result.evsql = evsql;
   221     res.evsql = evsql;
   223     
   222     
   224     if (query->result.evpq == NULL) {
   223     if (query->result.evpq == NULL) {
   225         // if a query didn't return any results (bug?), warn and fail the query
   224         // if a query didn't return any results (bug?), warn and fail the query
   226         WARNING("[evsql] evpq query didn't return any results");
   225         WARNING("[evsql] evpq query didn't return any results");
   227 
   226 
   228         result.error = 1;
   227         res.error = 1;
       
   228     
       
   229     } else if (strcmp(PQresultErrorMessage(query->result.evpq), "") != 0) {
       
   230         // the query failed with some error
       
   231         res.error = 1;
       
   232         res.result.pq = query->result.evpq;
   229 
   233 
   230     } else {
   234     } else {
   231         result.error = 0;
   235         res.error = 0;
   232         result.result.pq = query->result.evpq;
   236         res.result.pq = query->result.evpq;
   233 
   237 
   234     }
   238     }
   235 
   239 
   236     // finish it off
   240     // finish it off
   237     _evsql_query_done(query, &result);
   241     _evsql_query_done(query, &res);
   238 
   242 
   239     // pump the next one
   243     // pump the next one
   240     _evsql_pump(evsql);
   244     _evsql_pump(evsql);
   241 }
   245 }
   242 
   246 
   309  * Returns:
   313  * Returns:
   310  *      <0      connection failure, query not possible
   314  *      <0      connection failure, query not possible
   311  *      0       connection idle, can query immediately
   315  *      0       connection idle, can query immediately
   312  *      1       connection busy, must queue query
   316  *      1       connection busy, must queue query
   313  */
   317  */
   314 static int _evsql_query_idle (struct evsql *evsql) {
   318 static int _evsql_query_busy (struct evsql *evsql) {
   315     switch (evsql->type) {
   319     switch (evsql->type) {
   316         case EVSQL_EVPQ: {
   320         case EVSQL_EVPQ: {
   317             enum evpq_state state = evpq_state(evsql->engine.evpq);
   321             enum evpq_state state = evpq_state(evsql->engine.evpq);
   318             
   322             
   319             switch (state) {
   323             switch (state) {
   357 error:
   361 error:
   358     return NULL;
   362     return NULL;
   359 }
   363 }
   360 
   364 
   361 static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_query *query, const char *command) {
   365 static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_query *query, const char *command) {
   362     int idle;
   366     int busy;
   363     
   367     
   364     // check state
   368     // check state
   365     if ((idle = _evsql_query_idle(evsql)) < 0)
   369     if ((busy = _evsql_query_busy(evsql)) < 0)
   366         ERROR("connection is not valid");
   370         ERROR("connection is not valid");
   367     
   371     
   368     if (idle) {
   372     if (busy) {
       
   373         // copy the command for later execution
       
   374         if ((query->command = strdup(command)) == NULL)
       
   375             ERROR("strdup");
       
   376 
       
   377     } else {
   369         assert(TAILQ_EMPTY(&evsql->queue));
   378         assert(TAILQ_EMPTY(&evsql->queue));
   370 
   379 
   371         // execute directly
   380         // execute directly
   372         if (_evsql_query_exec(evsql, query, command))
   381         if (_evsql_query_exec(evsql, query, command))
   373             goto error;
   382             goto error;
   374 
   383 
   375     } else {
       
   376         // copy the command for later execution
       
   377         if ((query->command = strdup(command)) == NULL)
       
   378             ERROR("strdup");
       
   379     }
   384     }
   380     
   385     
   381     // store it on the list
   386     // store it on the list
   382     TAILQ_INSERT_TAIL(&evsql->queue, query, entry);
   387     TAILQ_INSERT_TAIL(&evsql->queue, query, entry);
   383 
   388 
   406     _evsql_query_free(query);
   411     _evsql_query_free(query);
   407 
   412 
   408     return NULL;
   413     return NULL;
   409 }
   414 }
   410 
   415 
   411 struct evsql_query *evsql_query_params (struct evsql *evsql, const char *command, struct evsql_query_params params, evsql_query_cb query_fn, void *cb_arg) {
   416 struct evsql_query *evsql_query_params (struct evsql *evsql, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) {
   412     struct evsql_query *query = NULL;
   417     struct evsql_query *query = NULL;
   413     struct evsql_query_param *param;
   418     const struct evsql_query_param *param;
   414     int idx;
   419     int idx;
   415     
   420     
   416     // alloc new query
   421     // alloc new query
   417     if ((query = _evsql_query_new(evsql, query_fn, cb_arg)) == NULL)
   422     if ((query = _evsql_query_new(evsql, query_fn, cb_arg)) == NULL)
   418         goto error;
   423         goto error;
   419 
   424 
   420     // count the params
   425     // count the params
   421     for (param = params.list; param->value || param->length; param++) 
   426     for (param = params->list; param->type; param++) 
   422         query->params.count++;
   427         query->params.count++;
   423 
   428 
   424     // allocate the vertical storage for the parameters
   429     // allocate the vertical storage for the parameters
   425     if (0
   430     if (0
   426         
   431         
   430         ||  !(query->params.formats  = calloc(query->params.count, sizeof(int)))
   435         ||  !(query->params.formats  = calloc(query->params.count, sizeof(int)))
   431     )
   436     )
   432         ERROR("calloc");
   437         ERROR("calloc");
   433 
   438 
   434     // transform
   439     // transform
   435     for (param = params.list, idx = 0; param->value || param->length; param++, idx++) {
   440     for (param = params->list, idx = 0; param->type; param++, idx++) {
   436         // `types` stays NULL
   441         // `types` stays NULL
   437         // query->params.types[idx] = 0;
   442         // query->params.types[idx] = 0;
   438         
   443         
   439         // values
   444         // values
   440         query->params.values[idx] = param->value;
   445         query->params.values[idx] = param->data_raw;
   441 
   446 
   442         // lengths (nonzero for NULLs)
   447         // lengths
   443         query->params.lengths[idx] = param->value ? param->length : 0;
   448         query->params.lengths[idx] = param->length;
   444 
   449 
   445         // formats, binary if length is nonzero
   450         // formats, binary if length is nonzero
   446         query->params.formats[idx] = param->value && param->length;
   451         query->params.formats[idx] = param->length ? 1 : 0;
   447     }
   452     }
   448 
   453 
   449     // result format
   454     // result format
   450     query->params.result_format = params.result_binary ? 1 : 0;
   455     switch (params->result_fmt) {
       
   456         case EVSQL_FMT_TEXT:
       
   457             query->params.result_format = 0; break;
       
   458 
       
   459         case EVSQL_FMT_BINARY:
       
   460             query->params.result_format = 1; break;
       
   461 
       
   462         default:
       
   463             FATAL("params.result_fmt: %d", params->result_fmt);
       
   464     }
   451 
   465 
   452     // execute it
   466     // execute it
   453     if (_evsql_query_enqueue(evsql, query, command))
   467     if (_evsql_query_enqueue(evsql, query, command))
   454         goto error;
   468         goto error;
   455 
   469 
   460     _evsql_query_free(query);
   474     _evsql_query_free(query);
   461     
   475     
   462     return NULL;
   476     return NULL;
   463 }
   477 }
   464 
   478 
       
   479 int evsql_param_string (struct evsql_query_params *params, size_t param, const char *ptr) {
       
   480     struct evsql_query_param *p = &params->list[param];
       
   481     
       
   482     assert(p->type == EVSQL_PARAM_STRING);
       
   483 
       
   484     p->data_raw = ptr;
       
   485     p->length = 0;
       
   486 
       
   487     return 0;
       
   488 }
       
   489 
       
   490 int evsql_param_uint32 (struct evsql_query_params *params, size_t param, uint32_t uval) {
       
   491     struct evsql_query_param *p = &params->list[param];
       
   492     
       
   493     assert(p->type == EVSQL_PARAM_UINT32);
       
   494 
       
   495     p->data.uint32 = htonl(uval);
       
   496     p->data_raw = (const char *) &p->data.uint32;
       
   497     p->length = sizeof(uval);
       
   498 
       
   499     return 0;
       
   500 }
       
   501 
       
   502 const char *evsql_result_error (const struct evsql_result_info *res) {
       
   503     if (!res->error)
       
   504         return "No error";
       
   505 
       
   506     switch (res->evsql->type) {
       
   507         case EVSQL_EVPQ:
       
   508             if (!res->result.pq)
       
   509                 return "unknown error (no result)";
       
   510             
       
   511             return PQresultErrorMessage(res->result.pq);
       
   512 
       
   513         default:
       
   514             FATAL("res->evsql->type");
       
   515     }
       
   516 
       
   517 }
       
   518 
       
   519 size_t evsql_result_rows (const struct evsql_result_info *res) {
       
   520     switch (res->evsql->type) {
       
   521         case EVSQL_EVPQ:
       
   522             return PQntuples(res->result.pq);
       
   523 
       
   524         default:
       
   525             FATAL("res->evsql->type");
       
   526     }
       
   527 }
       
   528 
       
   529 size_t evsql_result_cols (const struct evsql_result_info *res) {
       
   530     switch (res->evsql->type) {
       
   531         case EVSQL_EVPQ:
       
   532             return PQnfields(res->result.pq);
       
   533 
       
   534         default:
       
   535             FATAL("res->evsql->type");
       
   536     }
       
   537 }
       
   538 
       
   539 int evsql_result_binary (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, size_t size, int nullok) {
       
   540     *ptr = NULL;
       
   541 
       
   542     switch (res->evsql->type) {
       
   543         case EVSQL_EVPQ:
       
   544             if (PQgetisnull(res->result.pq, row, col)) {
       
   545                 if (nullok)
       
   546                     return 0;
       
   547                 else
       
   548                     ERROR("[%zu:%zu] field is null", row, col);
       
   549             }
       
   550 
       
   551             if (PQfformat(res->result.pq, col) != 1)
       
   552                 ERROR("[%zu:%zu] PQfformat is not binary: %d", row, col, PQfformat(res->result.pq, col));
       
   553     
       
   554             if (size && PQgetlength(res->result.pq, row, col) != size)
       
   555                 ERROR("[%zu:%zu] field size mismatch: %zu -> %d", row, col, size, PQgetlength(res->result.pq, row, col));
       
   556 
       
   557             *ptr = PQgetvalue(res->result.pq, row, col);
       
   558 
       
   559             return 0;
       
   560 
       
   561         default:
       
   562             FATAL("res->evsql->type");
       
   563     }
       
   564 
       
   565 error:
       
   566     return -1;
       
   567 }
       
   568 
       
   569 int evsql_result_string (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, int nullok) {
       
   570     return evsql_result_binary(res, row, col, ptr, 0, nullok);
       
   571 }
       
   572 
       
   573 int evsql_result_uint16 (const struct evsql_result_info *res, size_t row, size_t col, uint16_t *uval, int nullok) {
       
   574     const char *data;
       
   575     int16_t sval;
       
   576 
       
   577     if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
       
   578         goto error;
       
   579 
       
   580     sval = ntohs(*((int16_t *) data));
       
   581 
       
   582     if (sval < 0)
       
   583         ERROR("negative value for unsigned: %d", sval);
       
   584 
       
   585     *uval = sval;
       
   586     
       
   587     return 0;
       
   588 
       
   589 error:
       
   590     return nullok ? 0 : -1;
       
   591 }
       
   592 
       
   593 int evsql_result_uint32 (const struct evsql_result_info *res, size_t row, size_t col, uint32_t *uval, int nullok) {
       
   594     const char *data;
       
   595     int32_t sval;
       
   596 
       
   597     if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
       
   598         goto error;
       
   599 
       
   600     sval = ntohl(*(int32_t *) data);
       
   601 
       
   602     if (sval < 0)
       
   603         ERROR("negative value for unsigned: %d", sval);
       
   604 
       
   605     *uval = sval;
       
   606     
       
   607     return 0;
       
   608 
       
   609 error:
       
   610     return nullok ? 0 : -1;
       
   611 }
       
   612 
       
   613 int evsql_result_uint64 (const struct evsql_result_info *res, size_t row, size_t col, uint64_t *uval, int nullok) {
       
   614     const char *data;
       
   615     int64_t sval;
       
   616 
       
   617     if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
       
   618         goto error;
       
   619 
       
   620     sval = ntohq(*(int64_t *) data);
       
   621 
       
   622     if (sval < 0)
       
   623         ERROR("negative value for unsigned: %ld", sval);
       
   624 
       
   625     *uval = sval;
       
   626     
       
   627     return 0;
       
   628 
       
   629 error:
       
   630     return nullok ? 0 : -1;
       
   631 }
       
   632 
       
   633 void evsql_result_free (const struct evsql_result_info *res) {
       
   634     switch (res->evsql->type) {
       
   635         case EVSQL_EVPQ:
       
   636             return PQclear(res->result.pq);
       
   637 
       
   638         default:
       
   639             FATAL("res->evsql->type");
       
   640     }
       
   641 }