男人的av一区二区资源,亚洲日韩国产精品无码av,蜜桃久久久aaaa成人网一区,亚洲日韩中文字幕一区,在线观看国产亚洲视频免费

【Redis源碼】Redis事件監聽(tīng)

pic

簡(jiǎn)介

Redis服務(wù)器是典型的事件驅動(dòng)程序,而事件又分為文件事件(socket的可讀可寫(xiě)事件)與時(shí)間事件(定時(shí)任務(wù))兩大類(lèi)。無(wú)論是文件事件還是時(shí)間事件都封裝在結構體aeEventLoop中:

typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;
  • stop標識事件循環(huán)是否結束;
  • events為文件事件數組,存儲已經(jīng)注冊的文件事件;
  • fired存儲被觸發(fā)的文件事件;Redis有多個(gè)定時(shí)任務(wù),因此理論上應該有多個(gè)時(shí)間事件,多個(gè)時(shí)間事件形成鏈表,
  • timeEventHead即為時(shí)間事件鏈表頭節點(diǎn);Redis服務(wù)器需要阻塞等待文件事件的發(fā)生,進(jìn)程阻塞之前會(huì )調用beforesleep函數,進(jìn)程因為某種原因被喚醒之后會(huì )調用aftersleep函數。Redis底層可以使用4種I/O多路復用模型(kqueue、epoll等),
  • apidata是對這4種模型的進(jìn)一步封裝。

事件驅動(dòng)程序通常存在while/for循環(huán),循環(huán)等待事件發(fā)生并處理,Redis也不例外,其事件循環(huán)如下:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

函數aeProcessEvents為事件處理主函數,其第2個(gè)參數是一個(gè)標志位,AE_ALL_EVENTS表示函數需要處理文件事件與時(shí)間事件,AE_CALL_AFTER_SLEEP表示阻塞等待文件事件之后需要執行aftersleep函數。

文件監聽(tīng)事件

Redis客戶(hù)端通過(guò)TCP socket與服務(wù)端交互,文件事件指的就是socket的可讀可寫(xiě)事件。socket讀寫(xiě)操作有阻塞與非阻塞之分。采用阻塞模式時(shí),一個(gè)進(jìn)程只能處理一條網(wǎng)絡(luò )連接的讀寫(xiě)事件,為了同時(shí)處理多條網(wǎng)絡(luò )連接,通常會(huì )采用多線(xiàn)程或者多進(jìn)程,效率低下;非阻塞模式下,可以使用目前比較成熟的I/O多路復用模型,如select/epoll/kqueue等,視不同操作系統而定。

根據操作系統選擇I/O復用模型,選擇順序為epoll、kqueue、select。

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

以epoll為例,aeApiCreate函數是對epoll_create的封裝;aeApiAddEvent函數用于添加事件,是對epoll_ctl的封裝;aeApiDelEvent函數用于刪除事件,是對epoll_ctl的封裝;aeApiPoll是對epoll_wait的封裝。

static int aeApiCreate(aeEventLoop *eventLoop);
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);

4個(gè)函數的輸入參數含義如下。

  • eventLoop:事件循環(huán),與文件事件相關(guān)的最主要字段有3個(gè),
  • apidata指向I/O多路復用模型對象,注意4種I/O多路復用模型對象的類(lèi)型不同,因此此字段是void*類(lèi)型;
  • events存儲需要監控的事件數組,以socket文件描述符作為數組索引存取元素;fired存儲已觸發(fā)的事件數組。

時(shí)間監聽(tīng)事件

前面介紹了Redis文件事件,已經(jīng)知道事件循環(huán)執行函數aeProcessEvents的主要邏輯:①查找最早會(huì )發(fā)生的時(shí)間事件,計算超時(shí)時(shí)間;②阻塞等待文件事件的產(chǎn)生;③處理文件事件;④處理時(shí)間事件。時(shí)間事件的執行函數為processTimeEvents。

Redis服務(wù)器內部有很多定時(shí)任務(wù)需要執行,比如定時(shí)清除超時(shí)客戶(hù)端連接,定時(shí)刪除過(guò)期鍵等,定時(shí)任務(wù)被封裝為時(shí)間事件aeTimeEvent對象,多個(gè)時(shí)間事件形成鏈表,存儲在aeEventLoop結構體的timeEventHead字段,它指向鏈表首節點(diǎn)。時(shí)間事件aeTimeEvent定義如下:

typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
    int refcount; /* refcount to prevent timer events from being
  		   * freed in recursive time event calls. */
} aeTimeEvent;

各字段含義如下。

  • id:時(shí)間事件唯一ID,通過(guò)字段eventLoop->timeEventNextId實(shí)現;
  • when_sec與when_ms:時(shí)間事件觸發(fā)的秒數與毫秒數;
  • timeProc:函數指針,指向時(shí)間事件處理函數;
  • finalizerProc:函數指針,刪除時(shí)間事件節點(diǎn)之前會(huì )調用此函數;
  • clientData:指向對應的客戶(hù)端對象;
  • prev:指向上一個(gè)時(shí)間事件節點(diǎn)。
  • next:指向下一個(gè)時(shí)間事件節點(diǎn)。

