+7 (812) 670-9095
Обратная связьEnglish
Главная → Статьи → Системное ПО → Статья #23. Очереди: введение и базовые службы
Версия для печати

Статья #23. Очереди: введение и базовые службы

10 декабря 2018

Очереди уже были упомянуты в одной из предыдущих статей (#5). Они предоставляют более гибкий способ передачи простых сообщений между задачами по сравнению с почтовыми ящиками.




Статья #23. Очереди: введение и базовые службы


Использование очередей


В Nucleus SE очереди определяются на этапе сборки. В приложении может быть до 16 очередей. Если очереди в приложении отсутствуют, ни структуры данных, ни служебный код, относящиеся к очередям, в приложение не включаются.

Очередь — это набор областей в памяти, размера которых достаточно для одного элемента типа ADDR и безопасный доступ к которым контролируется таким образом, чтобы его могли пользоваться несколько задач. Задачи могут записывать данные в очередь до тех пор, пока все области не будут заполнены. Задачи могут читать данные из очереди, причем данные обычно поступают по принципу FIFO (First-in-First-Out). Попытка записать данные в переполненную очередь или прочитать данные из пустой очереди может привести к ошибке или приостановке задачи, в зависимости от выбранных параметров вызовов API и конфигурации Nucleus SE.


Очереди и каналы передачи данных


Nucleus SE поддерживает каналы передачи данных, которые также были упомянуты в одной из предыдущих статей (#5) и будут подробно рассмотрены в одной из следующих. Основное отличие очередей от каналов – размер сообщения. Очереди содержат сообщения, состоящие из одной переменной типа ADDR (обычно это указатели). Канал содержит сообщения произвольного размера, индивидуального для каждого канала в приложении и назначаемого во время настройки параметров.


Настройка очередей

Количество очередей


Как и для большинства объектов Nucleus SE, настройка очередей в основном управляется директивами #define в файле nuse_config.h. Основным параметром является NUSE_QUEUE_NUMBER, который определяет количество сконфигурированных очередей в приложении. Значение по умолчанию равно нулю (то есть в приложении нет очередей) и может принимать значения вплоть до 16. Некорректное значение приведет к ошибке во время компиляции, которая будет сгенерирована при проверке в файле nuse_config_check.h (он входит в файл nuse_config.c и компилируется вместе с ним), что приведет к срабатыванию директивы #error.

Выбор ненулевого значения служит главным активатором для очередей. Этот параметр используется при определении структур данных и от его значения зависит их размер (более подробно об этом в следующей статье). Кроме того, ненулевое значение активирует настройки API.


Активация вызовов API


Каждая функция API (служебный вызов) в Nucleus SE имеет активирующую директиву #define в nuse_config.h. Для очередей такими директивами являются:


NUSE_QUEUE_SEND
NUSE_QUEUE_RECEIVE
NUSE_QUEUE_JAM
NUSE_QUEUE_RESET
NUSE_QUEUE_INFORMATION
NUSE_QUEUE_COUNT


По умолчанию им присваивается значение FALSE, таким образом отключая все служебные вызовы и блокируя включение реализующего их кода. Для настройки очередей в приложении нужно выбрать необходимые вызовы API и присвоить им значение TRUE.

Ниже приведен фрагмент кода из файла nuse_config.h:



#define NUSE_QUEUE_NUMBER    0  /* Number of queues in the
                                   system - 0-16 */
                                /* Service call enablers */
#define NUSE_QUEUE_SEND         FALSE 
#define NUSE_QUEUE_RECEIVE      FALSE
#define NUSE_QUEUE_JAM          FALSE
#define NUSE_QUEUE_RESET        FALSE
#define NUSE_QUEUE_INFORMATION  FALSE
#define NUSE_QUEUE_COUNT        FALSE


Если функции API очередей активированы, но в приложении нет очередей (кроме NUSE_Queue_Count(), которая всегда разрешена), появится ошибка компиляции. Если ваш код использует вызов API, который не был активирован, это вызовет ошибку компоновки, так как реализующий код не был включен в приложение.


Служебные вызовы очередей


Nucleus RTOS поддерживает десять служебных вызовов, связанных с очередями, которые предоставляют следующий функционал:

  • Постановка сообщения в очередь. В Nucleus SE реализовано в функции NUSE_Queue_Send().
  • Принятие сообщения из очереди. В Nucleus SE реализовано в функции NUSE_Queue_Receive().
  • Постановка сообщения в голову очереди. В Nucleus SE реализовано в NUSE_Queue_Jam().
  • Восстановление очереди в неиспользуемое состояние с освобождением всех приостановленных задач (сброс). В Nucleus SE реализовано в NUSE_Queue_Reset().
  • Предоставление информации об определенной очереди. В Nucleus SE реализовано в NUSE_Queue_Information().
  • Возврат количества сконфигурированных на данный момент в приложении очередей. В Nucleus SE реализовано в NUSE_Queue_Count().
  • Добавление новой очереди в приложение (создание очереди). В Nucleus SE не реализовано.
  • Удаление очереди из приложения. В Nucleus SE не реализовано.
  • Возврат указателей на все очереди в приложении. В Nucleus SE не реализовано.
  • Отправка сообщения всем приостановленным в очереди задачам (broadcast). В Nucleus SE не реализовано.

Реализация каждого из этих служебных вызовов подробно описана ниже.

Служебные вызовы для записи и чтения из очередей


Базовыми операциями, которые выполняются над очередями, являются запись (которую иногда называют постановкой сообщений в очередь) и чтение (также известное как прием сообщений). Также есть возможность записи в начало очереди (jamming). Nucleus RTOS и Nucleus SE предоставляют три базовых вызова API для этих операций, которые будут рассмотрены ниже.


Запись в очередь


Служебный вызов Nucleus RTOS API для записи в очередь очень гибкий и позволяет приостанавливать задачу неявным образом, либо с определенным таймаутом, если операцию нельзя завершить немедленно (например, при попытке записи в заполненную очередь). Nucleus SE предоставляет те же функции, но приостановка задач опциональна, а таймаут не реализован.


Вызов для постановки сообщения в очередь в Nucleus RTOS
Прототип служебного вызова:
STATUS NU_Send_To_Queue(NU_QUEUE *queue, VOID *message, UNSIGNED size, UNSIGNED suspend);

Параметры:

queue – указатель на блок управления очередью, предоставленный пользователем;
message – указатель на отправляемое сообщение;
size – количество элементов данных типа UNSIGNED в сообщении. Если очередь поддерживает сообщения переменной длины, этот параметр должен быть равен размеру сообщению или быть меньше размера сообщение, поддерживаемого очередью. Если очередь поддерживает сообщения фиксированного размера, этот параметр должен точно совпадать с размером сообщения, поддерживаемого очередью;
suspend – спецификация приостановки задачи, может принимать значения NU_NO_SUSPEND или NU_SUSPEND или значение таймаута.


Возвращаемое значение:
NU_SUCCESS – вызов был успешно завершен;
NU_INVALID_QUEUE – некорректный указатель на очередь;
NU_INVALID_POINTER – нулевой указатель на сообщение (NULL);
NU_INVALID_SIZE – размер сообщения несовместим с размером сообщения, поддерживаемым очередью;
NU_INVALID_SUSPEND – приостановка была выполнена из не связанного с задачей потока;
NU_QUEUE_FULL – очередь переполнена, а приостановка не была указана;
NU_TIMEOUT – очередь переполнена даже после приостановки задачи на указанный таймаут;
NU_QUEUE_DELETED – очередь была удалена, пока задача была приостановлена;
NU_QUEUE_RESET – очередь была сброшена, пока задача была приостановлена.

Вызов для постановки сообщения в очередь в Nucleus SE
Этот служебный вызов API поддерживает основной функционал Nucleus RTOS API.

Прототип служебного вызова:
STATUS NUSE_Queue_Send(NUSE_QUEUE queue, ADDR *message, U8 suspend);

Параметры:
queue – индекс (ID) очереди;
message – указатель на отправляемое сообщение, является одной переменной типа ADDR;
suspend – спецификация для приостановки задач, может принимать значения NUSE_NO_SUSPEND или NUSE_SUSPEND.

Возвращаемое значение:
NUSE_SUCCESS – вызов был успешно завершен;
NUSE_INVALID_QUEUE – некорректный индекс очереди;
NUSE_INVALID_POINTER – нулевой указатель на сообщение (NULL);
NUSE_INVALID_SUSPEND – попытка приостановки задачи из не связанного с задачей потока или при отключенных служебных вызовах API для блокировки задач;
NUSE_QUEUE_FULL – очередь переполнена, а приостановка не была указана;
NUSE_QUEUE_WAS_RESET – очередь была сброшена, пока задача была приостановлена.


Реализация постановки сообщений в очередь в Nucleus SE
Вариант кода функции API NUSE_Queue_Send() (после проверки параметров) выбирается при помощи условной компиляции, в зависимости от того, активирована поддержка блокировки задач или нет. Мы рассмотрим оба варианта.

Если блокировка задач не активирована, код этого служебного вызова довольно прост:



if (NUSE_Queue_Items[queue] == NUSE_Queue_Size[queue])   /* queue
                                                         full */
{
   return_value = NUSE_QUEUE_FULL;
}
else                                 /* queue element available */
{
   NUSE_Queue_Data[queue][NUSE_Queue_Head[queue]++] = *message;
   if (NUSE_Queue_Head[queue] == NUSE_Queue_Size[queue])
   {
      NUSE_Queue_Head[queue] = 0;
   }
   NUSE_Queue_Items[queue]++;
   return_value = NUSE_SUCCESS;
}


Функция просто проверяет, есть ли свободное место в очереди, и использует индекс NUSE_Queue_Head[] для хранения сообщения в области данных очереди.

Если блокировка задач активирована, код становится более сложным:


do
{
   if (NUSE_Queue_Items[queue] == NUSE_Queue_Size[queue])   /*
                                                     queue full */
   {
      if (suspend == NUSE_NO_SUSPEND)
      {
         return_value = NUSE_QUEUE_FULL;
      }
      else
      {                                           /* block task */
         NUSE_Queue_Blocking_Count[queue]++;
         NUSE_Suspend_Task(NUSE_Task_Active,
                          (queue << 4) | NUSE_QUEUE_SUSPEND);
         return_value =
         NUSE_Task_Blocking_Return[NUSE_Task_Active];
         if (return_value != NUSE_SUCCESS)
         {
            suspend = NUSE_NO_SUSPEND;
         }
      }
   }
   else
   {                                /* queue element available */
      NUSE_Queue_Data[queue][NUSE_Queue_Head[queue]++] = *message;
      if (NUSE_Queue_Head[queue] == NUSE_Queue_Size[queue])
      {
         NUSE_Queue_Head[queue] = 0;
      }
      NUSE_Queue_Items[queue]++;
      if (NUSE_Queue_Blocking_Count[queue] != 0)
      {
         U8 index;          /* check whether a task is blocked
                               on this queue */
         NUSE_Queue_Blocking_Count[queue]--;
         for (index=0; index<NUSE_TASK_NUMBER; index++)
         {
             if ((LONIB(NUSE_Task_Status[index]) ==
                  NUSE_QUEUE_SUSPEND)
                  && (HINIB(NUSE_Task_Status[index]) == queue))
             {
                NUSE_Task_Blocking_Return[index] = NUSE_SUCCESS;
                NUSE_Wake_Task(index);
                break;
             }
          }
       }
       return_value = NUSE_SUCCESS;
       suspend = NUSE_NO_SUSPEND;
    }
} while (suspend == NUSE_SUSPEND);

Некоторые пояснения могут быть полезными.

Код заключен в цикл do…while, который выполняется, пока параметр приостановки задач имеет значение NUSE_SUSPEND.

Если очередь переполнена и параметр suspend имеет значение NUSE_NO_SUSPEND, вызов API завершается со значением NUSE_QUEUE_FULL. Если параметр suspend имеет значение NUSE_SUSPEND, задача приостанавливается. При завершении (то есть когда задача возобновляется), если возвращаемое значение равно NUSE_SUCCESS, то есть задача была возобновлена, потому что сообщение было прочитано (а не потому что очередь была сброшена), код возвращается в начало цикла.

Если очередь не переполнена, предоставляемое сообщение хранится, используя индекс NUSE_Queue_Head[], в области данных очереди. Выполняется проверка, есть ли в очереди приостановленные задачи (ожидающие сообщений). Если такие задачи есть, первая из них возобновляется. Переменной suspend присваивается значение NUSE_NO_SUSPEND, а вызов API завершается со значением NUSE_SUCCESS.


Чтение из очереди


Служебный вызов Nucleus RTOS API для чтения из очереди очень гибкий и позволяет приостанавливать задачи неявным образом, либо с определенным таймаутом, если операцию нельзя завершить немедленно (например, при попытке чтения из пустой очереди). Nucleus SE предоставляет тот же функционал, но приостановка задачи опциональна, а таймаут не реализован.


Вызов для получения сообщений из очереди в Nucleus RTOS
Прототип служебного вызова:
STATUS NU_Receive_From_Queue(NU_QUEUE *queue, VOID *message, UNSIGNED size, UNSIGNED *actual_size, UNSIGNED suspend);

Параметры:

queue – указатель на блок управления очередью, предоставляемый пользователем;
message – указатель на хранилище для принимаемых сообщений;
size – количество элементов данных типа UNSIGNED в сообщении. Это число должно соответствовать размеру сообщения, определенному при создании очереди; suspend – спецификация приостановки задачи, может принимать значения NU_NO_SUSPEND или NU_SUSPEND или значение таймаута.


Возвращаемое значение:
NU_SUCCESS – вызов был успешно завершен;
NU_INVALID_QUEUE – некорректный указатель на очередь;
NU_INVALID_POINTER – нулевой указатель на сообщение (NULL);
NU_INVALID_SUSPEND –попытка приостановки задачи из несвязанного с задачей потока;
NU_QUEUE_EMPTY – очередь пуста, а приостановка не была указана;
NU_TIMEOUT – говорит о том, что очередь все еще пуста, даже после приостановки задачи на указанный период времени;
NU_QUEUE_DELETED – очередь была удалена, пока задача была приостановлена;
NU_QUEUE_RESET – очередь была сброшена, пока задача была приостановлена.

Вызов для получения сообщений из очереди Nucleus SE
Этот вызов API поддерживает основной функционал Nucleus RTOS API.

Прототип служебного вызова:
STATUS NUSE_Queue_Receive(NUSE_QUEUE queue, ADDR *message, U8 suspend);

Параметры:
queue – индекс (ID) очереди;
message – указатель на хранилище для принимаемых сообщений, представляет из себя одну переменную типа ADDR;
suspend – спецификация приостановки задачи, может принимать значения NUSE_NO_SUSPEND или NUSE_SUSPEND.

Возвращаемое значение:
NUSE_SUCCESS – вызов был успешно завершен;
NUSE_INVALID_QUEUE – некорректный индекс очереди;
NUSE_INVALID_POINTER – нулевой указатель на сообщение (NULL);
NUSE_INVALID_SUSPEND – попытка приостановки задачи из не связанного с задачей потока или при отключенной поддержке блокировки задач;
NUSE_QUEUE_EMPTY – очередь пуста, а приостановка не была указана;
NUSE_QUEUE_WAS_RESET – очередь была сброшена, пока задача была приостановлена.

Реализация получения сообщений из очередей в Nucleus SE

Вариант кода функции API NUSE_Queue_Receive() (после проверки параметров) выбирается при помощи условной компиляции, в зависимости от того, активирована поддержка блокировки задач или нет. Рассмотрим оба варианта.

Если поддержка блокировки активирована, код этого вызова API довольно прост:


if (NUSE_Queue_Items[queue] == 0)                /* queue empty */
{
   return_value = NUSE_QUEUE_EMPTY;
}
else
{                                          /* message available */
   *message = NUSE_Queue_Data[queue][NUSE_Queue_Tail[queue]++];
   if (NUSE_Queue_Tail[queue] == NUSE_Queue_Size[queue])
   {
      NUSE_Queue_Tail[queue] = 0;
   }
      NUSE_Queue_Items[queue]--;
      return_value = NUSE_SUCCESS;
}

Функция просто проверяет, есть ли в очереди сообщение, и использует индекс NUSE_Queue_Tail[] для получения сообщения из очереди и возврата данных при помощи указателя на сообщение.

Если блокировка задач активирована, код становится более сложным:


do
{
   if (NUSE_Queue_Items[queue] == 0)           /* queue empty */
   {
      if (suspend == NUSE_NO_SUSPEND)
      {
         return_value = NUSE_QUEUE_EMPTY;
      }
      else
      {                                        /* block task */
         NUSE_Queue_Blocking_Count[queue]++;
         NUSE_Suspend_Task(NUSE_Task_Active, (queue << 4) |
         NUSE_QUEUE_SUSPEND);
         return_value =
         NUSE_Task_Blocking_Return[NUSE_Task_Active];
         if (return_value != NUSE_SUCCESS)
         {
            suspend = NUSE_NO_SUSPEND;
         }
      }
   }
   else
   {                                     /* message available */
      *message = NUSE_Queue_Data[queue][NUSE_Queue_Tail[queue]++];
      if (NUSE_Queue_Tail[queue] == NUSE_Queue_Size[queue])
      {
         NUSE_Queue_Tail[queue] = 0;
      }
      NUSE_Queue_Items[queue]--;
      if (NUSE_Queue_Blocking_Count[queue] != 0)
      {
         U8 index;          /* check whether a task is blocked */
                                              /* on this queue */
             NUSE_Queue_Blocking_Count[queue]--;
         for (index=0; index<NUSE_TASK_NUMBER; index++)
         {
            if ((LONIB(NUSE_Task_Status[index]) ==
               NUSE_QUEUE_SUSPEND)
               && (HINIB(NUSE_Task_Status[index]) == queue))
            {
               NUSE_Task_Blocking_Return[index] = NUSE_SUCCESS;
               NUSE_Wake_Task(index);
               break;
            }
         }
      }
      return_value = NUSE_SUCCESS;
      suspend = NUSE_NO_SUSPEND;
   }
} while (suspend == NUSE_SUSPEND);


Некоторые пояснения будут полезны.

Код заключен в цикл do…while, который выполняется, пока параметр приостановки задач имеет значение NUSE_SUSPEND.

Если очередь пуста и параметр suspend имеет значение NUSE_NO_SUSPEND, вызов API завершается со значением NUSE_QUEUE_EMPTY. Если параметр suspend имеет значение NUSE_SUSPEND, задача приостанавливается. При завершении (то есть когда задача возобновляется), если возвращаемое значение равно NUSE_SUCCESS, то есть задача была возобновлена, потому что сообщение было отправлено (а не потому что очередь была сброшена), код возвращается в начало цикла.

Если очередь содержит в себе сообщения, хранимое сообщение возвращается, используя индекс NUSE_Queue_Tail[]. Выполняется проверка, есть ли приостановленные (ожидающие отправки) задачи в этой очереди. Если такие задачи есть, первая из них возобновляется. Переменной suspend присваивается значение NUSE_NO_SUSPEND, и вызов API завершается с кодом NUSE_SUCCESS.


Запись в голову очереди


Служебный вызов Nucleus RTOS API для записи сообщения в голову очереди очень гибкий и позволяет приостановить задачу неявным образом, либо с определенным таймаутом, если операцию нельзя завершить немедленно (например, при попытке записи в переполненную очередь). Nucleus SE предоставляет тот же функционал, но приостановка задач опциональна, а таймаут не реализован.


Вызов для записи сообщения в голову очереди Nucleus RTOS
Прототип служебного вызова:
STATUS NU_Send_To_Front_Of_Queue(NU_QUEUE *queue, VOID *message, UNSIGNED size, UNSIGNED suspend);

Параметры:

queue – указатель на блок управления очередью, предоставленный пользователем;
message – указатель на отправляемое сообщение;
size – количество элементов данных типа UNSIGNED в сообщении. Если очередь поддерживает сообщения переменной длины, этот параметр должен быть равен размеру сообщения или быть меньше размера сообщения, поддерживаемого очередью. Если очередь поддерживает сообщения фиксированной длины, этот параметр должен точно совпадать с размером сообщения, поддерживаемого очередью;
suspend – спецификация приостановки задачи, может принимать значения NU_NO_SUSPEND или NU_SUSPEND или значение таймаута.


Возвращаемое значение:
NU_SUCCESS – вызов был успешно завершен;
NU_INVALID_QUEUE – некорректный указатель на очередь;
NU_INVALID_POINTER – нулевой указатель на сообщение (NULL);
NU_INVALID_SIZE – размер сообщения несовместим с размером сообщения, поддерживаемого очередью;
NU_INVALID_SUSPEND – попытка приостановки из не связанного с задачей потока;
NU_QUEUE_FULL – очередь переполнена, а приостановка не была указана;
NU_TIMEOUT – очередь переполнена, даже после приостановки задачи на определенный таймаут;
NU_QUEUE_DELETED – очередь была удалена, пока задача была приостановлена;
NU_QUEUE_RESET – очередь была сброшена, пока задача была приостановлена.

Вызов для записи сообщения в голову очереди в Nucleus SE
Этот вызов API поддерживает основной функционал Nucleus RTOS API.

Прототип служебного вызова:
STATUS NUSE_Queue_Jam(NUSE_QUEUE queue, ADDR *message, U8 suspend);

Параметры:
queue – индекс (ID) очереди;v message – указатель на сообщение, представляет из себя одну переменную типа ADDR;
suspend – спецификация приостановки задачи, может принимать значения NUSE_NO_SUSPEND или NUSE_SUSPEND.

Возвращаемое значение:
NUSE_SUCCESS – вызов был успешно завершен;
NUSE_INVALID_QUEUE – некорректный индекс очереди;
NUSE_INVALID_POINTER – нулевой указатель на сообщение (NULL);
NUSE_INVALID_SUSPEND – попытка приостановки задачи из не связанного с задачей потока или при отключенной поддержке блокировки задач;
NUSE_QUEUE_FULL – очередь переполнена, а приостановка не была указана;
NUSE_QUEUE_WAS_RESET – очередь была сброшена, пока задача была приостановлена.

Реализация записи сообщения в начало очереди в Nucleus SE
Вариант кода функции API NUSE_Queue_Jam() очень похож на NUSE_Queue_Send(), только данные хранятся при помощи индекса NUSE_Queue_Tail[], таким образом:

if (NUSE_Queue_Items[queue] == NUSE_Queue_Size[queue]) /* queue
                                                       full */
{
   return_value = NUSE_QUEUE_FULL;
}
else                                 /* queue element available */
{
   if (NUSE_Queue_Tail[queue] == 0)
   {
      NUSE_Queue_Tail[queue] = NUSE_Queue_Size[queue] - 1;
   }
   else
   {
      NUSE_Queue_Tail[queue]--;
   }
       NUSE_Queue_Data[queue][NUSE_Queue_Tail[queue]] = *message;
   NUSE_Queue_Items[queue]++;
   return_value = NUSE_SUCCESS;
}


В следующей статье будут рассмотрены дополнительные вызовы API, связанные с очередями, а также структуры данных.

Об авторе: Колин Уоллс уже более тридцати лет работает в сфере электронной промышленности, значительную часть времени уделяя встроенному ПО. Сейчас он – инженер в области встроенного ПО в Mentor Embedded ( подразделение Mentor Graphics). Колин Уоллс часто выступает на конференциях и семинарах, автор многочисленных технических статей и двух книг по встроенному ПО. Живет в Великобритании. Профессиональный блог Колина: https://blogs.mentor.com/colinwalls/, e-mail: colin_walls@mentor.com


Источник: https://www.embedded.com/design/operating-systems/4460985/Queues--introduction-and-basic-services

Теги: