librdkafka
The Apache Kafka C/C++ client library
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
rdkafka.h
Go to the documentation of this file.
1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2013 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  * this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  * this list of conditions and the following disclaimer in the documentation
14  * and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
43 /* @cond NO_DOC */
44 #pragma once
45 
46 #include <stdio.h>
47 #include <inttypes.h>
48 #include <sys/types.h>
49 
50 #ifdef __cplusplus
51 extern "C" {
52 #if 0
53 } /* Restore indent */
54 #endif
55 #endif
56 
57 #ifdef _MSC_VER
58 #include <basetsd.h>
59 #ifndef WIN32_MEAN_AND_LEAN
60 #define WIN32_MEAN_AND_LEAN
61 #endif
62 #include <Winsock2.h> /* for sockaddr, .. */
63 typedef SSIZE_T ssize_t;
64 #define RD_UNUSED
65 #define RD_INLINE __inline
66 #define RD_DEPRECATED __declspec(deprecated)
67 #undef RD_EXPORT
68 #ifdef LIBRDKAFKA_STATICLIB
69 #define RD_EXPORT
70 #else
71 #ifdef LIBRDKAFKA_EXPORTS
72 #define RD_EXPORT __declspec(dllexport)
73 #else
74 #define RD_EXPORT __declspec(dllimport)
75 #endif
76 #ifndef LIBRDKAFKA_TYPECHECKS
77 #define LIBRDKAFKA_TYPECHECKS 0
78 #endif
79 #endif
80 
81 #else
82 #include <sys/socket.h> /* for sockaddr, .. */
83 
84 #define RD_UNUSED __attribute__((unused))
85 #define RD_INLINE inline
86 #define RD_EXPORT
87 #define RD_DEPRECATED __attribute__((deprecated))
88 
89 #ifndef LIBRDKAFKA_TYPECHECKS
90 #define LIBRDKAFKA_TYPECHECKS 1
91 #endif
92 #endif
93 
94 
100 #if LIBRDKAFKA_TYPECHECKS
101 #define _LRK_TYPECHECK(RET,TYPE,ARG) \
102  ({ if (0) { TYPE __t RD_UNUSED = (ARG); } RET; })
103 
104 #define _LRK_TYPECHECK2(RET,TYPE,ARG,TYPE2,ARG2) \
105  ({ \
106  if (0) { \
107  TYPE __t RD_UNUSED = (ARG); \
108  TYPE2 __t2 RD_UNUSED = (ARG2); \
109  } \
110  RET; })
111 #else
112 #define _LRK_TYPECHECK(RET,TYPE,ARG) (RET)
113 #define _LRK_TYPECHECK2(RET,TYPE,ARG,TYPE2,ARG2) (RET)
114 #endif
115 
116 /* @endcond */
117 
118 
140 #define RD_KAFKA_VERSION 0x000b00c9
141 
150 RD_EXPORT
151 int rd_kafka_version(void);
152 
158 RD_EXPORT
159 const char *rd_kafka_version_str (void);
160 
179 typedef enum rd_kafka_type_t {
183 
184 
195 
196 
197 
204 RD_EXPORT
205 const char *rd_kafka_get_debug_contexts(void);
206 
214 #define RD_KAFKA_DEBUG_CONTEXTS \
215  "all,generic,broker,topic,metadata,queue,msg,protocol,cgrp,security,fetch,feature"
216 
217 
218 /* @cond NO_DOC */
219 /* Private types to provide ABI compatibility */
220 typedef struct rd_kafka_s rd_kafka_t;
221 typedef struct rd_kafka_topic_s rd_kafka_topic_t;
222 typedef struct rd_kafka_conf_s rd_kafka_conf_t;
223 typedef struct rd_kafka_topic_conf_s rd_kafka_topic_conf_t;
224 typedef struct rd_kafka_queue_s rd_kafka_queue_t;
225 /* @endcond */
226 
227 
240 typedef enum {
241  /* Internal errors to rdkafka: */
327 
330 
331  /* Kafka broker errors: */
451 
452  RD_KAFKA_RESP_ERR_END_ALL,
454 
455 
463  const char *name;
464  const char *desc;
465 };
466 
467 
471 RD_EXPORT
472 void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs,
473  size_t *cntp);
474 
475 
476 
477 
483 RD_EXPORT
484 const char *rd_kafka_err2str (rd_kafka_resp_err_t err);
485 
486 
487 
493 RD_EXPORT
494 const char *rd_kafka_err2name (rd_kafka_resp_err_t err);
495 
496 
522 RD_EXPORT
524 
525 
550 RD_EXPORT RD_DEPRECATED
552 
553 
566 RD_EXPORT RD_DEPRECATED
567 int rd_kafka_errno (void);
568 
569 
570 
586 typedef struct rd_kafka_topic_partition_s {
587  char *topic;
588  int32_t partition;
589  int64_t offset;
590  void *metadata;
591  size_t metadata_size;
592  void *opaque;
594  void *_private;
597 
598 
603 RD_EXPORT
605 
606 
611 typedef struct rd_kafka_topic_partition_list_s {
612  int cnt;
613  int size;
616 
617 
632 RD_EXPORT
634 
635 
639 RD_EXPORT
640 void
642 
652 RD_EXPORT
655  const char *topic, int32_t partition);
656 
657 
666 RD_EXPORT
667 void
669  *rktparlist,
670  const char *topic,
671  int32_t start, int32_t stop);
672 
673 
674 
686 RD_EXPORT
687 int
689  const char *topic, int32_t partition);
690 
691 
699 RD_EXPORT
700 int
703  int idx);
704 
705 
713 RD_EXPORT
716 
717 
718 
719 
727 RD_EXPORT
730  const char *topic, int32_t partition, int64_t offset);
731 
732 
733 
739 RD_EXPORT
742  const char *topic, int32_t partition);
743 
744 
752 RD_EXPORT void
754  int (*cmp) (const void *a, const void *b,
755  void *opaque),
756  void *opaque);
757 
758 
776 typedef enum rd_kafka_vtype_t {
787 
788 
797 #define RD_KAFKA_V_END RD_KAFKA_VTYPE_END
798 
802 #define RD_KAFKA_V_TOPIC(topic) \
803  _LRK_TYPECHECK(RD_KAFKA_VTYPE_TOPIC, const char *, topic), \
804  (const char *)topic
805 
808 #define RD_KAFKA_V_RKT(rkt) \
809  _LRK_TYPECHECK(RD_KAFKA_VTYPE_RKT, rd_kafka_topic_t *, rkt), \
810  (rd_kafka_topic_t *)rkt
811 
814 #define RD_KAFKA_V_PARTITION(partition) \
815  _LRK_TYPECHECK(RD_KAFKA_VTYPE_PARTITION, int32_t, partition), \
816  (int32_t)partition
817 
820 #define RD_KAFKA_V_VALUE(VALUE,LEN) \
821  _LRK_TYPECHECK2(RD_KAFKA_VTYPE_VALUE, void *, VALUE, size_t, LEN), \
822  (void *)VALUE, (size_t)LEN
823 
826 #define RD_KAFKA_V_KEY(KEY,LEN) \
827  _LRK_TYPECHECK2(RD_KAFKA_VTYPE_KEY, const void *, KEY, size_t, LEN), \
828  (void *)KEY, (size_t)LEN
829 
832 #define RD_KAFKA_V_OPAQUE(opaque) \
833  _LRK_TYPECHECK(RD_KAFKA_VTYPE_OPAQUE, void *, opaque), \
834  (void *)opaque
835 
839 #define RD_KAFKA_V_MSGFLAGS(msgflags) \
840  _LRK_TYPECHECK(RD_KAFKA_VTYPE_MSGFLAGS, int, msgflags), \
841  (int)msgflags
842 
845 #define RD_KAFKA_V_TIMESTAMP(timestamp) \
846  _LRK_TYPECHECK(RD_KAFKA_VTYPE_TIMESTAMP, int64_t, timestamp), \
847  (int64_t)timestamp
848 
860 // FIXME: This doesn't show up in docs for some reason
861 // "Compound rd_kafka_message_t is not documented."
862 
876 typedef struct rd_kafka_message_s {
878  rd_kafka_topic_t *rkt;
879  int32_t partition;
880  void *payload;
884  size_t len;
887  void *key;
889  size_t key_len;
891  int64_t offset;
901  void *_private;
906 
907 
911 RD_EXPORT
913 
914 
915 
916 
923 static RD_INLINE const char *
924 RD_UNUSED
926  if (!rkmessage->err)
927  return NULL;
928 
929  if (rkmessage->payload)
930  return (const char *)rkmessage->payload;
931 
932  return rd_kafka_err2str(rkmessage->err);
933 }
934 
935 
936 
948 RD_EXPORT
949 int64_t rd_kafka_message_timestamp (const rd_kafka_message_t *rkmessage,
950  rd_kafka_timestamp_type_t *tstype);
951 
952 
953 
960 RD_EXPORT
961 int64_t rd_kafka_message_latency (const rd_kafka_message_t *rkmessage);
962 
963 
979 typedef enum {
984 
985 
1016 RD_EXPORT
1017 rd_kafka_conf_t *rd_kafka_conf_new(void);
1018 
1019 
1023 RD_EXPORT
1024 void rd_kafka_conf_destroy(rd_kafka_conf_t *conf);
1025 
1026 
1033 RD_EXPORT
1034 rd_kafka_conf_t *rd_kafka_conf_dup(const rd_kafka_conf_t *conf);
1035 
1036 
1041 RD_EXPORT
1042 rd_kafka_conf_t *rd_kafka_conf_dup_filter (const rd_kafka_conf_t *conf,
1043  size_t filter_cnt,
1044  const char **filter);
1045 
1046 
1047 
1064 RD_EXPORT
1065 rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf,
1066  const char *name,
1067  const char *value,
1068  char *errstr, size_t errstr_size);
1069 
1070 
1076 RD_EXPORT
1077 void rd_kafka_conf_set_events(rd_kafka_conf_t *conf, int events);
1078 
1079 
1083 RD_EXPORT
1084 void rd_kafka_conf_set_dr_cb(rd_kafka_conf_t *conf,
1085  void (*dr_cb) (rd_kafka_t *rk,
1086  void *payload, size_t len,
1087  rd_kafka_resp_err_t err,
1088  void *opaque, void *msg_opaque));
1089 
1104 RD_EXPORT
1105 void rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t *conf,
1106  void (*dr_msg_cb) (rd_kafka_t *rk,
1107  const rd_kafka_message_t *
1108  rkmessage,
1109  void *opaque));
1110 
1111 
1116 RD_EXPORT
1117 void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf,
1118  void (*consume_cb) (rd_kafka_message_t *
1119  rkmessage,
1120  void *opaque));
1121 
1181 RD_EXPORT
1183  rd_kafka_conf_t *conf,
1184  void (*rebalance_cb) (rd_kafka_t *rk,
1185  rd_kafka_resp_err_t err,
1186  rd_kafka_topic_partition_list_t *partitions,
1187  void *opaque));
1188 
1189 
1190 
1205 RD_EXPORT
1207  rd_kafka_conf_t *conf,
1208  void (*offset_commit_cb) (rd_kafka_t *rk,
1209  rd_kafka_resp_err_t err,
1211  void *opaque));
1212 
1213 
1222 RD_EXPORT
1223 void rd_kafka_conf_set_error_cb(rd_kafka_conf_t *conf,
1224  void (*error_cb) (rd_kafka_t *rk, int err,
1225  const char *reason,
1226  void *opaque));
1227 
1242 RD_EXPORT
1243 void rd_kafka_conf_set_throttle_cb (rd_kafka_conf_t *conf,
1244  void (*throttle_cb) (
1245  rd_kafka_t *rk,
1246  const char *broker_name,
1247  int32_t broker_id,
1248  int throttle_time_ms,
1249  void *opaque));
1250 
1251 
1268 RD_EXPORT
1269 void rd_kafka_conf_set_log_cb(rd_kafka_conf_t *conf,
1270  void (*log_cb) (const rd_kafka_t *rk, int level,
1271  const char *fac, const char *buf));
1272 
1273 
1290 RD_EXPORT
1291 void rd_kafka_conf_set_stats_cb(rd_kafka_conf_t *conf,
1292  int (*stats_cb) (rd_kafka_t *rk,
1293  char *json,
1294  size_t json_len,
1295  void *opaque));
1296 
1297 
1298 
1313 RD_EXPORT
1314 void rd_kafka_conf_set_socket_cb(rd_kafka_conf_t *conf,
1315  int (*socket_cb) (int domain, int type,
1316  int protocol,
1317  void *opaque));
1318 
1319 
1320 
1333 RD_EXPORT void
1334 rd_kafka_conf_set_connect_cb (rd_kafka_conf_t *conf,
1335  int (*connect_cb) (int sockfd,
1336  const struct sockaddr *addr,
1337  int addrlen,
1338  const char *id,
1339  void *opaque));
1340 
1348 RD_EXPORT void
1349 rd_kafka_conf_set_closesocket_cb (rd_kafka_conf_t *conf,
1350  int (*closesocket_cb) (int sockfd,
1351  void *opaque));
1352 
1353 
1354 
1355 #ifndef _MSC_VER
1356 
1370 RD_EXPORT
1371 void rd_kafka_conf_set_open_cb (rd_kafka_conf_t *conf,
1372  int (*open_cb) (const char *pathname,
1373  int flags, mode_t mode,
1374  void *opaque));
1375 #endif
1376 
1380 RD_EXPORT
1381 void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque);
1382 
1386 RD_EXPORT
1387 void *rd_kafka_opaque(const rd_kafka_t *rk);
1388 
1389 
1390 
1396 RD_EXPORT
1397 void rd_kafka_conf_set_default_topic_conf (rd_kafka_conf_t *conf,
1398  rd_kafka_topic_conf_t *tconf);
1399 
1400 
1401 
1421 RD_EXPORT
1422 rd_kafka_conf_res_t rd_kafka_conf_get (const rd_kafka_conf_t *conf,
1423  const char *name,
1424  char *dest, size_t *dest_size);
1425 
1426 
1432 RD_EXPORT
1433 rd_kafka_conf_res_t rd_kafka_topic_conf_get (const rd_kafka_topic_conf_t *conf,
1434  const char *name,
1435  char *dest, size_t *dest_size);
1436 
1437 
1446 RD_EXPORT
1447 const char **rd_kafka_conf_dump(rd_kafka_conf_t *conf, size_t *cntp);
1448 
1449 
1458 RD_EXPORT
1459 const char **rd_kafka_topic_conf_dump(rd_kafka_topic_conf_t *conf,
1460  size_t *cntp);
1461 
1466 RD_EXPORT
1467 void rd_kafka_conf_dump_free(const char **arr, size_t cnt);
1468 
1473 RD_EXPORT
1474 void rd_kafka_conf_properties_show(FILE *fp);
1475 
1493 RD_EXPORT
1494 rd_kafka_topic_conf_t *rd_kafka_topic_conf_new(void);
1495 
1496 
1500 RD_EXPORT
1501 rd_kafka_topic_conf_t *rd_kafka_topic_conf_dup(const rd_kafka_topic_conf_t
1502  *conf);
1503 
1504 
1508 RD_EXPORT
1509 void rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t *topic_conf);
1510 
1511 
1520 RD_EXPORT
1521 rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf,
1522  const char *name,
1523  const char *value,
1524  char *errstr, size_t errstr_size);
1525 
1530 RD_EXPORT
1531 void rd_kafka_topic_conf_set_opaque(rd_kafka_topic_conf_t *conf, void *opaque);
1532 
1533 
1548 RD_EXPORT
1549 void
1550 rd_kafka_topic_conf_set_partitioner_cb (rd_kafka_topic_conf_t *topic_conf,
1551  int32_t (*partitioner) (
1552  const rd_kafka_topic_t *rkt,
1553  const void *keydata,
1554  size_t keylen,
1555  int32_t partition_cnt,
1556  void *rkt_opaque,
1557  void *msg_opaque));
1558 
1566 RD_EXPORT
1567 int rd_kafka_topic_partition_available(const rd_kafka_topic_t *rkt,
1568  int32_t partition);
1569 
1570 
1571 /*******************************************************************
1572  * *
1573  * Partitioners provided by rdkafka *
1574  * *
1575  *******************************************************************/
1576 
1585 RD_EXPORT
1586 int32_t rd_kafka_msg_partitioner_random(const rd_kafka_topic_t *rkt,
1587  const void *key, size_t keylen,
1588  int32_t partition_cnt,
1589  void *opaque, void *msg_opaque);
1590 
1599 RD_EXPORT
1600 int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
1601  const void *key, size_t keylen,
1602  int32_t partition_cnt,
1603  void *opaque, void *msg_opaque);
1604 
1615 RD_EXPORT
1616 int32_t rd_kafka_msg_partitioner_consistent_random (const rd_kafka_topic_t *rkt,
1617  const void *key, size_t keylen,
1618  int32_t partition_cnt,
1619  void *opaque, void *msg_opaque);
1620 
1621 
1662 RD_EXPORT
1663 rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf,
1664  char *errstr, size_t errstr_size);
1665 
1666 
1672 RD_EXPORT
1673 void rd_kafka_destroy(rd_kafka_t *rk);
1674 
1675 
1676 
1680 RD_EXPORT
1681 const char *rd_kafka_name(const rd_kafka_t *rk);
1682 
1683 
1687 RD_EXPORT
1688 rd_kafka_type_t rd_kafka_type(const rd_kafka_t *rk);
1689 
1690 
1701 RD_EXPORT
1702 char *rd_kafka_memberid (const rd_kafka_t *rk);
1703 
1704 
1705 
1723 RD_EXPORT
1724 char *rd_kafka_clusterid (rd_kafka_t *rk, int timeout_ms);
1725 
1726 
1748 RD_EXPORT
1749 rd_kafka_topic_t *rd_kafka_topic_new(rd_kafka_t *rk, const char *topic,
1750  rd_kafka_topic_conf_t *conf);
1751 
1752 
1753 
1762 RD_EXPORT
1763 void rd_kafka_topic_destroy(rd_kafka_topic_t *rkt);
1764 
1765 
1769 RD_EXPORT
1770 const char *rd_kafka_topic_name(const rd_kafka_topic_t *rkt);
1771 
1772 
1776 RD_EXPORT
1777 void *rd_kafka_topic_opaque (const rd_kafka_topic_t *rkt);
1778 
1779 
1786 #define RD_KAFKA_PARTITION_UA ((int32_t)-1)
1787 
1788 
1810 RD_EXPORT
1811 int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms);
1812 
1813 
1824 RD_EXPORT
1825 void rd_kafka_yield (rd_kafka_t *rk);
1826 
1827 
1828 
1829 
1837 RD_EXPORT rd_kafka_resp_err_t
1838 rd_kafka_pause_partitions (rd_kafka_t *rk,
1839  rd_kafka_topic_partition_list_t *partitions);
1840 
1841 
1842 
1850 RD_EXPORT rd_kafka_resp_err_t
1851 rd_kafka_resume_partitions (rd_kafka_t *rk,
1852  rd_kafka_topic_partition_list_t *partitions);
1853 
1854 
1855 
1856 
1865 RD_EXPORT rd_kafka_resp_err_t
1866 rd_kafka_query_watermark_offsets (rd_kafka_t *rk,
1867  const char *topic, int32_t partition,
1868  int64_t *low, int64_t *high, int timeout_ms);
1869 
1870 
1887 RD_EXPORT rd_kafka_resp_err_t
1888 rd_kafka_get_watermark_offsets (rd_kafka_t *rk,
1889  const char *topic, int32_t partition,
1890  int64_t *low, int64_t *high);
1891 
1892 
1893 
1912 RD_EXPORT rd_kafka_resp_err_t
1913 rd_kafka_offsets_for_times (rd_kafka_t *rk,
1915  int timeout_ms);
1916 
1917 
1931 RD_EXPORT
1932 void rd_kafka_mem_free (rd_kafka_t *rk, void *ptr);
1933 
1934 
1958 RD_EXPORT
1959 rd_kafka_queue_t *rd_kafka_queue_new(rd_kafka_t *rk);
1960 
1964 RD_EXPORT
1965 void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu);
1966 
1967 
1974 RD_EXPORT
1975 rd_kafka_queue_t *rd_kafka_queue_get_main (rd_kafka_t *rk);
1976 
1977 
1987 RD_EXPORT
1988 rd_kafka_queue_t *rd_kafka_queue_get_consumer (rd_kafka_t *rk);
1989 
2000 RD_EXPORT
2001 rd_kafka_queue_t *rd_kafka_queue_get_partition (rd_kafka_t *rk,
2002  const char *topic,
2003  int32_t partition);
2004 
2015 RD_EXPORT
2016 void rd_kafka_queue_forward (rd_kafka_queue_t *src, rd_kafka_queue_t *dst);
2017 
2034 RD_EXPORT
2036  rd_kafka_queue_t *rkqu);
2037 
2038 
2042 RD_EXPORT
2043 size_t rd_kafka_queue_length (rd_kafka_queue_t *rkqu);
2044 
2045 
2061 RD_EXPORT
2062 void rd_kafka_queue_io_event_enable (rd_kafka_queue_t *rkqu, int fd,
2063  const void *payload, size_t size);
2064 
2075 #define RD_KAFKA_OFFSET_BEGINNING -2
2077 #define RD_KAFKA_OFFSET_END -1
2079 #define RD_KAFKA_OFFSET_STORED -1000
2081 #define RD_KAFKA_OFFSET_INVALID -1001
2085 #define RD_KAFKA_OFFSET_TAIL_BASE -2000 /* internal: do not use */
2086 
2093 #define RD_KAFKA_OFFSET_TAIL(CNT) (RD_KAFKA_OFFSET_TAIL_BASE - (CNT))
2094 
2128 RD_EXPORT
2129 int rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition,
2130  int64_t offset);
2131 
2146 RD_EXPORT
2147 int rd_kafka_consume_start_queue(rd_kafka_topic_t *rkt, int32_t partition,
2148  int64_t offset, rd_kafka_queue_t *rkqu);
2149 
2163 RD_EXPORT
2164 int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition);
2165 
2166 
2167 
2182 RD_EXPORT
2183 rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *rkt,
2184  int32_t partition,
2185  int64_t offset,
2186  int timeout_ms);
2187 
2188 
2213 RD_EXPORT
2214 rd_kafka_message_t *rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition,
2215  int timeout_ms);
2216 
2217 
2218 
2244 RD_EXPORT
2245 ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt, int32_t partition,
2246  int timeout_ms,
2247  rd_kafka_message_t **rkmessages,
2248  size_t rkmessages_size);
2249 
2250 
2251 
2275 RD_EXPORT
2276 int rd_kafka_consume_callback(rd_kafka_topic_t *rkt, int32_t partition,
2277  int timeout_ms,
2278  void (*consume_cb) (rd_kafka_message_t
2279  *rkmessage,
2280  void *opaque),
2281  void *opaque);
2282 
2283 
2300 RD_EXPORT
2301 rd_kafka_message_t *rd_kafka_consume_queue(rd_kafka_queue_t *rkqu,
2302  int timeout_ms);
2303 
2309 RD_EXPORT
2310 ssize_t rd_kafka_consume_batch_queue(rd_kafka_queue_t *rkqu,
2311  int timeout_ms,
2312  rd_kafka_message_t **rkmessages,
2313  size_t rkmessages_size);
2314 
2320 RD_EXPORT
2321 int rd_kafka_consume_callback_queue(rd_kafka_queue_t *rkqu,
2322  int timeout_ms,
2323  void (*consume_cb) (rd_kafka_message_t
2324  *rkmessage,
2325  void *opaque),
2326  void *opaque);
2327 
2328 
2354 RD_EXPORT
2355 rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt,
2356  int32_t partition, int64_t offset);
2357 
2358 
2373 RD_EXPORT rd_kafka_resp_err_t
2374 rd_kafka_offsets_store(rd_kafka_t *rk,
2402 RD_EXPORT rd_kafka_resp_err_t
2403 rd_kafka_subscribe (rd_kafka_t *rk,
2404  const rd_kafka_topic_partition_list_t *topics);
2405 
2406 
2410 RD_EXPORT
2411 rd_kafka_resp_err_t rd_kafka_unsubscribe (rd_kafka_t *rk);
2412 
2413 
2423 RD_EXPORT rd_kafka_resp_err_t
2424 rd_kafka_subscription (rd_kafka_t *rk,
2426 
2427 
2428 
2450 RD_EXPORT
2451 rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk, int timeout_ms);
2452 
2468 RD_EXPORT
2470 
2471 
2472 
2486 RD_EXPORT rd_kafka_resp_err_t
2487 rd_kafka_assign (rd_kafka_t *rk,
2488  const rd_kafka_topic_partition_list_t *partitions);
2489 
2499 RD_EXPORT rd_kafka_resp_err_t
2500 rd_kafka_assignment (rd_kafka_t *rk,
2501  rd_kafka_topic_partition_list_t **partitions);
2502 
2503 
2504 
2505 
2520 RD_EXPORT rd_kafka_resp_err_t
2521 rd_kafka_commit (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets,
2522  int async);
2523 
2524 
2530 RD_EXPORT rd_kafka_resp_err_t
2531 rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
2532  int async);
2533 
2534 
2556 RD_EXPORT rd_kafka_resp_err_t
2557 rd_kafka_commit_queue (rd_kafka_t *rk,
2558  const rd_kafka_topic_partition_list_t *offsets,
2559  rd_kafka_queue_t *rkqu,
2560  void (*cb) (rd_kafka_t *rk,
2561  rd_kafka_resp_err_t err,
2563  void *opaque),
2564  void *opaque);
2565 
2566 
2579 RD_EXPORT rd_kafka_resp_err_t
2580 rd_kafka_committed (rd_kafka_t *rk,
2581  rd_kafka_topic_partition_list_t *partitions,
2582  int timeout_ms);
2583 
2584 
2585 
2598 RD_EXPORT rd_kafka_resp_err_t
2599 rd_kafka_position (rd_kafka_t *rk,
2600  rd_kafka_topic_partition_list_t *partitions);
2601 
2602 
2618 #define RD_KAFKA_MSG_F_FREE 0x1
2619 #define RD_KAFKA_MSG_F_COPY 0x2
2620 #define RD_KAFKA_MSG_F_BLOCK 0x4
2700 RD_EXPORT
2701 int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition,
2702  int msgflags,
2703  void *payload, size_t len,
2704  const void *key, size_t keylen,
2705  void *msg_opaque);
2706 
2707 
2718 RD_EXPORT
2719 rd_kafka_resp_err_t rd_kafka_producev (rd_kafka_t *rk, ...);
2720 
2721 
2743 RD_EXPORT
2744 int rd_kafka_produce_batch(rd_kafka_topic_t *rkt, int32_t partition,
2745  int msgflags,
2746  rd_kafka_message_t *rkmessages, int message_cnt);
2747 
2748 
2749 
2750 
2762 RD_EXPORT
2763 rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms);
2764 
2765 
2780 typedef struct rd_kafka_metadata_broker {
2781  int32_t id;
2782  char *host;
2783  int port;
2785 
2789 typedef struct rd_kafka_metadata_partition {
2790  int32_t id;
2791  rd_kafka_resp_err_t err;
2792  int32_t leader;
2793  int replica_cnt;
2794  int32_t *replicas;
2795  int isr_cnt;
2796  int32_t *isrs;
2798 
2802 typedef struct rd_kafka_metadata_topic {
2803  char *topic;
2804  int partition_cnt;
2805  struct rd_kafka_metadata_partition *partitions;
2813 typedef struct rd_kafka_metadata {
2814  int broker_cnt;
2815  struct rd_kafka_metadata_broker *brokers;
2817  int topic_cnt;
2818  struct rd_kafka_metadata_topic *topics;
2820  int32_t orig_broker_id;
2821  char *orig_broker_name;
2823 
2824 
2841 RD_EXPORT
2843 rd_kafka_metadata (rd_kafka_t *rk, int all_topics,
2844  rd_kafka_topic_t *only_rkt,
2845  const struct rd_kafka_metadata **metadatap,
2846  int timeout_ms);
2847 
2851 RD_EXPORT
2852 void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata);
2853 
2854 
2875  char *member_id;
2876  char *client_id;
2877  char *client_host;
2878  void *member_metadata;
2880  int member_metadata_size;
2881  void *member_assignment;
2884 };
2885 
2890  struct rd_kafka_metadata_broker broker;
2891  char *group;
2893  char *state;
2895  char *protocol;
2898 };
2899 
2908 };
2927 RD_EXPORT
2929 rd_kafka_list_groups (rd_kafka_t *rk, const char *group,
2930  const struct rd_kafka_group_list **grplistp,
2931  int timeout_ms);
2932 
2936 RD_EXPORT
2937 void rd_kafka_group_list_destroy (const struct rd_kafka_group_list *grplist);
2938 
2939 
2980 RD_EXPORT
2981 int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist);
2982 
2983 
2984 
2985 
2998 RD_EXPORT RD_DEPRECATED
2999 void rd_kafka_set_logger(rd_kafka_t *rk,
3000  void (*func) (const rd_kafka_t *rk, int level,
3001  const char *fac, const char *buf));
3002 
3003 
3011 RD_EXPORT
3012 void rd_kafka_set_log_level(rd_kafka_t *rk, int level);
3013 
3014 
3018 RD_EXPORT
3019 void rd_kafka_log_print(const rd_kafka_t *rk, int level,
3020  const char *fac, const char *buf);
3021 
3022 
3026 RD_EXPORT
3027 void rd_kafka_log_syslog(const rd_kafka_t *rk, int level,
3028  const char *fac, const char *buf);
3029 
3030 
3043 RD_EXPORT
3044 int rd_kafka_outq_len(rd_kafka_t *rk);
3045 
3046 
3047 
3054 RD_EXPORT
3055 void rd_kafka_dump(FILE *fp, rd_kafka_t *rk);
3056 
3057 
3058 
3064 RD_EXPORT
3065 int rd_kafka_thread_cnt(void);
3066 
3067 
3076 RD_EXPORT
3077 int rd_kafka_wait_destroyed(int timeout_ms);
3078 
3079 
3085 RD_EXPORT
3086 int rd_kafka_unittest (void);
3087 
3088 
3106 RD_EXPORT
3108 
3109 
3125 typedef int rd_kafka_event_type_t;
3126 #define RD_KAFKA_EVENT_NONE 0x0
3127 #define RD_KAFKA_EVENT_DR 0x1
3128 #define RD_KAFKA_EVENT_FETCH 0x2
3129 #define RD_KAFKA_EVENT_LOG 0x4
3130 #define RD_KAFKA_EVENT_ERROR 0x8
3131 #define RD_KAFKA_EVENT_REBALANCE 0x10
3132 #define RD_KAFKA_EVENT_OFFSET_COMMIT 0x20
3133 #define RD_KAFKA_EVENT_STATS 0x40
3136 typedef struct rd_kafka_op_s rd_kafka_event_t;
3137 
3138 
3145 RD_EXPORT
3146 rd_kafka_event_type_t rd_kafka_event_type (const rd_kafka_event_t *rkev);
3154 RD_EXPORT
3155 const char *rd_kafka_event_name (const rd_kafka_event_t *rkev);
3156 
3157 
3167 RD_EXPORT
3168 void rd_kafka_event_destroy (rd_kafka_event_t *rkev);
3169 
3170 
3186 RD_EXPORT
3187 const rd_kafka_message_t *rd_kafka_event_message_next (rd_kafka_event_t *rkev);
3188 
3189 
3203 RD_EXPORT
3204 size_t rd_kafka_event_message_array (rd_kafka_event_t *rkev,
3205  const rd_kafka_message_t **rkmessages,
3206  size_t size);
3207 
3208 
3216 RD_EXPORT
3217 size_t rd_kafka_event_message_count (rd_kafka_event_t *rkev);
3218 
3219 
3226 RD_EXPORT
3227 rd_kafka_resp_err_t rd_kafka_event_error (rd_kafka_event_t *rkev);
3228 
3229 
3238 RD_EXPORT
3239 const char *rd_kafka_event_error_string (rd_kafka_event_t *rkev);
3240 
3241 
3242 
3249 RD_EXPORT
3250 void *rd_kafka_event_opaque (rd_kafka_event_t *rkev);
3251 
3252 
3261 RD_EXPORT
3262 int rd_kafka_event_log (rd_kafka_event_t *rkev,
3263  const char **fac, const char **str, int *level);
3264 
3265 
3277 RD_EXPORT
3278 const char *rd_kafka_event_stats (rd_kafka_event_t *rkev);
3279 
3280 
3291 rd_kafka_event_topic_partition_list (rd_kafka_event_t *rkev);
3292 
3293 
3303 RD_EXPORT rd_kafka_topic_partition_t *
3304 rd_kafka_event_topic_partition (rd_kafka_event_t *rkev);
3305 
3306 
3314 RD_EXPORT
3315 rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms);
3316 
3325 RD_EXPORT
3326 int rd_kafka_queue_poll_callback (rd_kafka_queue_t *rkqu, int timeout_ms);
3327 
3328 
3370 typedef rd_kafka_resp_err_t
3371 (rd_kafka_plugin_f_conf_init_t) (rd_kafka_conf_t *conf,
3372  void **plug_opaquep,
3373  char *errstr, size_t errstr_size);
3374 
3454 typedef rd_kafka_conf_res_t
3455 (rd_kafka_interceptor_f_on_conf_set_t) (rd_kafka_conf_t *conf,
3456  const char *name, const char *val,
3457  char *errstr, size_t errstr_size,
3458  void *ic_opaque);
3459 
3460 
3477 typedef rd_kafka_resp_err_t
3478 (rd_kafka_interceptor_f_on_conf_dup_t) (rd_kafka_conf_t *new_conf,
3479  const rd_kafka_conf_t *old_conf,
3480  size_t filter_cnt,
3481  const char **filter,
3482  void *ic_opaque);
3483 
3484 
3491 typedef rd_kafka_resp_err_t
3493 
3494 
3512 typedef rd_kafka_resp_err_t
3513 (rd_kafka_interceptor_f_on_new_t) (rd_kafka_t *rk, const rd_kafka_conf_t *conf,
3514  void *ic_opaque,
3515  char *errstr, size_t errstr_size);
3516 
3517 
3525 typedef rd_kafka_resp_err_t
3526 (rd_kafka_interceptor_f_on_destroy_t) (rd_kafka_t *rk, void *ic_opaque);
3528 
3529 
3530 
3551 typedef rd_kafka_resp_err_t
3552 (rd_kafka_interceptor_f_on_send_t) (rd_kafka_t *rk,
3553  rd_kafka_message_t *rkmessage,
3554  void *ic_opaque);
3555 
3578 typedef rd_kafka_resp_err_t
3580  rd_kafka_message_t *rkmessage,
3581  void *ic_opaque);
3582 
3583 
3600 typedef rd_kafka_resp_err_t
3601 (rd_kafka_interceptor_f_on_consume_t) (rd_kafka_t *rk,
3602  rd_kafka_message_t *rkmessage,
3603  void *ic_opaque);
3604 
3625 typedef rd_kafka_resp_err_t
3627  rd_kafka_t *rk,
3628  const rd_kafka_topic_partition_list_t *offsets,
3629  rd_kafka_resp_err_t err, void *ic_opaque);
3630 
3631 
3632 
3645 RD_EXPORT rd_kafka_resp_err_t
3647  rd_kafka_conf_t *conf, const char *ic_name,
3649  void *ic_opaque);
3650 
3651 
3664 RD_EXPORT rd_kafka_resp_err_t
3666  rd_kafka_conf_t *conf, const char *ic_name,
3668  void *ic_opaque);
3669 
3683 RD_EXPORT rd_kafka_resp_err_t
3685  rd_kafka_conf_t *conf, const char *ic_name,
3687  void *ic_opaque);
3688 
3689 
3711 RD_EXPORT rd_kafka_resp_err_t
3713  rd_kafka_conf_t *conf, const char *ic_name,
3715  void *ic_opaque);
3716 
3717 
3718 
3731 RD_EXPORT rd_kafka_resp_err_t
3733  rd_kafka_t *rk, const char *ic_name,
3735  void *ic_opaque);
3736 
3737 
3750 RD_EXPORT rd_kafka_resp_err_t
3752  rd_kafka_t *rk, const char *ic_name,
3754  void *ic_opaque);
3755 
3768 RD_EXPORT rd_kafka_resp_err_t
3770  rd_kafka_t *rk, const char *ic_name,
3771  rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement,
3772  void *ic_opaque);
3773 
3774 
3787 RD_EXPORT rd_kafka_resp_err_t
3789  rd_kafka_t *rk, const char *ic_name,
3791  void *ic_opaque);
3792 
3793 
3806 RD_EXPORT rd_kafka_resp_err_t
3808  rd_kafka_t *rk, const char *ic_name,
3810  void *ic_opaque);
3811 
3812 
3813 
3814 
3818 #ifdef __cplusplus
3819 }
3820 #endif
void * _private
Definition: rdkafka.h:594
rd_kafka_resp_err_t
Error codes.
Definition: rdkafka.h:240
rd_kafka_resp_err_t( rd_kafka_interceptor_f_on_send_t)(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque)
on_send() is called from rd_kafka_produce*() (et.al) prior to the partitioner being called...
Definition: rdkafka.h:3566
rd_kafka_topic_t * rkt
Definition: rdkafka.h:878
RD_EXPORT rd_kafka_resp_err_t rd_kafka_conf_interceptor_add_on_conf_dup(rd_kafka_conf_t *conf, const char *ic_name, rd_kafka_interceptor_f_on_conf_dup_t *on_conf_dup, void *ic_opaque)
Append an on_conf_dup() interceptor.
Definition: rdkafka.h:335
RD_EXPORT int rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset)
Start consuming messages for topic rkt and partition at offset offset which may either be an absolute...
Definition: rdkafka.h:302
rd_kafka_resp_err_t err
Definition: rdkafka.h:877
rd_kafka_conf_res_t
Configuration result type.
Definition: rdkafka.h:979
int member_cnt
Definition: rdkafka.h:2911
RD_EXPORT char * rd_kafka_memberid(const rd_kafka_t *rk)
Returns this client&#39;s broker-assigned group member id.
RD_EXPORT void rd_kafka_queue_forward(rd_kafka_queue_t *src, rd_kafka_queue_t *dst)
Forward/re-route queue src to dst. If dst is NULL the forwarding is removed.
int cnt
Definition: rdkafka.h:612
Definition: rdkafka.h:320
RD_EXPORT int rd_kafka_thread_cnt(void)
Retrieve the current number of threads in use by librdkafka.
RD_EXPORT void rd_kafka_conf_set_consume_cb(rd_kafka_conf_t *conf, void(*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque))
Consumer: Set consume callback for use with rd_kafka_consumer_poll()
rd_kafka_topic_partition_t * elems
Definition: rdkafka.h:614
RD_EXPORT rd_kafka_resp_err_t rd_kafka_subscription(rd_kafka_t *rk, rd_kafka_topic_partition_list_t **topics)
Returns the current topic subscription.
RD_EXPORT int32_t rd_kafka_msg_partitioner_consistent(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *opaque, void *msg_opaque)
Consistent partitioner.
RD_EXPORT void rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t *conf, void(*dr_msg_cb)(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque))
Producer: Set delivery report callback in provided conf object.
RD_EXPORT const char * rd_kafka_event_name(const rd_kafka_event_t *rkev)
Definition: rdkafka.h:785
RD_EXPORT void rd_kafka_topic_partition_list_destroy(rd_kafka_topic_partition_list_t *rkparlist)
Free all resources used by the list and the list itself.
RD_EXPORT int rd_kafka_unittest(void)
Run librdkafka&#39;s built-in unit-tests.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_resume_partitions(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions)
Resume producing consumption for the provided list of partitions.
RD_EXPORT rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size)
Sets a single rd_kafka_topic_conf_t value by property name.
RD_EXPORT rd_kafka_topic_partition_list_t * rd_kafka_topic_partition_list_copy(const rd_kafka_topic_partition_list_t *src)
Make a copy of an existing list.
RD_EXPORT void rd_kafka_topic_conf_set_partitioner_cb(rd_kafka_topic_conf_t *topic_conf, int32_t(*partitioner)(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque))
Producer: Set partitioner callback in provided topic conf object.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_poll_set_consumer(rd_kafka_t *rk)
Redirect the main (rd_kafka_poll()) queue to the KafkaConsumer&#39;s queue (rd_kafka_consumer_poll()).
RD_EXPORT void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage)
Frees resources for rkmessage and hands ownership back to rdkafka.
RD_EXPORT rd_kafka_conf_res_t rd_kafka_topic_conf_get(const rd_kafka_topic_conf_t *conf, const char *name, char *dest, size_t *dest_size)
Retrieve topic configuration value for property name.
Definition: rdkafka.h:276
Definition: rdkafka.h:405
Definition: rdkafka.h:778
RD_EXPORT rd_kafka_resp_err_t rd_kafka_position(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions)
Retrieve current positions (offsets) for topics+partitions.
Definition: rdkafka.h:290
RD_EXPORT rd_kafka_queue_t * rd_kafka_queue_get_consumer(rd_kafka_t *rk)
char * state
Definition: rdkafka.h:2907
size_t key_len
Definition: rdkafka.h:889
RD_EXPORT rd_kafka_resp_err_t rd_kafka_offsets_store(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *offsets)
Store offsets for one or more partitions.
Definition: rdkafka.h:270
RD_EXPORT rd_kafka_event_type_t rd_kafka_event_type(const rd_kafka_event_t *rkev)
RD_EXPORT rd_kafka_conf_t * rd_kafka_conf_dup_filter(const rd_kafka_conf_t *conf, size_t filter_cnt, const char **filter)
Same as rd_kafka_conf_dup() but with an array of property name prefixes to filter out (ignore) when c...
Definition: rdkafka.h:981
int member_assignment_size
Definition: rdkafka.h:2897
RD_EXPORT rd_kafka_t * rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)
Creates a new Kafka handle and starts its operation according to the specified type (RD_KAFKA_CONSUME...
RD_EXPORT void rd_kafka_get_err_descs(const struct rd_kafka_err_desc **errdescs, size_t *cntp)
Returns the full list of error codes.
Group information.
Definition: rdkafka.h:2903
char * group
Definition: rdkafka.h:2905
Partition information.
Definition: rdkafka.h:2803
Definition: rdkafka.h:259
RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_commit(rd_kafka_t *rk, const char *ic_name, rd_kafka_interceptor_f_on_commit_t *on_commit, void *ic_opaque)
Append an on_commit() interceptor.
Definition: rdkafka.h:431
RD_EXPORT int rd_kafka_event_log(rd_kafka_event_t *rkev, const char **fac, const char **str, int *level)
Extract log message from the event.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_offsets_for_times(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *offsets, int timeout_ms)
Look up the offsets for the given partitions by timestamp.
rd_kafka_resp_err_t( rd_kafka_interceptor_f_on_conf_dup_t)(rd_kafka_conf_t *new_conf, const rd_kafka_conf_t *old_conf, size_t filter_cnt, const char **filter, void *ic_opaque)
on_conf_dup() is called from rd_kafka_conf_dup() in the order the interceptors were added and is used...
Definition: rdkafka.h:3492
RD_EXPORT const char * rd_kafka_version_str(void)
Returns the librdkafka version as string.
RD_EXPORT void * rd_kafka_opaque(const rd_kafka_t *rk)
Retrieves the opaque pointer previously set with rd_kafka_conf_set_opaque()
char * client_id
Definition: rdkafka.h:2890
const char * name
Definition: rdkafka.h:463
Definition: rdkafka.h:266
RD_EXPORT rd_kafka_topic_partition_list_t * rd_kafka_topic_partition_list_new(int size)
Create a new list/vector Topic+Partition container.
char * client_host
Definition: rdkafka.h:2891
struct rd_kafka_group_info * groups
Definition: rdkafka.h:2920
RD_EXPORT rd_kafka_resp_err_t rd_kafka_commit_message(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, int async)
Commit message&#39;s offset on broker for the message&#39;s partition.
RD_EXPORT ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms, rd_kafka_message_t **rkmessages, size_t rkmessages_size)
Consume up to rkmessages_size from topic rkt and partition putting a pointer to each message in the a...
Definition: rdkafka.h:783
RD_EXPORT void rd_kafka_topic_conf_set_opaque(rd_kafka_topic_conf_t *conf, void *opaque)
Sets the application&#39;s opaque pointer that will be passed to all topic callbacks as the rkt_opaque ar...
Definition: rdkafka.h:286
rd_kafka_resp_err_t( rd_kafka_interceptor_f_on_new_t)(rd_kafka_t *rk, const rd_kafka_conf_t *conf, void *ic_opaque, char *errstr, size_t errstr_size)
on_new() is called from rd_kafka_new() prior toreturning the newly created client instance to the app...
Definition: rdkafka.h:3527
RD_EXPORT void rd_kafka_log_print(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
Builtin (default) log sink: print to stderr.
char * protocol_type
Definition: rdkafka.h:2908
rd_kafka_resp_err_t( rd_kafka_interceptor_f_on_acknowledgement_t)(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque)
on_acknowledgement() is called to inform interceptors that a message was succesfully delivered or per...
Definition: rdkafka.h:3593
rd_kafka_resp_err_t( rd_kafka_interceptor_f_on_commit_t)(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, rd_kafka_resp_err_t err, void *ic_opaque)
on_commit() is called on completed or failed offset commit. It is called from internal librdkafka thr...
Definition: rdkafka.h:3640
Definition: rdkafka.h:294
rd_kafka_resp_err_t( rd_kafka_plugin_f_conf_init_t)(rd_kafka_conf_t *conf, void **plug_opaquep, char *errstr, size_t errstr_size)
Plugin&#39;s configuration initializer method called each time the library is referenced from configurati...
Definition: rdkafka.h:3385
RD_EXPORT int rd_kafka_consume_callback_queue(rd_kafka_queue_t *rkqu, int timeout_ms, void(*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque), void *opaque)
Consume multiple messages from queue with callback.
RD_EXPORT rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size)
Sets a configuration property.
RD_EXPORT int rd_kafka_outq_len(rd_kafka_t *rk)
Returns the current out queue length.
int group_cnt
Definition: rdkafka.h:2921
Definition: rdkafka.h:312
RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_consume(rd_kafka_t *rk, const char *ic_name, rd_kafka_interceptor_f_on_consume_t *on_consume, void *ic_opaque)
Append an on_consume() interceptor.
rd_kafka_vtype_t
Var-arg tag types.
Definition: rdkafka.h:776
RD_EXPORT const rd_kafka_message_t * rd_kafka_event_message_next(rd_kafka_event_t *rkev)
RD_EXPORT rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset)
Store offset offset for topic rkt partition partition.
RD_EXPORT void rd_kafka_conf_set_error_cb(rd_kafka_conf_t *conf, void(*error_cb)(rd_kafka_t *rk, int err, const char *reason, void *opaque))
Set error callback in provided conf object.
rd_kafka_resp_err_t err
Definition: rdkafka.h:2906
size_t len
Definition: rdkafka.h:884
RD_EXPORT rd_kafka_resp_err_t rd_kafka_commit(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, int async)
Commit offsets on broker for the provided list of partitions.
RD_EXPORT void rd_kafka_conf_set_default_topic_conf(rd_kafka_conf_t *conf, rd_kafka_topic_conf_t *tconf)
Definition: rdkafka.h:383
RD_EXPORT rd_kafka_type_t rd_kafka_type(const rd_kafka_t *rk)
Returns Kafka handle type.
RD_EXPORT int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t keylen, void *msg_opaque)
Produce and send a single message to broker.
rd_kafka_conf_res_t( rd_kafka_interceptor_f_on_conf_set_t)(rd_kafka_conf_t *conf, const char *name, const char *val, char *errstr, size_t errstr_size, void *ic_opaque)
on_conf_set() is called from rd_kafka_*_conf_set() in the order the interceptors were added...
Definition: rdkafka.h:3469
Definition: rdkafka.h:280
RD_EXPORT const char * rd_kafka_event_error_string(rd_kafka_event_t *rkev)
Definition: rdkafka.h:385
Definition: rdkafka.h:423
RD_EXPORT rd_kafka_topic_partition_t * rd_kafka_event_topic_partition(rd_kafka_event_t *rkev)
struct rd_kafka_metadata_broker broker
Definition: rdkafka.h:2904
RD_EXPORT void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu)
RD_EXPORT size_t rd_kafka_queue_length(rd_kafka_queue_t *rkqu)
Definition: rdkafka.h:298
RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_acknowledgement(rd_kafka_t *rk, const char *ic_name, rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement, void *ic_opaque)
Append an on_acknowledgement() interceptor.
rd_kafka_resp_err_t code
Definition: rdkafka.h:462
RD_EXPORT void rd_kafka_destroy(rd_kafka_t *rk)
Destroy Kafka handle.
RD_EXPORT void rd_kafka_set_log_level(rd_kafka_t *rk, int level)
Specifies the maximum logging level produced by internal kafka logging and debugging.
RD_EXPORT int rd_kafka_consume_callback(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms, void(*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque), void *opaque)
Consumes messages from topic rkt and partition, calling the provided callback for each consumed messs...
Definition: rdkafka.h:784
RD_EXPORT rd_kafka_resp_err_t rd_kafka_metadata(rd_kafka_t *rk, int all_topics, rd_kafka_topic_t *only_rkt, const struct rd_kafka_metadata **metadatap, int timeout_ms)
Request Metadata from broker.
RD_EXPORT int rd_kafka_wait_destroyed(int timeout_ms)
Wait for all rd_kafka_t objects to be destroyed.
Definition: rdkafka.h:369
RD_EXPORT rd_kafka_resp_err_t rd_kafka_commit_queue(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, rd_kafka_queue_t *rkqu, void(*cb)(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque), void *opaque)
Commit offsets on broker for the provided list of partitions.
Definition: rdkafka.h:357
int64_t offset
Definition: rdkafka.h:891
Definition: rdkafka.h:361
RD_EXPORT int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist)
Adds one or more brokers to the kafka handle&#39;s list of initial bootstrap brokers. ...
RD_EXPORT RD_DEPRECATED int rd_kafka_errno(void)
Returns the thread-local system errno.
rd_kafka_resp_err_t( rd_kafka_interceptor_f_on_destroy_t)(rd_kafka_t *rk, void *ic_opaque)
on_destroy() is called from rd_kafka_destroy() or (rd_kafka_new() if rd_kafka_new() fails during init...
Definition: rdkafka.h:3540
Definition: rdkafka.h:343
Definition: rdkafka.h:255
Definition: rdkafka.h:333
RD_EXPORT void rd_kafka_log_syslog(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
Builtin log sink: print to syslog.
Definition: rdkafka.h:243
Group member information.
Definition: rdkafka.h:2888
void * key
Definition: rdkafka.h:887
RD_EXPORT rd_kafka_conf_res_t rd_kafka_conf_get(const rd_kafka_conf_t *conf, const char *name, char *dest, size_t *dest_size)
Retrieve configuration value for property name.
RD_EXPORT rd_kafka_topic_conf_t * rd_kafka_topic_conf_new(void)
Create topic configuration object.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_get_watermark_offsets(rd_kafka_t *rk, const char *topic, int32_t partition, int64_t *low, int64_t *high)
Get last known low (oldest/beginning) and high (newest/end) offsets for partition.
RD_EXPORT void rd_kafka_dump(FILE *fp, rd_kafka_t *rk)
Dumps rdkafka&#39;s internal state for handle rk to stream fp.
Definition: rdkafka.h:780
Definition: rdkafka.h:268
RD_EXPORT void rd_kafka_conf_set_closesocket_cb(rd_kafka_conf_t *conf, int(*closesocket_cb)(int sockfd, void *opaque))
Set close socket callback.
A growable list of Topic+Partitions.
Definition: rdkafka.h:611
int rd_kafka_event_type_t
Event types.
Definition: rdkafka.h:3139
Topic information.
Definition: rdkafka.h:2816
RD_EXPORT void rd_kafka_conf_set_throttle_cb(rd_kafka_conf_t *conf, void(*throttle_cb)(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque))
Set throttle callback.
RD_EXPORT rd_kafka_queue_t * rd_kafka_queue_get_main(rd_kafka_t *rk)
RD_EXPORT void rd_kafka_conf_set_log_cb(rd_kafka_conf_t *conf, void(*log_cb)(const rd_kafka_t *rk, int level, const char *fac, const char *buf))
Set logger callback.
Definition: rdkafka.h:257
RD_EXPORT rd_kafka_resp_err_t rd_kafka_list_groups(rd_kafka_t *rk, const char *group, const struct rd_kafka_group_list **grplistp, int timeout_ms)
List and describe client groups in cluster.
Definition: rdkafka.h:779
int32_t partition
Definition: rdkafka.h:588
Definition: rdkafka.h:253
Definition: rdkafka.h:282
Definition: rdkafka.h:251
void * opaque
Definition: rdkafka.h:592
const char * desc
Definition: rdkafka.h:464
RD_EXPORT rd_kafka_resp_err_t rd_kafka_assign(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *partitions)
Atomic assignment of partitions to consume.
RD_EXPORT rd_kafka_conf_t * rd_kafka_conf_dup(const rd_kafka_conf_t *conf)
Creates a copy/duplicate of configuration object conf.
RD_EXPORT int64_t rd_kafka_message_timestamp(const rd_kafka_message_t *rkmessage, rd_kafka_timestamp_type_t *tstype)
Returns the message timestamp for a consumed message.
RD_EXPORT rd_kafka_queue_t * rd_kafka_queue_get_partition(rd_kafka_t *rk, const char *topic, int32_t partition)
RD_EXPORT int rd_kafka_topic_partition_list_del_by_idx(rd_kafka_topic_partition_list_t *rktparlist, int idx)
Delete partition from list by elems[] index.
rd_kafka_resp_err_t( rd_kafka_interceptor_f_on_conf_destroy_t)(void *ic_opaque)
on_conf_destroy() is called from rd_kafka_*_conf_destroy() in the order the interceptors were added...
Definition: rdkafka.h:3506
RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_send(rd_kafka_t *rk, const char *ic_name, rd_kafka_interceptor_f_on_send_t *on_send, void *ic_opaque)
Append an on_send() interceptor.
Definition: rdkafka.h:355
Definition: rdkafka.h:318
RD_EXPORT void rd_kafka_conf_set_connect_cb(rd_kafka_conf_t *conf, int(*connect_cb)(int sockfd, const struct sockaddr *addr, int addrlen, const char *id, void *opaque))
Set connect callback.
RD_EXPORT void rd_kafka_mem_free(rd_kafka_t *rk, void *ptr)
Free pointer returned by librdkafka.
static RD_INLINE const char *RD_UNUSED rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage)
Returns the error string for an errored rd_kafka_message_t or NULL if there was no error...
Definition: rdkafka.h:925
RD_EXPORT rd_kafka_resp_err_t rd_kafka_conf_interceptor_add_on_new(rd_kafka_conf_t *conf, const char *ic_name, rd_kafka_interceptor_f_on_new_t *on_new, void *ic_opaque)
Append an on_new() interceptor.
RD_EXPORT char * rd_kafka_clusterid(rd_kafka_t *rk, int timeout_ms)
Returns the ClusterId as reported in broker metadata.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_pause_partitions(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions)
Pause producing or consumption for the provided list of partitions.
RD_EXPORT void rd_kafka_group_list_destroy(const struct rd_kafka_group_list *grplist)
Release list memory.
Definition: rdkafka.h:415
RD_EXPORT void rd_kafka_event_destroy(rd_kafka_event_t *rkev)
Destroy an event.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_committed(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions, int timeout_ms)
Retrieve committed offsets for topics+partitions.
Definition: rdkafka.h:193
RD_EXPORT rd_kafka_conf_t * rd_kafka_conf_new(void)
Create configuration object.
RD_EXPORT rd_kafka_event_t * rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms)
Poll a queue for an event for max timeout_ms.
RD_EXPORT int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition)
Stop consuming messages for topic rkt and partition, purging all messages currently in the local queu...
RD_EXPORT const char * rd_kafka_err2name(rd_kafka_resp_err_t err)
Returns the error code name (enum name).
RD_EXPORT rd_kafka_resp_err_t rd_kafka_conf_interceptor_add_on_conf_destroy(rd_kafka_conf_t *conf, const char *ic_name, rd_kafka_interceptor_f_on_conf_destroy_t *on_conf_destroy, void *ic_opaque)
Append an on_conf_destroy() interceptor.
RD_EXPORT int rd_kafka_version(void)
Returns the librdkafka version as integer.
Definition: rdkafka.h:403
void * member_assignment
Definition: rdkafka.h:2895
RD_EXPORT int rd_kafka_topic_partition_list_del(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition)
Delete partition from list.
int size
Definition: rdkafka.h:613
RD_EXPORT rd_kafka_resp_err_t rd_kafka_subscribe(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics)
Subscribe to topic set using balanced consumer groups.
RD_EXPORT void rd_kafka_topic_partition_destroy(rd_kafka_topic_partition_t *rktpar)
Destroy a rd_kafka_topic_partition_t.
RD_EXPORT void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque)
Sets the application&#39;s opaque pointer that will be passed to callbacks.
Definition: rdkafka.h:982
RD_EXPORT const char ** rd_kafka_conf_dump(rd_kafka_conf_t *conf, size_t *cntp)
Dump the configuration properties and values of conf to an array with "key", "value" pairs...
RD_EXPORT rd_kafka_resp_err_t rd_kafka_event_error(rd_kafka_event_t *rkev)
Definition: rdkafka.h:310
char * topic
Definition: rdkafka.h:587
RD_EXPORT const char * rd_kafka_get_debug_contexts(void)
Retrieve supported debug contexts for use with the "debug" configuration property. (runtime)
Definition: rdkafka.h:316
RD_EXPORT void rd_kafka_conf_set_offset_commit_cb(rd_kafka_conf_t *conf, void(*offset_commit_cb)(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque))
Consumer: Set offset commit callback for use with consumer groups.
RD_EXPORT void rd_kafka_conf_set_rebalance_cb(rd_kafka_conf_t *conf, void(*rebalance_cb)(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque))
Consumer: Set rebalance callback for use with coordinated consumer group balancing.
Definition: rdkafka.h:777
RD_EXPORT int rd_kafka_queue_poll_callback(rd_kafka_queue_t *rkqu, int timeout_ms)
Poll a queue for events served through callbacks for max timeout_ms.
rd_kafka_resp_err_t( rd_kafka_interceptor_f_on_consume_t)(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque)
on_consume() is called just prior to passing the message to the application in rd_kafka_consumer_poll...
Definition: rdkafka.h:3615
RD_EXPORT rd_kafka_resp_err_t rd_kafka_flush(rd_kafka_t *rk, int timeout_ms)
Wait until all outstanding produce requests, et.al, are completed. This should typically be done prio...
RD_EXPORT int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms)
Polls the provided kafka handle for events.
RD_EXPORT int rd_kafka_topic_partition_available(const rd_kafka_topic_t *rkt, int32_t partition)
Check if partition is available (has a leader broker).
Definition: rdkafka.h:180
RD_EXPORT int64_t rd_kafka_message_latency(const rd_kafka_message_t *rkmessage)
Returns the latency for a produced message measured from the produce() call.
Definition: rdkafka.h:419
Definition: rdkafka.h:373
RD_EXPORT void rd_kafka_conf_set_socket_cb(rd_kafka_conf_t *conf, int(*socket_cb)(int domain, int type, int protocol, void *opaque))
Set socket callback.
Definition: rdkafka.h:181
Definition: rdkafka.h:192
Definition: rdkafka.h:278
RD_EXPORT void rd_kafka_conf_set_events(rd_kafka_conf_t *conf, int events)
Enable event sourcing. events is a bitmask of RD_KAFKA_EVENT_* of events to enable for consumption by...
RD_EXPORT void rd_kafka_conf_set_dr_cb(rd_kafka_conf_t *conf, void(*dr_cb)(rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t err, void *opaque, void *msg_opaque))
Definition: rdkafka.h:274
RD_EXPORT rd_kafka_topic_conf_t * rd_kafka_topic_conf_dup(const rd_kafka_topic_conf_t *conf)
Creates a copy/duplicate of topic configuration object conf.
Definition: rdkafka.h:337
RD_EXPORT RD_DEPRECATED void rd_kafka_set_logger(rd_kafka_t *rk, void(*func)(const rd_kafka_t *rk, int level, const char *fac, const char *buf))
Set logger function.
Definition: rdkafka.h:249
Metadata container.
Definition: rdkafka.h:2827
Definition: rdkafka.h:308
RD_EXPORT rd_kafka_resp_err_t rd_kafka_unsubscribe(rd_kafka_t *rk)
Unsubscribe from the current subscription set.
RD_EXPORT void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata)
Release metadata memory.
RD_EXPORT int rd_kafka_produce_batch(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, rd_kafka_message_t *rkmessages, int message_cnt)
Produce multiple messages.
RD_EXPORT void rd_kafka_conf_set_open_cb(rd_kafka_conf_t *conf, int(*open_cb)(const char *pathname, int flags, mode_t mode, void *opaque))
Set open callback.
Definition: rdkafka.h:300
RD_EXPORT rd_kafka_resp_err_t rd_kafka_producev(rd_kafka_t *rk,...)
Produce and send a single message to broker.
Definition: rdkafka.h:417
RD_EXPORT rd_kafka_resp_err_t rd_kafka_seek(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset, int timeout_ms)
Seek consumer for topic+partition to offset which is either an absolute or logical offset...
RD_EXPORT void rd_kafka_topic_partition_list_add_range(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t start, int32_t stop)
Add range of partitions from start to stop inclusive.
RD_EXPORT size_t rd_kafka_event_message_count(rd_kafka_event_t *rkev)
Definition: rdkafka.h:980
RD_EXPORT rd_kafka_topic_partition_t * rd_kafka_topic_partition_list_add(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition)
Add topic+partition to list.
Definition: rdkafka.h:272
Definition: rdkafka.h:191
Definition: rdkafka.h:296
RD_EXPORT ssize_t rd_kafka_consume_batch_queue(rd_kafka_queue_t *rkqu, int timeout_ms, rd_kafka_message_t **rkmessages, size_t rkmessages_size)
Consume batch of messages from queue.
rd_kafka_timestamp_type_t
Definition: rdkafka.h:190
RD_EXPORT void * rd_kafka_topic_opaque(const rd_kafka_topic_t *rkt)
Get the rkt_opaque pointer that was set in the topic configuration.
RD_EXPORT void * rd_kafka_event_opaque(rd_kafka_event_t *rkev)
RD_EXPORT const char * rd_kafka_name(const rd_kafka_t *rk)
Returns Kafka handle name.
A Kafka message as returned by the rd_kafka_consume*() family of functions as well as provided to the...
Definition: rdkafka.h:876
Definition: rdkafka.h:284
RD_EXPORT void rd_kafka_queue_io_event_enable(rd_kafka_queue_t *rkqu, int fd, const void *payload, size_t size)
Enable IO event triggering for queue.
RD_EXPORT void rd_kafka_topic_destroy(rd_kafka_topic_t *rkt)
Loose application&#39;s topic handle refcount as previously created with rd_kafka_topic_new().
int32_t partition
Definition: rdkafka.h:879
Definition: rdkafka.h:329
RD_EXPORT rd_kafka_resp_err_t rd_kafka_conf_interceptor_add_on_conf_set(rd_kafka_conf_t *conf, const char *ic_name, rd_kafka_interceptor_f_on_conf_set_t *on_conf_set, void *ic_opaque)
Append an on_conf_set() interceptor.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_query_watermark_offsets(rd_kafka_t *rk, const char *topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)
Query broker for low (oldest/beginning) and high (newest/end) offsets for partition.
RD_EXPORT rd_kafka_message_t * rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms)
Consume a single message from topic rkt and partition.
Definition: rdkafka.h:288
RD_EXPORT rd_kafka_message_t * rd_kafka_consumer_poll(rd_kafka_t *rk, int timeout_ms)
Poll the consumer for messages or events.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_assignment(rd_kafka_t *rk, rd_kafka_topic_partition_list_t **partitions)
Returns the current partition assignment.
Definition: rdkafka.h:264
char * member_id
Definition: rdkafka.h:2889
List of groups.
Definition: rdkafka.h:2919
Definition: rdkafka.h:448
Definition: rdkafka.h:782
RD_EXPORT const char * rd_kafka_err2str(rd_kafka_resp_err_t err)
Returns a human readable representation of a kafka error.
void * metadata
Definition: rdkafka.h:590
RD_EXPORT void rd_kafka_topic_partition_list_sort(rd_kafka_topic_partition_list_t *rktparlist, int(*cmp)(const void *a, const void *b, void *opaque), void *opaque)
Sort list using comparator cmp.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_destroy(rd_kafka_t *rk, const char *ic_name, rd_kafka_interceptor_f_on_destroy_t *on_destroy, void *ic_opaque)
Append an on_destroy() interceptor.
int64_t offset
Definition: rdkafka.h:589
RD_EXPORT rd_kafka_topic_partition_list_t * rd_kafka_event_topic_partition_list(rd_kafka_event_t *rkev)
RD_EXPORT RD_DEPRECATED rd_kafka_resp_err_t rd_kafka_errno2err(int errnox)
Converts the system errno value errnox to a rd_kafka_resp_err_t error code upon failure from the foll...
Definition: rdkafka.h:247
RD_EXPORT void rd_kafka_conf_destroy(rd_kafka_conf_t *conf)
Destroys a conf object.
Definition: rdkafka.h:306
Broker information.
Definition: rdkafka.h:2794
Error code value, name and description. Typically for use with language bindings to automatically exp...
Definition: rdkafka.h:461
Definition: rdkafka.h:399
Definition: rdkafka.h:379
rd_kafka_resp_err_t err
Definition: rdkafka.h:593
Definition: rdkafka.h:304
Topic+Partition place holder.
Definition: rdkafka.h:586
Definition: rdkafka.h:245
struct rd_kafka_group_member_info * members
Definition: rdkafka.h:2910
size_t metadata_size
Definition: rdkafka.h:591
RD_EXPORT const char ** rd_kafka_topic_conf_dump(rd_kafka_topic_conf_t *conf, size_t *cntp)
Dump the topic configuration properties and values of conf to an array with "key", "value" pairs.
int member_metadata_size
Definition: rdkafka.h:2894
RD_EXPORT size_t rd_kafka_event_message_array(rd_kafka_event_t *rkev, const rd_kafka_message_t **rkmessages, size_t size)
Extacts size message(s) from the event into the pre-allocated array rkmessages.
Definition: rdkafka.h:781
rd_kafka_type_t
rd_kafka_t handle type.
Definition: rdkafka.h:179
RD_EXPORT int32_t rd_kafka_msg_partitioner_random(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *opaque, void *msg_opaque)
Random partitioner.
RD_EXPORT void rd_kafka_conf_set_stats_cb(rd_kafka_conf_t *conf, int(*stats_cb)(rd_kafka_t *rk, char *json, size_t json_len, void *opaque))
Set statistics callback in provided conf object.
Definition: rdkafka.h:349
RD_EXPORT rd_kafka_topic_partition_t * rd_kafka_topic_partition_list_find(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition)
Find element by topic and partition.
RD_EXPORT void rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t *topic_conf)
Destroys a topic conf object.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition, int64_t offset)
Set offset to offset for topic and partition.
Definition: rdkafka.h:409
void * payload
Definition: rdkafka.h:880
RD_EXPORT void rd_kafka_yield(rd_kafka_t *rk)
Cancels the current callback dispatcher (rd_kafka_poll(), rd_kafka_consume_callback(), etc).
Definition: rdkafka.h:339
RD_EXPORT const char * rd_kafka_event_stats(rd_kafka_event_t *rkev)
Extract stats from the event.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_set_log_queue(rd_kafka_t *rk, rd_kafka_queue_t *rkqu)
Forward librdkafka logs (and debug) to the specified queue for serving with one of the ...
void * member_metadata
Definition: rdkafka.h:2892
RD_EXPORT rd_kafka_resp_err_t rd_kafka_consumer_close(rd_kafka_t *rk)
Close down the KafkaConsumer.
RD_EXPORT void rd_kafka_conf_properties_show(FILE *fp)
Prints a table to fp of all supported configuration properties, their default values as well as a des...
RD_EXPORT void rd_kafka_conf_dump_free(const char **arr, size_t cnt)
Frees a configuration dump returned from rd_kafka_conf_dump() or `rd_kafka_topic_conf_dump().
void * _private
Definition: rdkafka.h:901
Definition: rdkafka.h:262
RD_EXPORT int32_t rd_kafka_msg_partitioner_consistent_random(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *opaque, void *msg_opaque)
Consistent-Random partitioner.
RD_EXPORT const char * rd_kafka_topic_name(const rd_kafka_topic_t *rkt)
Returns the topic name.
char * protocol
Definition: rdkafka.h:2909
RD_EXPORT rd_kafka_message_t * rd_kafka_consume_queue(rd_kafka_queue_t *rkqu, int timeout_ms)
Consume from queue.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_last_error(void)
Returns the last error code generated by a legacy API call in the current thread. ...
RD_EXPORT int rd_kafka_consume_start_queue(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset, rd_kafka_queue_t *rkqu)
Same as rd_kafka_consume_start() but re-routes incoming messages to the provided queue rkqu (which mu...
RD_EXPORT rd_kafka_queue_t * rd_kafka_queue_new(rd_kafka_t *rk)
Create a new message queue.
RD_EXPORT rd_kafka_topic_t * rd_kafka_topic_new(rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)
Creates a new topic handle for topic named topic.