02 | 日志(上):日志究竟是如何加载日志段的?

你好,我是胡夕。今天我来讲讲Kafka源码的日志(Log)对象。

上节课,我们学习了日志段部分的源码,你可以认为,日志是日志段的容器,里面定义了很多管理日志段的操作。坦率地说,如果看Kafka源码却不看Log,就跟你买了这门课却不知道作者是谁一样。在我看来,Log对象是Kafka源码(特别是Broker端)最核心的部分,没有之一。

它到底有多重要呢?我和你分享一个例子,你先感受下。我最近正在修复一个Kafka的Bug(KAFKA-9157):在某些情况下,Kafka的Compaction操作会产生很多空的日志段文件。如果要避免这些空日志段文件被创建出来,就必须搞懂创建日志段文件的原理,而这些代码恰恰就在Log源码中。

既然Log源码要管理日志段对象,那么它就必须先把所有日志段对象加载到内存里面。这个过程是怎么实现的呢?今天,我就带你学习下日志加载日志段的过程。

首先,我们来看下Log对象的源码结构。

Log源码结构

Log源码位于Kafka core工程的log源码包下,文件名是Log.scala。总体上,该文件定义了10个类和对象,如下图所示:

那么,这10个类和对象都是做什么的呢?我先给你简单介绍一下,你可以对它们有个大致的了解。

不过,在介绍之前,我先提一句,图中括号里的C表示Class,O表示Object。还记得我在上节课提到过的伴生对象吗?没错,同时定义同名的Class和Object,就属于Scala中的伴生对象用法。

我们先来看伴生对象,也就是LogAppendInfo、Log和RollParams。

1.LogAppendInfo

  • LogAppendInfo(C):保存了一组待写入消息的各种元数据信息。比如,这组消息中第一条消息的位移值是多少、最后一条消息的位移值是多少;再比如,这组消息中最大的消息时间戳又是多少。总之,这里面的数据非常丰富(下节课我再具体说说)。
  • LogAppendInfo(O): 可以理解为其对应伴生类的工厂方法类,里面定义了一些工厂方法,用于创建特定的LogAppendInfo实例。

2.Log

  • Log(C): Log源码中最核心的代码。这里我先卖个关子,一会儿细聊。
  • Log(O):同理,Log伴生类的工厂方法,定义了很多常量以及一些辅助方法。

3.RollParams

  • RollParams(C):定义用于控制日志段是否切分(Roll)的数据结构。
  • RollParams(O):同理,RollParams伴生类的工厂方法。

除了这3组伴生对象之外,还有4类源码。

  • LogMetricNames:定义了Log对象的监控指标。
  • LogOffsetSnapshot:封装分区所有位移元数据的容器类。
  • LogReadInfo:封装读取日志返回的数据及其元数据。
  • CompletedTxn:记录已完成事务的元数据,主要用于构建事务索引。

Log Class & Object

下面,我会按照这些类和对象的重要程度,对它们一一进行拆解。首先,咱们先说说Log类及其伴生对象。

考虑到伴生对象多用于保存静态变量和静态方法(比如静态工厂方法等),因此我们先看伴生对象(即Log Object)的实现。毕竟,柿子先找软的捏!

object Log {
  val LogFileSuffix = ".log"
  val IndexFileSuffix = ".index"
  val TimeIndexFileSuffix = ".timeindex"
  val ProducerSnapshotFileSuffix = ".snapshot"
  val TxnIndexFileSuffix = ".txnindex"
  val DeletedFileSuffix = ".deleted"
  val CleanedFileSuffix = ".cleaned"
  val SwapFileSuffix = ".swap"
  val CleanShutdownFile = ".kafka_cleanshutdown"
  val DeleteDirSuffix = "-delete"
  val FutureDirSuffix = "-future"
……
}

这是Log Object定义的所有常量。如果有面试官问你Kafka中定义了多少种文件类型,你可以自豪地把这些说出来。耳熟能详的.log、.index、.timeindex和.txnindex我就不解释了,我们来了解下其他几种文件类型。

  • .snapshot是Kafka为幂等型或事务型Producer所做的快照文件。鉴于我们现在还处于阅读源码的初级阶段,事务或幂等部分的源码我就不详细展开讲了。
  • .deleted是删除日志段操作创建的文件。目前删除日志段文件是异步操作,Broker端把日志段文件从.log后缀修改为.deleted后缀。如果你看到一大堆.deleted后缀的文件名,别慌,这是Kafka在执行日志段文件删除。
  • .cleaned和.swap都是Compaction操作的产物,等我们讲到Cleaner的时候再说。
  • -delete则是应用于文件夹的。当你删除一个主题的时候,主题的分区文件夹会被加上这个后缀。
  • -future是用于变更主题分区文件夹地址的,属于比较高阶的用法。