時(shí)間事件執行函數processTimeEvents的處理邏輯比較簡(jiǎn)單,只是遍歷時(shí)間事件鏈表,判斷當前時(shí)間事件是否已經(jīng)到期,如果到期則執行時(shí)間事件處理函數timeProc:

static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    time_t now = time(NULL);

    /* 校對系統時(shí)間 */
    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    eventLoop->lastTime = now;

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    while(te) {
        long now_sec, now_ms;
        long long id;

        /* 刪除已經(jīng)指定要刪除的事件 */
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            if (te->refcount) {
                te = next;
                continue;
            }
            if (te->prev)
                te->prev->next = te->next;
            else
                eventLoop->timeEventHead = te->next;
            if (te->next)
                te->next->prev = te->prev;
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }

        /* 確保只執行Redis自己創(chuàng  )建的時(shí)間事件 */
        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;

            id = te->id;
            te->refcount++;
            // 處理時(shí)間事件
            retval = te->timeProc(eventLoop, id, te->clientData);
            te->refcount--;
            processed++;
            // 重新設置時(shí)間事件到期時(shí)間
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        te = te->next;
    }
    return processed;
}   

注意時(shí)間事件處理函數timeProc返回值retval,其表示此時(shí)間事件下次應該被觸發(fā)的時(shí)間,單位為毫秒,且是一個(gè)相對時(shí)間,即從當前時(shí)間算起,retval毫秒后此時(shí)間事件會(huì )被觸發(fā)。

Redis創(chuàng )建時(shí)間事件節點(diǎn)的函數為aeCreateTimeEvent,內部實(shí)現非常簡(jiǎn)單,只是創(chuàng )建時(shí)間事件并添加到時(shí)間事件鏈表。aeCreateTimeEvent函數定義如下:

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc);

各字段含義如下。

  • eventLoop:輸入參數指向事件循環(huán)結構體;
  • milliseconds:表示此時(shí)間事件觸發(fā)時(shí)間,單位毫秒,注意這是一個(gè)相對時(shí)間,即從當前時(shí)間算起,milliseconds毫秒后此時(shí)間事件會(huì )被觸發(fā);
  • proc:指向時(shí)間事件的處理函數;
  • clientData:指向對應的結構體對象;
  • finalizerProc:同樣是函數指針,刪除時(shí)間事件節點(diǎn)之前會(huì )調用此函數。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    run_with_period(100) {
       //100毫秒周期執行
    }
    run_with_period(5000) {
      //5000毫秒周期執行
    }

    /* Show information about connected clients */
    if (!server.sentinel_mode) {
        run_with_period(5000) {
            serverLog(LL_DEBUG,
                "%lu clients connected (%lu replicas), %zu bytes in use",
                listLength(server.clients)-listLength(server.slaves),
                listLength(server.slaves),
                zmalloc_used_memory());
        }
    }

    /* 清除無(wú)用客戶(hù)端連接 */
    clientsCron();

    /* 執行數據庫定時(shí)任務(wù) */
    databasesCron();

    /* 檢查是否手動(dòng)執行了AOF寫(xiě)入命令*/
    if (!hasActiveChildProcess() &&
        server.aof_rewrite_scheduled)
    {
        rewriteAppendOnlyFileBackground();
    }

    /* 檢查AOF寫(xiě)入是否完成等 */
    if (hasActiveChildProcess() || ldbPendingChildren())
    {
        checkChildrenDone();
    } else {
  
    }

    /* AOF定時(shí)寫(xiě)入磁盤(pán) */
    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);

    /* 定時(shí)檢查AOF寫(xiě)入結果*/
    run_with_period(1000) {
        if (server.aof_last_write_status == C_ERR)
            flushAppendOnlyFile(0);
    }

    /* 清理無(wú)用的客戶(hù)端連接 */
    clientsArePaused(); /* Don't check return value, just use the side effect.*/

    /* cluster 定時(shí)檢查 */
    run_with_period(1000) replicationCron();

    /* 運行cluster定時(shí)任務(wù) */
    run_with_period(100) {
        if (server.cluster_enabled) clusterCron();
    }

    /* 哨兵模式定時(shí)任務(wù)執行 */
    if (server.sentinel_mode) sentinelTimer();

    /* Cleanup expired MIGRATE cached sockets. */
    run_with_period(1000) {
        migrateCloseTimedoutSockets();
    }

    /* 停止無(wú)用的I/O線(xiàn)程 */
    stopThreadedIOIfNeeded();

    /* 定時(shí)檢查slot信息 */
    if (server.tracking_clients) trackingLimitUsedSlots();

    /*定時(shí)模塊定時(shí)任務(wù)執行 */
    RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz};
    moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP,
                          0,
                          &ei);

    server.cronloops++;
    return 1000/server.hz;
}


標 題:《【Redis源碼】Redis事件監聽(tīng)
作 者:zeekling
提 示:轉載請注明文章轉載自個(gè)人博客:浪浪山旁那個(gè)村

    評論
    0 評論
avatar

取消
男人的av一区二区资源,亚洲日韩国产精品无码av,蜜桃久久久aaaa成人网一区,亚洲日韩中文字幕一区,在线观看国产亚洲视频免费