教育行業(yè)A股IPO第一股(股票代碼 003032)

全國咨詢/投訴熱線:400-618-4000

watermark的作用是什么?怎樣保證數據不丟失?

更新時間:2023年10月10日10時52分 來源:傳智教育 瀏覽次數:

好口碑IT培訓

  在大數據處理中,watermark是一種時間概念,用于衡量事件流數據的進度。它的作用是為了控制事件時間窗口的計算進度以及處理延遲。

  具體而言,watermark可以把事件流數據按照事件發(fā)生的時間進度劃分到不同的時間窗口中。在處理數據的過程中,必須要等到一個時間窗口的所有數據都到達后才能進行計算。而watermark就是用來判定一個時間窗口內的數據是否已經全量到達的標志。

  保證數據不丟失的關鍵是通過合理設置watermark的生成和處理機制。在生成watermark的過程中,可以基于事件數據中的時間戳信息來確定watermark的位置。而在處理時,可以通過比較watermark和事件時間戳的關系,判斷事件數據是否落后于watermark,如果落后則說明有數據丟失。

  以下是使用Apache Flink的Java API示例代碼,展示如何在流式處理中使用Watermark來控制事件時間窗口的計算進度。

// 導入必要的包
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

public class WatermarkExample {

    public static void main(String[] args) throws Exception {
        // 設置流式執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 設置時間特性為事件時間
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 創(chuàng)建數據源
        DataStream<Event> events = env.fromElements(
                new Event(1, "2021-01-01T00:00:00"),
                new Event(2, "2021-01-01T00:02:00"),
                new Event(3, "2021-01-01T00:01:30")
        );

        // 使用Watermark來指定事件時間
        events.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Event>() {
            private final long maxOutOfOrderness = 5000; // 最大亂序程度為5秒
            private long currentMaxTimestamp;

            @Override
            public long extractTimestamp(Event event, long previousElementTimestamp) {
                long timestamp = event.getTimestamp().toEpochMilli();
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                return timestamp;
            }

            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
            }
        });

        // 在這里添加更多的流處理操作,如窗口計算、聚合等

        // 執(zhí)行流式處理
        env.execute("Watermark Example");
    }

    // 定義事件類
    public static class Event {
        private int id;
        private LocalDateTime timestamp;

        public Event(int id, String timestamp) {
            this.id = id;
            this.timestamp = LocalDateTime.parse(timestamp);
        }

        public int getId() {
            return id;
        }

        public LocalDateTime getTimestamp() {
            return timestamp;
        }
    }
}

  在上面的示例中,我們首先設置了流式執(zhí)行環(huán)境,并將時間特性設置為事件時間。然后,我們創(chuàng)建了一個包含三個事件的數據源,并為每個事件指定了事件時間戳。接下來,我們使用AssignerWithPeriodicWatermarks函數來為事件分配時間戳和Watermark。在這個函數中,我們定義了如何提取事件的時間戳,并根據最大亂序程度計算Watermark。最后,我們可以在assignTimestampsAndWatermarks方法后添加更多的流處理操作,如窗口計算、聚合等。

  為了更好地保證數據不丟失,還可以采取一些策略來處理數據落后的情況,比如等待一段時間以等待可能的延遲數據到達,或者設置數據的最大亂序程度,超過亂序程度的數據將被丟棄。同時,還可以通過設置watermark的間隔時間來控制事件時間窗口的大小,以適應不同的處理延遲需求。

0 分享到:
和我們在線交談!