总之,记住这些常量吧。记住它们的主要作用是,以后不要被面试官唬住!开玩笑,其实这些常量最重要的地方就在于,它们能够让你了解Kafka定义的各种文件类型。

Log Object还定义了超多的工具类方法。由于它们都很简单,这里我只给出一个方法的源码,我们一起读一下。

def filenamePrefixFromOffset(offset: Long): String = {
    val nf = NumberFormat.getInstance()
    nf.setMinimumIntegerDigits(20)
    nf.setMaximumFractionDigits(0)
    nf.setGroupingUsed(false)
    nf.format(offset)
  }

这个方法的作用是通过给定的位移值计算出对应的日志段文件名。Kafka日志文件固定是20位的长度,filenamePrefixFromOffset方法就是用前面补0的方式,把给定位移值扩充成一个固定20位长度的字符串。

举个例子,我们给定一个位移值是12345,那么Broker端磁盘上对应的日志段文件名就应该是00000000000000012345.log。怎么样,很简单吧?其他的工具类方法也很简单,我就不一一展开说了。

下面我们来看Log源码部分的重头戏:Log类。这是一个2000多行的大类。放眼整个Kafka源码,像Log这么大的类也不多见,足见它的重要程度。我们先来看这个类的定义:

class Log(@volatile var dir: File,
          @volatile var config: LogConfig,
          @volatile var logStartOffset: Long,
          @volatile var recoveryPoint: Long,
          scheduler: Scheduler,
          brokerTopicStats: BrokerTopicStats,
          val time: Time,
          val maxProducerIdExpirationMs: Int,
          val producerIdExpirationCheckIntervalMs: Int,
          val topicPartition: TopicPartition,
          val producerStateManager: ProducerStateManager,
          logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
……
}

看着好像有很多属性,但其实,你只需要记住两个属性的作用就够了:dir和logStartOffset。dir就是这个日志所在的文件夹路径,也就是主题分区的路径。而logStartOffset,表示日志的当前最早位移。dir和logStartOffset都是volatile var类型,表示它们的值是变动的,而且可能被多个线程更新。

你可能听过日志的当前末端位移,也就是Log End Offset(LEO),它是表示日志下一条待插入消息的位移值,而这个Log Start Offset是跟它相反的,它表示日志当前对外可见的最早一条消息的位移值。我用一张图来标识它们的区别:

图中绿色的位移值3是日志的Log Start Offset,而位移值15表示LEO。另外,位移值8是高水位值,它是区分已提交消息和未提交消息的分水岭。

有意思的是,Log End Offset可以简称为LEO,但Log Start Offset却不能简称为LSO。因为在Kafka中,LSO特指Log Stable Offset,属于Kafka事务的概念。这个课程中不会涉及LSO,你只需要知道Log Start Offset不等于LSO即可。

Log类的其他属性你暂时不用理会,因为它们要么是很明显的工具类属性,比如timer和scheduler,要么是高阶用法才会用到的高级属性,比如producerStateManager和logDirFailureChannel。工具类的代码大多是做辅助用的,跳过它们也不妨碍我们理解Kafka的核心功能;而高阶功能代码设计复杂,学习成本高,性价比不高。

其实,除了Log类签名定义的这些属性之外,Log类还定义了一些很重要的属性,比如下面这段代码:

    @volatile private var nextOffsetMetadata: LogOffsetMetadata = _
    @volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
    private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
    @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None

第一个属性nextOffsetMetadata,它封装了下一条待插入消息的位移值,你基本上可以把这个属性和LEO等同起来。

第二个属性highWatermarkMetadata,是分区日志高水位值。关于高水位的概念,我们在《Kafka核心技术与实战》这个课程中做过详细解释,你可以看一下这篇文章(下节课我还会再具体给你介绍下)。

