/** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ publicintpartition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){ List partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
** * **The range assignor works on a per-topic basis**. For each topic, we lay out the available partitions in numeric order * and the consumers in lexicographic order. We then divide the number of partitions by the total number of * consumers to determine the number of partitions to assign to each consumer. If it does not evenly * divide, then the first few consumers will have one extra partition. * * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. * * The assignment will be: * C0: [t0p0, t0p1, t1p0, t1p1] * C1: [t0p2, t1p2] */
/** * **The round robin assignor lays out all the available partitions and all the available consumers.** It * then proceeds to do a round robin assignment from partition to consumer. If the subscriptions of all consumer * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts * will be within a delta of exactly one across all consumers.) * * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. * * The assignment will be: * C0: [t0p0, t0p2, t1p1] * C1: [t0p1, t1p0, t1p2] * * When subscriptions differ across consumer instances, the assignment process still considers each * consumer instance in round robin fashion but skips over an instance if it is not subscribed to * the topic. Unlike the case when subscriptions are identical, this can result in imbalanced * assignments. For example, we have three consumers C0, C1, C2, and three topics t0, t1, t2, * with 1, 2, and 3 partitions, respectively. Therefore, the partitions are t0p0, t1p0, t1p1, t2p0, * t2p1, t2p2\. C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2. * * Tha assignment will be: * C0: [t0p0] * C1: [t1p0] * C2: [t1p1, t2p0, t2p1, t2p2] */