Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update(docs): small updates to docs. #19

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,31 @@ jobs:
mkdir build && cd build
cmake -DBUILD_SAMPLES=On -DBUILD_TESTS=On ..
make package
ctest -V
ctest -VV --stop-on-failure
cmake -DBUILD_OOT_TEST=On ..
sudo make install
- name: Build and test FS
run: |
mkdir build_fs && cd build_fs
cmake -DBUILD_TESTS=On -DBUILD_SAMPLES=On -DWITH_FS=On ..
make package
ctest -V
ctest -VV --stop-on-failure
cmake -DBUILD_OOT_TEST=On ..
sudo make install
- name: Build and test libkqueue
run: |
mkdir build_kqueue && cd build_kqueue
cmake -DBUILD_SAMPLES=On -DBUILD_TESTS=On -DWITH_LIBKQUEUE=true -DWITH_VALGRIND=false ..
make package
ctest -V
ctest -VV --stop-on-failure
cmake -DBUILD_OOT_TEST=On ..
sudo make install
- name: Build and test liburing
run: |
mkdir build_uring && cd build_uring
cmake -DBUILD_SAMPLES=On -DBUILD_TESTS=On -DWITH_LIBURING=true -DWITH_VALGRIND=false ..
make package
ctest -V
ctest -VV --stop-on-failure
cmake -DBUILD_OOT_TEST=On ..
sudo make install

Expand All @@ -70,14 +70,14 @@ jobs:
mkdir build && cd build
cmake -DBUILD_SAMPLES=On -DBUILD_TESTS=On -DBUILD_OOT_TEST=true ..
make
ctest -V
ctest -VV --stop-on-failure
make install
cd ..
mkdir build_fs && cd build_fs
cmake -DBUILD_SAMPLES=On -DBUILD_TESTS=On -DBUILD_OOT_TEST=true -DWITH_FS=true ..
make
kldload fusefs
ctest -V
ctest -VV --stop-on-failure
make install

build-osx-amd64:
Expand All @@ -94,4 +94,4 @@ jobs:
mkdir -p build
cd build && cmake -DBUILD_SAMPLES=On -DBUILD_TESTS=On ..
cmake --build .
ctest -V
ctest -VV --stop-on-failure
11 changes: 2 additions & 9 deletions Lib/core/evts.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,12 @@ _public_ ssize_t m_mod_unstash(m_mod_t *mod, size_t len) {
return processed;
}