第三个属性segments,我认为这是Log类中最重要的属性。它保存了分区日志下所有的日志段信息,只不过是用Map的数据结构来保存的。Map的Key值是日志段的起始位移值,Value则是日志段对象本身。Kafka源码使用ConcurrentNavigableMap数据结构来保存日志段对象,就可以很轻松地利用该类提供的线程安全和各种支持排序的方法,来管理所有日志段对象。

第四个属性是Leader Epoch Cache对象。Leader Epoch是社区于0.11.0.0版本引入源码中的,主要是用来判断出现Failure时是否执行日志截断操作(Truncation)。之前靠高水位来判断的机制,可能会造成副本间数据不一致的情形。这里的Leader Epoch Cache是一个缓存类数据,里面保存了分区Leader的Epoch值与对应位移值的映射关系,我建议你查看下LeaderEpochFileCache类,深入地了解下它的实现原理。

掌握了这些基本属性之后,我们看下Log类的初始化逻辑:

 locally {
        val startMs = time.milliseconds
    
    
        // create the log directory if it doesn't exist
        Files.createDirectories(dir.toPath)
    
    
        initializeLeaderEpochCache()
    
    
        val nextOffset = loadSegments()
    
    
        /* Calculate the offset of the next message */
        nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
    
    
        leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset))
    
    
        logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)
    
    
        // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
        leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
    
    
        // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
        // from scratch.
        if (!producerStateManager.isEmpty)
          throw new IllegalStateException("Producer state must be empty during log initialization")
        loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile)
    
    
        info(s"Completed load of log with ${segments.size} segments, log start offset $logStartOffset and " +
          s"log end offset $logEndOffset in ${time.milliseconds() - startMs} 

在详细解释这段初始化代码之前,我使用一张图来说明它到底做了什么:

这里我们重点说说第三步,即加载日志段的实现逻辑,以下是loadSegments的实现代码:

 private def loadSegments(): Long = {
        // first do a pass through the files in the log directory and remove any temporary files
        // and find any interrupted swap operations
        val swapFiles = removeTempFilesAndCollectSwapFiles()
    
    
        // Now do a second pass and load all the log and index files.
        // We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When
        // this happens, restart loading segment files from scratch.
        retryOnOffsetOverflow {
          // In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry
          // loading of segments. In that case, we also need to close all segments that could have been left open in previous
          // call to loadSegmentFiles().
          logSegments.foreach(_.close())
          segments.clear()
          loadSegmentFiles()
        }
    
    
        // Finally, complete any interrupted swap operations. To be crash-safe,
        // log files that are replaced by the swap segment should be renamed to .deleted
        // before the swap file is restored as the new segment file.
        completeSwapOperations(swapFiles)
    
    
        if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
          val nextOffset = retryOnOffsetOverflow {
            recoverLog()
          }
    
    
          // reset the index size of the currently active log segment to allow more entries
          activeSegment.resizeIndexes(config.maxIndexSize)
          nextOffset
        } else {
           if (logSegments.isEmpty) {
              addSegment(LogSegment.open(dir = dir,
                baseOffset = 0,
                config,
                time = time,
                fileAlreadyExists = false,
                initFileSize = this.initFileSize,
                preallocate = false))
           }
          0
        }

这段代码会对分区日志路径遍历两次。

首先,它会移除上次Failure遗留下来的各种临时文件(包括.cleaned、.swap、.deleted文件等),removeTempFilesAndCollectSwapFiles方法实现了这个逻辑。

之后,它会清空所有日志段对象,并且再次遍历分区路径,重建日志段segments Map并删除无对应日志段文件的孤立索引文件。

待执行完这两次遍历之后,它会完成未完成的swap操作,即调用completeSwapOperations方法。等这些都做完之后,再调用recoverLog方法恢复日志段对象,然后返回恢复之后的分区日志LEO值。

如果你现在觉得有点蒙,也没关系,我把这段代码再进一步拆解下,以更小的粒度跟你讲下它们做了什么。理解了这段代码之后,你大致就能搞清楚大部分的分区日志操作了。所以,这部分代码绝对值得我们多花一点时间去学习。

我们首先来看第一步,removeTempFilesAndCollectSwapFiles方法的实现。我用注释的方式详细解释了每行代码的作用:

 private def removeTempFilesAndCollectSwapFiles(): Set[File] = {
    
    // 在方法内部定义一个名为deleteIndicesIfExist的方法,用于删除日志文件对应的索引文件
    
    def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = {
    
    info(s"Deleting index files with suffix $suffix for baseFile $baseFile")
    
    val offset = offsetFromFile(baseFile)
    
    Files.deleteIfExists(Log.offsetIndexFile(dir, offset, suffix).toPath)
    
    Files.deleteIfExists(Log.timeIndexFile(dir, offset, suffix).toPath)
    
    Files.deleteIfExists(Log.transactionIndexFile(dir, offset, suffix).toPath)
    
    }
    
    var swapFiles = Set[File]()
    
    var cleanFiles = Set[File]()
    
    var minCleanedFileOffset = Long.MaxValue
    
    // 遍历分区日志路径下的所有文件
    
    for (file <- dir.listFiles if file.isFile) {
    
    if (!file.canRead) // 如果不可读,直接抛出IOException
    
    throw new IOException(s"Could not read file $file")
    
    val filename = file.getName
    
    if (filename.endsWith(DeletedFileSuffix)) { // 如果以.deleted结尾
    
    debug(s"Deleting stray temporary file ${file.getAbsolutePath}")
    
    Files.deleteIfExists(file.toPath) // 说明是上次Failure遗留下来的文件,直接删除
    
    } else if (filename.endsWith(CleanedFileSuffix)) { // 如果以.cleaned结尾
    
    minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset) // 选取文件名中位移值最小的.cleaned文件,获取其位移值,并将该文件加入待删除文件集合中
    
    cleanFiles += file
    
    } else if (filename.endsWith(SwapFileSuffix)) { // 如果以.swap结尾
    
    val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
    
    info(s"Found file ${file.getAbsolutePath} from interrupted swap operation.")
    
    if (isIndexFile(baseFile)) { // 如果该.swap文件原来是索引文件
    
    deleteIndicesIfExist(baseFile) // 删除原来的索引文件
    
    } else if (isLogFile(baseFile)) { // 如果该.swap文件原来是日志文件
    
    deleteIndicesIfExist(baseFile) // 删除掉原来的索引文件
    
    swapFiles += file // 加入待恢复的.swap文件集合中
    
    }
    
    }
    
    }
    
    // 从待恢复swap集合中找出那些起始位移值大于minCleanedFileOffset值的文件,直接删掉这些无效的.swap文件
    
    val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file => offsetFromFile(file) >= minCleanedFileOffset)
    
    invalidSwapFiles.foreach { file =>
    
    debug(s"Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset")
    
    val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
    
    deleteIndicesIfExist(baseFile, SwapFileSuffix)
    
    Files.deleteIfExists(file.toPath)
    
    }
    
    // Now that we have deleted all .swap files that constitute an incomplete split operation, let's delete all .clean files
    
    // 清除所有待删除文件集合中的文件
    
    cleanFiles.foreach { file =>
    
    debug(s"Deleting stray .clean file ${file.getAbsolutePath}")
    
    Files.deleteIfExists(file.toPath)
    
    }
    
    // 最后返回当前有效的.swap文件集合
    
    validSwapFiles
    
    }

