librdkafka
The Apache Kafka C/C++ client library
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
rdkafkacpp.h
Go to the documentation of this file.
1 /*
2  * librdkafka - Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2014 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  * this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  * this list of conditions and the following disclaimer in the documentation
14  * and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #pragma once
30 
49 #include <string>
50 #include <list>
51 #include <vector>
52 #include <stdint.h>
53 
54 
55 #ifdef _MSC_VER
56 #undef RD_EXPORT
57 #ifdef LIBRDKAFKA_STATICLIB
58 #define RD_EXPORT
59 #else
60 #ifdef LIBRDKAFKACPP_EXPORTS
61 #define RD_EXPORT __declspec(dllexport)
62 #else
63 #define RD_EXPORT __declspec(dllimport)
64 #endif
65 #endif
66 #else
67 #define RD_EXPORT
68 #endif
69 
72 namespace RdKafka {
73 
74 
94 #define RD_KAFKA_VERSION 0x000b00c9
95 
101 RD_EXPORT
102 int version ();
103 
107 RD_EXPORT
108 std::string version_str();
109 
114 RD_EXPORT
115 std::string get_debug_contexts();
116 
126 RD_EXPORT
127 int wait_destroyed(int timeout_ms);
128 
129 
152 enum ErrorCode {
153  /* Internal errors to rdkafka: */
155  ERR__BEGIN = -200,
157  ERR__BAD_MSG = -199,
159  ERR__BAD_COMPRESSION = -198,
161  ERR__DESTROY = -197,
163  ERR__FAIL = -196,
165  ERR__TRANSPORT = -195,
167  ERR__CRIT_SYS_RESOURCE = -194,
169  ERR__RESOLVE = -193,
171  ERR__MSG_TIMED_OUT = -192,
174  ERR__PARTITION_EOF = -191,
176  ERR__UNKNOWN_PARTITION = -190,
178  ERR__FS = -189,
180  ERR__UNKNOWN_TOPIC = -188,
182  ERR__ALL_BROKERS_DOWN = -187,
184  ERR__INVALID_ARG = -186,
186  ERR__TIMED_OUT = -185,
188  ERR__QUEUE_FULL = -184,
190  ERR__ISR_INSUFF = -183,
192  ERR__NODE_UPDATE = -182,
194  ERR__SSL = -181,
196  ERR__WAIT_COORD = -180,
198  ERR__UNKNOWN_GROUP = -179,
200  ERR__IN_PROGRESS = -178,
202  ERR__PREV_IN_PROGRESS = -177,
204  ERR__EXISTING_SUBSCRIPTION = -176,
206  ERR__ASSIGN_PARTITIONS = -175,
208  ERR__REVOKE_PARTITIONS = -174,
210  ERR__CONFLICT = -173,
212  ERR__STATE = -172,
214  ERR__UNKNOWN_PROTOCOL = -171,
216  ERR__NOT_IMPLEMENTED = -170,
218  ERR__AUTHENTICATION = -169,
220  ERR__NO_OFFSET = -168,
222  ERR__OUTDATED = -167,
224  ERR__TIMED_OUT_QUEUE = -166,
226  ERR__UNSUPPORTED_FEATURE = -165,
228  ERR__WAIT_CACHE = -164,
230  ERR__INTR = -163,
232  ERR__KEY_SERIALIZATION = -162,
234  ERR__VALUE_SERIALIZATION = -161,
236  ERR__KEY_DESERIALIZATION = -160,
238  ERR__VALUE_DESERIALIZATION = -159,
240  ERR__END = -100,
241 
242  /* Kafka broker errors: */
244  ERR_UNKNOWN = -1,
246  ERR_NO_ERROR = 0,
248  ERR_OFFSET_OUT_OF_RANGE = 1,
250  ERR_INVALID_MSG = 2,
252  ERR_UNKNOWN_TOPIC_OR_PART = 3,
254  ERR_INVALID_MSG_SIZE = 4,
256  ERR_LEADER_NOT_AVAILABLE = 5,
258  ERR_NOT_LEADER_FOR_PARTITION = 6,
260  ERR_REQUEST_TIMED_OUT = 7,
262  ERR_BROKER_NOT_AVAILABLE = 8,
264  ERR_REPLICA_NOT_AVAILABLE = 9,
266  ERR_MSG_SIZE_TOO_LARGE = 10,
268  ERR_STALE_CTRL_EPOCH = 11,
270  ERR_OFFSET_METADATA_TOO_LARGE = 12,
272  ERR_NETWORK_EXCEPTION = 13,
274  ERR_GROUP_LOAD_IN_PROGRESS = 14,
276  ERR_GROUP_COORDINATOR_NOT_AVAILABLE = 15,
278  ERR_NOT_COORDINATOR_FOR_GROUP = 16,
280  ERR_TOPIC_EXCEPTION = 17,
282  ERR_RECORD_LIST_TOO_LARGE = 18,
284  ERR_NOT_ENOUGH_REPLICAS = 19,
286  ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20,
288  ERR_INVALID_REQUIRED_ACKS = 21,
290  ERR_ILLEGAL_GENERATION = 22,
292  ERR_INCONSISTENT_GROUP_PROTOCOL = 23,
294  ERR_INVALID_GROUP_ID = 24,
296  ERR_UNKNOWN_MEMBER_ID = 25,
298  ERR_INVALID_SESSION_TIMEOUT = 26,
300  ERR_REBALANCE_IN_PROGRESS = 27,
302  ERR_INVALID_COMMIT_OFFSET_SIZE = 28,
304  ERR_TOPIC_AUTHORIZATION_FAILED = 29,
306  ERR_GROUP_AUTHORIZATION_FAILED = 30,
308  ERR_CLUSTER_AUTHORIZATION_FAILED = 31,
310  ERR_INVALID_TIMESTAMP = 32,
312  ERR_UNSUPPORTED_SASL_MECHANISM = 33,
314  ERR_ILLEGAL_SASL_STATE = 34,
316  ERR_UNSUPPORTED_VERSION = 35,
318  ERR_TOPIC_ALREADY_EXISTS = 36,
320  ERR_INVALID_PARTITIONS = 37,
322  ERR_INVALID_REPLICATION_FACTOR = 38,
324  ERR_INVALID_REPLICA_ASSIGNMENT = 39,
326  ERR_INVALID_CONFIG = 40,
328  ERR_NOT_CONTROLLER = 41,
330  ERR_INVALID_REQUEST = 42,
332  ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43,
334  ERR_POLICY_VIOLATION = 44,
336  ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45,
338  ERR_DUPLICATE_SEQUENCE_NUMBER = 46,
340  ERR_INVALID_PRODUCER_EPOCH = 47,
342  ERR_INVALID_TXN_STATE = 48,
345  ERR_INVALID_PRODUCER_ID_MAPPING = 49,
348  ERR_INVALID_TRANSACTION_TIMEOUT = 50,
351  ERR_CONCURRENT_TRANSACTIONS = 51,
355  ERR_TRANSACTION_COORDINATOR_FENCED = 52,
357  ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53,
359  ERR_SECURITY_DISABLED = 54,
361  ERR_OPERATION_NOT_ATTEMPTED = 55
362 };
363 
364 
368 RD_EXPORT
369 std::string err2str(RdKafka::ErrorCode err);
370 
371 
377 /* Forward declarations */
378 class Producer;
379 class Message;
380 class Queue;
381 class Event;
382 class Topic;
383 class TopicPartition;
384 class Metadata;
385 class KafkaConsumer;
417 class RD_EXPORT DeliveryReportCb {
418  public:
422  virtual void dr_cb (Message &message) = 0;
423 
424  virtual ~DeliveryReportCb() { }
425 };
426 
427 
435 class RD_EXPORT PartitionerCb {
436  public:
453  virtual int32_t partitioner_cb (const Topic *topic,
454  const std::string *key,
455  int32_t partition_cnt,
456  void *msg_opaque) = 0;
457 
458  virtual ~PartitionerCb() { }
459 };
460 
466  public:
475  virtual int32_t partitioner_cb (const Topic *topic,
476  const void *key,
477  size_t key_len,
478  int32_t partition_cnt,
479  void *msg_opaque) = 0;
480 
481  virtual ~PartitionerKeyPointerCb() { }
482 };
483 
484 
485 
494 class RD_EXPORT EventCb {
495  public:
501  virtual void event_cb (Event &event) = 0;
502 
503  virtual ~EventCb() { }
504 };
505 
506 
510 class RD_EXPORT Event {
511  public:
513  enum Type {
517  EVENT_THROTTLE
518  };
519 
521  enum Severity {
522  EVENT_SEVERITY_EMERG = 0,
523  EVENT_SEVERITY_ALERT = 1,
524  EVENT_SEVERITY_CRITICAL = 2,
525  EVENT_SEVERITY_ERROR = 3,
526  EVENT_SEVERITY_WARNING = 4,
527  EVENT_SEVERITY_NOTICE = 5,
528  EVENT_SEVERITY_INFO = 6,
529  EVENT_SEVERITY_DEBUG = 7
530  };
531 
532  virtual ~Event () { }
533 
534  /*
535  * Event Accessor methods
536  */
537 
542  virtual Type type () const = 0;
543 
548  virtual ErrorCode err () const = 0;
549 
554  virtual Severity severity () const = 0;
555 
560  virtual std::string fac () const = 0;
561 
570  virtual std::string str () const = 0;
571 
576  virtual int throttle_time () const = 0;
577 
582  virtual std::string broker_name () const = 0;
583 
588  virtual int broker_id () const = 0;
589 };
590 
591 
592 
596 class RD_EXPORT ConsumeCb {
597  public:
605  virtual void consume_cb (Message &message, void *opaque) = 0;
606 
607  virtual ~ConsumeCb() { }
608 };
609 
610 
614 class RD_EXPORT RebalanceCb {
615 public:
665  virtual void rebalance_cb (RdKafka::KafkaConsumer *consumer,
666  RdKafka::ErrorCode err,
667  std::vector<TopicPartition*>&partitions) = 0;
668 
669  virtual ~RebalanceCb() { }
670 };
671 
672 
676 class RD_EXPORT OffsetCommitCb {
677 public:
693  virtual void offset_commit_cb(RdKafka::ErrorCode err,
694  std::vector<TopicPartition*>&offsets) = 0;
695 
696  virtual ~OffsetCommitCb() { }
697 };
698 
699 
700 
705 class RD_EXPORT SocketCb {
706  public:
720  virtual int socket_cb (int domain, int type, int protocol) = 0;
721 
722  virtual ~SocketCb() { }
723 };
724 
725 
730 class RD_EXPORT OpenCb {
731  public:
743  virtual int open_cb (const std::string &path, int flags, int mode) = 0;
744 
745  virtual ~OpenCb() { }
746 };
747 
748 
769 class RD_EXPORT Conf {
770  public:
774  enum ConfType {
776  CONF_TOPIC
777  };
778 
782  enum ConfResult {
783  CONF_UNKNOWN = -2,
784  CONF_INVALID = -1,
785  CONF_OK = 0
786  };
787 
788 
792  static Conf *create (ConfType type);
793 
794  virtual ~Conf () { }
795 
809  virtual Conf::ConfResult set (const std::string &name,
810  const std::string &value,
811  std::string &errstr) = 0;
812 
814  virtual Conf::ConfResult set (const std::string &name,
815  DeliveryReportCb *dr_cb,
816  std::string &errstr) = 0;
817 
819  virtual Conf::ConfResult set (const std::string &name,
820  EventCb *event_cb,
821  std::string &errstr) = 0;
822 
830  virtual Conf::ConfResult set (const std::string &name,
831  const Conf *topic_conf,
832  std::string &errstr) = 0;
833 
835  virtual Conf::ConfResult set (const std::string &name,
836  PartitionerCb *partitioner_cb,
837  std::string &errstr) = 0;
838 
840  virtual Conf::ConfResult set (const std::string &name,
841  PartitionerKeyPointerCb *partitioner_kp_cb,
842  std::string &errstr) = 0;
843 
845  virtual Conf::ConfResult set (const std::string &name, SocketCb *socket_cb,
846  std::string &errstr) = 0;
847 
849  virtual Conf::ConfResult set (const std::string &name, OpenCb *open_cb,
850  std::string &errstr) = 0;
851 
853  virtual Conf::ConfResult set (const std::string &name,
854  RebalanceCb *rebalance_cb,
855  std::string &errstr) = 0;
856 
858  virtual Conf::ConfResult set (const std::string &name,
859  OffsetCommitCb *offset_commit_cb,
860  std::string &errstr) = 0;
861 
873  virtual Conf::ConfResult get(const std::string &name,
874  std::string &value) const = 0;
875 
879  virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const = 0;
880 
884  virtual Conf::ConfResult get(EventCb *&event_cb) const = 0;
885 
889  virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const = 0;
890 
894  virtual Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const = 0;
895 
899  virtual Conf::ConfResult get(SocketCb *&socket_cb) const = 0;
900 
904  virtual Conf::ConfResult get(OpenCb *&open_cb) const = 0;
905 
909  virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const = 0;
910 
914  virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const = 0;
915 
918  virtual std::list<std::string> *dump () = 0;
919 
921  virtual Conf::ConfResult set (const std::string &name, ConsumeCb *consume_cb,
922  std::string &errstr) = 0;
923 };
924 
937 class RD_EXPORT Handle {
938  public:
939  virtual ~Handle() { }
940 
942  virtual const std::string name () const = 0;
943 
952  virtual const std::string memberid () const = 0;
953 
954 
977  virtual int poll (int timeout_ms) = 0;
978 
985  virtual int outq_len () = 0;
986 
1002  virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt,
1003  Metadata **metadatap, int timeout_ms) = 0;
1004 
1005 
1015  virtual ErrorCode pause (std::vector<TopicPartition*> &partitions) = 0;
1016 
1017 
1027  virtual ErrorCode resume (std::vector<TopicPartition*> &partitions) = 0;
1028 
1029 
1038  virtual ErrorCode query_watermark_offsets (const std::string &topic,
1039  int32_t partition,
1040  int64_t *low, int64_t *high,
1041  int timeout_ms) = 0;
1042 
1060  virtual ErrorCode get_watermark_offsets (const std::string &topic,
1061  int32_t partition,
1062  int64_t *low, int64_t *high) = 0;
1063 
1064 
1084  virtual ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets,
1085  int timeout_ms) = 0;
1086 
1087 
1096  virtual Queue *get_partition_queue (const TopicPartition *partition) = 0;
1097 
1114  virtual ErrorCode set_log_queue (Queue *queue) = 0;
1115 
1127  virtual void yield () = 0;
1128 
1143  virtual const std::string clusterid (int timeout_ms) = 0;
1144 };
1145 
1146 
1165 class RD_EXPORT TopicPartition {
1166 public:
1173  static TopicPartition *create (const std::string &topic, int partition);
1174  static TopicPartition *create (const std::string &topic, int partition,
1175  int64_t offset);
1176 
1177  virtual ~TopicPartition() = 0;
1178 
1183  static void destroy (std::vector<TopicPartition*> &partitions);
1184 
1186  virtual const std::string &topic () const = 0;
1187 
1189  virtual int partition () const = 0;
1190 
1192  virtual int64_t offset () const = 0;
1193 
1195  virtual void set_offset (int64_t offset) = 0;
1196 
1198  virtual ErrorCode err () const = 0;
1199 };
1200 
1201 
1202 
1207 class RD_EXPORT Topic {
1208  public:
1215  static const int32_t PARTITION_UA;
1216 
1218  static const int64_t OFFSET_BEGINNING;
1219  static const int64_t OFFSET_END;
1220  static const int64_t OFFSET_STORED;
1221  static const int64_t OFFSET_INVALID;
1233  static Topic *create (Handle *base, const std::string &topic_str,
1234  Conf *conf, std::string &errstr);
1235 
1236  virtual ~Topic () = 0;
1237 
1238 
1240  virtual const std::string name () const = 0;
1241 
1247  virtual bool partition_available (int32_t partition) const = 0;
1248 
1259  virtual ErrorCode offset_store (int32_t partition, int64_t offset) = 0;
1260 };
1261 
1262 
1284 class RD_EXPORT MessageTimestamp {
1285 public:
1289  MSG_TIMESTAMP_LOG_APPEND_TIME
1290  };
1291 
1293  int64_t timestamp;
1294 };
1295 
1296 
1297 
1309 class RD_EXPORT Message {
1310  public:
1318  virtual std::string errstr() const = 0;
1319 
1321  virtual ErrorCode err () const = 0;
1322 
1327  virtual Topic *topic () const = 0;
1328 
1330  virtual std::string topic_name () const = 0;
1331 
1333  virtual int32_t partition () const = 0;
1334 
1336  virtual void *payload () const = 0 ;
1337 
1339  virtual size_t len () const = 0;
1340 
1342  virtual const std::string *key () const = 0;
1343 
1345  virtual const void *key_pointer () const = 0 ;
1346 
1348  virtual size_t key_len () const = 0;
1349 
1351  virtual int64_t offset () const = 0;
1352 
1354  virtual MessageTimestamp timestamp () const = 0;
1355 
1357  virtual void *msg_opaque () const = 0;
1358 
1359  virtual ~Message () = 0;
1360 
1363  virtual int64_t latency () const = 0;
1364 };
1365 
1389 class RD_EXPORT Queue {
1390  public:
1394  static Queue *create (Handle *handle);
1395 
1406  virtual ErrorCode forward (Queue *dst) = 0;
1407 
1408 
1420  virtual Message *consume (int timeout_ms) = 0;
1421 
1429  virtual int poll (int timeout_ms) = 0;
1430 
1431  virtual ~Queue () = 0;
1432 };
1433 
1452 class RD_EXPORT KafkaConsumer : public virtual Handle {
1453 public:
1465  static KafkaConsumer *create (Conf *conf, std::string &errstr);
1466 
1467  virtual ~KafkaConsumer () = 0;
1468 
1469 
1472  virtual ErrorCode assignment (std::vector<RdKafka::TopicPartition*> &partitions) = 0;
1473 
1476  virtual ErrorCode subscription (std::vector<std::string> &topics) = 0;
1477 
1502  virtual ErrorCode subscribe (const std::vector<std::string> &topics) = 0;
1503 
1505  virtual ErrorCode unsubscribe () = 0;
1506 
1513  virtual ErrorCode assign (const std::vector<TopicPartition*> &partitions) = 0;
1514 
1518  virtual ErrorCode unassign () = 0;
1519 
1544  virtual Message *consume (int timeout_ms) = 0;
1545 
1559  virtual ErrorCode commitSync () = 0;
1560 
1566  virtual ErrorCode commitAsync () = 0;
1567 
1575  virtual ErrorCode commitSync (Message *message) = 0;
1576 
1584  virtual ErrorCode commitAsync (Message *message) = 0;
1585 
1591  virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0;
1592 
1598  virtual ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) = 0;
1599 
1610  virtual ErrorCode commitSync (OffsetCommitCb *offset_commit_cb) = 0;
1611 
1622  virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets,
1623  OffsetCommitCb *offset_commit_cb) = 0;
1624 
1625 
1626 
1627 
1636  virtual ErrorCode committed (std::vector<TopicPartition*> &partitions,
1637  int timeout_ms) = 0;
1638 
1647  virtual ErrorCode position (std::vector<TopicPartition*> &partitions) = 0;
1648 
1649 
1672  virtual ErrorCode close () = 0;
1673 
1674 
1692  virtual ErrorCode seek (const TopicPartition &partition, int timeout_ms) = 0;
1693 
1694 
1706  virtual ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) = 0;
1707 };
1708 
1709 
1724 class RD_EXPORT Consumer : public virtual Handle {
1725  public:
1736  static Consumer *create (Conf *conf, std::string &errstr);
1737 
1738  virtual ~Consumer () = 0;
1739 
1740 
1760  virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset) = 0;
1761 
1768  virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset,
1769  Queue *queue) = 0;
1770 
1780  virtual ErrorCode stop (Topic *topic, int32_t partition) = 0;
1781 
1796  virtual ErrorCode seek (Topic *topic, int32_t partition, int64_t offset,
1797  int timeout_ms) = 0;
1798 
1816  virtual Message *consume (Topic *topic, int32_t partition,
1817  int timeout_ms) = 0;
1818 
1840  virtual Message *consume (Queue *queue, int timeout_ms) = 0;
1841 
1861  virtual int consume_callback (Topic *topic, int32_t partition,
1862  int timeout_ms,
1863  ConsumeCb *consume_cb,
1864  void *opaque) = 0;
1865 
1872  virtual int consume_callback (Queue *queue, int timeout_ms,
1873  RdKafka::ConsumeCb *consume_cb,
1874  void *opaque) = 0;
1875 
1885  static int64_t OffsetTail(int64_t offset);
1886 };
1887 
1901 class RD_EXPORT Producer : public virtual Handle {
1902  public:
1913  static Producer *create (Conf *conf, std::string &errstr);
1914 
1915 
1916  virtual ~Producer () = 0;
1917 
1923  enum {
1924  RK_MSG_FREE = 0x1,
1926  RK_MSG_COPY = 0x2,
1930  RK_MSG_BLOCK = 0x4
1947  /* For backwards compatibility: */
1948 #ifndef MSG_COPY /* defined in sys/msg.h */
1949  ,
1952  MSG_FREE = RK_MSG_FREE,
1953  MSG_COPY = RK_MSG_COPY
1954 #endif
1955 
1956  };
1957 
2012  virtual ErrorCode produce (Topic *topic, int32_t partition,
2013  int msgflags,
2014  void *payload, size_t len,
2015  const std::string *key,
2016  void *msg_opaque) = 0;
2017 
2022  virtual ErrorCode produce (Topic *topic, int32_t partition,
2023  int msgflags,
2024  void *payload, size_t len,
2025  const void *key, size_t key_len,
2026  void *msg_opaque) = 0;
2027 
2034  virtual ErrorCode produce (const std::string topic_name, int32_t partition,
2035  int msgflags,
2036  void *payload, size_t len,
2037  const void *key, size_t key_len,
2038  int64_t timestamp,
2039  void *msg_opaque) = 0;
2040 
2041 
2046  virtual ErrorCode produce (Topic *topic, int32_t partition,
2047  const std::vector<char> *payload,
2048  const std::vector<char> *key,
2049  void *msg_opaque) = 0;
2050 
2051 
2063  virtual ErrorCode flush (int timeout_ms) = 0;
2064 };
2065 
2080  public:
2082  virtual int32_t id() const = 0;
2083 
2085  virtual const std::string host() const = 0;
2086 
2088  virtual int port() const = 0;
2089 
2090  virtual ~BrokerMetadata() = 0;
2091 };
2092 
2093 
2094 
2099  public:
2101  typedef std::vector<int32_t> ReplicasVector;
2103  typedef std::vector<int32_t> ISRSVector;
2104 
2106  typedef ReplicasVector::const_iterator ReplicasIterator;
2108  typedef ISRSVector::const_iterator ISRSIterator;
2109 
2110 
2112  virtual int32_t id() const = 0;
2113 
2115  virtual ErrorCode err() const = 0;
2116 
2118  virtual int32_t leader() const = 0;
2119 
2121  virtual const std::vector<int32_t> *replicas() const = 0;
2122 
2126  virtual const std::vector<int32_t> *isrs() const = 0;
2127 
2128  virtual ~PartitionMetadata() = 0;
2129 };
2130 
2131 
2132 
2137  public:
2139  typedef std::vector<const PartitionMetadata*> PartitionMetadataVector;
2141  typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator;
2142 
2144  virtual const std::string topic() const = 0;
2145 
2147  virtual const PartitionMetadataVector *partitions() const = 0;
2148 
2150  virtual ErrorCode err() const = 0;
2151 
2152  virtual ~TopicMetadata() = 0;
2153 };
2154 
2155 
2159 class Metadata {
2160  public:
2162  typedef std::vector<const BrokerMetadata*> BrokerMetadataVector;
2164  typedef std::vector<const TopicMetadata*> TopicMetadataVector;
2165 
2167  typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator;
2169  typedef TopicMetadataVector::const_iterator TopicMetadataIterator;
2170 
2171 
2173  virtual const BrokerMetadataVector *brokers() const = 0;
2174 
2176  virtual const TopicMetadataVector *topics() const = 0;
2177 
2179  virtual int32_t orig_broker_id() const = 0;
2180 
2182  virtual const std::string orig_broker_name() const = 0;
2183 
2184  virtual ~Metadata() = 0;
2185 };
2186 
2189 }
2190 
virtual int32_t partitioner_cb(const Topic *topic, const void *key, size_t key_len, int32_t partition_cnt, void *msg_opaque)=0
Variant partitioner callback that gets key as pointer and length instead of as a const std::string *...
virtual ErrorCode err() const =0
virtual const std::string orig_broker_name() const =0
Broker (name) originating this metadata.
ISRSVector::const_iterator ISRSIterator
ISRs iterator.
Definition: rdkafkacpp.h:2108
High-level KafkaConsumer (for brokers 0.9 and later)
Definition: rdkafkacpp.h:1452
static const int64_t OFFSET_BEGINNING
Special offsets.
Definition: rdkafkacpp.h:1218
ConfType
Configuration object type.
Definition: rdkafkacpp.h:774
Partitioner callback class.
Definition: rdkafkacpp.h:435
virtual int port() const =0
Type
Event type.
Definition: rdkafkacpp.h:513
virtual int32_t orig_broker_id() const =0
Broker (id) originating this metadata.
virtual const std::string host() const =0
std::vector< const PartitionMetadata * > PartitionMetadataVector
Partitions.
Definition: rdkafkacpp.h:2139
static const int32_t PARTITION_UA
Unassigned partition.
Definition: rdkafkacpp.h:1215
Message object.
Definition: rdkafkacpp.h:1309
Severity
EVENT_LOG severities (conforms to syslog(3) severities)
Definition: rdkafkacpp.h:521
Event callback class.
Definition: rdkafkacpp.h:494
ReplicasVector::const_iterator ReplicasIterator
Replicas iterator.
Definition: rdkafkacpp.h:2106
KafkaConsunmer: Rebalance callback class
Definition: rdkafkacpp.h:614
std::vector< int32_t > ReplicasVector
Replicas.
Definition: rdkafkacpp.h:2101
virtual const std::string topic() const =0
int64_t timestamp
Definition: rdkafkacpp.h:1293
Variant partitioner with key pointer.
Definition: rdkafkacpp.h:465
virtual int32_t id() const =0
BrokerMetadataVector::const_iterator BrokerMetadataIterator
Brokers iterator.
Definition: rdkafkacpp.h:2167
Definition: rdkafkacpp.h:516
static const int64_t OFFSET_INVALID
Definition: rdkafkacpp.h:1221
virtual const PartitionMetadataVector * partitions() const =0
virtual const BrokerMetadataVector * brokers() const =0
Broker list.
Topic handle.
Definition: rdkafkacpp.h:1207
Metadata: Partition information.
Definition: rdkafkacpp.h:2098
Producer.
Definition: rdkafkacpp.h:1901
Metadata: Topic information.
Definition: rdkafkacpp.h:2136
Definition: rdkafkacpp.h:775
Definition: rdkafkacpp.h:514
std::vector< const BrokerMetadata * > BrokerMetadataVector
Brokers.
Definition: rdkafkacpp.h:2162
ConfResult
RdKafka::Conf::Set() result code.
Definition: rdkafkacpp.h:782
Queue interface.
Definition: rdkafkacpp.h:1389
MessageTimestampType type
Definition: rdkafkacpp.h:1292
Message timestamp object.
Definition: rdkafkacpp.h:1284
std::vector< int32_t > ISRSVector
ISRs (In-Sync-Replicas)
Definition: rdkafkacpp.h:2103
virtual const TopicMetadataVector * topics() const =0
Topic list.
static const int64_t OFFSET_END
Definition: rdkafkacpp.h:1219
PartitionMetadataVector::const_iterator PartitionMetadataIterator
Partitions iterator.
Definition: rdkafkacpp.h:2141
Portability: OpenCb callback class
Definition: rdkafkacpp.h:730
Metadata: Broker information.
Definition: rdkafkacpp.h:2079
Definition: rdkafkacpp.h:515
virtual int32_t leader() const =0
static const int64_t OFFSET_STORED
Definition: rdkafkacpp.h:1220
Configuration interface.
Definition: rdkafkacpp.h:769
Offset Commit callback class.
Definition: rdkafkacpp.h:676
virtual const std::vector< int32_t > * replicas() const =0
Base handle, super class for specific clients.
Definition: rdkafkacpp.h:937
std::vector< const TopicMetadata * > TopicMetadataVector
Topics.
Definition: rdkafkacpp.h:2164
Topic+Partition.
Definition: rdkafkacpp.h:1165
Consume callback class.
Definition: rdkafkacpp.h:596
virtual const std::vector< int32_t > * isrs() const =0
Simple Consumer (legacy)
Definition: rdkafkacpp.h:1724
MessageTimestampType
Definition: rdkafkacpp.h:1286
TopicMetadataVector::const_iterator TopicMetadataIterator
Topics iterator.
Definition: rdkafkacpp.h:2169
Event object class as passed to the EventCb callback.
Definition: rdkafkacpp.h:510
Metadata container.
Definition: rdkafkacpp.h:2159
virtual ErrorCode err() const =0
Portability: SocketCb callback class
Definition: rdkafkacpp.h:705
Delivery Report callback class.
Definition: rdkafkacpp.h:417
virtual int32_t id() const =0