_public_ int m_mod_set_batch_size(m_mod_t *mod, size_t len) {
_public_ int m_mod_batch_set(m_mod_t *mod, size_t len, uint64_t timeout_ns) {
M_MOD_ASSERT(mod);
M_MOD_CONSUME_TOKEN(mod);

mod->batch.len = len;
return 0;
}

_public_ int m_mod_set_batch_timeout(m_mod_t *mod, uint64_t timeout_ns) {
M_MOD_ASSERT(mod);

// src_deregister and src_register already consume a token


/* If it was already set, remove old timer */
if (mod->batch.timer.ns != 0) {
m_mod_src_deregister_tmr(mod, &mod->batch.timer);
Expand Down
2 changes: 1 addition & 1 deletion Lib/core/mod.c
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ _public_ int m_mod_deregister(m_mod_t **mod) {
return mod_deregister(mod, true);
}

_public_ int m_mod_set_tokenbucket(m_mod_t *mod, uint32_t rate, uint64_t burst) {
_public_ int m_mod_tb_set(m_mod_t *mod, uint32_t rate, uint64_t burst) {
M_MOD_ASSERT(mod);
M_PARAM_ASSERT(rate <= BILLION);

Expand Down
3 changes: 1 addition & 2 deletions Lib/core/poll/kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ int poll_create(poll_priv_t *priv) {
}

int poll_set_new_evt(poll_priv_t *priv, ev_src_t *tmp, const enum op_type flag) {
static int timer_ids = 1;
GET_PRIV_DATA();

/* Eventually alloc kqueue data if needed */
Expand Down Expand Up @@ -53,7 +52,7 @@ int poll_set_new_evt(poll_priv_t *priv, ev_src_t *tmp, const enum op_type flag)
#else
const int flags = 0; // unsupported...
#endif
EV_SET(_ev, timer_ids++, EVFILT_TIMER, f, flags | NOTE_NSECONDS, tmp->tmr_src.its.ns, tmp);
EV_SET(_ev, tmp->tmr_src.its.ns, EVFILT_TIMER, f, flags | NOTE_NSECONDS, tmp->tmr_src.its.ns, tmp);
break;
}
case M_SRC_TYPE_SGN:
Expand Down
4 changes: 2 additions & 2 deletions Lib/core/ps.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ static int send_msg(m_mod_t *mod, const m_mod_t *recipient, const char *topic,
M_PARAM_ASSERT(message);

mod->stats.sent_msgs++;
ps_priv_t m = { { false, mod, topic, message }, flags, NULL };
ps_priv_t m = { { mod, topic, message }, flags, NULL };
return tell_pubsub_msg(&m, recipient, mod->ctx);
}

Expand All @@ -146,7 +146,7 @@ int tell_system_pubsub_msg(const m_mod_t *recipient, m_ctx_t *c, m_mod_t *sender
// A module sent a M_PS_MOD_POISONPILL message to another, or it was stopped
sender->stats.sent_msgs++;
}
ps_priv_t m = { { true, sender, topic, NULL }, 0, NULL };
ps_priv_t m = { { sender, topic, NULL }, 0, NULL };
return tell_pubsub_msg(&m, recipient, c);
}

Expand Down
21 changes: 10 additions & 11 deletions Lib/core/public/module/mod.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ typedef enum {
*/
#define M_SRC_SHIFT(type, val) val << (8 * (type + 1))
typedef enum {
M_SRC_PRIO_LOW = 1 << 0, // PubSub subscription low priority
M_SRC_PRIO_NORM = 1 << 1, // PubSub subscription mid priority (default)
M_SRC_PRIO_HIGH = 1 << 2, // PubSub subscription high priority
M_SRC_PRIO_LOW = 1 << 0, // Src low priority
M_SRC_PRIO_NORM = 1 << 1, // Src mid priority (default)
M_SRC_PRIO_HIGH = 1 << 2, // Src high priority
M_SRC_AUTOFREE = 1 << 3, // Automatically free userdata upon source deregistation.
M_SRC_ONESHOT = 1 << 4, // Run just once then automatically deregister source.
M_SRC_DUP = 1 << 5, // Duplicate PubSub topic, source fd or source path.
Expand All @@ -46,7 +46,7 @@ typedef enum {
#define M_PS_MOD_STOPPED "LIBMODULE_MOD_STOPPED"

/*
* Module's pubsub API flags (m_mod_tell(), m_mod_publish(), m_mod_broadcast())
* Module's pubsub API flags (m_mod_ps_tell(), m_mod_ps_publish(), m_mod_ps_broadcast())
*/
typedef enum {
M_PS_AUTOFREE = 1 << 0, // Autofree PubSub data after every recipient receives message (ie: when ps_evt ref counter goes to 0)
Expand Down Expand Up @@ -87,7 +87,6 @@ typedef struct {

/* PubSub messages */
typedef struct {
bool system; // Is this a system message?
const m_mod_t *sender;
const char *topic;
const void *data; // NULL for system messages
Expand Down Expand Up @@ -232,12 +231,10 @@ m_mod_t *m_mod_lookup(const m_mod_t *mod, const char *name);
int m_mod_become(m_mod_t *mod, m_evt_cb new_on_evt);
int m_mod_unbecome(m_mod_t *mod);

/* Module PubSub interface */
/* Module PubSub interface (Subscribe/unsubscribe API is below under the event sources management) */
int m_mod_ps_tell(m_mod_t *mod, const m_mod_t *recipient, const void *message, m_ps_flags flags);
int m_mod_ps_publish(m_mod_t *mod, const char *topic, const void *message, m_ps_flags flags);
int m_mod_ps_poisonpill(m_mod_t *mod, const m_mod_t *recipient);
int m_mod_ps_subscribe(m_mod_t *mod, const char *topic, m_src_flags flags, const void *userptr);
int m_mod_ps_unsubscribe(m_mod_t *mod, const char *topic);

/* Events' stashing API */
int m_mod_stash(m_mod_t *mod, const m_evt_t *evt);
Expand All @@ -246,6 +243,9 @@ ssize_t m_mod_unstash(m_mod_t *mod, size_t len);
/* Event Sources management */
ssize_t m_mod_src_len(const m_mod_t *mod, m_src_types type);

int m_mod_ps_subscribe(m_mod_t *mod, const char *topic, m_src_flags flags, const void *userptr);
int m_mod_ps_unsubscribe(m_mod_t *mod, const char *topic);

int m_mod_src_register_fd(m_mod_t *mod, int fd, m_src_flags flags, const void *userptr);
int m_mod_src_deregister_fd(m_mod_t *mod, int fd);

Expand All @@ -268,11 +268,10 @@ int m_mod_src_register_thresh(m_mod_t *mod, const m_src_thresh_t *thr, m_src_fla
int m_mod_src_deregister_thresh(m_mod_t *mod, const m_src_thresh_t *thr);

/* Event batch management */
int m_mod_set_batch_size(m_mod_t *mod, size_t len);
int m_mod_set_batch_timeout(m_mod_t *mod, uint64_t timeout_ns);
int m_mod_batch_set(m_mod_t *mod, size_t len, uint64_t timeout_ns);

/* Mod tokenbucket */
int m_mod_set_tokenbucket(m_mod_t *mod, uint32_t rate, uint64_t burst);
int m_mod_tb_set(m_mod_t *mod, uint32_t rate, uint64_t burst);

/* Generic event source registering functions */
#define m_mod_src_register(mod, X, flags, userptr) _Generic((X) + 0, \
Expand Down
59 changes: 33 additions & 26 deletions Lib/core/src.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,56 +200,56 @@ static ev_src_t *create_src(m_mod_t *mod, m_src_types type, process_cb proc,
}

static int fdcmp(void *my_data, void *node_data) {
ev_src_t *src = (ev_src_t *)node_data;
int fd = *((int *)my_data);
const ev_src_t *node = (const ev_src_t *)node_data;
const ev_src_t *other = (const ev_src_t *)my_data;

return fd - src->fd_src.fd;
return other->fd_src.fd - node->fd_src.fd;
}

static int tmrcmp(void *my_data, void *node_data) {
ev_src_t *src = (ev_src_t *)node_data;
const m_src_tmr_t *its = (const m_src_tmr_t *)my_data;
const ev_src_t *node = (const ev_src_t *)node_data;
const ev_src_t *other = (const ev_src_t *)my_data;

return its->ns - src->tmr_src.its.ns;
return other->tmr_src.its.ns - node->tmr_src.its.ns;
}

static int sgncmp(void *my_data, void *node_data) {
ev_src_t *src = (ev_src_t *)node_data;
const m_src_sgn_t *sgs = (const m_src_sgn_t *)my_data;
const ev_src_t *node = (const ev_src_t *)node_data;
const ev_src_t *other = (const ev_src_t *)my_data;

return sgs->signo - src->sgn_src.sgs.signo;
return other->sgn_src.sgs.signo - node->sgn_src.sgs.signo;
}

static int pathcmp(void *my_data, void *node_data) {
ev_src_t *src = (ev_src_t *)node_data;
const m_src_path_t *pt = (const m_src_path_t *)my_data;
const ev_src_t *node = (const ev_src_t *)node_data;
const ev_src_t *other = (const ev_src_t *)my_data;

return strcmp(pt->path, src->path_src.pt.path);
return strcmp(other->path_src.pt.path, node->path_src.pt.path);
}

static int pidcmp(void *my_data, void *node_data) {
ev_src_t *src = (ev_src_t *)node_data;
const m_src_pid_t *pid = (const m_src_pid_t *)my_data;
const ev_src_t *node = (const ev_src_t *)node_data;
const ev_src_t *other = (const ev_src_t *)my_data;

return pid->pid - src->pid_src.pid.pid;
return other->pid_src.pid.pid - node->pid_src.pid.pid;
}

static int taskcmp(void *my_data, void *node_data) {
ev_src_t *src = (ev_src_t *)node_data;
const m_src_task_t *tid = (const m_src_task_t *)my_data;
const ev_src_t *node = (const ev_src_t *)node_data;
const ev_src_t *other = (const ev_src_t *)my_data;

return tid->tid - src->task_src.tid.tid;
return other->task_src.tid.tid - node->task_src.tid.tid;
}

static int threshcmp(void *my_data, void *node_data) {
ev_src_t *src = (ev_src_t *)node_data;
const m_src_thresh_t *thr = (const m_src_thresh_t *)my_data;
const ev_src_t *node = (const ev_src_t *)node_data;
const ev_src_t *other = (const ev_src_t *)my_data;

long double my_val = (long double)thr->activity_freq
+ (long double)thr->inactive_ms;
long double their_val = (long double)src->thresh_src.thr.activity_freq
+ (long double)src->thresh_src.thr.inactive_ms;
return my_val - their_val;
long double other_val = (long double)other->thresh_src.thr.activity_freq
+ (long double)other->thresh_src.thr.inactive_ms;
long double node_val = (long double)node->thresh_src.thr.activity_freq
+ (long double)node->thresh_src.thr.inactive_ms;
return other_val - node_val;
}

static ev_src_t *process_ps(ev_src_t *this, m_ctx_t *c, int idx, evt_priv_t *evt) {
Expand Down Expand Up @@ -394,7 +394,14 @@ int deregister_mod_src(m_mod_t *mod, m_src_types type, void *src_data) {
M_MOD_ASSERT(mod);
M_MOD_CONSUME_TOKEN(mod);

return m_bst_remove(mod->srcs[type], src_data);
// Create onetime src to check the bst
ev_src_t *src = create_src(mod, type, src_procs_map[type], src_data, 0, NULL);
if (!src) {
return -EINVAL;
}
int ret = m_bst_remove(mod->srcs[type], src);
m_mem_unref(src);
return ret;
}

int start_task(m_ctx_t *c, ev_src_t *src) {
Expand Down
58 changes: 18 additions & 40 deletions Lib/utils/log.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ X_LOG_LEVELS
/** **/

static inline m_logger_level find_level(const char *level_str) {
if (!level_str) {
return -1;
}
static const char *lvl_names[] = {
#define X_LOG_LEVEL(name) #name,
X_LOG_LEVELS
Expand All @@ -47,27 +50,27 @@ static __attribute__((constructor (111))) void libmodule_log_init(void) {
X_LOG_CTXS
#undef X_LOG_CTX
};
char *log_env;

// Load fallback global level
int global_level = find_level(getenv("LIBMODULE_LOG"));
if (global_level == -1) {
// Default value
global_level = ERR;
}

char env_name[64];
bool log_set[X_LOG_CTX_MAX] = {0};
// Now load log levels for each context
for (int i = 0; i < X_LOG_CTX_MAX; i++) {
/* Default values */
// Default noop logger
libmodule_logger.DEBUG[i] = libmodule_log_noop;
libmodule_logger.INFO[i] = libmodule_log_noop;
libmodule_logger.WARN[i] = libmodule_log_noop;
libmodule_logger.ERR[i] = libmodule_log_noop;

int log_level = ERR;

snprintf(env_name, sizeof(env_name), "LIBMODULE_LOG_%s", ctx_names[i]);
log_env = getenv(env_name);
if (log_env) {
log_level = find_level(log_env);
if (log_level != -1) {
log_set[i] = true;
} else {
// Default value
log_level = ERR;
}
int log_level = find_level(getenv(env_name));
if (log_level == -1) {
log_level = global_level;
}
switch (log_level) {
case DEBUG:
Expand All @@ -81,32 +84,7 @@ static __attribute__((constructor (111))) void libmodule_log_init(void) {
break;
}
}

int log_level = ERR;
log_env = getenv("LIBMODULE_LOG");
if (log_env) {
log_level = find_level(log_env);
if (log_level == -1) {
// Default value
log_level = ERR;
}
}
for (int i = 0; i < X_LOG_CTX_MAX; i++) {
if (!log_set[i]) {
switch (log_level) {
case DEBUG:
libmodule_logger.DEBUG[i] = libmodule_log_DEBUG;
case INFO:
libmodule_logger.INFO[i] = libmodule_log_INFO;
case WARN:
libmodule_logger.WARN[i] = libmodule_log_WARN;
default:
libmodule_logger.ERR[i] = libmodule_log_ERR;
break;
}
}
}


const char *log_file_path = getenv("LIBMODULE_LOG_OUTPUT");
if (log_file_path) {
libmodule_logger.log_file = fopen(log_file_path, "w");
Expand Down
2 changes: 1 addition & 1 deletion Samples/Task/pippo.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ static bool m_mod_on_start(m_mod_t *mod) {

m_mod_src_register(mod, &((m_src_tmr_t) { CLOCK_MONOTONIC, (uint64_t)1 * 1000 * 1000 * 1000 }), 0, &tmrData); // 1s
m_mod_src_register(mod, &((m_src_task_t) { 8, inc }), 0, &thData);
m_mod_set_batch_timeout(mod, 1500); // 1500ms!
m_mod_batch_set(mod, 0, 1500); // 1500ms!
return true;
}

Expand Down
Loading