执行完了removeTempFilesAndCollectSwapFiles逻辑之后,源码开始清空已有日志段集合,并重新加载日志段文件。这就是第二步。这里调用的主要方法是loadSegmentFiles。

   private def loadSegmentFiles(): Unit = {
    
    // 按照日志段文件名中的位移值正序排列,然后遍历每个文件
    
    for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
    
    if (isIndexFile(file)) { // 如果是索引文件
    
    val offset = offsetFromFile(file)
    
    val logFile = Log.logFile(dir, offset)
    
    if (!logFile.exists) { // 确保存在对应的日志文件,否则记录一个警告,并删除该索引文件
    
    warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
    
    Files.deleteIfExists(file.toPath)
    
    }
    
    } else if (isLogFile(file)) { // 如果是日志文件
    
    val baseOffset = offsetFromFile(file)
    
    val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists()
    
    // 创建对应的LogSegment对象实例,并加入segments中
    
    val segment = LogSegment.open(dir = dir,
    
    baseOffset = baseOffset,
    
    config,
    
    time = time,
    
    fileAlreadyExists = true)
    
    try segment.sanityCheck(timeIndexFileNewlyCreated)
    
    catch {
    
    case _: NoSuchFileException =>
    
    error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " +
    
    "recovering segment and rebuilding index files...")
    
    recoverSegment(segment)
    
    case e: CorruptIndexException =>
    
    warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " +
    
    s"to ${e.getMessage}}, recovering segment and rebuilding index files...")
    
    recoverSegment(segment)
    
    }
    
    addSegment(segment)
    
    }
    
    }
    
    }

