2014/09/24

FreeRTOS queue management

1. Introduction


queue 用於 task 之間的溝通, 每個queue能放有限的item
item的大小稱為size, item的數量稱為length
當 queue 被建立時, length & size都會設定好

通常 queue 使用 FIFO, 資料從tail寫入, 從head讀出, (但也有可能從head寫入)
寫入的時候, 會copy data到queue裡面
讀出的時候, 會copy data出來, 並且queue的那一份會刪掉

queue 本身就像個物件, 會自行管理queue的相關功能,
所以多個task可以同時寫入同一個queue, 也可以多個task讀出同一個queue
queue通常有多個writer, 但不常有多個reader
( 這裡的writer指的是嘗試寫入queue的接口, reader指的是嘗試從queue讀出的接口 )

每當 task 嘗試從 queue裡讀出data, 需要設定 block time, 這是因為如果 queue 是空的, task就會進入block state, 直到 bloke time timeout 而回到 ready state, 或者是有其它 task / interrupt 寫入data到queue裡, 這樣也會讓 task 回到 ready state

如果 queue 有多個 reader, 並且有多個 task 都在等空的 queue, 那麼當有新的 data 進來, 只會有一個 task變成 ready state, queue會選出priority最高的task, 如果有多個task有一樣的priority, 那麼會選等最久的

當 task 嘗試寫入 data 到 queue裡, 也需要設定 block time, 這是因為如果 queue 滿了, task就會進入 block state, 直到 block time timeout 而回到 ready state, 或者這期間 queue 騰出空間而讓task回到 ready state

如果 queue 有多個 writer, 並且有多個 task 嘗試寫入滿的 queue, 那麼當空間騰出來時, priority最高的task會變成ready state, 如果有多個 task 有一樣的 priority, 那麼會選等最久的

2. using a queue


2.1 create a queue


xQueueHandle xQueueCreate( unsigned portBASE_TYPE uxQueueLength,
                           unsigned portBASE_TYPE uxItemSize
                           );

其中

  • uxQueueLength : max queue length
  • uxItemSize : item size, 單位是 byte

如果 RAM size 不夠,就會回傳 NULL,
如果成功, 回傳 queue handle

2.2 send data to queue


portBASE_TYPE xQueueSendToBack( xQueueHandle xQueue,
                                const void * pvItemToQueue,
                                portTickType xTicksToWait
                                );

xQueueSendToBack() 會把 item 放到 queue 的後面 (tail), xQueueSend() 跟 xQueueSendToBack() 是同樣的function

portBASE_TYPE xQueueSendToFront( xQueueHandle xQueue,
                                 const void * pvItemToQueue,
                                 portTickType xTicksToWait
                                 );

xQueueSendToFromt 把 data 送到 queue 的前端 (head)

其中它的參數 :

  • xTicksToWait : 如果想讓 send 一直等下去, 可以把 xTicksToWait 設成 portMAX_DELAY, 如果不想等, 就把 xTicksToWait 設成 0

可能的回傳值 :

  • pdPASS : 表示成功寫入, 即使task是在block time timeout之前寫入也算是成功寫入
  • errQUEUE_FULL : 失敗, 如果 xTicksToWait > 0, 那麼表示在 block time timeout之前都沒辦法寫入

2.3 receive data from queue


portBASE_TYPE xQueueReceive( xQueueHandle xQueue,
                             const void * pvBuffer,
                             portTickType xTicksToWait
                             );
xQueueReceive 從 queue head 讀一個 item, 並把 queue 裡的 item 刪掉

portBASE_TYPE xQueuePeek( xQueueHandle xQueue,
                          const void * pvBuffer,
                          portTickType xTicksToWait
                          );

xQueuePeek 從 queue head 讀一個item, 但不會刪掉 item

可能的回傳值 :

  • pdPASS : 表示成功讀出, 即使 task 在 block time timeout 之前讀出也算成功讀出
  • errQUEUE_EMPTY : 失敗, 如果 xTicksToWait > 0, 表示在 block time timeout 之前沒辦法讀出

2.4 query queue


unsigned portBASE_TYPE uxQueueMessagesWaiting( xQueueHandle xQueue );

uxQueueMessagesWaiting 可以拿到目前 queue 裡的 items 數量

要注意的是, 這個不能在interrupt裡面使用,
interrupt應該使用 uxQueueMessagesWaitingFromISR()

3. sample code


/* 宣告一個queue */
xQueueHandle xQueue;

static void vSenderTask( void *pvParameters )
{
    long lValueToSend;
    portBASE_TYPE xStatus;

    /* 我們把要寫入 queue 的資料從 task 的 parameter 帶入, 型態為 long */
    lValueToSend = ( long ) pvParameters;

    for( ;; )
    {
        /* 把data寫入queue, 第1個參數是要寫入的queue,
           第2個參數是要寫入的item pointer, 第3個參數是block time */
        xStatus = xQueueSendToBack( xQueue, &lValueToSend, 0 );
        if( xStatus != pdPASS )
        {
            vPrintString( "Could not send to the queue.\r\n" );
        }

        /* taskYIELD() 通知 scheduler switch 到別的task */
        taskYIELD();
    }
}

static void vReceiverTask( void *pvParameters )
{
    long lReceivedValue;
    portBASE_TYPE xStatus;
    const portTickType xTicksToWait = 100 / portTICK_RATE_MS;

    for( ;; )
    {
        /* 檢查 queue 裡面的 item 數量 */
        if( uxQueueMessagesWaiting( xQueue ) != 0 )
        {
            vPrintString( "Queue should have been empty!\r\n" );
        }

        /* 從 queue 裡讀出data, 第1個參數是要讀出data的queue,
           第2個參數是放 data 的 pointer, 第3個參數是 block time */
        xStatus = xQueueReceive( xQueue, &lReceivedValue, xTicksToWait );
        if( xStatus == pdPASS )
        {
            /* 成功讀出 data, 把它印出來 */
            vPrintStringAndNumber( "Received = ", lReceivedValue );
        }
        else
        {
            /* 100ms內沒有讀出data */
            vPrintString( "Could not receive from the queue.\r\n" );
        }
    }
}

int main( void )
{
    /* 建 queue, 設定成長度為5個item, item大小為 sizeof(long) */
    xQueue = xQueueCreate( 5, sizeof( long ) );
    if( xQueue != NULL )
    {
        /* 建2個 task, 分別對 queue 寫入 data, 
           data 來自於 task 的 parameter, 也就是100和200 */
        xTaskCreate( vSenderTask, "Sender1", 1000, ( void * ) 100, 1, NULL );
        xTaskCreate( vSenderTask, "Sender2", 1000, ( void * ) 200, 1, NULL );

        /* 建1個 task 從 queue 讀出 data, 它的priority比較高, 
           但是在 xQueueReceive(), 如果 queue 裡沒 data, 它就會進 block state */
        xTaskCreate( vReceiverTask, "Receiver", 1000, NULL, 2, NULL );

        vTaskStartScheduler();
    }
    else
    {
        /* 建 queue 失敗 */
    }

    /* we should never reach here */
    for( ;; );
}

4. store compound data


有時候對 receiver task 來說, 它需要知道 sender 是誰, 我們可以把 data type 設計成 struct, 並且裡面加上 sender information

另一種情況是, 如果我們要在 queue 裡放大的 data, 改成用指向該 data 的 pointer 會比較好, 這樣可以避免在寫入和讀出時 copy 大量的 data, 但是要注意 memory 的 allocation & release,




沒有留言:

張貼留言