第三步是处理第一步返回的有效.swap文件集合。completeSwapOperations方法就是做这件事的:

  private def completeSwapOperations(swapFiles: Set[File]): Unit = {
    
    // 遍历所有有效.swap文件
    
    for (swapFile <- swapFiles) {
    
    val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, "")) // 获取对应的日志文件
    
    val baseOffset = offsetFromFile(logFile) // 拿到日志文件的起始位移值
    
    // 创建对应的LogSegment实例
    
    val swapSegment = LogSegment.open(swapFile.getParentFile,
    
    baseOffset = baseOffset,
    
    config,
    
    time = time,
    
    fileSuffix = SwapFileSuffix)
    
    info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.")
    
    // 执行日志段恢复操作
    
    recoverSegment(swapSegment)
    
    // We create swap files for two cases:
    
    // (1) Log cleaning where multiple segments are merged into one, and
    
    // (2) Log splitting where one segment is split into multiple.
    
    //
    
    // Both of these mean that the resultant swap segments be composed of the original set, i.e. the swap segment
    
    // must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion
    
    // of that segment was successful. In such an event, we should simply rename the .swap to .log without having to
    
    // do a replace with an existing segment.
    
    // 确认之前删除日志段是否成功,是否还存在老的日志段文件
    
    val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset).filter { segment =>
    
    segment.readNextOffset > swapSegment.baseOffset
    
    }
    
    // 将生成的.swap文件加入到日志中,删除掉swap之前的日志段
    
    replaceSegments(Seq(swapSegment), oldSegments.toSeq, isRecoveredSwapFile = true)
    
    }
    
    }

最后一步是recoverLog操作:

 private def recoverLog(): Long = {
        // if we have the clean shutdown marker, skip recovery
        // 如果不存在以.kafka_cleanshutdown结尾的文件。通常都不存在
        if (!hasCleanShutdownFile) {
          // 获取到上次恢复点以外的所有unflushed日志段对象
          val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).toIterator
          var truncated = false
    
    
          // 遍历这些unflushed日志段
          while (unflushed.hasNext && !truncated) {
            val segment = unflushed.next
            info(s"Recovering unflushed segment ${segment.baseOffset}")
            val truncatedBytes =
              try {
                // 执行恢复日志段操作
                recoverSegment(segment, leaderEpochCache)
              } catch {
                case _: InvalidOffsetException =>
                  val startOffset = segment.baseOffset
                  warn("Found invalid offset during recovery. Deleting the corrupt segment and " +
                    s"creating an empty one with starting offset $startOffset")
                  segment.truncateTo(startOffset)
              }
            if (truncatedBytes > 0) { // 如果有无效的消息导致被截断的字节数不为0,直接删除剩余的日志段对象
              warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}")
              removeAndDeleteSegments(unflushed.toList, asyncDelete = true)
              truncated = true
            }
          }
        }
    
    
        // 这些都做完之后,如果日志段集合不为空
        if (logSegments.nonEmpty) {
          val logEndOffset = activeSegment.readNextOffset
          if (logEndOffset < logStartOffset) { // 验证分区日志的LEO值不能小于Log Start Offset值,否则删除这些日志段对象
            warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
              "This could happen if segment files were deleted from the file system.")
            removeAndDeleteSegments(logSegments, asyncDelete = true)
          }
        }
    
    
        // 这些都做完之后,如果日志段集合为空了
        if (logSegments.isEmpty) {
        // 至少创建一个新的日志段,以logStartOffset为日志段的起始位移,并加入日志段集合中
          addSegment(LogSegment.open(dir = dir,
            baseOffset = logStartOffset,
            config,
            time = time,
            fileAlreadyExists = false,
            initFileSize = this.initFileSize,
            preallocate = config.preallocate))
        }
    
    
        // 更新上次恢复点属性,并返回
        recoveryPoint = activeSegment.readNextOffset
        recoveryPoint

总结

今天,我重点向你介绍了Kafka的Log源码,主要包括:

  1. Log文件的源码结构:你可以看下下面的导图,它展示了Log类文件的架构组成,你要重点掌握Log类及其相关方法。
  2. 加载日志段机制:我结合源码重点分析了日志在初始化时是如何加载日志段的。前面说过了,日志是日志段的容器,弄明白如何加载日志段是后续学习日志段管理的前提条件。

总的来说,虽然洋洋洒洒几千字,但我也只讲了最重要的部分。我建议你多看几遍Log.scala中加载日志段的代码,这对后面我们理解Kafka Broker端日志段管理原理大有裨益。在下节课,我会继续讨论日志部分的源码,带你学习常见的Kafka日志操作。

课后讨论

Log源码中有个maybeIncrementHighWatermark方法,你能说说它的实现原理吗?

欢迎你在留言区畅所欲言,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。

精选留言

  • 曾轼麟

    2020-04-18 22:13:51

    先回答老师的问题maybeIncrementHighWatermark的实现:
    【首先需要注意以下几个内容】:
    1、这个方法是通过leaderLog这个实例去调用的,当HW更新的时候follower就会更新自身的HW。

    2、leaderLog 是在Partition.scala中的,是分区维度的概念。

    3、maybeIncrementHighWatermark的入参是newHighWatermark,是新的HW标记,但是可能是更新也可能不是。

    【下面来说实现】
    在maybeIncrementHighWatermark中会先去判断新的newHighWatermark.messageOffset是否大于当前的LEO,如果大于肯定不合理,因为新的HW不可能跑在LEO前面

    然后获取当前高水位的偏移量和元数据。如果偏移元数据不是,已知,将在索引中执行查找并缓存结果,并赋值给oldHighWatermark。

    最后进行判断,如果(oldHighWatermark.messageOffset小于newHighWatermark.messageOffset)
    则跟新HW的元数据

    或者如果(oldHighWatermark.messageOffset 等于 newHighWatermark.messageOffset)并且当前segmentBaseOffset小于newHighWatermark.segmentBaseOffset
    也会更新HW的元数据

    最后这个过程是在synchronized的包围下进行的。
    作者回复

    👍👍👍

    2020-04-20 09:23:45

  • 胡夕

    2020-04-23 10:32:04

    你好,我是胡夕。我来公布上节课的“课后讨论”题答案啦~

    上节课,咱们重点了解了日志段对象,课后我让你思考下如果给定位移值过大truncateTo方法的实现。关于这个问题,我的看法很简单。如果truncateTo的输入offset过大以至于超过了该日志段当前最大的消息位移值,那么这个方法不会执行任何截断操作,因为不会有机会调用log.truncateTo(mapping.position)

    okay,你是怎么考虑的呢?我们可以一起讨论下。
  • Geek_47baf9

    2021-09-20 17:05:06

    在trunk分支提交记录(commitId=db1f581da7f3440cfd5be93800b4a9a2d7327a35)上 Log.scala已经在2021-08-13 7:10被重命名为UnifiedLog.scala,希望大家看专栏的时候注意点
    作者回复

    是啊,感慨Kafka代码依然在保持活力地更新着

    2021-10-12 10:13:04

  • 花开成海

    2020-04-30 23:39:30

    有两个疑问请老师帮忙看下:
    1、为什么broker重启要重建所有的索引文件?
    2、LogSegment的log: FileRecords 表示的是消息内容,加载日志段时候,什么机制使有限内存加载所有segment?
    作者回复

    1. 这些写的确实有点问题。应该是删除孤立的索引文件
    2. 应该说是JVM GC机制:) FileRocords表示日志段对象的底层物理文件,已加载完成的日志段对象对应的FileRecords是可以被GC的。

    2020-05-02 14:14:19

  • 你瞅啥?没见过这么帅的哈士奇吗

    2021-05-08 11:16:21

    清除上一次failure留下的文件,这个failure情况是指什么情况下发生的failure情况,是节点挂了重启时的failure还是其他情况,能举个🌰吗
    作者回复

    broker碰到的任何失败

    2021-05-10 10:28:19

  • 北纬8℃

    2020-04-16 09:24:24

    太难了😂
    作者回复

    哈哈,坚持坚持!

    2020-04-16 09:53:29

  • 云超

    2020-10-19 23:39:16

    老师,问一个语法上的问题,在上面的代码片段中的locally{xxx} ,这个locally的作用是什么啊?
    作者回复

    类似于java中的静态代码块,多用于类或Object初始化执行一段代码之用

    2020-10-22 16:14:09

  • 张子涵

    2020-07-24 13:35:41

    def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
    if (newHighWatermark.messageOffset > logEndOffset) //对高水位的值进行判断,如果new高水位大于LEO,则报错
    throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
    s"log end offset $logEndOffsetMetadata")
    lock.synchronized {
    val oldHighWatermark = fetchHighWatermarkMetadata
    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
    (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
    //若old高水位offset小于new高水位offset
    // 或者(old高水位offset等于new高水位offset,且当前segmentBaseOffset小于new高水位的segmentBaseOffset)
    //则更新高水位值
    updateHighWatermarkMetadata(newHighWatermark)
    Some(oldHighWatermark)
    } else {
    None
    }
    }
    }
  • 小虞

    2020-04-26 17:23:37

    先回答老师的问题:
    如果HW大于LEO,那么直接抛出IllegalArgumentException异常;
    否则做如下操作:
    1. 拿到HW的LogOffsetMetadata
    2. 当存在以下情况的时候,更新HW,并且返回更新之前的HW
    a. old HW的位移小于new HW
    b. old HW的位移等于new HW, 并且old HW的基准位移小于new HW的基准位移,这种情况说明,new HW对应的segment是一个新的segment
    3. 如果old HW的位移大于等于new HW,直接返回None

    再请教老师一个问题,LeaderEpochFileCache中有这样一行代码:
    private var epochs: ArrayBuffer[EpochEntry] = inWriteLock(lock) ......
    其实就是初始化epochs,我理解这边是读leader-epoch-checkpoint文件,为什么要用inWriteLock写锁,而不是inReadLock。
    作者回复

    首先我要声明,源码写得也不一定就是对的!我们完全可以针对源码中可能的问题开放讨论哈。

    在LeaderEpochFileCache中,针对epochs的更新和赋值全部都在write lock下,读取epoch用read lock。

    2020-04-27 09:37:06

  • 曾轼麟

    2020-04-18 22:27:55

    老师下面我想问一下我的一些问题:
    1、为什么要遍历两次文件路径呢?我看了一下,如果在删除的时候顺便去加载segment会有什么问题吗?这样是否可以提高加载效率呢?
    2、我看了一下在removeTempFilesAndCollectSwapFiles方法中minCleanedFileOffset是从文件名filename上面读取的,如果我修改了文件名的offset大小会出现什么意想不到的情况呢?
    3、我发现segments是使用ConcurrentNavigableMap,而这里的ConcurrentNavigableMap是使用JDK的ConcurrentSkipListMap,使用跳表的目的是为了方便使用offset范围查询segments中的对象吗?
    作者回复

    1. 就我个人而言,我觉得也没有什么问题。我觉得作者更多是为了把不同逻辑进行了分组导致遍历多次
    2. 可能造成Broker的崩溃,无法启动。因为我们公司有小伙伴这么干过:(
    3. 对的,这样可以快速根据给定offset找到对应的一上一下日志段对象

    2020-04-20 09:23:35

  • 我是小队长

    2020-04-17 16:00:30

    // 这些都做完之后,如果日志段集合不为空
    // 验证分区日志的LEO值不能小于Log Start Offset值,否则删除这些日志段对象


    想问下什么时候才会出现这种情况呢?
    作者回复

    当底层日志文件被删除或损坏的话就可能出现这种情况,因为无法读取文件去获取LEO了。你可以用2.0版本做个试验:
    1. 发消息到分区日志
    2. 使用Admin的DeleteRecords命令驱动Log start offset前进
    3. 关闭Broker
    4. 删除日志路径
    5. 重启Broker

    2020-04-20 09:37:21

  • 我是小队长

    2020-04-17 15:36:59

    老师,找出需要恢复的swap,直接恢复完成不就行了么,为什么还有下面的操作呢?


    // We create swap files for two cases:
    // (1) Log cleaning where multiple segments are merged into one, and
    // (2) Log splitting where one segment is split into multiple.
    //
    // Both of these mean that the resultant swap segments be composed of the original set, i.e. the swap segment
    // must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion
    // of that segment was successful. In such an event, we should simply rename the .swap to .log without having to
    // do a replace with an existing segment.
    // 确认之前删除日志段是否成功,是否还存在老的日志段文件
    val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset).filter { segment =>
    segment.readNextOffset > swapSegment.baseOffset
    }
    // 如果存在,直接把.swap文件重命名成.log
    replaceSegments(Seq(swapSegment), oldSegments.toSeq, isRecoveredSwapFile =
    作者回复

    首先,不管是否要过滤出符合条件的oldSegments,回复之后都要进行替换,这个你实际上这是因为升级Broker版本而做的防御性编程。在老的版本中,代码写入消息后并不会对位移值进行校验。因此log cleaner老代码可能写入一个非常大的位移值(大于Int.MAX_VALUE)。当broker升级后,这些日志段就不能正常被compact了,因为位移值越界了(新版本加入了位移校验)

    代码需要去搜寻在swap start offset和swap LEO之间的所有日志段对象,但这还不够,还要保证这些日志段的LEO比swap的base offset大才行是同意的吧?否则

    2020-04-20 09:57:04

  • 张三丰

    2023-04-10 10:19:06

    老师能否解释下为何加载完日志段还要恢复日志段呢? 加载难道不是加载了所有日志段么?
  • loserwang,no ssp

    2022-12-03 17:32:02

    maybeIncrementHighWatermark用到了lock, 看注释是对log的所有update都用这个锁,是不是锁的粒度比较高,而且读的时候不用锁是否会不一致(ps:对锁这一块一向不清楚)
  • 。。。

    2022-07-19 19:45:33

    恢复点是啥啊 文中好像没有提到老师
  • qgaye

    2021-12-22 01:59:41

    // 从待恢复swap集合中找出那些起始位移值大于minCleanedFileOffset值的文件,直接删掉这些无效的.swap文件
    请问为什么要删除位移值大于minCleanedFileOffset的文件呢
  • 邓斌

    2021-05-12 09:14:08

    你好,我的Windows10系统在启动源码的时候出现java.nio.file.AccessDeniedException异常,错误信息:Error while writing to checkpoint file C:\tmp\kafka\kafka-logs\recovery-point-offset-checkpoint (kafka.server.LogDirFailureChannel)和
    Failed to create or validate data directory C:\tmp\kafka-logs (kafka.server.LogDirFailureChannel)。
    我已经排查了目录和文件的权限都是完全控制,甚至重装系统都没有解决,请教一下,这个错误还有什么办法解决吗?
    作者回复

    最好还是别放在windows上了,有各种各样的问题

    2021-05-12 19:29:08

  • 对与错

    2020-09-25 17:37:16

    请问分区与Log之间的关系是一对多吗?
    作者回复

    一对一。log下面分多个segment

    2020-09-27 09:33:17

  • 鲁·本

    2020-08-25 16:04:36

    用注释的方式给出我的理解:

    def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
    //如果期望设置的高水位offset > endOffset 无效值,抛出异常
    if (newHighWatermark.messageOffset > logEndOffset)
    throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
    s"log end offset $logEndOffsetMetadata")

    lock.synchronized {
    //获取老的高水位元数据
    val oldHighWatermark = fetchHighWatermarkMetadata

    // Ensure that the high watermark increases monotonically. We also update the high watermark when the new
    // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
    //以下场景会执行高水位元数据的更新
    //如果老的高水位offset < 新的高水位offset 或者
    //老的高水位offset == 新的高水位offset 并且 老的高水位元数据的baseoff小于新的高水位元数据的baseoffset(也就是虽然高水位没变,但是日志段发生了滚动)
    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
    (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
    updateHighWatermarkMetadata(newHighWatermark)
    Some(oldHighWatermark)
    } else {
    None
    }
    }
    }
  • 三颗豆子

    2020-08-18 20:07:06

    请问一下:
    1、检查点是根据什么依据来建立的呢?是定时任务,还是什么。
    2、“unflushed日志段”记录的是未提交位移的日志吗?如果是已经提交位移的日志,删掉这些日志段对象,不就映射不到这些日志了吗?如果是未提交位移的日志,那已经提交位移的日志一定在recoveryPoint之内吗,为什么?
    谢谢。
    作者回复

    1、定时任务
    2、unflushed记录的是未写入到检查点文件的日志段。

    2020-08-20 09:06:31